taolib.testing.task_queue.services.task_service#

任务服务层。

提供任务的业务逻辑操作。

Attributes#

Classes#

TaskService

任务服务。

Module Contents#

taolib.testing.task_queue.services.task_service.logger#
class taolib.testing.task_queue.services.task_service.TaskService(task_repo: taolib.testing.task_queue.repository.task_repo.TaskRepository, redis_queue: taolib.testing.task_queue.queue.redis_queue.RedisTaskQueue)#

任务服务。

提供任务提交、查询、重试、取消等业务逻辑。

_task_repo#
_redis_queue#
async submit_task(task_create: taolib.testing.task_queue.models.task.TaskCreate) taolib.testing.task_queue.models.task.TaskDocument#

提交新任务。

参数:

task_create -- 任务创建数据

返回:

创建的任务文档

抛出:

TaskAlreadyExistsError -- 幂等键冲突

async get_task(task_id: str) taolib.testing.task_queue.models.task.TaskDocument#

获取任务详情。

参数:

task_id -- 任务 ID

返回:

任务文档

抛出:

TaskNotFoundError -- 任务不存在

async retry_task(task_id: str) taolib.testing.task_queue.models.task.TaskDocument#

手动重试失败任务。

重置任务状态并重新入队。

参数:

task_id -- 任务 ID

返回:

更新后的任务文档

抛出:
async cancel_task(task_id: str) taolib.testing.task_queue.models.task.TaskDocument#

取消任务。

仅可取消 PENDING 或 RETRYING 状态的任务。

参数:

task_id -- 任务 ID

返回:

更新后的任务文档

抛出:
async list_tasks(status: taolib.testing.task_queue.models.enums.TaskStatus | None = None, task_type: str | None = None, priority: taolib.testing.task_queue.models.enums.TaskPriority | None = None, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#

查询任务列表。

参数:
  • status -- 状态过滤

  • task_type -- 类型过滤

  • priority -- 优先级过滤

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

任务文档列表

async get_stats() dict[str, Any]#

获取任务统计信息。

合并 Redis 实时数据和 MongoDB 持久统计。

返回:

统计信息字典