taolib.testing.task_queue.worker.worker#

任务工作协程。

单个 Worker 的实现,从 Redis 队列中拉取任务并执行。

Attributes#

Classes#

TaskWorker

任务工作协程。

Module Contents#

taolib.testing.task_queue.worker.worker.logger#
class taolib.testing.task_queue.worker.worker.TaskWorker(worker_id: str, redis_queue: taolib.testing.task_queue.queue.redis_queue.RedisTaskQueue, task_repo: taolib.testing.task_queue.repository.task_repo.TaskRepository, registry: taolib.testing.task_queue.worker.registry.TaskHandlerRegistry)#

任务工作协程。

从 Redis 队列中拉取任务、查找处理器并执行, 处理成功/失败结果和重试逻辑。

_worker_id#
_redis_queue#
_task_repo#
_registry#
_running = False#
_current_task_id: str | None = None#
property worker_id: str#

工作者标识。

property is_running: bool#

是否正在运行。

property current_task_id: str | None#

当前正在执行的任务 ID。

async start() None#

启动工作协程。

stop() None#

请求停止工作协程(完成当前任务后退出)。

async _run_loop() None#

主工作循环。

async _process_task(task_id: str) None#

处理单个任务。

参数:

task_id -- 任务 ID

async _execute_handler(handler: Any, task: taolib.testing.task_queue.models.task.TaskDocument) dict[str, Any]#

执行任务处理器。

参数:
  • handler -- 处理器函数

  • task -- 任务文档

返回:

处理结果字典

async _handle_success(task: taolib.testing.task_queue.models.task.TaskDocument, result: dict[str, Any]) None#

处理任务成功完成。

参数:
  • task -- 任务文档

  • result -- 执行结果

async _handle_failure(task: taolib.testing.task_queue.models.task.TaskDocument, error: Exception) None#

处理任务执行失败。

参数:
  • task -- 任务文档

  • error -- 异常对象