taolib.testing.oauth.repository.activity_repo#

OAuth 活动日志 Repository 模块。

提供 OAuth 活动日志的数据访问操作。

Classes#

OAuthActivityLogRepository

OAuth 活动日志数据访问层。

Module Contents#

class taolib.testing.oauth.repository.activity_repo.OAuthActivityLogRepository(collection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.oauth.models.activity.OAuthActivityLogDocument]

OAuth 活动日志数据访问层。

async log_activity(*, action: taolib.testing.oauth.models.enums.OAuthActivityAction, status: taolib.testing.oauth.models.enums.OAuthActivityStatus, provider: taolib.testing.oauth.models.enums.OAuthProvider | None = None, user_id: str | None = None, connection_id: str | None = None, ip_address: str = '', user_agent: str = '', metadata: dict[str, Any] | None = None) taolib.testing.oauth.models.activity.OAuthActivityLogDocument#

记录一条活动日志。

参数:
  • action -- 操作类型

  • status -- 操作状态

  • provider -- OAuth 提供商

  • user_id -- 用户 ID

  • connection_id -- 连接 ID

  • ip_address -- 客户端 IP

  • user_agent -- 客户端 User-Agent

  • metadata -- 额外上下文

返回:

活动日志文档

async query_logs(*, user_id: str | None = None, provider: str | None = None, action: str | None = None, status: str | None = None, time_from: datetime.datetime | None = None, time_to: datetime.datetime | None = None, skip: int = 0, limit: int = 50) list[taolib.testing.oauth.models.activity.OAuthActivityLogDocument]#

查询活动日志。

参数:
  • user_id -- 按用户 ID 过滤

  • provider -- 按提供商过滤

  • action -- 按操作类型过滤

  • status -- 按状态过滤

  • time_from -- 起始时间

  • time_to -- 结束时间

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

活动日志列表

async get_stats() dict[str, Any]#

获取活动统计摘要。

返回:

统计数据字典

async create_indexes() None#

创建 MongoDB 索引。