taolib.testing.analytics.repository.session_repo#

会话 Repository 模块。

提供会话数据的访问和聚合查询。

Classes#

SessionRepository

会话数据访问层。

Module Contents#

class taolib.testing.analytics.repository.session_repo.SessionRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.analytics.models.event.SessionDocument]

会话数据访问层。

async upsert_session(session_data: dict[str, Any]) taolib.testing.analytics.models.event.SessionDocument#

更新或创建会话记录。

参数:

session_data -- 会话数据字典,必须包含 _id (session_id)

返回:

更新后的会话文档

async find_by_app(app_id: str, start: datetime.datetime, end: datetime.datetime, skip: int = 0, limit: int = 100) list[taolib.testing.analytics.models.event.SessionDocument]#

按应用和时间范围查询会话。

参数:
  • app_id -- 应用标识

  • start -- 开始时间

  • end -- 结束时间

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

会话文档列表

async get_session_stats(app_id: str, start: datetime.datetime, end: datetime.datetime) dict[str, Any]#

获取会话统计数据。

参数:
  • app_id -- 应用标识

  • start -- 开始时间

  • end -- 结束时间

返回:

会话统计字典(平均时长、平均页面数、跳出率)

async create_indexes() None#

创建所有必要的索引。