taolib.harness.runtime.checkpointer#

跨层持久化适配器 - 实现 LangGraph Checkpointer 接口,底层桥接多种存储后端。

本模块提供 HarnessCheckpointer

  • 在结构上与 LangGraph 1.x 的 BaseCheckpointSaver 接口对齐(同步 + 异步 方法对:put / aputget_tuple / aget_tuple 等);

  • 支持 memory``(默认)与 ``redis 两种后端,通过配置切换;

  • 序列化采用 JSON(Pydantic model_dump_json / model_validate_json)。

LangGraph 与 redis 在当前环境可能未安装,本模块通过运行时探测决定是否 启用对应后端,不强依赖第三方库。

Classes#

CheckpointBackend

检查点后端类型。

CheckpointMetadata

检查点附加元数据。

CheckpointRecord

检查点记录 - 持久化的最小单元。

CheckpointerConfig

检查点适配器配置。

HarnessCheckpointer

Harness 跨层检查点适配器。

MemoryBackend

基于进程内字典的内存后端(默认/测试用)。

RedisBackend

Redis 后端 - 延迟导入 redis.asyncio,未安装时抛出明确错误。

Module Contents#

class taolib.harness.runtime.checkpointer.CheckpointBackend#

Bases: enum.StrEnum

检查点后端类型。

Initialize self. See help(type(self)) for accurate signature.

MEMORY = 'memory'#
REDIS = 'redis'#
class taolib.harness.runtime.checkpointer.CheckpointMetadata#

Bases: pydantic.BaseModel

检查点附加元数据。

model_config#
parents: dict[str, str]#
source_layer: str#
step: int#
timestamp: float#
writes: dict[str, Any]#
class taolib.harness.runtime.checkpointer.CheckpointRecord#

Bases: pydantic.BaseModel

检查点记录 - 持久化的最小单元。

checkpoint: dict[str, Any]#
checkpoint_id: str#
checkpoint_ns: str = ''#
metadata: CheckpointMetadata#
model_config#
parent_checkpoint_id: str | None = None#
thread_id: str#
class taolib.harness.runtime.checkpointer.CheckpointerConfig#

Bases: pydantic.BaseModel

检查点适配器配置。

backend: CheckpointBackend#
key_prefix: str#
model_config#
redis_url: str#
ttl_seconds: int | None#
class taolib.harness.runtime.checkpointer.HarnessCheckpointer(config: CheckpointerConfig | None = None, *, backend: _Backend | None = None)#

Harness 跨层检查点适配器。

在 LangGraph 1.x 的 BaseCheckpointSaver 之上提供统一封装, 可被 StateGraph.compile(checkpointer=...) 直接接入。本实现不 强依赖 langgraph_checkpoint 包,未安装时也能独立使用。

async adelete_thread(thread_id: str) int#

删除指定线程下的全部检查点。

async aget_tuple(config: dict[str, Any]) CheckpointRecord | None#

获取指定 config 对应的检查点记录。

async alist(config: dict[str, Any], *, limit: int | None = None) collections.abc.AsyncIterator[CheckpointRecord]#

异步遍历指定线程的检查点(按时间倒序)。

async aput(config: dict[str, Any], checkpoint: dict[str, Any], metadata: CheckpointMetadata | dict[str, Any] | None = None, new_versions: dict[str, Any] | None = None) dict[str, Any]#

写入一个检查点并返回新的 config 字典(含 checkpoint_id)。

delete_thread(thread_id: str) int#

同步版本的 adelete_thread()

get_tuple(config: dict[str, Any]) CheckpointRecord | None#

同步版本的 aget_tuple()

list(config: dict[str, Any], *, limit: int | None = None) collections.abc.Iterator[CheckpointRecord]#

同步版本的 alist()

put(config: dict[str, Any], checkpoint: dict[str, Any], metadata: CheckpointMetadata | dict[str, Any] | None = None, new_versions: dict[str, Any] | None = None) dict[str, Any]#

同步版本的 aput()

property backend: _Backend#

底层后端实例。

property config: CheckpointerConfig#

当前配置。

class taolib.harness.runtime.checkpointer.MemoryBackend#

Bases: _Backend

基于进程内字典的内存后端(默认/测试用)。

async adelete(thread_id: str) int#
async aget(thread_id: str, checkpoint_id: str | None) CheckpointRecord | None#
async alist(thread_id: str, *, limit: int | None = None) list[CheckpointRecord]#
async aput(record: CheckpointRecord) None#
class taolib.harness.runtime.checkpointer.RedisBackend(config: CheckpointerConfig)#

Bases: _Backend

Redis 后端 - 延迟导入 redis.asyncio,未安装时抛出明确错误。

数据组织:

  • {prefix}{thread_id}:index —— 有序集合,按时间戳存放 checkpoint_id;

  • {prefix}{thread_id}:item:{checkpoint_id} —— 检查点 JSON 字符串。

async adelete(thread_id: str) int#
async aget(thread_id: str, checkpoint_id: str | None) CheckpointRecord | None#
async alist(thread_id: str, *, limit: int | None = None) list[CheckpointRecord]#
async aput(record: CheckpointRecord) None#