taolib.testing.task_queue.worker.manager#

工作者管理器。

编排多个 TaskWorker 协程,管理重试轮询和崩溃恢复。

Attributes#

Classes#

WorkerManager

工作者管理器。

Module Contents#

taolib.testing.task_queue.worker.manager.logger#
taolib.testing.task_queue.worker.manager.RETRY_POLL_INTERVAL = 30#
taolib.testing.task_queue.worker.manager.STALE_TASK_TIMEOUT = 1800#
class taolib.testing.task_queue.worker.manager.WorkerManager(redis_queue: taolib.testing.task_queue.queue.redis_queue.RedisTaskQueue, task_repo: taolib.testing.task_queue.repository.task_repo.TaskRepository, registry: taolib.testing.task_queue.worker.registry.TaskHandlerRegistry, num_workers: int = 3)#

工作者管理器。

管理多个 TaskWorker 的生命周期,包括: - 启动/停止工作者 - 重试任务轮询 - 崩溃恢复

_redis_queue#
_task_repo#
_registry#
_num_workers = 3#
_workers: list[taolib.testing.task_queue.worker.worker.TaskWorker] = []#
_worker_tasks: list[asyncio.Task] = []#
_retry_poller_task: asyncio.Task | None = None#
_running = False#
property is_running: bool#

管理器是否正在运行。

property num_workers: int#

工作者数量。

property workers: list[taolib.testing.task_queue.worker.worker.TaskWorker]#

工作者列表。

async start() None#

启动所有工作者和重试轮询。

async stop() None#

优雅停止所有工作者。

等待每个工作者完成当前任务后退出。

async _retry_poll_loop() None#

重试任务轮询循环。

每 30 秒检查到期的重试任务,将其重新入队。

async _recover_running_tasks() None#

崩溃恢复:检查 Redis 中的孤儿任务并重新入队。

启动时调用,处理因进程崩溃而停留在 running 状态的任务。