taolib.testing.data_sync.services.orchestrator#

同步管道编排器。

核心 ETL 流程控制:Extract → Transform → Validate → Load

Attributes#

Classes#

SyncOrchestrator

同步管道编排器。

Module Contents#

taolib.testing.data_sync.services.orchestrator.logger#
class taolib.testing.data_sync.services.orchestrator.SyncOrchestrator(job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository, log_repo: taolib.testing.data_sync.repository.log_repo.SyncLogRepository, checkpoint_repo: taolib.testing.data_sync.repository.checkpoint_repo.CheckpointRepository, failure_repo: taolib.testing.data_sync.repository.failure_repo.FailureRecordRepository, validator: taolib.testing.data_sync.pipeline.protocols.ValidatorProtocol | None = None)#

同步管道编排器。

负责协调整个 ETL 流程: 1. 加载作业配置 2. 连接源/目标 MongoDB 3. 执行 Extract → Transform → Validate → Load 4. 更新检查点 5. 记录日志和失败

_job_repo#
_log_repo#
_checkpoint_repo#
_failure_repo#
_validator = None#
async run_job(job_id: str) taolib.testing.data_sync.models.SyncLogResponse#

运行同步作业。

参数:

job_id -- 作业 ID

返回:

同步日志响应

抛出:

SyncJobNotFoundError -- 作业不存在或已禁用

async _execute_etl(job: taolib.testing.data_sync.models.SyncJobDocument, log_doc: taolib.testing.data_sync.models.SyncLogDocument) None#

执行 ETL 流程。

async _sync_collection(source_collection: Any, target_collection: Any, collection_name: str, job: taolib.testing.data_sync.models.SyncJobDocument, log_id: str) dict[str, int]#

同步单个集合。

返回:

统计字典

async _retry_transform(original_batch: list[dict[str, Any]], failures: list[dict[str, Any]], transformer: taolib.testing.data_sync.pipeline.transformer.TransformChain, context: taolib.testing.data_sync.pipeline.protocols.TransformContext, max_retries: int) taolib.testing.data_sync.pipeline.protocols.TransformResult#

对 transform 失败的文档进行重试。

参数:
  • original_batch -- 原始文档批次

  • failures -- 失败记录列表

  • transformer -- 转换器实例

  • context -- 转换上下文

  • max_retries -- 最大重试次数

返回:

重试后的转换结果

async _retry_load(docs_to_load: list[dict[str, Any]], failures: list[dict[str, Any]], loader: taolib.testing.data_sync.pipeline.loader.MongoLoader, target_collection: Any, max_retries: int) taolib.testing.data_sync.pipeline.protocols.LoadResult#

对 load 失败的文档进行重试。

参数:
  • docs_to_load -- 原始待加载文档

  • failures -- 失败记录列表

  • loader -- 加载器实例

  • target_collection -- 目标集合

  • max_retries -- 最大重试次数

返回:

重试后的加载结果

async _record_failures(failures: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str, phase: str) None#

记录失败到 failure_repo。

参数:
  • failures -- 失败记录列表

  • job_id -- 作业 ID

  • log_id -- 日志 ID

  • collection_name -- 集合名称

  • phase -- 失败阶段

async _record_load_failures(load_failures: list[dict[str, Any]], docs_to_load: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str) None#

记录 load 阶段失败到 failure_repo。

参数:
  • load_failures -- 加载失败记录列表

  • docs_to_load -- 待加载文档列表(用于快照)

  • job_id -- 作业 ID

  • log_id -- 日志 ID

  • collection_name -- 集合名称

async _create_log(job: taolib.testing.data_sync.models.SyncJobDocument) taolib.testing.data_sync.models.SyncLogDocument#

创建日志记录。

async _fail_log(log_id: str, error_message: str) None#

标记日志失败。