taolib.testing.task_queue.queue.redis_queue#

Redis 任务队列。

基于 Redis 的任务队列实现,支持优先级、重试调度和实时统计。

Classes#

RedisTaskQueue

Redis 任务队列。

Module Contents#

class taolib.testing.task_queue.queue.redis_queue.RedisTaskQueue(redis: redis.asyncio.Redis, key_prefix: str = 'tq')#

Redis 任务队列。

使用 Redis List 实现优先级队列,Sorted Set 实现重试调度。

Redis 键结构:

{prefix}:queue:high - LIST: 高优先级待处理任务 {prefix}:queue:normal - LIST: 普通优先级待处理任务 {prefix}:queue:low - LIST: 低优先级待处理任务 {prefix}:running - SET: 运行中任务 ID {prefix}:completed - LIST: 最近完成的任务 ID(上限 1000) {prefix}:failed - SET: 失败任务 ID {prefix}:retry - ZSET: 重试调度(score = retry_at 时间戳) {prefix}:task:{id} - HASH: 任务元数据缓存 {prefix}:stats - HASH: 全局计数器

_redis#
_prefix = 'tq'#
_key(suffix: str) str#

生成 Redis 键。

property _queue_keys: list[str]#

优先级队列键列表(高 → 普通 → 低)。

async enqueue(task_id: str, priority: taolib.testing.task_queue.models.enums.TaskPriority, task_meta: dict[str, str]) None#

将任务加入队列。

参数:
  • task_id -- 任务 ID

  • priority -- 任务优先级

  • task_meta -- 任务元数据(用于 Redis 缓存)

async dequeue(timeout: float = 5.0) str | None#

从队列中取出一个任务(按优先级顺序)。

使用 BRPOP 阻塞等待,优先消费高优先级队列。

参数:

timeout -- 阻塞等待超时时间(秒)

返回:

任务 ID,超时返回 None

async ack(task_id: str) None#

确认任务完成(成功)。

参数:

task_id -- 任务 ID

async nack(task_id: str, *, schedule_retry: bool, retry_at: float | None = None) None#

标记任务失败。

参数:
  • task_id -- 任务 ID

  • schedule_retry -- 是否调度重试

  • retry_at -- 重试时间戳(仅 schedule_retry=True 时有效)

async poll_retries() list[str]#

轮询到期的重试任务并重新入队。

返回:

重新入队的任务 ID 列表

async get_task_meta(task_id: str) dict[str, str] | None#

获取任务元数据缓存。

参数:

task_id -- 任务 ID

返回:

任务元数据字典,不存在返回 None

async set_task_meta(task_id: str, meta: dict[str, str]) None#

设置任务元数据缓存。

参数:
  • task_id -- 任务 ID

  • meta -- 元数据字典

async get_stats() dict[str, Any]#

获取队列统计信息。

返回:

包含各项统计指标的字典

async get_queue_lengths() dict[str, int]#

获取各优先级队列长度。

返回:

各队列长度字典

async get_running_task_ids() set[str]#

获取运行中任务 ID 集合。

返回:

任务 ID 集合

async remove_from_running(task_id: str) None#

从运行中集合移除任务。

参数:

task_id -- 任务 ID

async remove_from_failed(task_id: str) None#

从失败集合移除任务。

参数:

task_id -- 任务 ID