taolib.harness.pipelines.templates#

Pipeline 模板集合 - 预置可复用的 Flow 结构模板与工厂方法。

通过 FlowTemplate 枚举对外暴露内置模板的标识,并提供 create_from_template() 作为统一的工厂入口。

示例:

from taolib.harness.pipelines.templates import FlowTemplate, create_from_template

flow = create_from_template(
    FlowTemplate.EVAL,
    config={"name": "rag-eval", "version": "0.1.0"},
)
results = flow.run({"dataset": [...]})

新模板可通过 register_template() 注入到全局表中。

Classes#

ETLFlow

ETL 数据处理 Flow 模板。

FlowTemplate

预置 Flow 模板枚举。

TrainingFlow

模型训练 Flow 模板。

Functions#

create_from_template(...)

根据模板枚举构造 Flow 实例。

register_template(→ None)

注册一个新的 Flow 模板(或覆盖已有模板)。

Package Contents#

class taolib.harness.pipelines.templates.ETLFlow(config: FlowConfig | collections.abc.Mapping[str, Any], *, registry: taolib.harness.core.registry.FlowRegistry | None = None, auto_register: bool = True)#

Bases: taolib.harness.pipelines.flow_base.HarnessFlow

ETL 数据处理 Flow 模板。

标准骨架:extract transform load end,子类按需重写各 Step。

构造 Flow。

参数:
  • config -- FlowConfig 实例或可被其校验的字典。

  • registry -- 自定义 Flow 注册表;缺省使用全局默认注册表。

  • auto_register -- 是否在构造时自动注册到 registry

async end(context: dict[str, Any]) dict[str, Any]#
async extract(context: dict[str, Any]) dict[str, Any]#

抽取原始数据。子类应返回 {"raw": [...]}

async load(context: dict[str, Any]) dict[str, Any]#

落盘 / 写库。子类应返回 {"loaded": int}

steps() list[tuple[str, taolib.harness.pipelines.flow_base.StepFn]]#

返回 Flow 的有序 Step 列表,子类必须重写。

async transform(context: dict[str, Any]) dict[str, Any]#

转换数据。子类应返回 {"transformed": [...]}

class taolib.harness.pipelines.templates.FlowTemplate#

Bases: enum.StrEnum

预置 Flow 模板枚举。

Initialize self. See help(type(self)) for accurate signature.

ETL = 'etl'#
EVAL = 'eval'#
TRAINING = 'training'#
class taolib.harness.pipelines.templates.TrainingFlow(config: FlowConfig | collections.abc.Mapping[str, Any], *, registry: taolib.harness.core.registry.FlowRegistry | None = None, auto_register: bool = True)#

Bases: taolib.harness.pipelines.flow_base.HarnessFlow

模型训练 Flow 模板。

标准骨架:prepare train evaluate checkpoint end

构造 Flow。

参数:
  • config -- FlowConfig 实例或可被其校验的字典。

  • registry -- 自定义 Flow 注册表;缺省使用全局默认注册表。

  • auto_register -- 是否在构造时自动注册到 registry

async checkpoint(context: dict[str, Any]) dict[str, Any]#

保存模型检查点。

async end(context: dict[str, Any]) dict[str, Any]#
async evaluate(context: dict[str, Any]) dict[str, Any]#

在验证集上评估。

async prepare(context: dict[str, Any]) dict[str, Any]#

准备训练数据与超参。

steps() list[tuple[str, taolib.harness.pipelines.flow_base.StepFn]]#

返回 Flow 的有序 Step 列表,子类必须重写。

async train(context: dict[str, Any]) dict[str, Any]#

执行训练循环。子类应返回 {"model": ...}

taolib.harness.pipelines.templates.create_from_template(template: FlowTemplate | str, config: taolib.harness.pipelines.flow_base.FlowConfig | collections.abc.Mapping[str, Any], /, **kwargs: Any) taolib.harness.pipelines.flow_base.HarnessFlow#

根据模板枚举构造 Flow 实例。

参数:
  • template -- 模板枚举或其字符串值。

  • config -- Flow 配置(FlowConfig 或可被其校验的字典)。

  • **kwargs -- 透传给具体 Flow 构造函数的额外参数(如 registryauto_register 等)。

抛出:

KeyError -- 当模板未注册时抛出。

taolib.harness.pipelines.templates.register_template(template: FlowTemplate | str, flow_cls: type[taolib.harness.pipelines.flow_base.HarnessFlow]) None#

注册一个新的 Flow 模板(或覆盖已有模板)。