taolib.testing.email_service.repository.tracking_repo#

追踪事件 Repository。

提供邮件追踪事件的数据访问和聚合分析操作。

Classes#

TrackingRepository

追踪事件 Repository。

Module Contents#

class taolib.testing.email_service.repository.tracking_repo.TrackingRepository(collection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.email_service.models.tracking.TrackingEventDocument]

追踪事件 Repository。

async find_by_email_id(email_id: str) list[taolib.testing.email_service.models.tracking.TrackingEventDocument]#

查找指定邮件的所有追踪事件。

async find_by_event_type(event_type: taolib.testing.email_service.models.enums.TrackingEventType, start: datetime.datetime, end: datetime.datetime, skip: int = 0, limit: int = 100) list[taolib.testing.email_service.models.tracking.TrackingEventDocument]#

按事件类型和时间范围查询。

async get_event_counts(start: datetime.datetime, end: datetime.datetime) dict[str, int]#

聚合统计各类事件数量。

async get_daily_stats(start: datetime.datetime, end: datetime.datetime) list[dict]#

按天聚合统计。

async create_indexes() None#

创建索引。