taolib.symphony.orchestrator.engine#
Symphony 编排引擎 — 轮询、对账、分派与重试的完整生命周期。
Orchestrator 是 Symphony 服务的核心组件,拥有轮询节拍和内存中的运行时状态。 负责决定哪些问题需要分派、重试、停止或释放。
设计原则: - 单线程 asyncio 事件循环,所有状态变更串行化 - asyncio.Task per worker,使用命名 task(Python 3.14 内省) - 通过单一权威状态 (OrchestratorState) 避免重复分派 - 对账在每个节拍的分派之前运行 - 重启恢复由跟踪器驱动和文件系统驱动(无需持久化调度器数据库)
对应规范 §7(编排状态机)、§8(轮询、调度和对账)。
Attributes#
Exceptions#
分派预检验证失败。 |
Classes#
Symphony 编排引擎。 |
Module Contents#
- taolib.symphony.orchestrator.engine.logger#
- exception taolib.symphony.orchestrator.engine.DispatchValidationError(reasons: list[str])#
Bases:
Exception分派预检验证失败。
- reasons#
- class taolib.symphony.orchestrator.engine.Orchestrator(config: taolib.symphony.config.schema.SymphonyConfig, tracker: taolib.symphony.tracker.base.TrackerClient, workspace_manager: taolib.symphony.workspace.manager.WorkspaceManager, agent_runner: Any, *, scheduler: taolib.symphony.orchestrator.scheduler.Scheduler | None = None)#
Symphony 编排引擎。
负责轮询 → 对账 → 分派 → 监控的完整生命周期。 单线程 asyncio 事件循环,所有状态变更串行化。
对应规范 §7(编排状态机)和 §8(轮询、调度和对账)。
用法:
config = SymphonyConfig() tracker = LinearTrackerClient(...) workspace_mgr = WorkspaceManager(...) agent_runner = AgentRunner(...) orch = Orchestrator(config, tracker, workspace_mgr, agent_runner) await orch.run() # 阻塞直到 shutdown
- _config#
- _tracker#
- _workspace_manager#
- _agent_runner#
- _scheduler#
- _running = False#
- state#
- async run() None#
服务主循环:启动验证 → 终态清理 → 首次 tick → 周期性 tick。
阻塞调用,直到 shutdown() 被调用。
对应规范 §16.1 服务启动: 1. 配置验证 2. 启动终态工作区清理 3. 即时首次 tick 4. 周期性 tick
- async _tick() None#
轮询和分派节拍。
对应规范 §16.2 轮询和分派节拍: 1. 对账运行中的问题(停顿检测 + 跟踪器状态刷新) 2. 运行分派预检验证 3. 使用活跃状态从跟踪器获取候选问题 4. 按分派优先级排序问题 5. 在槽位可用时分派符合条件的问题
- _dispatch_issue(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#
分派一个 issue,创建命名 worker task。
对应规范 §16.4 分派一个问题。
- 参数:
issue -- 待分派的问题
attempt -- 重试尝试序号(首次为 None)
- async _run_worker(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#
Worker 协程:运行智能体尝试。
对应规范 §16.5 Worker 尝试(工作区 + 提示词 + 智能体)。
- 参数:
issue -- 待处理的问题
attempt -- 重试尝试序号
- _make_update_callback(issue_id: str)#
创建 Codex 事件更新回调。
回调在编排器事件循环线程中同步执行, 因此可以安全修改 state。
- 参数:
issue_id -- 问题 ID
- 返回:
回调函数
- _on_worker_done(issue_id: str, task: asyncio.Task) None#
Worker 完成回调:续运重试或故障指数退避重试。
对应规范 §7.3 转换触发器 — Worker Exit: - 正常退出:安排续运重试(delay_type="continuation",1 秒延迟) - 异常退出:安排指数退避重试(delay_type="failure")
此回调在事件循环线程中执行,可安全修改 state。
- 参数:
issue_id -- 问题 ID
task -- 完成的 worker task
- _schedule_retry(issue_id: str, attempt: int, meta: taolib.symphony.orchestrator.state.RetryMeta) None#
安排一次重试。
对应规范 §8.4 重试和退避。
退避公式: - 续运重试(delay_type="continuation"):固定 1000 ms 延迟 - 故障重试(delay_type="failure"):delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)
retry_token 用于防止过期重试(参考 Elixir make_ref() 模式): 当同一 Issue 被重新调度时,旧计时器的 token 将不匹配, 避免过期回调触发重复分派。
- 参数:
issue_id -- 问题 ID
attempt -- 重试尝试序号
meta -- 重试调度元数据
- _compute_retry_delay_ms(attempt: int, is_continuation: bool) int#
计算重试延迟。
对应规范 §8.4 退避公式: - 续运重试:1000 ms 固定延迟 - 故障重试:delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)
- 参数:
attempt -- 重试尝试序号(从 1 开始)
is_continuation -- 是否为续运重试
- 返回:
延迟毫秒数
- async _on_retry_timer(issue_id: str, retry_token: uuid.UUID) None#
重试计时器触发回调。
对应规范 §16.6 on_retry_timer: 1. 验证 retry_token(防过期重试) 2. 获取活跃候选问题 3. 按 issue_id 查找特定问题 4. 如果未找到,释放声明 5. 如果找到且仍符合候选条件,分派 6. 如果找到但不再活跃,释放声明 7. 如果槽位不可用,以错误重新排队
- 参数:
issue_id -- 触发重试的问题 ID
retry_token -- 重试令牌,不匹配则跳过(防止过期重试)
- async _reconcile_stalled_runs() None#
停顿检测。
对应规范 §8.5 部分 A: - 对于每个运行中的问题,计算 elapsed_ms - 基于 last_codex_timestamp(如已看到任何事件),否则 started_at - 如果 elapsed_ms > stall_timeout_ms,终止 Worker 并排队重试 - 如果 stall_timeout_ms <= 0,跳过停顿检测
- async _reconcile_tracker_states() None#
跟踪器状态刷新。
对应规范 §8.5 部分 B 和 §16.3: - 获取所有运行中问题 ID 的当前问题状态 - 终态:终止 Worker 并清理工作区 - 活跃:更新内存中的问题快照 - 既非活跃也非终态:终止 Worker 但不清理工作区 - 如果状态刷新失败,保持 Worker 运行
- async _terminate_running_issue(issue_id: str, entry: taolib.symphony.orchestrator.state.RunningEntry, *, cleanup_workspace: bool) None#
终止运行中的问题。
- 参数:
issue_id -- 问题 ID
entry -- 运行条目
cleanup_workspace -- 是否清理工作区
- async _startup_terminal_cleanup() None#
启动终态工作区清理。
对应规范 §8.6: 1. 查询跟踪器中处于终态的问题 2. 移除对应的工作区目录 3. 如果终态问题获取失败,记录警告并继续启动
- _refresh_runtime_config() None#
从最新配置刷新运行时参数(支持 WORKFLOW.md 热重载)。
配置由 WorkflowStore/Watcher 管理热重载, 此方法将最新配置值同步到编排器状态。
- snapshot() dict[str, Any]#
可观测性:生成当前状态快照(供 HTTP API 和仪表板使用)。
委托 SnapshotGenerator 生成快照,包含: - 运行中的 worker 信息 - 重试队列条目 - 令牌汇总 - 配置参数 - 轮询状态
- _revalidate_issue_for_dispatch(issue: taolib.symphony.tracker.models.Issue) bool#
分派前重新验证 Issue 有效性(防竞态调度)。
在 _dispatch_issue 中调用,确保从 filter_dispatchable 到 实际分派之间状态未发生变化。
- 参数:
issue -- 待验证的问题
- 返回:
True 表示验证通过,可以分派
- _select_worker_host() str | None#
SSH 场景下最小负载主机选择。
从配置的 ssh_hosts 中选择当前运行 worker 数最少的主机。 本地执行时返回 None。
- 返回:
选中的主机地址,或 None 表示本地执行
- _validate_dispatch_config() None#
分派预检验证。
对应规范 §6.3: - tracker.kind 存在且受支持 - tracker.api_key 在 $ 解析后存在 - tracker.project_slug 存在(当 tracker.kind == "linear" 时) - codex.command 存在且非空
- 抛出:
DispatchValidationError -- 验证失败