taolib.testing.task_queue#

后台任务队列模块。

基于 Redis 的后台任务处理系统,支持: - 三级优先级队列(高/普通/低) - 失败自动重试(递增延迟) - 可配置的并发 Worker - MongoDB 持久化任务记录 - Web 管理仪表板

使用方式:

from taolib.testing.task_queue import TaskCreate, TaskPriority from taolib.testing.task_queue.services.task_service import TaskService from taolib.testing.task_queue.worker.registry import task_handler

# 注册任务处理器 @task_handler("send_email") async def handle_send_email(params: dict) -> dict:

await send_email(params["to"], params["subject"]) return {"sent": True}

# 提交任务 task = await task_service.submit_task(

TaskCreate(task_type="send_email", params={"to": "user@example.com"})

)

启动 Web 服务器:

from taolib.testing.task_queue.server.app import create_app

app = create_app() # 使用 uvicorn.run(app, host="0.0.0.0", port=8002)

Submodules#

Attributes#

Exceptions#

TaskAlreadyExistsError

幂等键冲突,任务已存在。

TaskExecutionError

任务处理器执行时抛出不可恢复的错误。

TaskHandlerNotFoundError

指定任务类型没有注册处理器。

TaskMaxRetriesExceededError

任务重试次数已达上限。

TaskNotFoundError

任务不存在。

TaskQueueConnectionError

Redis 连接失败。

TaskQueueError

所有任务队列错误的基类。

Classes#

TaskCreate

创建任务的输入模型。

TaskDocument

任务的 MongoDB 文档模型。

TaskPriority

任务优先级。

TaskResponse

任务的 API 响应模型。

TaskStatus

任务状态。

TaskUpdate

更新任务的输入模型(所有字段可选)。

Package Contents#

exception taolib.testing.task_queue.TaskAlreadyExistsError#

Bases: TaskQueueError

幂等键冲突,任务已存在。

exception taolib.testing.task_queue.TaskExecutionError#

Bases: TaskQueueError

任务处理器执行时抛出不可恢复的错误。

exception taolib.testing.task_queue.TaskHandlerNotFoundError#

Bases: TaskQueueError

指定任务类型没有注册处理器。

exception taolib.testing.task_queue.TaskMaxRetriesExceededError#

Bases: TaskQueueError

任务重试次数已达上限。

exception taolib.testing.task_queue.TaskNotFoundError#

Bases: TaskQueueError

任务不存在。

exception taolib.testing.task_queue.TaskQueueConnectionError#

Bases: TaskQueueError

Redis 连接失败。

exception taolib.testing.task_queue.TaskQueueError#

Bases: Exception

所有任务队列错误的基类。

class taolib.testing.task_queue.TaskCreate#

Bases: TaskBase

创建任务的输入模型。

class taolib.testing.task_queue.TaskDocument#

Bases: TaskBase

任务的 MongoDB 文档模型。

id: str#
status: taolib.testing.task_queue.models.enums.TaskStatus#
retry_count: int#
result: dict[str, Any] | None = None#
error_message: str | None = None#
error_traceback: str | None = None#
created_at: datetime.datetime#
started_at: datetime.datetime | None = None#
completed_at: datetime.datetime | None = None#
next_retry_at: datetime.datetime | None = None#
model_config#
to_response() TaskResponse#

转换为 API 响应。

class taolib.testing.task_queue.TaskPriority#

Bases: enum.StrEnum

任务优先级。

HIGH = 'high'#
NORMAL = 'normal'#
LOW = 'low'#
class taolib.testing.task_queue.TaskResponse#

Bases: TaskBase

任务的 API 响应模型。

id: str#
status: taolib.testing.task_queue.models.enums.TaskStatus#
retry_count: int#
result: dict[str, Any] | None = None#
error_message: str | None = None#
error_traceback: str | None = None#
created_at: datetime.datetime#
started_at: datetime.datetime | None = None#
completed_at: datetime.datetime | None = None#
next_retry_at: datetime.datetime | None = None#
model_config#
class taolib.testing.task_queue.TaskStatus#

Bases: enum.StrEnum

任务状态。

PENDING = 'pending'#
RUNNING = 'running'#
COMPLETED = 'completed'#
FAILED = 'failed'#
RETRYING = 'retrying'#
CANCELLED = 'cancelled'#
class taolib.testing.task_queue.TaskUpdate#

Bases: pydantic.BaseModel

更新任务的输入模型(所有字段可选)。

status: taolib.testing.task_queue.models.enums.TaskStatus | None = None#
retry_count: int | None = None#
result: dict[str, Any] | None = None#
error_message: str | None = None#
error_traceback: str | None = None#
started_at: datetime.datetime | None = None#
completed_at: datetime.datetime | None = None#
next_retry_at: datetime.datetime | None = None#
taolib.testing.task_queue.__version__ = '0.1.0'#