import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class AsyncThreadManager:
def __init__(self):
self._loop = None
self._loop_thread = None
self._lock = threading.Lock()
self._executor = ThreadPoolExecutor(max_workers=1)
self._running = False
def start_loop(self):
"""启动事件循环线程"""
with self._lock:
if self._loop is not None:
return
self._loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_loop, daemon=True)
self._running = True
self._loop_thread.start()
# 等待事件循环初始化完成
while not self._loop.is_running():
time.sleep(0.01)
def _run_loop(self):
"""在线程中运行事件循环"""
asyncio.set_event_loop(self._loop)
try:
self._loop.run_forever()
finally:
self._loop.close()
async def run_coroutine(self, coro):
"""在事件循环中运行协程并返回结果"""
return await asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(coro, self._loop)
)
def submit_task(self, coro):
"""提交协程任务到线程池执行"""
return self._executor.submit(asyncio.run, self.run_coroutine(coro))
def stop(self):
"""优雅地停止事件循环和线程池"""
with self._lock:
if not self._running:
return
self._running = False
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
self._executor.shutdown(wait=True)
if self._loop_thread:
self._loop_thread.join(timeout=1.0)
def run_task(coro):
manager = AsyncThreadManager()
try:
# 启动事件循环线程
manager.start_loop()
# 提交协程任务
future = manager.submit_task(coro)
# 等待任务完成并获取结果
result = future.result()
# print(f"任务结果: {result}")
finally:
# 确保资源被正确释放
manager.stop()
return result