taolib.harness.pipelines.flow_base#

Metaflow Flow 基类 - 与 Harness 系统集成的 FlowSpec 扩展。

本模块定义两个核心抽象:

  • FlowConfig —— 通过 Pydantic v2 定义的 Flow 元配置(名称、版本、 描述、资源要求、环境变量等);

  • HarnessFlow —— 与 FlowRegistry 自动集成的 Flow 基类, 内置状态回调钩子、产物访问与状态查询方法,并提供 as_tool() 将 Flow 包装为 LangGraph Agent 可调用的工具(Flow-as-Tool)。

并附带预置模板 EvalFlow:以 start → prepare_data → run_agent → collect_metrics → report → end 六步骨架承载评估场景;每步暴露明确的输入/输出协议,便于子类按需重写。

装饰器风格示例:

from taolib.harness.core import register_flow

@register_flow(name="rag-eval", version="1.0.0", tags=("eval",))
class RagEvalFlow(EvalFlow): ...

Metaflow 在当前环境可能未安装,本模块通过 TYPE_CHECKING 守卫与运行时 探测实现可插拔接入:未安装时仍可在内存中模拟 Step 顺序执行。

Attributes#

StepFn

Step 函数签名:接收当前 Flow 上下文,返回需要合并的更新字典(同步或异步)。

Classes#

EvalFlow

评估 Flow 模板。

FlowArtifact

Flow 产物的标准包装 - 对齐 Metaflow Artifact 概念。

FlowConfig

Flow 元配置。

FlowResources

Flow 资源声明 - 用于在 Metaflow @resources 装饰器中映射。

FlowStatus

Flow 运行状态。

HarnessFlow

Metaflow Flow 基类(Harness 增强版)。

StepResult

单个 Step 执行结果。

Module Contents#

class taolib.harness.pipelines.flow_base.EvalFlow(config: FlowConfig | collections.abc.Mapping[str, Any], *, registry: taolib.harness.core.registry.FlowRegistry | None = None, auto_register: bool = True)#

Bases: HarnessFlow

评估 Flow 模板。

标准骨架:

start → prepare_data → run_agent → collect_metrics → report → end

每步遵守明确的数据协议:

  • start:初始化运行上下文,写入 run_idstarted_at

  • prepare_data:产出 dataset(评估样本列表);

  • run_agent:在 dataset 上运行被评 Agent,产出 predictions

  • collect_metrics:汇总 predictionsmetrics 字典;

  • report:基于 metrics 产出最终 report

  • end:标记完成并落盘汇总产物。

子类通常重写 prepare_data() / run_agent() / collect_metrics() 三个方法即可定制完整评估链路。

构造 Flow。

参数:
  • config -- FlowConfig 实例或可被其校验的字典。

  • registry -- 自定义 Flow 注册表;缺省使用全局默认注册表。

  • auto_register -- 是否在构造时自动注册到 registry

async collect_metrics(context: dict[str, Any]) dict[str, Any]#

汇总评估指标。子类应在此返回 {"metrics": {...}}

async end(context: dict[str, Any]) dict[str, Any]#

落盘汇总产物并标记完成。

async prepare_data(context: dict[str, Any]) dict[str, Any]#

准备评估数据集。子类应在此返回 {"dataset": [...]}

async report(context: dict[str, Any]) dict[str, Any]#

生成最终评估报告。子类可在此渲染表格 / 写入文件等。

async run_agent(context: dict[str, Any]) dict[str, Any]#

在数据集上运行被评 Agent。子类应在此返回 {"predictions": [...]}

async start(context: dict[str, Any]) dict[str, Any]#

初始化评估上下文。

steps() list[tuple[str, StepFn]]#

声明评估骨架的 Step 序列。

class taolib.harness.pipelines.flow_base.FlowArtifact#

Bases: pydantic.BaseModel

Flow 产物的标准包装 - 对齐 Metaflow Artifact 概念。

created_at: float#
metadata: dict[str, Any]#
model_config#
name: str#
step: str#
value: Any = None#
class taolib.harness.pipelines.flow_base.FlowConfig#

Bases: pydantic.BaseModel

Flow 元配置。

description: str#
env: dict[str, str]#
max_retries: int#
model_config#
name: str#
resources: FlowResources#
tags: tuple[str, Ellipsis]#
version: str#
class taolib.harness.pipelines.flow_base.FlowResources#

Bases: pydantic.BaseModel

Flow 资源声明 - 用于在 Metaflow @resources 装饰器中映射。

cpu: int#
disk_mb: int | None#
gpu: int#
memory_mb: int#
model_config#
class taolib.harness.pipelines.flow_base.FlowStatus#

Bases: enum.StrEnum

Flow 运行状态。

Initialize self. See help(type(self)) for accurate signature.

CANCELLED = 'cancelled'#
FAILED = 'failed'#
PENDING = 'pending'#
RUNNING = 'running'#
SUCCEEDED = 'succeeded'#
class taolib.harness.pipelines.flow_base.HarnessFlow(config: FlowConfig | collections.abc.Mapping[str, Any], *, registry: taolib.harness.core.registry.FlowRegistry | None = None, auto_register: bool = True)#

Metaflow Flow 基类(Harness 增强版)。

生命周期:

  1. 实例化时自动向 FlowRegistry 注册自身(除非显式禁用);

  2. steps() 子类定义有序的 Step 列表((name, callable) 元组);

  3. run() / arun() 同步/异步执行整个 Flow;

  4. as_tool() 将 Flow 包装为 LangGraph Agent 可调用的工具;

  5. get_artifacts() / get_status() 查询执行产物与运行状态。

钩子:on_step_starton_step_endon_completeon_failure

构造 Flow。

参数:
  • config -- FlowConfig 实例或可被其校验的字典。

  • registry -- 自定义 Flow 注册表;缺省使用全局默认注册表。

  • auto_register -- 是否在构造时自动注册到 registry

async arun(inputs: collections.abc.Mapping[str, Any] | None = None, *, run_id: str | None = None) list[StepResult]#

异步执行 Flow,返回所有 Step 的结果列表。

as_tool(*, name: str | None = None, description: str | None = None) taolib.harness.agents.graph_agent.AgentTool#

将 Flow 包装为 LangGraph Agent 可调用的工具(Flow-as-Tool)。

返回的 AgentTool 接受任意输入字典,触发 arun() 并返回 最终上下文字典(包含所有 Step 累积输出)。

emit_artifact(name: str, value: Any, *, step: str, **metadata: Any) FlowArtifact#

显式记录一个 Step 产出的数据卡片。

get_artifacts(*, step: str | None = None) list[FlowArtifact]#

获取 Flow 产出的数据卡片(可按 Step 过滤)。

get_status() FlowStatus#

查询 Flow 当前运行状态。

run(inputs: collections.abc.Mapping[str, Any] | None = None, *, run_id: str | None = None) list[StepResult]#

同步执行 Flow。

abstractmethod steps() list[tuple[str, StepFn]]#

返回 Flow 的有序 Step 列表,子类必须重写。

config#
config_class: ClassVar[type[FlowConfig]]#
on_complete: collections.abc.Callable[[list[StepResult]], collections.abc.Awaitable[None] | None] | None = None#
on_failure: collections.abc.Callable[[StepResult], collections.abc.Awaitable[None] | None] | None = None#
on_step_end: collections.abc.Callable[[StepResult], collections.abc.Awaitable[None] | None] | None = None#
on_step_start: collections.abc.Callable[[str, dict[str, Any]], collections.abc.Awaitable[None] | None] | None = None#
property registry_entry: taolib.harness.core.registry.RegistryEntry[Any] | None#

注册条目(若已注册)。

property run_id: str#

当前运行 ID。

class taolib.harness.pipelines.flow_base.StepResult#

Bases: pydantic.BaseModel

单个 Step 执行结果。

error: str | None = None#
finished_at: float | None = None#
model_config#
output: dict[str, Any]#
started_at: float#
status: FlowStatus#
step: str#
type taolib.harness.pipelines.flow_base.StepFn = Callable[[dict[str, Any]], dict[str, Any] | Awaitable[dict[str, Any]]]#

Step 函数签名:接收当前 Flow 上下文,返回需要合并的更新字典(同步或异步)。