taolib.testing.task_queue.repository.task_repo#

任务 Repository。

提供 Task 的 MongoDB 持久化操作。

Classes#

TaskRepository

任务 Repository。

Module Contents#

class taolib.testing.task_queue.repository.task_repo.TaskRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.task_queue.models.task.TaskDocument]

任务 Repository。

async find_by_status(status: taolib.testing.task_queue.models.enums.TaskStatus, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#

根据状态查找任务。

参数:
  • status -- 任务状态

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

任务文档列表

async find_by_type(task_type: str, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#

根据任务类型查找任务。

参数:
  • task_type -- 任务类型

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

任务文档列表

async find_failed_tasks(skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#

查找所有失败的任务。

返回:

失败任务列表

async find_running_tasks() list[taolib.testing.task_queue.models.task.TaskDocument]#

查找所有运行中的任务。

返回:

运行中任务列表

async update_status(task_id: str, status: taolib.testing.task_queue.models.enums.TaskStatus, **extra_fields: Any) taolib.testing.task_queue.models.task.TaskDocument | None#

更新任务状态。

参数:
  • task_id -- 任务 ID

  • status -- 新状态

  • **extra_fields -- 额外更新字段

返回:

更新后的任务文档,如果不存在则返回 None

async find_by_idempotency_key(idempotency_key: str) taolib.testing.task_queue.models.task.TaskDocument | None#

根据幂等键查找任务。

参数:

idempotency_key -- 幂等键

返回:

任务文档,如果不存在则返回 None

async find_by_filters(status: taolib.testing.task_queue.models.enums.TaskStatus | None = None, task_type: str | None = None, priority: taolib.testing.task_queue.models.enums.TaskPriority | None = None, skip: int = 0, limit: int = 100) list[taolib.testing.task_queue.models.task.TaskDocument]#

根据多个条件过滤查找任务。

参数:
  • status -- 任务状态过滤

  • task_type -- 任务类型过滤

  • priority -- 优先级过滤

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

任务文档列表

async create_indexes() None#

创建索引。