taolib.harness.core.bridge#
Bridge 层 - 实现 LangGraph 与 Metaflow 之间的双向适配和通信。
本模块提供两侧执行单元(LangGraph Node ↔ Metaflow Step)的双向适配器、 跨层事件总线以及统一的桥接配置/错误模型。设计目标是让两类异构运行时 能以最小耦合互调:图节点可将耗时任务卸载到 Metaflow,Flow 步骤的产出 也可注入回 LangGraph 的子图继续推进。
Exceptions#
桥接层统一错误类型。 |
Classes#
桥接配置。 |
|
跨层事件。 |
|
跨层事件总线。 |
|
桥接事件类型。 |
|
LangGraph 节点输出的标准包装。 |
|
将 LangGraph Node 输出适配为 Metaflow Step 输入并触发执行。 |
|
序列化格式枚举。 |
|
Metaflow Step 结果的标准包装。 |
|
将 Metaflow Step 结果注入回 LangGraph 子图继续执行。 |
Module Contents#
- exception taolib.harness.core.bridge.BridgeError(message: str, *, layer: str = 'bridge', cause: BaseException | None = None)#
Bases:
Exception桥接层统一错误类型。
- 变量:
layer -- 错误来源层(
langgraph/metaflow/bridge)。cause -- 触发本异常的原始异常对象(可为 None)。
Initialize self. See help(type(self)) for accurate signature.
- cause = None#
- layer = 'bridge'#
- class taolib.harness.core.bridge.BridgeConfig#
Bases:
pydantic.BaseModel桥接配置。
- model_config#
- serialization: SerializationFormat#
- class taolib.harness.core.bridge.BridgeEvent#
Bases:
pydantic.BaseModel跨层事件。
- kind: BridgeEventKind#
- model_config#
- class taolib.harness.core.bridge.BridgeEventBus(*, buffer_size: int = 1024)#
跨层事件总线。
内部使用
asyncio.Queue缓冲事件,并将其分发给所有已订阅的 监听器。支持背压(队列满时阻塞publish)与广播取消。- async publish(event: BridgeEvent) None#
发布一个事件到总线。
- subscribe(listener: EventListener) collections.abc.Callable[[], None]#
订阅事件,返回用于取消订阅的回调。
- class taolib.harness.core.bridge.BridgeEventKind#
Bases:
enum.StrEnum桥接事件类型。
Initialize self. See help(type(self)) for accurate signature.
- GRAPH_RESUMED = 'graph_resumed'#
- NODE_DISPATCHED = 'node_dispatched'#
- STEP_COMPLETED = 'step_completed'#
- STEP_FAILED = 'step_failed'#
- class taolib.harness.core.bridge.NodeOutput#
Bases:
pydantic.BaseModel,Generic[T_out]LangGraph 节点输出的标准包装。
- model_config#
- payload: Any = None#
- class taolib.harness.core.bridge.NodeToStepAdapter(*, config: BridgeConfig | None = None, event_bus: BridgeEventBus | None = None)#
将 LangGraph Node 输出适配为 Metaflow Step 输入并触发执行。
适配器负责:
序列化 Node 输出 → 调用 Step;
在
BridgeConfig控制下做超时/重试;通过
BridgeEventBus广播事件。
- async dispatch(step: _StepCallable, node_output: NodeOutput[Any], *, step_name: str, flow_name: str = 'harness-flow', **kwargs: Any) StepResult[Any]#
将节点输出转换为参数并异步触发 Step 执行。
- 参数:
step -- 实现
_StepCallable协议的异步函数(占位 Metaflow Step)。node_output -- 上游 LangGraph Node 的输出。
step_name -- 目标 Step 名称。
flow_name -- 所属 Flow 名称。
**kwargs -- 额外透传给 Step 的参数。
- property event_bus: BridgeEventBus#
关联的事件总线。
- class taolib.harness.core.bridge.SerializationFormat#
Bases:
enum.StrEnum序列化格式枚举。
Initialize self. See help(type(self)) for accurate signature.
- JSON = 'json'#
- MSGPACK = 'msgpack'#
- PICKLE = 'pickle'#
- class taolib.harness.core.bridge.StepResult#
Bases:
pydantic.BaseModel,Generic[T_out]Metaflow Step 结果的标准包装。
- model_config#
- payload: Any = None#
- class taolib.harness.core.bridge.StepToGraphAdapter(*, config: BridgeConfig | None = None, event_bus: BridgeEventBus | None = None)#
将 Metaflow Step 结果注入回 LangGraph 子图继续执行。
- async resume(node: _NodeCallable, step_result: StepResult[Any], *, node_name: str, **kwargs: Any) NodeOutput[Any]#
将
step_result注入到node中并执行,返回新的NodeOutput。- 参数:
node -- 接收 Step 结果的 LangGraph 节点(异步可调用)。
step_result -- 上游 Step 的执行结果。
node_name -- 当前节点名称(用于事件追踪)。
- property event_bus: BridgeEventBus#
关联的事件总线。