AsyncThreadManager

AsyncThreadManager#

import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

class AsyncThreadManager:
    def __init__(self):
        self._loop = None
        self._loop_thread = None
        self._lock = threading.Lock()
        self._executor = ThreadPoolExecutor(max_workers=1)
        self._running = False

    def start_loop(self):
        """启动事件循环线程"""
        with self._lock:
            if self._loop is not None:
                return
            
            self._loop = asyncio.new_event_loop()
            self._loop_thread = threading.Thread(target=self._run_loop, daemon=True)
            self._running = True
            self._loop_thread.start()
            
            # 等待事件循环初始化完成
            while not self._loop.is_running():
                time.sleep(0.01)

    def _run_loop(self):
        """在线程中运行事件循环"""
        asyncio.set_event_loop(self._loop)
        try:
            self._loop.run_forever()
        finally:
            self._loop.close()

    async def run_coroutine(self, coro):
        """在事件循环中运行协程并返回结果"""
        return await asyncio.wrap_future(
            asyncio.run_coroutine_threadsafe(coro, self._loop)
        )

    def submit_task(self, coro):
        """提交协程任务到线程池执行"""
        return self._executor.submit(asyncio.run, self.run_coroutine(coro))

    def stop(self):
        """优雅地停止事件循环和线程池"""
        with self._lock:
            if not self._running:
                return
                
            self._running = False
            if self._loop and self._loop.is_running():
                self._loop.call_soon_threadsafe(self._loop.stop)
            
            self._executor.shutdown(wait=True)
            if self._loop_thread:
                self._loop_thread.join(timeout=1.0)

def run_task(coro):
    manager = AsyncThreadManager()
    try:
        # 启动事件循环线程
        manager.start_loop()
        # 提交协程任务
        future = manager.submit_task(coro)
        # 等待任务完成并获取结果
        result = future.result()
        # print(f"任务结果: {result}")
    finally:
        # 确保资源被正确释放
        manager.stop()
    return result