taolib.testing.data_sync.services#
数据同步 Service 层。
导出所有服务类。
Submodules#
Classes#
同步作业服务。 |
|
同步指标服务。 |
|
同步管道编排器。 |
|
异步调度器。 |
Package Contents#
- class taolib.testing.data_sync.services.SyncJobService(job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository)#
同步作业服务。
- _job_repo#
- async create_job(job_create: taolib.testing.data_sync.models.job.SyncJobCreate, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#
创建同步作业。
- 参数:
job_create -- 作业创建数据
user_id -- 用户 ID
- 返回:
创建的作业文档
- async get_job(job_id: str) taolib.testing.data_sync.models.job.SyncJobDocument#
获取作业详情。
- 参数:
job_id -- 作业 ID
- 返回:
作业文档
- 抛出:
SyncJobNotFoundError -- 如果作业不存在
- async update_job(job_id: str, job_update: taolib.testing.data_sync.models.job.SyncJobUpdate, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#
更新作业配置。
- 参数:
job_id -- 作业 ID
job_update -- 更新数据
user_id -- 用户 ID
- 返回:
更新后的作业文档
- 抛出:
SyncJobNotFoundError -- 如果作业不存在
- async list_jobs(enabled_only: bool = False, skip: int = 0, limit: int = 100) list[taolib.testing.data_sync.models.job.SyncJobDocument]#
列出作业。
- 参数:
enabled_only -- 只返回启用的作业
skip -- 跳过记录数
limit -- 限制记录数
- 返回:
作业文档列表
- async enable_job(job_id: str, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#
启用作业。
- async disable_job(job_id: str, user_id: str = 'system') taolib.testing.data_sync.models.job.SyncJobDocument#
禁用作业。
- class taolib.testing.data_sync.services.MetricsService(log_repo: taolib.testing.data_sync.repository.log_repo.SyncLogRepository, failure_repo: taolib.testing.data_sync.repository.failure_repo.FailureRecordRepository, checkpoint_repo: taolib.testing.data_sync.repository.checkpoint_repo.CheckpointRepository | None = None)#
同步指标服务。
- _log_repo#
- _failure_repo#
- _checkpoint_repo = None#
- class taolib.testing.data_sync.services.SyncOrchestrator(job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository, log_repo: taolib.testing.data_sync.repository.log_repo.SyncLogRepository, checkpoint_repo: taolib.testing.data_sync.repository.checkpoint_repo.CheckpointRepository, failure_repo: taolib.testing.data_sync.repository.failure_repo.FailureRecordRepository, validator: taolib.testing.data_sync.pipeline.protocols.ValidatorProtocol | None = None)#
同步管道编排器。
负责协调整个 ETL 流程: 1. 加载作业配置 2. 连接源/目标 MongoDB 3. 执行 Extract → Transform → Validate → Load 4. 更新检查点 5. 记录日志和失败
- _job_repo#
- _log_repo#
- _checkpoint_repo#
- _failure_repo#
- _validator = None#
- async run_job(job_id: str) taolib.testing.data_sync.models.SyncLogResponse#
运行同步作业。
- 参数:
job_id -- 作业 ID
- 返回:
同步日志响应
- 抛出:
SyncJobNotFoundError -- 作业不存在或已禁用
- async _execute_etl(job: taolib.testing.data_sync.models.SyncJobDocument, log_doc: taolib.testing.data_sync.models.SyncLogDocument) None#
执行 ETL 流程。
- async _sync_collection(source_collection: Any, target_collection: Any, collection_name: str, job: taolib.testing.data_sync.models.SyncJobDocument, log_id: str) dict[str, int]#
同步单个集合。
- 返回:
统计字典
- async _retry_transform(original_batch: list[dict[str, Any]], failures: list[dict[str, Any]], transformer: taolib.testing.data_sync.pipeline.transformer.TransformChain, context: taolib.testing.data_sync.pipeline.protocols.TransformContext, max_retries: int) taolib.testing.data_sync.pipeline.protocols.TransformResult#
对 transform 失败的文档进行重试。
- 参数:
original_batch -- 原始文档批次
failures -- 失败记录列表
transformer -- 转换器实例
context -- 转换上下文
max_retries -- 最大重试次数
- 返回:
重试后的转换结果
- async _retry_load(docs_to_load: list[dict[str, Any]], failures: list[dict[str, Any]], loader: taolib.testing.data_sync.pipeline.loader.MongoLoader, target_collection: Any, max_retries: int) taolib.testing.data_sync.pipeline.protocols.LoadResult#
对 load 失败的文档进行重试。
- 参数:
docs_to_load -- 原始待加载文档
failures -- 失败记录列表
loader -- 加载器实例
target_collection -- 目标集合
max_retries -- 最大重试次数
- 返回:
重试后的加载结果
- async _record_failures(failures: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str, phase: str) None#
记录失败到 failure_repo。
- 参数:
failures -- 失败记录列表
job_id -- 作业 ID
log_id -- 日志 ID
collection_name -- 集合名称
phase -- 失败阶段
- async _record_load_failures(load_failures: list[dict[str, Any]], docs_to_load: list[dict[str, Any]], job_id: str, log_id: str, collection_name: str) None#
记录 load 阶段失败到 failure_repo。
- 参数:
load_failures -- 加载失败记录列表
docs_to_load -- 待加载文档列表(用于快照)
job_id -- 作业 ID
log_id -- 日志 ID
collection_name -- 集合名称
- async _create_log(job: taolib.testing.data_sync.models.SyncJobDocument) taolib.testing.data_sync.models.SyncLogDocument#
创建日志记录。
- class taolib.testing.data_sync.services.AsyncScheduler(orchestrator: taolib.testing.data_sync.services.orchestrator.SyncOrchestrator, job_repo: taolib.testing.data_sync.repository.job_repo.SyncJobRepository, check_interval: int = 60)#
异步调度器。
根据作业的 schedule_cron 配置定时运行同步作业。
- _orchestrator#
- _job_repo#
- _check_interval = 60#
- _running = False#
- _task: asyncio.Task[None] | None = None#
- _is_due(cron_expr: str, now: datetime.datetime) bool#
检查是否到期。
简化实现:每分钟检查一次,如果 cron 表达式匹配当前分钟则运行。