taolib.harness.core.state#
统一状态管理器 - 跨 LangGraph Checkpointer 和 Metaflow Artifact Store 的一致性状态视图。
本模块提供 Harness 系统的核心状态抽象,聚合 LangGraph 与 Metaflow 两侧的
状态源,对外暴露线程安全的、按 thread_id 分区的统一视图,并通过观察者
模式向上层广播状态变更事件。
Attributes#
状态监听器签名:接收变更事件,可同步或异步执行。 |
Classes#
状态变更事件 - 由 |
|
状态变更事件类型。 |
|
状态快照 - 不可变的状态视图。 |
|
状态视图接口 - 所有底层状态源(LangGraph/Metaflow)的统一抽象。 |
|
统一状态管理器 - 聚合多个 |
Module Contents#
- class taolib.harness.core.state.StateChangeEvent#
Bases:
pydantic.BaseModel状态变更事件 - 由
UnifiedStateManager广播给所有监听器。- kind: StateChangeKind#
- model_config#
- snapshot: StateSnapshot#
- thread_id: ThreadId#
- class taolib.harness.core.state.StateChangeKind#
Bases:
enum.StrEnum状态变更事件类型。
Initialize self. See help(type(self)) for accurate signature.
- CREATED = 'created'#
- DELETED = 'deleted'#
- SNAPSHOT = 'snapshot'#
- UPDATED = 'updated'#
- class taolib.harness.core.state.StateSnapshot#
Bases:
pydantic.BaseModel状态快照 - 不可变的状态视图。
用于持久化、调试与时间回溯,包含完整的状态负载、版本号与时间戳。
- model_config#
- payload: StatePayload#
- thread_id: ThreadId#
- class taolib.harness.core.state.StateView#
Bases:
Protocol状态视图接口 - 所有底层状态源(LangGraph/Metaflow)的统一抽象。
实现者需提供按
thread_id维度的读取、写入、快照能力。- async read(thread_id: ThreadId) StatePayload#
读取指定线程的当前状态。
- async snapshot(thread_id: ThreadId) StateSnapshot#
生成指定线程的不可变快照。
- class taolib.harness.core.state.UnifiedStateManager(*, primary_source: str = 'langgraph', views: dict[str, StateView] | None = None)#
统一状态管理器 - 聚合多个
StateView并广播变更事件。典型用法:
manager = UnifiedStateManager() manager.attach_view("langgraph", lg_view) manager.attach_view("metaflow", mf_view) manager.subscribe(my_listener) await manager.update("thread-1", {"step": "started"})
构造统一状态管理器。
- 参数:
primary_source -- 主状态源名称,
read优先从该源读取。views -- 初始注入的状态视图映射,键为来源名,值为
StateView实现。
- known_threads() collections.abc.Iterable[ThreadId]#
返回当前已记录版本号的所有线程 ID。
- async read(thread_id: ThreadId, *, source: str | None = None) StatePayload#
读取指定线程的状态。
- 参数:
thread_id -- 线程分区标识。
source -- 指定状态源名称;缺省时使用主状态源。
- async snapshot(thread_id: ThreadId) StateSnapshot#
生成跨所有状态源聚合后的快照视图。
- subscribe(listener: StateListener) collections.abc.Callable[[], None]#
订阅状态变更事件,返回用于取消订阅的回调。
- async update(thread_id: ThreadId, delta: StatePayload, *, source: str | None = None) StateSnapshot#
以增量方式合并状态:在原状态基础上做浅合并后写回。
- async write(thread_id: ThreadId, payload: StatePayload, *, source: str | None = None, broadcast: bool = True) StateSnapshot#
写入指定线程的状态并广播事件。
- 参数:
thread_id -- 线程分区标识。
payload -- 待写入的状态负载。
source -- 写入到哪个状态源;缺省写主源。
broadcast -- 是否广播变更事件。
- type taolib.harness.core.state.StateListener = Callable[[StateChangeEvent], Awaitable[None] | None]#
状态监听器签名:接收变更事件,可同步或异步执行。