taolib.testing.data_sync.pipeline#

数据同步 Pipeline 层。

导出 Protocol 和实现。

Submodules#

Classes#

ExtractorProtocol

数据提取器 Protocol。

LoaderProtocol

数据加载器 Protocol。

TransformerProtocol

数据转换器 Protocol。

ValidateResult

验证结果。

ValidatorProtocol

数据验证器 Protocol。

Package Contents#

class taolib.testing.data_sync.pipeline.ExtractorProtocol#

Bases: Protocol

数据提取器 Protocol。

async extract(source_collection: motor.motor_asyncio.AsyncIOMotorCollection, checkpoint: taolib.testing.data_sync.models.checkpoint.SyncCheckpoint | None, filter_query: dict[str, Any] | None, batch_size: int) Any#

分批提取源数据。

参数:
  • source_collection -- 源集合

  • checkpoint -- 检查点(增量同步时使用)

  • filter_query -- 过滤查询

  • batch_size -- 批次大小

生成器:

文档批次列表

class taolib.testing.data_sync.pipeline.LoaderProtocol#

Bases: Protocol

数据加载器 Protocol。

async load(target_collection: motor.motor_asyncio.AsyncIOMotorCollection, documents: list[dict[str, Any]]) LoadResult#

加载文档到目标集合。

参数:
  • target_collection -- 目标集合

  • documents -- 文档列表

返回:

加载结果

class taolib.testing.data_sync.pipeline.TransformerProtocol#

Bases: Protocol

数据转换器 Protocol。

async transform(documents: list[dict[str, Any]], context: TransformContext) TransformResult#

转换文档批次。

参数:
  • documents -- 文档列表

  • context -- 转换上下文

返回:

转换结果(成功文档 + 失败记录)

class taolib.testing.data_sync.pipeline.ValidateResult#

验证结果。

valid: list[dict[str, Any]]#
failures: list[dict[str, Any]] = []#
class taolib.testing.data_sync.pipeline.ValidatorProtocol#

Bases: Protocol

数据验证器 Protocol。

async validate(documents: list[dict[str, Any]], context: TransformContext) ValidateResult#

验证文档批次。

参数:
  • documents -- 文档列表

  • context -- 转换上下文

返回:

验证结果(通过的文档 + 失败记录)