taolib.testing.email_service.queue#
邮件队列层。
Submodules#
Classes#
内存邮件队列。 |
|
邮件队列协议。 |
|
Redis 邮件队列。 |
Package Contents#
- class taolib.testing.email_service.queue.InMemoryEmailQueue#
内存邮件队列。
使用 asyncio.PriorityQueue 实现,仅用于测试。
- _queue: asyncio.PriorityQueue[tuple[int, str]]#
- async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
将邮件 ID 加入队列。
- async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
批量加入队列。
- class taolib.testing.email_service.queue.EmailQueueProtocol#
Bases:
Protocol邮件队列协议。
所有队列实现必须符合此协议。
- async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
将邮件 ID 加入队列。
- 参数:
email_id -- 邮件 ID
priority -- 优先级
- async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
批量加入队列。
- 参数:
email_ids -- 邮件 ID 列表
priority -- 优先级
- class taolib.testing.email_service.queue.RedisEmailQueue(redis_client: redis.asyncio.Redis)#
Redis 邮件队列。
使用三个 Redis List 实现优先级队列: - email:queue:high (高优先级) - email:queue:normal (普通优先级) - email:queue:low (低优先级)
BRPOP 按顺序检查三个列表,自然实现优先级。
- _redis#
- async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
将邮件 ID 加入队列。
- async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
批量加入队列。