taolib.testing.task_queue.repository.task_repo#
任务 Repository。
提供 Task 的 MongoDB 持久化操作。
Classes#
任务 Repository。 |
Module Contents#
- class taolib.testing.task_queue.repository.task_repo.TaskRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.task_queue.models.task.TaskDocument]任务 Repository。
- async find_by_status(status: taolib.testing.task_queue.models.enums.TaskStatus, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#
根据状态查找任务。
- 参数:
status -- 任务状态
skip -- 跳过记录数
limit -- 限制记录数
- 返回:
任务文档列表
- async find_by_type(task_type: str, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#
根据任务类型查找任务。
- 参数:
task_type -- 任务类型
skip -- 跳过记录数
limit -- 限制记录数
- 返回:
任务文档列表
- async find_failed_tasks(skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#
查找所有失败的任务。
- 返回:
失败任务列表
- async find_running_tasks() list[taolib.testing.task_queue.models.task.TaskDocument]#
查找所有运行中的任务。
- 返回:
运行中任务列表
- async update_status(task_id: str, status: taolib.testing.task_queue.models.enums.TaskStatus, **extra_fields: Any) taolib.testing.task_queue.models.task.TaskDocument | None#
更新任务状态。
- 参数:
task_id -- 任务 ID
status -- 新状态
**extra_fields -- 额外更新字段
- 返回:
更新后的任务文档,如果不存在则返回 None
- async find_by_idempotency_key(idempotency_key: str) taolib.testing.task_queue.models.task.TaskDocument | None#
根据幂等键查找任务。
- 参数:
idempotency_key -- 幂等键
- 返回:
任务文档,如果不存在则返回 None
- async find_by_filters(status: taolib.testing.task_queue.models.enums.TaskStatus | None = None, task_type: str | None = None, priority: taolib.testing.task_queue.models.enums.TaskPriority | None = None, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#
根据多个条件过滤查找任务。
- 参数:
status -- 任务状态过滤
task_type -- 任务类型过滤
priority -- 优先级过滤
skip -- 跳过记录数
limit -- 限制记录数
- 返回:
任务文档列表