taolib.testing.email_service.repository#
数据访问层。
Submodules#
Classes#
邮件 Repository。 |
|
订阅 Repository。 |
|
模板 Repository。 |
|
追踪事件 Repository。 |
Package Contents#
- class taolib.testing.email_service.repository.EmailRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.email.EmailDocument]邮件 Repository。
- async find_by_status(status: taolib.testing.email_service.models.enums.EmailStatus, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.email.EmailDocument]#
按状态查询邮件。
- async find_by_recipient(email: str, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.email.EmailDocument]#
按收件人查询邮件。
- async find_queued_emails(limit: int = 100) list[taolib.testing.email_service.models.email.EmailDocument]#
查找待发送的队列邮件。
- async find_scheduled_ready(limit: int = 50) list[taolib.testing.email_service.models.email.EmailDocument]#
查找到期的计划邮件。
- async update_status(email_id: str, status: taolib.testing.email_service.models.enums.EmailStatus, **extra_fields: Any) taolib.testing.email_service.models.email.EmailDocument | None#
更新邮件状态。
- async increment_retry(email_id: str) taolib.testing.email_service.models.email.EmailDocument | None#
递增重试计数。
- async get_stats(start: datetime.datetime, end: datetime.datetime) dict[str, int]#
获取指定时间范围内的邮件统计。
- class taolib.testing.email_service.repository.SubscriptionRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.subscription.SubscriptionDocument]订阅 Repository。
- async find_by_email(email: str) taolib.testing.email_service.models.subscription.SubscriptionDocument | None#
按邮箱查找订阅记录。
- async find_by_token(token: str) taolib.testing.email_service.models.subscription.SubscriptionDocument | None#
按退订令牌查找订阅记录。
- async find_unsubscribed(skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.subscription.SubscriptionDocument]#
查找所有已退订的记录。
- class taolib.testing.email_service.repository.TemplateRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.template.TemplateDocument]模板 Repository。
- async find_by_name(name: str) taolib.testing.email_service.models.template.TemplateDocument | None#
按名称查找模板。
- async find_active(skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.template.TemplateDocument]#
查找所有激活的模板。
- async find_by_email_type(email_type: taolib.testing.email_service.models.enums.EmailType, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.template.TemplateDocument]#
按邮件类型查找模板。
- class taolib.testing.email_service.repository.TrackingRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.tracking.TrackingEventDocument]追踪事件 Repository。
- async find_by_email_id(email_id: str) list[taolib.testing.email_service.models.tracking.TrackingEventDocument]#
查找指定邮件的所有追踪事件。
- async find_by_event_type(event_type: taolib.testing.email_service.models.enums.TrackingEventType, start: datetime.datetime, end: datetime.datetime, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.tracking.TrackingEventDocument]#
按事件类型和时间范围查询。
- async get_event_counts(start: datetime.datetime, end: datetime.datetime) dict[str, int]#
聚合统计各类事件数量。
- async get_daily_stats(start: datetime.datetime, end: datetime.datetime) list[dict]#
按天聚合统计。