taolib.harness.core.state#

统一状态管理器 - 跨 LangGraph Checkpointer 和 Metaflow Artifact Store 的一致性状态视图。

本模块提供 Harness 系统的核心状态抽象,聚合 LangGraph 与 Metaflow 两侧的 状态源,对外暴露线程安全的、按 thread_id 分区的统一视图,并通过观察者 模式向上层广播状态变更事件。

Attributes#

StateListener

状态监听器签名:接收变更事件,可同步或异步执行。

Classes#

StateChangeEvent

状态变更事件 - 由 UnifiedStateManager 广播给所有监听器。

StateChangeKind

状态变更事件类型。

StateSnapshot

状态快照 - 不可变的状态视图。

StateView

状态视图接口 - 所有底层状态源(LangGraph/Metaflow)的统一抽象。

UnifiedStateManager

统一状态管理器 - 聚合多个 StateView 并广播变更事件。

Module Contents#

class taolib.harness.core.state.StateChangeEvent#

Bases: pydantic.BaseModel

状态变更事件 - 由 UnifiedStateManager 广播给所有监听器。

kind: StateChangeKind#
model_config#
snapshot: StateSnapshot#
thread_id: ThreadId#
timestamp: float#
version: int#
class taolib.harness.core.state.StateChangeKind#

Bases: enum.StrEnum

状态变更事件类型。

Initialize self. See help(type(self)) for accurate signature.

CREATED = 'created'#
DELETED = 'deleted'#
SNAPSHOT = 'snapshot'#
UPDATED = 'updated'#
class taolib.harness.core.state.StateSnapshot#

Bases: pydantic.BaseModel

状态快照 - 不可变的状态视图。

用于持久化、调试与时间回溯,包含完整的状态负载、版本号与时间戳。

created_at: float#
model_config#
payload: StatePayload#
source: str#
thread_id: ThreadId#
version: int#
class taolib.harness.core.state.StateView#

Bases: Protocol

状态视图接口 - 所有底层状态源(LangGraph/Metaflow)的统一抽象。

实现者需提供按 thread_id 维度的读取、写入、快照能力。

async read(thread_id: ThreadId) StatePayload#

读取指定线程的当前状态。

async snapshot(thread_id: ThreadId) StateSnapshot#

生成指定线程的不可变快照。

async write(thread_id: ThreadId, payload: StatePayload) None#

写入指定线程的状态。

class taolib.harness.core.state.UnifiedStateManager(*, primary_source: str = 'langgraph', views: dict[str, StateView] | None = None)#

统一状态管理器 - 聚合多个 StateView 并广播变更事件。

典型用法:

manager = UnifiedStateManager()
manager.attach_view("langgraph", lg_view)
manager.attach_view("metaflow", mf_view)
manager.subscribe(my_listener)
await manager.update("thread-1", {"step": "started"})

构造统一状态管理器。

参数:
  • primary_source -- 主状态源名称,read 优先从该源读取。

  • views -- 初始注入的状态视图映射,键为来源名,值为 StateView 实现。

attach_view(name: str, view: StateView) None#

注册或替换一个底层状态视图。

async delete(thread_id: ThreadId) None#

删除指定线程在所有状态源中的记录(若底层支持)。

detach_view(name: str) StateView | None#

注销指定的状态视图,返回原视图(若存在)。

known_threads() collections.abc.Iterable[ThreadId]#

返回当前已记录版本号的所有线程 ID。

async read(thread_id: ThreadId, *, source: str | None = None) StatePayload#

读取指定线程的状态。

参数:
  • thread_id -- 线程分区标识。

  • source -- 指定状态源名称;缺省时使用主状态源。

async snapshot(thread_id: ThreadId) StateSnapshot#

生成跨所有状态源聚合后的快照视图。

subscribe(listener: StateListener) collections.abc.Callable[[], None]#

订阅状态变更事件,返回用于取消订阅的回调。

async update(thread_id: ThreadId, delta: StatePayload, *, source: str | None = None) StateSnapshot#

以增量方式合并状态:在原状态基础上做浅合并后写回。

async write(thread_id: ThreadId, payload: StatePayload, *, source: str | None = None, broadcast: bool = True) StateSnapshot#

写入指定线程的状态并广播事件。

参数:
  • thread_id -- 线程分区标识。

  • payload -- 待写入的状态负载。

  • source -- 写入到哪个状态源;缺省写主源。

  • broadcast -- 是否广播变更事件。

property sources: list[str]#

所有已注册的状态源名称。

type taolib.harness.core.state.StateListener = Callable[[StateChangeEvent], Awaitable[None] | None]#

状态监听器签名:接收变更事件,可同步或异步执行。