taolib.testing.data_sync.services#

数据同步 Service 层。

导出所有服务类。

Submodules#

Classes#

SyncJobService

同步作业服务。

MetricsService

同步指标服务。

SyncOrchestrator

同步管道编排器。

AsyncScheduler

异步调度器。

Package Contents#

class taolib.testing.data_sync.services.SyncJobService(job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository)#

同步作业服务。

_job_repo#
async create_job(job_create: taolib.testing.data_sync.models.job.SyncJobCreate, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#

创建同步作业。

参数:
  • job_create -- 作业创建数据

  • user_id -- 用户 ID

返回:

创建的作业文档

async get_job(job_id: str) taolib.testing.data_sync.models.job.SyncJobDocument#

获取作业详情。

参数:

job_id -- 作业 ID

返回:

作业文档

抛出:

SyncJobNotFoundError -- 如果作业不存在

async update_job(job_id: str, job_update: taolib.testing.data_sync.models.job.SyncJobUpdate, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#

更新作业配置。

参数:
  • job_id -- 作业 ID

  • job_update -- 更新数据

  • user_id -- 用户 ID

返回:

更新后的作业文档

抛出:

SyncJobNotFoundError -- 如果作业不存在

async delete_job(job_id: str) bool#

删除作业。

参数:

job_id -- 作业 ID

返回:

是否删除成功

async list_jobs(enabled_only: bool = False, skip: int = 0, limit: int = 100) list[taolib.testing.data_sync.models.job.SyncJobDocument]#

列出作业。

参数:
  • enabled_only -- 只返回启用的作业

  • skip -- 跳过记录数

  • limit -- 限制记录数

返回:

作业文档列表

async enable_job(job_id: str, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#

启用作业。

async disable_job(job_id: str, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#

禁用作业。

async reset_checkpoint(job_id: str, checkpoint_repo: Any) int#

重置作业检查点(强制下次全量同步)。

参数:
  • job_id -- 作业 ID

  • checkpoint_repo -- 检查点 Repository

返回:

删除的检查点数量

class taolib.testing.data_sync.services.MetricsService(log_repo: taolib.testing.data_sync.repository.log_repo.SyncLogRepository, failure_repo: taolib.testing.data_sync.repository.failure_repo.FailureRecordRepository, checkpoint_repo: taolib.testing.data_sync.repository.checkpoint_repo.CheckpointRepository | None = None)#

同步指标服务。

_log_repo#
_failure_repo#
_checkpoint_repo = None#
async get_job_summary(job_id: str) dict[str, Any]#

获取作业摘要。

参数:

job_id -- 作业 ID

返回:

作业摘要字典

async get_global_summary() dict[str, Any]#

获取全局摘要。

返回:

全局摘要字典

async get_failure_summary(job_id: str) dict[str, Any]#

获取失败统计。

参数:

job_id -- 作业 ID

返回:

失败统计字典

class taolib.testing.data_sync.services.SyncOrchestrator(job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository, log_repo: taolib.testing.data_sync.repository.log_repo.SyncLogRepository, checkpoint_repo: taolib.testing.data_sync.repository.checkpoint_repo.CheckpointRepository, failure_repo: taolib.testing.data_sync.repository.failure_repo.FailureRecordRepository, validator: taolib.testing.data_sync.pipeline.protocols.ValidatorProtocol | None = None)#

同步管道编排器。

负责协调整个 ETL 流程: 1. 加载作业配置 2. 连接源/目标 MongoDB 3. 执行 Extract → Transform → Validate → Load 4. 更新检查点 5. 记录日志和失败

_job_repo#
_log_repo#
_checkpoint_repo#
_failure_repo#
_validator = None#
async run_job(job_id: str) taolib.testing.data_sync.models.SyncLogResponse#

运行同步作业。

参数:

job_id -- 作业 ID

返回:

同步日志响应

抛出:

SyncJobNotFoundError -- 作业不存在或已禁用

async _execute_etl(job: taolib.testing.data_sync.models.SyncJobDocument, log_doc: taolib.testing.data_sync.models.SyncLogDocument) None#

执行 ETL 流程。

async _sync_collection(source_collection: Any, target_collection: Any, collection_name: str, job: taolib.testing.data_sync.models.SyncJobDocument, log_id: str) dict[str, int]#

同步单个集合。

返回:

统计字典

async _retry_transform(original_batch: list[dict[str, Any]], failures: list[dict[str, Any]], transformer: taolib.testing.data_sync.pipeline.transformer.TransformChain, context: taolib.testing.data_sync.pipeline.protocols.TransformContext, max_retries: int) taolib.testing.data_sync.pipeline.protocols.TransformResult#

对 transform 失败的文档进行重试。

参数:
  • original_batch -- 原始文档批次

  • failures -- 失败记录列表

  • transformer -- 转换器实例

  • context -- 转换上下文

  • max_retries -- 最大重试次数

返回:

重试后的转换结果

async _retry_load(docs_to_load: list[dict[str, Any]], failures: list[dict[str, Any]], loader: taolib.testing.data_sync.pipeline.loader.MongoLoader, target_collection: Any, max_retries: int) taolib.testing.data_sync.pipeline.protocols.LoadResult#

对 load 失败的文档进行重试。

参数:
  • docs_to_load -- 原始待加载文档

  • failures -- 失败记录列表

  • loader -- 加载器实例

  • target_collection -- 目标集合

  • max_retries -- 最大重试次数

返回:

重试后的加载结果

async _record_failures(failures: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str, phase: str) None#

记录失败到 failure_repo。

参数:
  • failures -- 失败记录列表

  • job_id -- 作业 ID

  • log_id -- 日志 ID

  • collection_name -- 集合名称

  • phase -- 失败阶段

async _record_load_failures(load_failures: list[dict[str, Any]], docs_to_load: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str) None#

记录 load 阶段失败到 failure_repo。

参数:
  • load_failures -- 加载失败记录列表

  • docs_to_load -- 待加载文档列表(用于快照)

  • job_id -- 作业 ID

  • log_id -- 日志 ID

  • collection_name -- 集合名称

async _create_log(job: taolib.testing.data_sync.models.SyncJobDocument) taolib.testing.data_sync.models.SyncLogDocument#

创建日志记录。

async _fail_log(log_id: str, error_message: str) None#

标记日志失败。

class taolib.testing.data_sync.services.AsyncScheduler(orchestrator: taolib.testing.data_sync.services.orchestrator.SyncOrchestrator, job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository, check_interval: int = 60)#

异步调度器。

根据作业的 schedule_cron 配置定时运行同步作业。

_orchestrator#
_job_repo#
_check_interval = 60#
_running = False#
_task: asyncio.Task[None] | None = None#
async start() None#

启动调度器。

async stop() None#

停止调度器。

async _run_loop() None#

运行循环。

async _check_and_run() None#

检查并运行到期的作业。

_is_due(cron_expr: str, now: datetime.datetime) bool#

检查是否到期。

简化实现:每分钟检查一次,如果 cron 表达式匹配当前分钟则运行。

async _run_job(job_id: str) None#

运行单个作业。