taolib.harness.devtools.replay#

执行回放工具 - 录制 Agent 执行轨迹并支持变速 / 步进 / 跳转回放。

提供六类标准事件(节点进入/退出、边遍历、状态更新、工具调用、LLM 调用) 和 ReplayEngine

  • record() - 注入回调收集事件;

  • replay() / areplay() - 顺序回放,支持速度倍率、断点、步进;

  • seek() - 跳转到指定步骤;

  • export() / import_recording() - JSON 持久化。

Attributes#

ReplayCallback

回放回调签名:(事件下标, 事件) -> None

Classes#

ExecutionRecording

一次执行的事件序列录制。

RecordedEvent

单条录制事件。

ReplayConfig

回放配置。

ReplayEngine

执行回放引擎。

ReplayEventType

回放事件类型。

ReplayMode

回放模式。

Module Contents#

class taolib.harness.devtools.replay.ExecutionRecording#

Bases: pydantic.BaseModel

一次执行的事件序列录制。

append(event: RecordedEvent) None#

追加一条事件。

property duration_seconds: float#

录制总时长(秒)。

events: list[RecordedEvent]#
finished_at: float | None = None#
metadata: dict[str, Any]#
model_config#
recording_id: str#
started_at: float#
class taolib.harness.devtools.replay.RecordedEvent#

Bases: pydantic.BaseModel

单条录制事件。

event_id: str#
model_config#
node: str | None = None#
payload: dict[str, Any]#
thread_id: str | None = None#
timestamp: float#
type: ReplayEventType#
class taolib.harness.devtools.replay.ReplayConfig#

Bases: pydantic.BaseModel

回放配置。

breakpoints: list[int]#
end_index: int | None = None#
mode: ReplayMode#
model_config#
preserve_timing: bool = True#
speed: float#
start_index: int = 0#
class taolib.harness.devtools.replay.ReplayEngine#

执行回放引擎。

async arecord(execution: collections.abc.Callable[[_Recorder], collections.abc.Awaitable[Any]], *, metadata: dict[str, Any] | None = None) ExecutionRecording#

异步录制:等待 execution 协程完成后冻结录制。

async areplay(recording: ExecutionRecording, config: ReplayConfig | None = None, *, callback: ReplayCallback | None = None, step_signal: asyncio.Event | None = None) list[RecordedEvent]#

异步回放,可被 step_signal 控制单步步进。

static export(recording: ExecutionRecording, path: str | pathlib.Path | None = None, *, format: str = 'json') str#

导出录制为 JSON 字符串;若提供 path 则同时落盘。

static filter_events(recording: ExecutionRecording, *, types: collections.abc.Iterable[ReplayEventType | str] | None = None, nodes: collections.abc.Iterable[str] | None = None) list[RecordedEvent]#

按类型 / 节点过滤事件。

import_recording(path: str | pathlib.Path) ExecutionRecording#

从 JSON 文件导入录制并缓存。

static merge(recordings: collections.abc.Sequence[ExecutionRecording]) ExecutionRecording#

按时间戳合并多个录制为一份。

record(execution: collections.abc.Callable[[_Recorder], collections.abc.Awaitable[Any] | Any] | None = None, *, metadata: dict[str, Any] | None = None) _Recorder | ExecutionRecording#

录制执行过程。

两种用法:

  • 不传 execution,返回 _Recorder 由调用方主动调用 emit

  • 传入 execution,由引擎驱动其执行,期间通过回调收集事件, 返回最终的 ExecutionRecording

replay(recording: ExecutionRecording, config: ReplayConfig | None = None, *, callback: ReplayCallback | None = None) list[RecordedEvent]#

同步回放(在新事件循环中运行 areplay())。

static seek(recording: ExecutionRecording, step_index: int) RecordedEvent#

跳转到指定下标的事件并返回该事件。

property loaded: dict[str, ExecutionRecording]#

已导入的录制集合。

class taolib.harness.devtools.replay.ReplayEventType#

Bases: enum.StrEnum

回放事件类型。

Initialize self. See help(type(self)) for accurate signature.

EDGE_TRAVERSE = 'edge_traverse'#
LLM_CALL = 'llm_call'#
NODE_ENTER = 'node_enter'#
NODE_EXIT = 'node_exit'#
STATE_UPDATE = 'state_update'#
TOOL_CALL = 'tool_call'#
class taolib.harness.devtools.replay.ReplayMode#

Bases: enum.StrEnum

回放模式。

Initialize self. See help(type(self)) for accurate signature.

BREAKPOINT = 'breakpoint'#
CONTINUOUS = 'continuous'#
STEP = 'step'#
type taolib.harness.devtools.replay.ReplayCallback = Callable[[int, RecordedEvent], Awaitable[None] | None]#

回放回调签名:(事件下标, 事件) -> None