taolib.testing.data_sync

目录

taolib.testing.data_sync#

数据同步管道模块。

MongoDB 到 MongoDB 的数据同步管道,支持: - 增量/全量同步 - 自定义 Python 转换函数 - 检查点恢复 - 失败记录追踪 - 定时/手动触发

使用方式:

from taolib.testing.data_sync.services import SyncOrchestrator, SyncJobService from taolib.testing.data_sync.repository import SyncJobRepository

# 创建服务 job_service = SyncJobService(job_repo) orchestrator = SyncOrchestrator(...)

# 运行同步作业 log = await orchestrator.run_job(job_id)

启动 Web 服务器:

from taolib.testing.data_sync.server.app import create_app

app = create_app() # 使用 uvicorn.run(app, host="0.0.0.0", port=8001)

Submodules#

Attributes#

Exceptions#

SyncAbortError

当 failure_action == abort 时达到阈值触发。

SyncCheckpointError

检查点损坏或更新失败。

SyncConnectionError

无法连接到源/目标 MongoDB。

SyncError

所有同步错误的基类。

SyncJobNotFoundError

同步作业不存在或已禁用。

SyncTransformError

用户转换函数抛出不可恢复的错误。

Classes#

FailureAction

失败处理动作。

FailureRecordDocument

失败记录文档模型。

SyncCheckpoint

同步检查点模型。

SyncConnectionConfig

数据库连接配置。

SyncJobCreate

创建同步作业的输入模型。

SyncJobDocument

同步作业的 MongoDB 文档模型。

SyncJobResponse

同步作业的 API 响应模型。

SyncJobUpdate

更新同步作业的输入模型(所有字段可选)。

SyncLogCreate

创建同步日志的输入模型。

SyncLogDocument

同步日志的 MongoDB 文档模型。

SyncLogResponse

同步日志的 API 响应模型。

SyncMetrics

同步指标。

SyncMode

同步模式。

SyncScope

同步范围。

SyncStatus

同步状态。

Package Contents#

exception taolib.testing.data_sync.SyncAbortError#

Bases: SyncError

当 failure_action == abort 时达到阈值触发。

exception taolib.testing.data_sync.SyncCheckpointError#

Bases: SyncError

检查点损坏或更新失败。

exception taolib.testing.data_sync.SyncConnectionError#

Bases: SyncError

无法连接到源/目标 MongoDB。

exception taolib.testing.data_sync.SyncError#

Bases: Exception

所有同步错误的基类。

exception taolib.testing.data_sync.SyncJobNotFoundError#

Bases: SyncError

同步作业不存在或已禁用。

exception taolib.testing.data_sync.SyncTransformError#

Bases: SyncError

用户转换函数抛出不可恢复的错误。

class taolib.testing.data_sync.FailureAction#

Bases: enum.StrEnum

失败处理动作。

SKIP = 'skip'#
RETRY = 'retry'#
ABORT = 'abort'#
class taolib.testing.data_sync.FailureRecordDocument#

Bases: pydantic.BaseModel

失败记录文档模型。

id: str#
job_id: str#
log_id: str#
collection_name: str#
document_id: str#
phase: str#
error_type: str#
error_message: str#
document_snapshot: dict | None#
retry_count: int#
created_at: datetime.datetime#
model_config#
class taolib.testing.data_sync.SyncCheckpoint#

Bases: pydantic.BaseModel

同步检查点模型。

id: str#
job_id: str#
collection_name: str#
last_synced_timestamp: datetime.datetime | None#
last_synced_id: str | None#
total_synced: int#
updated_at: datetime.datetime#
model_config#
class taolib.testing.data_sync.SyncConnectionConfig#

Bases: pydantic.BaseModel

数据库连接配置。

mongo_url: str = 'mongodb://localhost:27017'#
database: str#
collections: list[str] | None = None#
class taolib.testing.data_sync.SyncJobCreate#

Bases: SyncJobBase

创建同步作业的输入模型。

class taolib.testing.data_sync.SyncJobDocument#

Bases: SyncJobBase

同步作业的 MongoDB 文档模型。

id: str#
created_by: str = 'system'#
updated_by: str = 'system'#
created_at: datetime.datetime#
updated_at: datetime.datetime#
last_run_at: datetime.datetime | None = None#
last_run_status: str | None = None#
model_config#
to_response() SyncJobResponse#

转换为 API 响应。

class taolib.testing.data_sync.SyncJobResponse#

Bases: SyncJobBase

同步作业的 API 响应模型。

id: str#
created_by: str#
updated_by: str#
created_at: datetime.datetime#
updated_at: datetime.datetime#
last_run_at: datetime.datetime | None = None#
last_run_status: str | None = None#
model_config#
class taolib.testing.data_sync.SyncJobUpdate#

Bases: pydantic.BaseModel

更新同步作业的输入模型(所有字段可选)。

description: str | None = None#
scope: taolib.testing.data_sync.models.enums.SyncScope | None = None#
mode: taolib.testing.data_sync.models.enums.SyncMode | None = None#
source: SyncConnectionConfig | None = None#
target: SyncConnectionConfig | None = None#
transform_module: str | None = None#
field_mapping: dict[str, str] | None = None#
filter_query: dict[str, Any] | None = None#
schedule_cron: str | None = None#
batch_size: int | None = None#
failure_action: taolib.testing.data_sync.models.enums.FailureAction | None = None#
max_retries: int | None = None#
enabled: bool | None = None#
tags: list[str] | None = None#
class taolib.testing.data_sync.SyncLogCreate#

Bases: SyncLogBase

创建同步日志的输入模型。

class taolib.testing.data_sync.SyncLogDocument#

Bases: SyncLogBase

同步日志的 MongoDB 文档模型。

id: str#
model_config#
to_response() SyncLogResponse#

转换为 API 响应。

class taolib.testing.data_sync.SyncLogResponse#

Bases: SyncLogBase

同步日志的 API 响应模型。

id: str#
model_config#
class taolib.testing.data_sync.SyncMetrics#

Bases: pydantic.BaseModel

同步指标。

total_extracted: int = 0#
total_transformed: int = 0#
total_loaded: int = 0#
total_skipped: int = 0#
total_failed: int = 0#
bytes_transferred: int = 0#
class taolib.testing.data_sync.SyncMode#

Bases: enum.StrEnum

同步模式。

FULL = 'full'#
INCREMENTAL = 'incremental'#
class taolib.testing.data_sync.SyncScope#

Bases: enum.StrEnum

同步范围。

CONFIG_CENTER = 'config_center'#
DATABASE = 'database'#
FULL = 'full'#
class taolib.testing.data_sync.SyncStatus#

Bases: enum.StrEnum

同步状态。

PENDING = 'pending'#
RUNNING = 'running'#
COMPLETED = 'completed'#
FAILED = 'failed'#
CANCELLED = 'cancelled'#
taolib.testing.data_sync.__version__ = '0.1.0'#