taolib.harness.core.bridge#

Bridge 层 - 实现 LangGraph 与 Metaflow 之间的双向适配和通信。

本模块提供两侧执行单元(LangGraph Node ↔ Metaflow Step)的双向适配器、 跨层事件总线以及统一的桥接配置/错误模型。设计目标是让两类异构运行时 能以最小耦合互调:图节点可将耗时任务卸载到 Metaflow,Flow 步骤的产出 也可注入回 LangGraph 的子图继续推进。

Exceptions#

BridgeError

桥接层统一错误类型。

Classes#

BridgeConfig

桥接配置。

BridgeEvent

跨层事件。

BridgeEventBus

跨层事件总线。

BridgeEventKind

桥接事件类型。

NodeOutput

LangGraph 节点输出的标准包装。

NodeToStepAdapter

将 LangGraph Node 输出适配为 Metaflow Step 输入并触发执行。

SerializationFormat

序列化格式枚举。

StepResult

Metaflow Step 结果的标准包装。

StepToGraphAdapter

将 Metaflow Step 结果注入回 LangGraph 子图继续执行。

Module Contents#

exception taolib.harness.core.bridge.BridgeError(message: str, *, layer: str = 'bridge', cause: BaseException | None = None)#

Bases: Exception

桥接层统一错误类型。

变量:
  • layer -- 错误来源层(langgraph / metaflow / bridge)。

  • cause -- 触发本异常的原始异常对象(可为 None)。

Initialize self. See help(type(self)) for accurate signature.

cause = None#
layer = 'bridge'#
class taolib.harness.core.bridge.BridgeConfig#

Bases: pydantic.BaseModel

桥接配置。

enable_async_offload: bool#
event_buffer_size: int#
max_retries: int#
model_config#
retry_backoff_seconds: float#
serialization: SerializationFormat#
timeout_seconds: float#
class taolib.harness.core.bridge.BridgeEvent#

Bases: pydantic.BaseModel

跨层事件。

kind: BridgeEventKind#
model_config#
payload: dict[str, Any]#
source: str#
target: str | None = None#
timestamp: float#
class taolib.harness.core.bridge.BridgeEventBus(*, buffer_size: int = 1024)#

跨层事件总线。

内部使用 asyncio.Queue 缓冲事件,并将其分发给所有已订阅的 监听器。支持背压(队列满时阻塞 publish)与广播取消。

async close() None#

关闭总线并等待事件分发完成。

async publish(event: BridgeEvent) None#

发布一个事件到总线。

subscribe(listener: EventListener) collections.abc.Callable[[], None]#

订阅事件,返回用于取消订阅的回调。

class taolib.harness.core.bridge.BridgeEventKind#

Bases: enum.StrEnum

桥接事件类型。

Initialize self. See help(type(self)) for accurate signature.

GRAPH_RESUMED = 'graph_resumed'#
NODE_DISPATCHED = 'node_dispatched'#
STEP_COMPLETED = 'step_completed'#
STEP_FAILED = 'step_failed'#
class taolib.harness.core.bridge.NodeOutput#

Bases: pydantic.BaseModel, Generic[T_out]

LangGraph 节点输出的标准包装。

metadata: dict[str, Any]#
model_config#
node_name: str#
payload: Any = None#
class taolib.harness.core.bridge.NodeToStepAdapter(*, config: BridgeConfig | None = None, event_bus: BridgeEventBus | None = None)#

将 LangGraph Node 输出适配为 Metaflow Step 输入并触发执行。

适配器负责:

  1. 序列化 Node 输出 → 调用 Step;

  2. BridgeConfig 控制下做超时/重试;

  3. 通过 BridgeEventBus 广播事件。

async dispatch(step: _StepCallable, node_output: NodeOutput[Any], *, step_name: str, flow_name: str = 'harness-flow', **kwargs: Any) StepResult[Any]#

将节点输出转换为参数并异步触发 Step 执行。

参数:
  • step -- 实现 _StepCallable 协议的异步函数(占位 Metaflow Step)。

  • node_output -- 上游 LangGraph Node 的输出。

  • step_name -- 目标 Step 名称。

  • flow_name -- 所属 Flow 名称。

  • **kwargs -- 额外透传给 Step 的参数。

property event_bus: BridgeEventBus#

关联的事件总线。

class taolib.harness.core.bridge.SerializationFormat#

Bases: enum.StrEnum

序列化格式枚举。

Initialize self. See help(type(self)) for accurate signature.

JSON = 'json'#
MSGPACK = 'msgpack'#
PICKLE = 'pickle'#
class taolib.harness.core.bridge.StepResult#

Bases: pydantic.BaseModel, Generic[T_out]

Metaflow Step 结果的标准包装。

error: str | None = None#
finished_at: float | None = None#
flow_name: str#
model_config#
payload: Any = None#
run_id: str | None = None#
started_at: float#
step_name: str#
success: bool = True#
class taolib.harness.core.bridge.StepToGraphAdapter(*, config: BridgeConfig | None = None, event_bus: BridgeEventBus | None = None)#

将 Metaflow Step 结果注入回 LangGraph 子图继续执行。

async resume(node: _NodeCallable, step_result: StepResult[Any], *, node_name: str, **kwargs: Any) NodeOutput[Any]#

step_result 注入到 node 中并执行,返回新的 NodeOutput

参数:
  • node -- 接收 Step 结果的 LangGraph 节点(异步可调用)。

  • step_result -- 上游 Step 的执行结果。

  • node_name -- 当前节点名称(用于事件追踪)。

property event_bus: BridgeEventBus#

关联的事件总线。