taolib.harness.runtime.checkpointer#
跨层持久化适配器 - 实现 LangGraph Checkpointer 接口,底层桥接多种存储后端。
本模块提供 HarnessCheckpointer:
在结构上与 LangGraph 1.x 的
BaseCheckpointSaver接口对齐(同步 + 异步 方法对:put/aput、get_tuple/aget_tuple等);支持
memory``(默认)与 ``redis两种后端,通过配置切换;序列化采用 JSON(Pydantic
model_dump_json/model_validate_json)。
LangGraph 与 redis 在当前环境可能未安装,本模块通过运行时探测决定是否 启用对应后端,不强依赖第三方库。
Classes#
检查点后端类型。 |
|
检查点附加元数据。 |
|
检查点记录 - 持久化的最小单元。 |
|
检查点适配器配置。 |
|
Harness 跨层检查点适配器。 |
|
基于进程内字典的内存后端(默认/测试用)。 |
|
Redis 后端 - 延迟导入 |
Module Contents#
- class taolib.harness.runtime.checkpointer.CheckpointBackend#
Bases:
enum.StrEnum检查点后端类型。
Initialize self. See help(type(self)) for accurate signature.
- MEMORY = 'memory'#
- REDIS = 'redis'#
- class taolib.harness.runtime.checkpointer.CheckpointMetadata#
Bases:
pydantic.BaseModel检查点附加元数据。
- model_config#
- class taolib.harness.runtime.checkpointer.CheckpointRecord#
Bases:
pydantic.BaseModel检查点记录 - 持久化的最小单元。
- metadata: CheckpointMetadata#
- model_config#
- class taolib.harness.runtime.checkpointer.CheckpointerConfig#
Bases:
pydantic.BaseModel检查点适配器配置。
- backend: CheckpointBackend#
- model_config#
- class taolib.harness.runtime.checkpointer.HarnessCheckpointer(config: CheckpointerConfig | None = None, *, backend: _Backend | None = None)#
Harness 跨层检查点适配器。
在 LangGraph 1.x 的
BaseCheckpointSaver之上提供统一封装, 可被StateGraph.compile(checkpointer=...)直接接入。本实现不 强依赖langgraph_checkpoint包,未安装时也能独立使用。- async aget_tuple(config: dict[str, Any]) CheckpointRecord | None#
获取指定 config 对应的检查点记录。
- async alist(config: dict[str, Any], *, limit: int | None = None) collections.abc.AsyncIterator[CheckpointRecord]#
异步遍历指定线程的检查点(按时间倒序)。
- async aput(config: dict[str, Any], checkpoint: dict[str, Any], metadata: CheckpointMetadata | dict[str, Any] | None = None, new_versions: dict[str, Any] | None = None) dict[str, Any]#
写入一个检查点并返回新的 config 字典(含
checkpoint_id)。
- delete_thread(thread_id: str) int#
同步版本的
adelete_thread()。
- get_tuple(config: dict[str, Any]) CheckpointRecord | None#
同步版本的
aget_tuple()。
- list(config: dict[str, Any], *, limit: int | None = None) collections.abc.Iterator[CheckpointRecord]#
同步版本的
alist()。
- put(config: dict[str, Any], checkpoint: dict[str, Any], metadata: CheckpointMetadata | dict[str, Any] | None = None, new_versions: dict[str, Any] | None = None) dict[str, Any]#
同步版本的
aput()。
- property backend: _Backend#
底层后端实例。
- property config: CheckpointerConfig#
当前配置。
- class taolib.harness.runtime.checkpointer.MemoryBackend#
Bases:
_Backend基于进程内字典的内存后端(默认/测试用)。
- async aput(record: CheckpointRecord) None#
- class taolib.harness.runtime.checkpointer.RedisBackend(config: CheckpointerConfig)#
Bases:
_BackendRedis 后端 - 延迟导入
redis.asyncio,未安装时抛出明确错误。数据组织:
{prefix}{thread_id}:index—— 有序集合,按时间戳存放 checkpoint_id;{prefix}{thread_id}:item:{checkpoint_id}—— 检查点 JSON 字符串。
- async aput(record: CheckpointRecord) None#