taolib.harness.runtime.executor#

统一执行引擎 - 封装 LangGraph 图执行和 Metaflow Flow 运行的统一接口。

UnifiedExecutor 根据任务类型路由到 GraphExecutorFlowExecutor,并统一以 ExecutionResult 返回执行产物。 LangGraph / Metaflow 在当前环境可能未安装,本模块通过 TYPE_CHECKING 守卫与 duck typing 实现可插拔接入。

Classes#

ExecutionContext

执行上下文。

ExecutionMode

执行模式。

ExecutionResult

执行结果。

ExecutionStatus

执行状态。

ExecutorBackend

执行后端类型。

FlowExecutor

Metaflow Flow 执行器。

GraphExecutor

LangGraph 图执行器。

UnifiedExecutor

统一执行入口。

Module Contents#

class taolib.harness.runtime.executor.ExecutionContext#

Bases: pydantic.BaseModel

执行上下文。

携带 thread_id、可序列化配置和任意元数据,贯穿单次执行的全生命周期。

config: dict[str, Any]#
metadata: dict[str, Any]#
mode: ExecutionMode#
model_config#
run_id: str#
thread_id: str#
timeout_seconds: float | None#
class taolib.harness.runtime.executor.ExecutionMode#

Bases: enum.StrEnum

执行模式。

Initialize self. See help(type(self)) for accurate signature.

BATCH = 'batch'#
LOCAL = 'local'#
REMOTE = 'remote'#
class taolib.harness.runtime.executor.ExecutionResult#

Bases: pydantic.BaseModel

执行结果。

backend: ExecutorBackend#
property duration_seconds: float | None#

执行耗时(秒)。

error: str | None = None#
finished_at: float | None = None#
metadata: dict[str, Any]#
model_config#
output: Any = None#
run_id: str#
started_at: float#
status: ExecutionStatus#
class taolib.harness.runtime.executor.ExecutionStatus#

Bases: enum.StrEnum

执行状态。

Initialize self. See help(type(self)) for accurate signature.

CANCELLED = 'cancelled'#
FAILED = 'failed'#
PENDING = 'pending'#
RUNNING = 'running'#
SUCCEEDED = 'succeeded'#
class taolib.harness.runtime.executor.ExecutorBackend#

Bases: enum.StrEnum

执行后端类型。

Initialize self. See help(type(self)) for accurate signature.

FLOW = 'flow'#
GRAPH = 'graph'#
class taolib.harness.runtime.executor.FlowExecutor(*, mode: ExecutionMode = ExecutionMode.LOCAL)#

Metaflow Flow 执行器。

在真实环境中应通过 Runner API 触发 Flow,本实现为离线友好的占位: 若目标暴露了异步 run / arun 方法则调用,否则将其作为可调用对象。

async execute(target: Any, inputs: dict[str, Any], context: ExecutionContext) ExecutionResult#

执行 Metaflow Flow。

backend#
class taolib.harness.runtime.executor.GraphExecutor(*, compile_options: dict[str, Any] | None = None)#

LangGraph 图执行器。

支持两类目标:

  • 已编译的 CompiledGraph``(具有 ``ainvoke 方法);

  • StateGraph 实例(先调用 compile()ainvoke)。

若目标不具备上述能力,则回退到将其作为异步可调用对象直接调用。

async execute(target: Any, inputs: dict[str, Any], context: ExecutionContext) ExecutionResult#

执行 LangGraph 图。

backend#
class taolib.harness.runtime.executor.UnifiedExecutor(*, graph_executor: GraphExecutor | None = None, flow_executor: FlowExecutor | None = None)#

统一执行入口。

根据 backend 路由到 GraphExecutorFlowExecutor。 用户也可注入自定义后端实现。

async execute(target: Any, inputs: dict[str, Any] | None = None, *, backend: ExecutorBackend | str, context: ExecutionContext | None = None) ExecutionResult#

执行目标对象。

参数:
  • target -- 待执行的图 / Flow 对象。

  • inputs -- 输入数据,缺省为空字典。

  • backend -- 后端类型(graphflow)。

  • context -- 执行上下文,缺省自动生成。

property flow: FlowExecutor#

Flow 执行器。

property graph: GraphExecutor#

图执行器。