taolib.harness.pipelines.flow_base#
Metaflow Flow 基类 - 与 Harness 系统集成的 FlowSpec 扩展。
本模块定义两个核心抽象:
FlowConfig—— 通过 Pydantic v2 定义的 Flow 元配置(名称、版本、 描述、资源要求、环境变量等);HarnessFlow—— 与FlowRegistry自动集成的 Flow 基类, 内置状态回调钩子、产物访问与状态查询方法,并提供as_tool()将 Flow 包装为 LangGraph Agent 可调用的工具(Flow-as-Tool)。
并附带预置模板 EvalFlow:以
start → prepare_data → run_agent → collect_metrics → report → end
六步骨架承载评估场景;每步暴露明确的输入/输出协议,便于子类按需重写。
装饰器风格示例:
from taolib.harness.core import register_flow
@register_flow(name="rag-eval", version="1.0.0", tags=("eval",))
class RagEvalFlow(EvalFlow): ...
Metaflow 在当前环境可能未安装,本模块通过 TYPE_CHECKING 守卫与运行时
探测实现可插拔接入:未安装时仍可在内存中模拟 Step 顺序执行。
Attributes#
Step 函数签名:接收当前 Flow 上下文,返回需要合并的更新字典(同步或异步)。 |
Classes#
评估 Flow 模板。 |
|
Flow 产物的标准包装 - 对齐 Metaflow |
|
Flow 元配置。 |
|
Flow 资源声明 - 用于在 Metaflow |
|
Flow 运行状态。 |
|
Metaflow Flow 基类(Harness 增强版)。 |
|
单个 Step 执行结果。 |
Module Contents#
- class taolib.harness.pipelines.flow_base.EvalFlow(config: FlowConfig | collections.abc.Mapping[str, Any], *, registry: taolib.harness.core.registry.FlowRegistry | None = None, auto_register: bool = True)#
Bases:
HarnessFlow评估 Flow 模板。
标准骨架:
start → prepare_data → run_agent → collect_metrics → report → end
每步遵守明确的数据协议:
start:初始化运行上下文,写入run_id、started_at;prepare_data:产出dataset(评估样本列表);run_agent:在dataset上运行被评 Agent,产出predictions;collect_metrics:汇总predictions→metrics字典;report:基于metrics产出最终report;end:标记完成并落盘汇总产物。
子类通常重写
prepare_data()/run_agent()/collect_metrics()三个方法即可定制完整评估链路。构造 Flow。
- 参数:
config --
FlowConfig实例或可被其校验的字典。registry -- 自定义 Flow 注册表;缺省使用全局默认注册表。
auto_register -- 是否在构造时自动注册到
registry。
- class taolib.harness.pipelines.flow_base.FlowArtifact#
Bases:
pydantic.BaseModelFlow 产物的标准包装 - 对齐 Metaflow
Artifact概念。- model_config#
- value: Any = None#
- class taolib.harness.pipelines.flow_base.FlowConfig#
Bases:
pydantic.BaseModelFlow 元配置。
- model_config#
- resources: FlowResources#
- class taolib.harness.pipelines.flow_base.FlowResources#
Bases:
pydantic.BaseModelFlow 资源声明 - 用于在 Metaflow
@resources装饰器中映射。- model_config#
- class taolib.harness.pipelines.flow_base.FlowStatus#
Bases:
enum.StrEnumFlow 运行状态。
Initialize self. See help(type(self)) for accurate signature.
- CANCELLED = 'cancelled'#
- FAILED = 'failed'#
- PENDING = 'pending'#
- RUNNING = 'running'#
- SUCCEEDED = 'succeeded'#
- class taolib.harness.pipelines.flow_base.HarnessFlow(config: FlowConfig | collections.abc.Mapping[str, Any], *, registry: taolib.harness.core.registry.FlowRegistry | None = None, auto_register: bool = True)#
Metaflow Flow 基类(Harness 增强版)。
生命周期:
实例化时自动向
FlowRegistry注册自身(除非显式禁用);steps()子类定义有序的 Step 列表((name, callable)元组);as_tool()将 Flow 包装为 LangGraph Agent 可调用的工具;get_artifacts()/get_status()查询执行产物与运行状态。
钩子:
on_step_start、on_step_end、on_complete、on_failure。构造 Flow。
- 参数:
config --
FlowConfig实例或可被其校验的字典。registry -- 自定义 Flow 注册表;缺省使用全局默认注册表。
auto_register -- 是否在构造时自动注册到
registry。
- async arun(inputs: collections.abc.Mapping[str, Any] | None = None, *, run_id: str | None = None) list[StepResult]#
异步执行 Flow,返回所有 Step 的结果列表。
- as_tool(*, name: str | None = None, description: str | None = None) taolib.harness.agents.graph_agent.AgentTool#
将 Flow 包装为 LangGraph Agent 可调用的工具(Flow-as-Tool)。
返回的
AgentTool接受任意输入字典,触发arun()并返回 最终上下文字典(包含所有 Step 累积输出)。
- emit_artifact(name: str, value: Any, *, step: str, **metadata: Any) FlowArtifact#
显式记录一个 Step 产出的数据卡片。
- get_artifacts(*, step: str | None = None) list[FlowArtifact]#
获取 Flow 产出的数据卡片(可按 Step 过滤)。
- get_status() FlowStatus#
查询 Flow 当前运行状态。
- run(inputs: collections.abc.Mapping[str, Any] | None = None, *, run_id: str | None = None) list[StepResult]#
同步执行 Flow。
- config#
- config_class: ClassVar[type[FlowConfig]]#
- on_complete: collections.abc.Callable[[list[StepResult]], collections.abc.Awaitable[None] | None] | None = None#
- on_failure: collections.abc.Callable[[StepResult], collections.abc.Awaitable[None] | None] | None = None#
- on_step_end: collections.abc.Callable[[StepResult], collections.abc.Awaitable[None] | None] | None = None#
- on_step_start: collections.abc.Callable[[str, dict[str, Any]], collections.abc.Awaitable[None] | None] | None = None#
- property registry_entry: taolib.harness.core.registry.RegistryEntry[Any] | None#
注册条目(若已注册)。
- class taolib.harness.pipelines.flow_base.StepResult#
Bases:
pydantic.BaseModel单个 Step 执行结果。
- model_config#
- status: FlowStatus#