taolib.symphony.agent

目录

taolib.symphony.agent#

智能体子包。

提供 Codex 智能体的运行管理、传输层抽象、 事件解析、工具定义和 app-server 通信。

Submodules#

Exceptions#

AppServerError

AppServer 通信错误。

Classes#

AppServerClient

Codex app-server stdio 客户端。

AppServerEvent

AppServer 事件。

TokenAccounting

令牌使用追踪器。

TurnResult

轮次执行结果。

AgentRunner

智能体运行器。

IssueInfo

Issue 信息的协议接口。

RunnerConfig

运行器配置。

DynamicTool

动态工具基类。

LinearGraphQLTool

Linear GraphQL 查询工具。

ToolResult

工具执行结果。

AgentProcess

智能体子进程的 I/O 句柄。

AgentTransport

传输层抽象基类。

LocalTransport

本地传输实现。

SSHTransport

SSH 远程传输实现。

Functions#

parse_event(→ AppServerEvent)

将原始 JSON 字典解析为 AppServerEvent。

Package Contents#

class taolib.symphony.agent.AppServerClient(transport: taolib.symphony.agent.transport.AgentTransport)#

Codex app-server stdio 客户端。

通过 stdin/stdout JSON 行协议与 Codex app-server 子进程通信, 实现会话初始化、轮次运行、流式事件解析等核心交互。

_transport#
_process: taolib.symphony.agent.transport.AgentProcess | None = None#
_token_accounting#
_session_id: str | None = None#
property token_accounting: taolib.symphony.agent.events.TokenAccounting#

令牌使用追踪器。

property session_id: str | None#

当前会话 ID。

async start(command: str, workspace_path: str, *, approval_policy: str = 'never', sandbox_policy: dict | None = None) None#

启动 app-server 进程。

参数:
  • command -- 启动 app-server 的命令。

  • workspace_path -- 工作区路径(作为 cwd)。

  • approval_policy -- 审批策略(默认 never)。

  • sandbox_policy -- 沙箱策略配置。

async initialize_session(config: dict[str, Any] | None = None) str#

发送 initialize 消息,返回 session_id。

参数:

config -- 初始化配置(工具定义、审批策略等)。

返回:

会话 ID 字符串。

抛出:

AppServerError -- 初始化失败或通信错误。

async run_turn(session_id: str, prompt: str, on_event: collections.abc.Callable[[taolib.symphony.agent.events.AppServerEvent], None] | None = None) taolib.symphony.agent.events.TurnResult#

运行一个轮次,流式处理事件。

参数:
  • session_id -- 会话 ID。

  • prompt -- 轮次提示词。

  • on_event -- 事件回调函数。

返回:

轮次执行结果。

抛出:

AppServerError -- 通信错误。

async stop(session_id: str) None#

停止会话并关闭进程。

参数:

session_id -- 会话 ID。

async _send(msg: dict) None#

发送 JSON 行。

async _recv() dict#

接收一行 JSON。

async _stream_events()#

异步迭代接收事件。

exception taolib.symphony.agent.AppServerError#

Bases: taolib.symphony.errors.AgentError

AppServer 通信错误。

class taolib.symphony.agent.AppServerEvent#

AppServer 事件。

type: str#

事件类型(如 message, tool_call, turn_complete, error)。

data: dict#

事件附加数据。

raw: dict#

原始 JSON 字典。

class taolib.symphony.agent.TokenAccounting#

令牌使用追踪器。

增量跟踪每个轮次和会话的令牌消耗, 支持按轮次和总量查询。

_per_turn: list[int] = []#
_total: int = 0#
property total: int#

累计使用的令牌总数。

property turns: int#

已记录的轮次数。

record(tokens: int) None#

记录一个轮次的令牌使用。

参数:

tokens -- 本轮使用的令牌数。

get_turn_usage(turn_index: int) int#

获取指定轮次的令牌使用量。

参数:

turn_index -- 轮次索引(从 0 开始)。

返回:

令牌使用量,索引越界返回 0。

reset() None#

重置所有计数。

class taolib.symphony.agent.TurnResult#

轮次执行结果。

success: bool#

轮次是否成功完成。

token_usage: int = 0#

本轮使用的令牌数。

error: str | None = None#

错误信息(如果轮次失败)。

taolib.symphony.agent.parse_event(raw: dict) AppServerEvent#

将原始 JSON 字典解析为 AppServerEvent。

参数:

raw -- 从 app-server 接收的原始 JSON 行。

返回:

结构化的事件对象。

class taolib.symphony.agent.AgentRunner(transport: taolib.symphony.agent.transport.AgentTransport, config: RunnerConfig | None = None)#

智能体运行器。

编排单个 Issue 的完整运行生命周期: 创建工作区 → 执行 before_run 钩子 → 启动 app-server → 轮次循环 → 停止 app-server → 执行 after_run 钩子。

_transport#
_config#
_client#
property client: taolib.symphony.agent.appserver.AppServerClient#

底层 AppServer 客户端。

async run_attempt(issue: IssueInfo, attempt: int, workspace_path: str, *, build_prompt: collections.abc.Callable[[IssueInfo, int, int], str], check_issue_status: collections.abc.Callable[[IssueInfo], asyncio.Coroutine[Any, Any, str | None]] | None = None, on_event: collections.abc.Callable[[taolib.symphony.agent.events.AppServerEvent], None] | None = None) taolib.symphony.agent.events.TurnResult#

执行一次完整的运行尝试。

流程: 1. 执行 before_run 钩子 2. 启动 app-server 进程 3. 初始化会话 4. 轮次循环(构建提示词 → 运行轮次 → 检查状态) 5. 停止 app-server 6. 执行 after_run 钩子

参数:
  • issue -- Issue 信息。

  • attempt -- 尝试序号(从 0 开始)。

  • workspace_path -- 工作区路径。

  • build_prompt -- 提示词构建回调。

  • check_issue_status -- Issue 状态检查回调(可选)。

  • on_event -- 事件回调(可选)。

返回:

最终轮次结果。

class taolib.symphony.agent.IssueInfo#

Bases: Protocol

Issue 信息的协议接口。

property identifier: str#
property title: str#
property status: str#
model_dump() dict[str, Any]#
class taolib.symphony.agent.RunnerConfig#

运行器配置。

command: str = 'codex'#

启动 app-server 的命令。

max_turns: int = 10#

单次运行最大轮次数。

approval_policy: str = 'never'#

审批策略。

stall_timeout_ms: int = 300000#

停滞检测超时(毫秒)。

before_run: str | None = None#

运行前钩子脚本。

after_run: str | None = None#

运行后钩子脚本。

hook_timeout_ms: int = 30000#

钩子超时(毫秒)。

class taolib.symphony.agent.DynamicTool#

动态工具基类。

所有客户端侧工具的抽象基类, 定义工具名称和调用接口。

property name: str#
Abstractmethod:

工具名称。

property description: str#
Abstractmethod:

工具描述。

abstractmethod handle(arguments: dict) ToolResult#
Async:

处理工具调用。

参数:

arguments -- 工具调用参数。

返回:

工具执行结果。

to_tool_definition() dict#

生成 Codex 工具定义字典。

class taolib.symphony.agent.LinearGraphQLTool(api_key: str, endpoint: str = 'https://api.linear.app/graphql')#

Bases: DynamicTool

Linear GraphQL 查询工具。

允许 Codex 智能体在工作区内查询 Linear API, 复用 Symphony 的 Linear 认证信息。

_api_key#
_endpoint = 'https://api.linear.app/graphql'#
property name: str#

工具名称。

property description: str#

工具描述。

async handle(arguments: dict) ToolResult#

执行 Linear GraphQL 查询。

参数:

arguments -- 包含 query 和可选 variables 的字典。

返回:

GraphQL 查询结果。

to_tool_definition() dict#

生成包含参数 schema 的工具定义。

class taolib.symphony.agent.ToolResult#

工具执行结果。

success: bool#

是否成功。

output: str = ''#

工具输出的文本内容。

error: str | None = None#

错误信息。

class taolib.symphony.agent.AgentProcess#

智能体子进程的 I/O 句柄。

stdin: asyncio.StreamWriter#
stdout: asyncio.StreamReader#
stderr: asyncio.StreamReader#
pid: str | None = None#
class taolib.symphony.agent.AgentTransport#

Bases: abc.ABC

传输层抽象基类。

定义启动智能体进程和执行钩子脚本的接口, 使编排器可与本地或远程执行环境解耦。

abstractmethod start_process(command: str, cwd: str) AgentProcess#
Async:

启动智能体进程。

参数:
  • command -- 要执行的命令。

  • cwd -- 工作目录。

返回:

进程 I/O 句柄。

abstractmethod run_hook(script: str, cwd: str, timeout_ms: int) int#
Async:

执行钩子脚本。

参数:
  • script -- Shell 命令。

  • cwd -- 工作目录。

  • timeout_ms -- 超时毫秒数。

返回:

进程退出码。

class taolib.symphony.agent.LocalTransport#

Bases: AgentTransport

本地传输实现。

在本机以 asyncio 子进程方式启动智能体和执行钩子。

async start_process(command: str, cwd: str) AgentProcess#

在本机启动子进程。

async run_hook(script: str, cwd: str, timeout_ms: int) int#

在本机执行钩子脚本。

class taolib.symphony.agent.SSHTransport(host: str, **ssh_opts: object)#

Bases: AgentTransport

SSH 远程传输实现。

通过 asyncssh 连接远程主机,在远端启动智能体进程和执行钩子。 使用前必须先调用 connect() 建立连接。

_host#
_ssh_opts#
_conn: asyncssh.SSHClientConnection | None = None#
async connect() None#

建立 SSH 连接。

async start_process(command: str, cwd: str) AgentProcess#

在远程主机启动子进程。

async run_hook(script: str, cwd: str, timeout_ms: int) int#

在远程主机执行钩子脚本。

async disconnect() None#

关闭 SSH 连接。