taolib.testing.task_queue#
后台任务队列模块。
基于 Redis 的后台任务处理系统,支持: - 三级优先级队列(高/普通/低) - 失败自动重试(递增延迟) - 可配置的并发 Worker - MongoDB 持久化任务记录 - Web 管理仪表板
使用方式:
from taolib.testing.task_queue import TaskCreate, TaskPriority from taolib.testing.task_queue.services.task_service import TaskService from taolib.testing.task_queue.worker.registry import task_handler
# 注册任务处理器 @task_handler("send_email") async def handle_send_email(params: dict) -> dict:
await send_email(params["to"], params["subject"]) return {"sent": True}
# 提交任务 task = await task_service.submit_task(
TaskCreate(task_type="send_email", params={"to": "user@example.com"})
)
启动 Web 服务器:
from taolib.testing.task_queue.server.app import create_app
app = create_app() # 使用 uvicorn.run(app, host="0.0.0.0", port=8002)
Submodules#
Attributes#
Exceptions#
幂等键冲突,任务已存在。 |
|
任务处理器执行时抛出不可恢复的错误。 |
|
指定任务类型没有注册处理器。 |
|
任务重试次数已达上限。 |
|
任务不存在。 |
|
Redis 连接失败。 |
|
所有任务队列错误的基类。 |
Classes#
创建任务的输入模型。 |
|
任务的 MongoDB 文档模型。 |
|
任务优先级。 |
|
任务的 API 响应模型。 |
|
任务状态。 |
|
更新任务的输入模型(所有字段可选)。 |
Package Contents#
- exception taolib.testing.task_queue.TaskAlreadyExistsError#
Bases:
TaskQueueError幂等键冲突,任务已存在。
- exception taolib.testing.task_queue.TaskExecutionError#
Bases:
TaskQueueError任务处理器执行时抛出不可恢复的错误。
- exception taolib.testing.task_queue.TaskHandlerNotFoundError#
Bases:
TaskQueueError指定任务类型没有注册处理器。
- exception taolib.testing.task_queue.TaskMaxRetriesExceededError#
Bases:
TaskQueueError任务重试次数已达上限。
- exception taolib.testing.task_queue.TaskNotFoundError#
Bases:
TaskQueueError任务不存在。
- exception taolib.testing.task_queue.TaskQueueConnectionError#
Bases:
TaskQueueErrorRedis 连接失败。
- class taolib.testing.task_queue.TaskCreate#
Bases:
TaskBase创建任务的输入模型。
- class taolib.testing.task_queue.TaskDocument#
Bases:
TaskBase任务的 MongoDB 文档模型。
- created_at: datetime.datetime#
- started_at: datetime.datetime | None = None#
- completed_at: datetime.datetime | None = None#
- next_retry_at: datetime.datetime | None = None#
- model_config#
- to_response() TaskResponse#
转换为 API 响应。
- class taolib.testing.task_queue.TaskPriority#
Bases:
enum.StrEnum任务优先级。
- HIGH = 'high'#
- NORMAL = 'normal'#
- LOW = 'low'#
- class taolib.testing.task_queue.TaskResponse#
Bases:
TaskBase任务的 API 响应模型。
- created_at: datetime.datetime#
- started_at: datetime.datetime | None = None#
- completed_at: datetime.datetime | None = None#
- next_retry_at: datetime.datetime | None = None#
- model_config#
- class taolib.testing.task_queue.TaskStatus#
Bases:
enum.StrEnum任务状态。
- PENDING = 'pending'#
- RUNNING = 'running'#
- COMPLETED = 'completed'#
- FAILED = 'failed'#
- RETRYING = 'retrying'#
- CANCELLED = 'cancelled'#
- class taolib.testing.task_queue.TaskUpdate#
Bases:
pydantic.BaseModel更新任务的输入模型(所有字段可选)。
- status: taolib.testing.task_queue.models.enums.TaskStatus | None = None#
- started_at: datetime.datetime | None = None#
- completed_at: datetime.datetime | None = None#
- next_retry_at: datetime.datetime | None = None#
- taolib.testing.task_queue.__version__ = '0.1.0'#