taolib.testing.task_queue.queue.redis_queue#
Redis 任务队列。
基于 Redis 的任务队列实现,支持优先级、重试调度和实时统计。
Classes#
Redis 任务队列。 |
Module Contents#
- class taolib.testing.task_queue.queue.redis_queue.RedisTaskQueue(redis: redis.asyncio.Redis, key_prefix: str = 'tq')#
Redis 任务队列。
使用 Redis List 实现优先级队列,Sorted Set 实现重试调度。
- Redis 键结构:
{prefix}:queue:high - LIST: 高优先级待处理任务 {prefix}:queue:normal - LIST: 普通优先级待处理任务 {prefix}:queue:low - LIST: 低优先级待处理任务 {prefix}:running - SET: 运行中任务 ID {prefix}:completed - LIST: 最近完成的任务 ID(上限 1000) {prefix}:failed - SET: 失败任务 ID {prefix}:retry - ZSET: 重试调度(score = retry_at 时间戳) {prefix}:task:{id} - HASH: 任务元数据缓存 {prefix}:stats - HASH: 全局计数器
- _redis#
- _prefix = 'tq'#
- async enqueue(task_id: str, priority: taolib.testing.task_queue.models.enums.TaskPriority, task_meta: dict[str, str]) None#
将任务加入队列。
- 参数:
task_id -- 任务 ID
priority -- 任务优先级
task_meta -- 任务元数据(用于 Redis 缓存)
- async dequeue(timeout: float = 5.0) str | None#
从队列中取出一个任务(按优先级顺序)。
使用 BRPOP 阻塞等待,优先消费高优先级队列。
- 参数:
timeout -- 阻塞等待超时时间(秒)
- 返回:
任务 ID,超时返回 None
- async nack(task_id: str, *, schedule_retry: bool, retry_at: float | None = None) None#
标记任务失败。
- 参数:
task_id -- 任务 ID
schedule_retry -- 是否调度重试
retry_at -- 重试时间戳(仅 schedule_retry=True 时有效)
- async get_task_meta(task_id: str) dict[str, str] | None#
获取任务元数据缓存。
- 参数:
task_id -- 任务 ID
- 返回:
任务元数据字典,不存在返回 None