taolib.testing.data_sync.services.orchestrator#
同步管道编排器。
核心 ETL 流程控制:Extract → Transform → Validate → Load
Attributes#
Classes#
同步管道编排器。 |
Module Contents#
- taolib.testing.data_sync.services.orchestrator.logger#
- class taolib.testing.data_sync.services.orchestrator.SyncOrchestrator(job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository, log_repo: taolib.testing.data_sync.repository.log_repo.SyncLogRepository, checkpoint_repo: taolib.testing.data_sync.repository.checkpoint_repo.CheckpointRepository, failure_repo: taolib.testing.data_sync.repository.failure_repo.FailureRecordRepository, validator: taolib.testing.data_sync.pipeline.protocols.ValidatorProtocol | None = None)#
同步管道编排器。
负责协调整个 ETL 流程: 1. 加载作业配置 2. 连接源/目标 MongoDB 3. 执行 Extract → Transform → Validate → Load 4. 更新检查点 5. 记录日志和失败
- _job_repo#
- _log_repo#
- _checkpoint_repo#
- _failure_repo#
- _validator = None#
- async run_job(job_id: str) taolib.testing.data_sync.models.SyncLogResponse#
运行同步作业。
- 参数:
job_id -- 作业 ID
- 返回:
同步日志响应
- 抛出:
SyncJobNotFoundError -- 作业不存在或已禁用
- async _execute_etl(job: taolib.testing.data_sync.models.SyncJobDocument, log_doc: taolib.testing.data_sync.models.SyncLogDocument) None#
执行 ETL 流程。
- async _sync_collection(source_collection: Any, target_collection: Any, collection_name: str, job: taolib.testing.data_sync.models.SyncJobDocument, log_id: str) dict[str, int]#
同步单个集合。
- 返回:
统计字典
- async _retry_transform(original_batch: list[dict[str, Any]], failures: list[dict[str, Any]], transformer: taolib.testing.data_sync.pipeline.transformer.TransformChain, context: taolib.testing.data_sync.pipeline.protocols.TransformContext, max_retries: int) taolib.testing.data_sync.pipeline.protocols.TransformResult#
对 transform 失败的文档进行重试。
- 参数:
original_batch -- 原始文档批次
failures -- 失败记录列表
transformer -- 转换器实例
context -- 转换上下文
max_retries -- 最大重试次数
- 返回:
重试后的转换结果
- async _retry_load(docs_to_load: list[dict[str, Any]], failures: list[dict[str, Any]], loader: taolib.testing.data_sync.pipeline.loader.MongoLoader, target_collection: Any, max_retries: int) taolib.testing.data_sync.pipeline.protocols.LoadResult#
对 load 失败的文档进行重试。
- 参数:
docs_to_load -- 原始待加载文档
failures -- 失败记录列表
loader -- 加载器实例
target_collection -- 目标集合
max_retries -- 最大重试次数
- 返回:
重试后的加载结果
- async _record_failures(failures: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str, phase: str) None#
记录失败到 failure_repo。
- 参数:
failures -- 失败记录列表
job_id -- 作业 ID
log_id -- 日志 ID
collection_name -- 集合名称
phase -- 失败阶段
- async _record_load_failures(load_failures: list[dict[str, Any]], docs_to_load: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str) None#
记录 load 阶段失败到 failure_repo。
- 参数:
load_failures -- 加载失败记录列表
docs_to_load -- 待加载文档列表(用于快照)
job_id -- 作业 ID
log_id -- 日志 ID
collection_name -- 集合名称
- async _create_log(job: taolib.testing.data_sync.models.SyncJobDocument) taolib.testing.data_sync.models.SyncLogDocument#
创建日志记录。