taolib.testing.email_service.queue.redis_queue#

Redis 邮件队列实现。

基于 Redis List 的优先级队列,使用 LPUSH/BRPOP 模式。

Attributes#

Classes#

RedisEmailQueue

Redis 邮件队列。

Module Contents#

taolib.testing.email_service.queue.redis_queue._QUEUE_KEYS#
taolib.testing.email_service.queue.redis_queue._PRIORITY_ORDER#
class taolib.testing.email_service.queue.redis_queue.RedisEmailQueue(redis_client: redis.asyncio.Redis)#

Redis 邮件队列。

使用三个 Redis List 实现优先级队列: - email:queue:high (高优先级) - email:queue:normal (普通优先级) - email:queue:low (低优先级)

BRPOP 按顺序检查三个列表,自然实现优先级。

_redis#
async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

将邮件 ID 加入队列。

async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

批量加入队列。

async dequeue(timeout: int = 5) str | None#

从队列取出一封邮件 ID(按优先级)。

使用 BRPOP 按 high → normal → low 顺序检查。

async size() dict[str, int]#

获取各优先级队列大小。