taolib.testing.email_service.repository#

数据访问层。

Submodules#

Classes#

EmailRepository

邮件 Repository。

SubscriptionRepository

订阅 Repository。

TemplateRepository

模板 Repository。

TrackingRepository

追踪事件 Repository。

Package Contents#

class taolib.testing.email_service.repository.EmailRepository(collection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.email.EmailDocument]

邮件 Repository。

async find_by_status(status: taolib.testing.email_service.models.enums.EmailStatus, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.email.EmailDocument]#

按状态查询邮件。

async find_by_recipient(email: str, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.email.EmailDocument]#

按收件人查询邮件。

async find_queued_emails(limit: int = 100) list[taolib.testing.email_service.models.email.EmailDocument]#

查找待发送的队列邮件。

async find_scheduled_ready(limit: int = 50) list[taolib.testing.email_service.models.email.EmailDocument]#

查找到期的计划邮件。

async update_status(email_id: str, status: taolib.testing.email_service.models.enums.EmailStatus, **extra_fields: Any) taolib.testing.email_service.models.email.EmailDocument | None#

更新邮件状态。

async increment_retry(email_id: str) taolib.testing.email_service.models.email.EmailDocument | None#

递增重试计数。

async get_stats(start: datetime.datetime, end: datetime.datetime) dict[str, int]#

获取指定时间范围内的邮件统计。

async create_indexes() None#

创建索引。

class taolib.testing.email_service.repository.SubscriptionRepository(collection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.subscription.SubscriptionDocument]

订阅 Repository。

async find_by_email(email: str) taolib.testing.email_service.models.subscription.SubscriptionDocument | None#

按邮箱查找订阅记录。

async find_by_token(token: str) taolib.testing.email_service.models.subscription.SubscriptionDocument | None#

按退订令牌查找订阅记录。

async is_subscribed(email: str) bool#

检查邮箱是否处于订阅状态。

如果没有记录,默认视为已订阅。

async find_unsubscribed(skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.subscription.SubscriptionDocument]#

查找所有已退订的记录。

async create_indexes() None#

创建索引。

class taolib.testing.email_service.repository.TemplateRepository(collection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.template.TemplateDocument]

模板 Repository。

async find_by_name(name: str) taolib.testing.email_service.models.template.TemplateDocument | None#

按名称查找模板。

async find_active(skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.template.TemplateDocument]#

查找所有激活的模板。

async find_by_email_type(email_type: taolib.testing.email_service.models.enums.EmailType, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.template.TemplateDocument]#

按邮件类型查找模板。

async create_indexes() None#

创建索引。

class taolib.testing.email_service.repository.TrackingRepository(collection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.tracking.TrackingEventDocument]

追踪事件 Repository。

async find_by_email_id(email_id: str) list[taolib.testing.email_service.models.tracking.TrackingEventDocument]#

查找指定邮件的所有追踪事件。

async find_by_event_type(event_type: taolib.testing.email_service.models.enums.TrackingEventType, start: datetime.datetime, end: datetime.datetime, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.tracking.TrackingEventDocument]#

按事件类型和时间范围查询。

async get_event_counts(start: datetime.datetime, end: datetime.datetime) dict[str, int]#

聚合统计各类事件数量。

async get_daily_stats(start: datetime.datetime, end: datetime.datetime) list[dict]#

按天聚合统计。

async create_indexes() None#

创建索引。