taolib.testing.email_service.queue.memory_queue#
内存邮件队列实现。
用于测试环境,无需 Redis 依赖。
Attributes#
Classes#
内存邮件队列。 |
Module Contents#
- taolib.testing.email_service.queue.memory_queue._PRIORITY_MAP#
- class taolib.testing.email_service.queue.memory_queue.InMemoryEmailQueue#
内存邮件队列。
使用 asyncio.PriorityQueue 实现,仅用于测试。
- _queue: asyncio.PriorityQueue[tuple[int, str]]#
- async enqueue(email_id: str, priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
将邮件 ID 加入队列。
- async enqueue_bulk(email_ids: list[str], priority: taolib.testing.email_service.models.enums.EmailPriority = EmailPriority.NORMAL) None#
批量加入队列。