taolib.testing.data_sync.repository#

数据同步 Repository 层。

导出所有 Repository 类。

Submodules#

Classes#

CheckpointRepository

同步检查点 Repository。

FailureRecordRepository

同步失败记录 Repository。

SyncJobRepository

同步作业 Repository。

SyncLogRepository

同步日志 Repository。

Package Contents#

class taolib.testing.data_sync.repository.CheckpointRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.data_sync.models.checkpoint.SyncCheckpoint]

同步检查点 Repository。

async get_or_create(job_id: str, collection_name: str) taolib.testing.data_sync.models.checkpoint.SyncCheckpoint#

获取或创建检查点。

参数:
  • job_id -- 作业 ID

  • collection_name -- 集合名称

返回:

检查点文档

async update_checkpoint(job_id: str, collection_name: str, last_ts: datetime.datetime, last_id: str, count: int) taolib.testing.data_sync.models.checkpoint.SyncCheckpoint | None#

更新检查点。

参数:
  • job_id -- 作业 ID

  • collection_name -- 集合名称

  • last_ts -- 最后同步时间戳

  • last_id -- 最后同步文档 ID

  • count -- 本次同步文档数

返回:

更新后的检查点文档,如果不存在则返回 None

async delete_by_job(job_id: str) int#

删除指定作业的所有检查点。

参数:

job_id -- 作业 ID

返回:

删除的检查点数量

async create_indexes() None#

创建索引。

class taolib.testing.data_sync.repository.FailureRecordRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.data_sync.models.failure.FailureRecordDocument]

同步失败记录 Repository。

async bulk_create(records: list[dict]) int#

批量创建失败记录。

参数:

records -- 失败记录数据列表

返回:

创建的记录数量

async find_by_log(log_id: str, skip: int = 0, limit: int = 100) list[taolib.testing.data_sync.models.failure.FailureRecordDocument]#

查找指定日志的失败记录。

参数:
  • log_id -- 日志 ID

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

失败记录文档列表

async count_by_job(job_id: str) int#

统计指定作业的失败数量。

参数:

job_id -- 作业 ID

返回:

失败记录数量

async create_indexes() None#

创建索引。

class taolib.testing.data_sync.repository.SyncJobRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.data_sync.models.job.SyncJobDocument]

同步作业 Repository。

async find_by_name(name: str) taolib.testing.data_sync.models.job.SyncJobDocument | None#

根据名称查找作业。

参数:

name -- 作业名称

返回:

作业文档,如果不存在则返回 None

async find_enabled_jobs() list[taolib.testing.data_sync.models.job.SyncJobDocument]#

查找所有启用的作业。

返回:

启用的作业文档列表

async find_by_schedule() list[taolib.testing.data_sync.models.job.SyncJobDocument]#

查找所有有调度配置的作业。

返回:

有 schedule_cron 的作业列表

async update_last_run(job_id: str, status: str, timestamp) taolib.testing.data_sync.models.job.SyncJobDocument | None#

更新最后运行状态。

参数:
  • job_id -- 作业 ID

  • status -- 运行状态

  • timestamp -- 运行时间

返回:

更新后的作业文档,如果不存在则返回 None

async create_indexes() None#

创建索引。

class taolib.testing.data_sync.repository.SyncLogRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection)#

Bases: taolib.testing._base.repository.AsyncRepository[taolib.testing.data_sync.models.log.SyncLogDocument]

同步日志 Repository。

async find_by_job(job_id: str, skip: int = 0, limit: int = 50) list[taolib.testing.data_sync.models.log.SyncLogDocument]#

查找指定作业的日志。

参数:
  • job_id -- 作业 ID

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

日志文档列表,按 started_at 降序

async find_recent(skip: int = 0, limit: int = 50) list[taolib.testing.data_sync.models.log.SyncLogDocument]#

查找最近的日志。

参数:
  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

最近的日志文档列表

async get_aggregate_metrics(job_id: str, days: int = 7) dict[str, Any]#

获取聚合指标。

参数:
  • job_id -- 作业 ID

  • days -- 天数

返回:

聚合指标字典

async create_indexes() None#

创建索引。