taolib.testing.email_service.queue#

邮件队列层。

Submodules#

Classes#

InMemoryEmailQueue

内存邮件队列。

EmailQueueProtocol

邮件队列协议。

RedisEmailQueue

Redis 邮件队列。

Package Contents#

class taolib.testing.email_service.queue.InMemoryEmailQueue#

内存邮件队列。

使用 asyncio.PriorityQueue 实现,仅用于测试。

_queue: asyncio.PriorityQueue[tuple[int, str]]#
_counts: dict[str, int]#
async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

将邮件 ID 加入队列。

async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

批量加入队列。

async dequeue(timeout: int = 5) str | None#

从队列取出一封邮件 ID。

async size() dict[str, int]#

获取队列大小(近似值)。

class taolib.testing.email_service.queue.EmailQueueProtocol#

Bases: Protocol

邮件队列协议。

所有队列实现必须符合此协议。

async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

将邮件 ID 加入队列。

参数:
  • email_id -- 邮件 ID

  • priority -- 优先级

async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

批量加入队列。

参数:
  • email_ids -- 邮件 ID 列表

  • priority -- 优先级

async dequeue(timeout: int = 5) str | None#

从队列取出一封邮件 ID。

按优先级顺序(高→普通→低)取出。

参数:

timeout -- 等待超时秒数

返回:

邮件 ID,队列为空时返回 None

async size() dict[str, int]#

获取各优先级队列的大小。

返回:

count} 字典

返回类型:

{priority

class taolib.testing.email_service.queue.RedisEmailQueue(redis_client: redis.asyncio.Redis)#

Redis 邮件队列。

使用三个 Redis List 实现优先级队列: - email:queue:high (高优先级) - email:queue:normal (普通优先级) - email:queue:low (低优先级)

BRPOP 按顺序检查三个列表,自然实现优先级。

_redis#
async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

将邮件 ID 加入队列。

async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#

批量加入队列。

async dequeue(timeout: int = 5) str | None#

从队列取出一封邮件 ID(按优先级)。

使用 BRPOP 按 high → normal → low 顺序检查。

async size() dict[str, int]#

获取各优先级队列大小。