taolib.symphony.orchestrator.engine#

Symphony 编排引擎 — 轮询、对账、分派与重试的完整生命周期。

Orchestrator 是 Symphony 服务的核心组件,拥有轮询节拍和内存中的运行时状态。 负责决定哪些问题需要分派、重试、停止或释放。

设计原则: - 单线程 asyncio 事件循环,所有状态变更串行化 - asyncio.Task per worker,使用命名 task(Python 3.14 内省) - 通过单一权威状态 (OrchestratorState) 避免重复分派 - 对账在每个节拍的分派之前运行 - 重启恢复由跟踪器驱动和文件系统驱动(无需持久化调度器数据库)

对应规范 §7(编排状态机)、§8(轮询、调度和对账)。

Attributes#

Exceptions#

DispatchValidationError

分派预检验证失败。

Classes#

Orchestrator

Symphony 编排引擎。

Module Contents#

taolib.symphony.orchestrator.engine.logger#
exception taolib.symphony.orchestrator.engine.DispatchValidationError(reasons: list[str])#

Bases: Exception

分派预检验证失败。

reasons#
class taolib.symphony.orchestrator.engine.Orchestrator(config: taolib.symphony.config.schema.SymphonyConfig, tracker: taolib.symphony.tracker.base.TrackerClient, workspace_manager: taolib.symphony.workspace.manager.WorkspaceManager, agent_runner: Any, *, scheduler: taolib.symphony.orchestrator.scheduler.Scheduler | None = None)#

Symphony 编排引擎。

负责轮询 → 对账 → 分派 → 监控的完整生命周期。 单线程 asyncio 事件循环,所有状态变更串行化。

对应规范 §7(编排状态机)和 §8(轮询、调度和对账)。

用法:

config = SymphonyConfig()
tracker = LinearTrackerClient(...)
workspace_mgr = WorkspaceManager(...)
agent_runner = AgentRunner(...)

orch = Orchestrator(config, tracker, workspace_mgr, agent_runner)
await orch.run()  # 阻塞直到 shutdown
_config#
_tracker#
_workspace_manager#
_agent_runner#
_scheduler#
_running = False#
state#
async run() None#

服务主循环:启动验证 → 终态清理 → 首次 tick → 周期性 tick。

阻塞调用,直到 shutdown() 被调用。

对应规范 §16.1 服务启动: 1. 配置验证 2. 启动终态工作区清理 3. 即时首次 tick 4. 周期性 tick

async shutdown() None#

优雅关闭:停止新分派,等待活跃 worker,清理重试计时器。

对应规范 §14 操作员干预 — 重启服务场景。

async _tick() None#

轮询和分派节拍。

对应规范 §16.2 轮询和分派节拍: 1. 对账运行中的问题(停顿检测 + 跟踪器状态刷新) 2. 运行分派预检验证 3. 使用活跃状态从跟踪器获取候选问题 4. 按分派优先级排序问题 5. 在槽位可用时分派符合条件的问题

_dispatch_issue(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#

分派一个 issue,创建命名 worker task。

对应规范 §16.4 分派一个问题。

参数:
  • issue -- 待分派的问题

  • attempt -- 重试尝试序号(首次为 None)

async _run_worker(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#

Worker 协程:运行智能体尝试。

对应规范 §16.5 Worker 尝试(工作区 + 提示词 + 智能体)。

参数:
  • issue -- 待处理的问题

  • attempt -- 重试尝试序号

_make_update_callback(issue_id: str)#

创建 Codex 事件更新回调。

回调在编排器事件循环线程中同步执行, 因此可以安全修改 state。

参数:

issue_id -- 问题 ID

返回:

回调函数

_on_worker_done(issue_id: str, task: asyncio.Task) None#

Worker 完成回调:续运重试或故障指数退避重试。

对应规范 §7.3 转换触发器 — Worker Exit: - 正常退出:安排续运重试(delay_type="continuation",1 秒延迟) - 异常退出:安排指数退避重试(delay_type="failure")

此回调在事件循环线程中执行,可安全修改 state。

参数:
  • issue_id -- 问题 ID

  • task -- 完成的 worker task

_schedule_retry(issue_id: str, attempt: int, meta: taolib.symphony.orchestrator.state.RetryMeta) None#

安排一次重试。

对应规范 §8.4 重试和退避。

退避公式: - 续运重试(delay_type="continuation"):固定 1000 ms 延迟 - 故障重试(delay_type="failure"):delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)

retry_token 用于防止过期重试(参考 Elixir make_ref() 模式): 当同一 Issue 被重新调度时,旧计时器的 token 将不匹配, 避免过期回调触发重复分派。

参数:
  • issue_id -- 问题 ID

  • attempt -- 重试尝试序号

  • meta -- 重试调度元数据

_compute_retry_delay_ms(attempt: int, is_continuation: bool) int#

计算重试延迟。

对应规范 §8.4 退避公式: - 续运重试:1000 ms 固定延迟 - 故障重试:delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)

参数:
  • attempt -- 重试尝试序号(从 1 开始)

  • is_continuation -- 是否为续运重试

返回:

延迟毫秒数

async _on_retry_timer(issue_id: str, retry_token: uuid.UUID) None#

重试计时器触发回调。

对应规范 §16.6 on_retry_timer: 1. 验证 retry_token(防过期重试) 2. 获取活跃候选问题 3. 按 issue_id 查找特定问题 4. 如果未找到,释放声明 5. 如果找到且仍符合候选条件,分派 6. 如果找到但不再活跃,释放声明 7. 如果槽位不可用,以错误重新排队

参数:
  • issue_id -- 触发重试的问题 ID

  • retry_token -- 重试令牌,不匹配则跳过(防止过期重试)

async _reconcile_running_issues() None#

对账活跃运行。

对应规范 §8.5 活跃运行对账,包含两个部分: A. 停顿检测 B. 跟踪器状态刷新

async _reconcile_stalled_runs() None#

停顿检测。

对应规范 §8.5 部分 A: - 对于每个运行中的问题,计算 elapsed_ms - 基于 last_codex_timestamp(如已看到任何事件),否则 started_at - 如果 elapsed_ms > stall_timeout_ms,终止 Worker 并排队重试 - 如果 stall_timeout_ms <= 0,跳过停顿检测

async _reconcile_tracker_states() None#

跟踪器状态刷新。

对应规范 §8.5 部分 B 和 §16.3: - 获取所有运行中问题 ID 的当前问题状态 - 终态:终止 Worker 并清理工作区 - 活跃:更新内存中的问题快照 - 既非活跃也非终态:终止 Worker 但不清理工作区 - 如果状态刷新失败,保持 Worker 运行

async _terminate_running_issue(issue_id: str, entry: taolib.symphony.orchestrator.state.RunningEntry, *, cleanup_workspace: bool) None#

终止运行中的问题。

参数:
  • issue_id -- 问题 ID

  • entry -- 运行条目

  • cleanup_workspace -- 是否清理工作区

async _startup_terminal_cleanup() None#

启动终态工作区清理。

对应规范 §8.6: 1. 查询跟踪器中处于终态的问题 2. 移除对应的工作区目录 3. 如果终态问题获取失败,记录警告并继续启动

_refresh_runtime_config() None#

从最新配置刷新运行时参数(支持 WORKFLOW.md 热重载)。

配置由 WorkflowStore/Watcher 管理热重载, 此方法将最新配置值同步到编排器状态。

snapshot() dict[str, Any]#

可观测性:生成当前状态快照(供 HTTP API 和仪表板使用)。

委托 SnapshotGenerator 生成快照,包含: - 运行中的 worker 信息 - 重试队列条目 - 令牌汇总 - 配置参数 - 轮询状态

_revalidate_issue_for_dispatch(issue: taolib.symphony.tracker.models.Issue) bool#

分派前重新验证 Issue 有效性(防竞态调度)。

在 _dispatch_issue 中调用,确保从 filter_dispatchable 到 实际分派之间状态未发生变化。

参数:

issue -- 待验证的问题

返回:

True 表示验证通过,可以分派

_select_worker_host() str | None#

SSH 场景下最小负载主机选择。

从配置的 ssh_hosts 中选择当前运行 worker 数最少的主机。 本地执行时返回 None。

返回:

选中的主机地址,或 None 表示本地执行

_validate_dispatch_config() None#

分派预检验证。

对应规范 §6.3: - tracker.kind 存在且受支持 - tracker.api_key 在 $ 解析后存在 - tracker.project_slug 存在(当 tracker.kind == "linear" 时) - codex.command 存在且非空

抛出:

DispatchValidationError -- 验证失败