taolib.symphony.orchestrator#
Symphony 编排器子包。
提供编排引擎的核心实现,包括状态机、调度引擎和并发控制。
主要导出: - Orchestrator:编排引擎主类,负责轮询、对账、分派与重试 - OrchestratorState:编排器单一权威内存状态 - Scheduler:候选排序和并发控制
Submodules#
Exceptions#
分派预检验证失败。 |
Classes#
Symphony 编排引擎。 |
|
候选排序和并发控制。 |
|
编排器单一权威内存状态。 |
Package Contents#
- exception taolib.symphony.orchestrator.DispatchValidationError(reasons: list[str])#
Bases:
Exception分派预检验证失败。
- reasons#
- class taolib.symphony.orchestrator.Orchestrator(config: taolib.symphony.config.schema.SymphonyConfig, tracker: taolib.symphony.tracker.base.TrackerClient, workspace_manager: taolib.symphony.workspace.manager.WorkspaceManager, agent_runner: Any, *, scheduler: taolib.symphony.orchestrator.scheduler.Scheduler | None = None)#
Symphony 编排引擎。
负责轮询 → 对账 → 分派 → 监控的完整生命周期。 单线程 asyncio 事件循环,所有状态变更串行化。
对应规范 §7(编排状态机)和 §8(轮询、调度和对账)。
用法:
config = SymphonyConfig() tracker = LinearTrackerClient(...) workspace_mgr = WorkspaceManager(...) agent_runner = AgentRunner(...) orch = Orchestrator(config, tracker, workspace_mgr, agent_runner) await orch.run() # 阻塞直到 shutdown
- _config#
- _tracker#
- _workspace_manager#
- _agent_runner#
- _scheduler#
- _running = False#
- state#
- async run() None#
服务主循环:启动验证 → 终态清理 → 首次 tick → 周期性 tick。
阻塞调用,直到 shutdown() 被调用。
对应规范 §16.1 服务启动: 1. 配置验证 2. 启动终态工作区清理 3. 即时首次 tick 4. 周期性 tick
- async _tick() None#
轮询和分派节拍。
对应规范 §16.2 轮询和分派节拍: 1. 对账运行中的问题(停顿检测 + 跟踪器状态刷新) 2. 运行分派预检验证 3. 使用活跃状态从跟踪器获取候选问题 4. 按分派优先级排序问题 5. 在槽位可用时分派符合条件的问题
- _dispatch_issue(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#
分派一个 issue,创建命名 worker task。
对应规范 §16.4 分派一个问题。
- 参数:
issue -- 待分派的问题
attempt -- 重试尝试序号(首次为 None)
- async _run_worker(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#
Worker 协程:运行智能体尝试。
对应规范 §16.5 Worker 尝试(工作区 + 提示词 + 智能体)。
- 参数:
issue -- 待处理的问题
attempt -- 重试尝试序号
- _make_update_callback(issue_id: str)#
创建 Codex 事件更新回调。
回调在编排器事件循环线程中同步执行, 因此可以安全修改 state。
- 参数:
issue_id -- 问题 ID
- 返回:
回调函数
- _on_worker_done(issue_id: str, task: asyncio.Task) None#
Worker 完成回调:续运重试或故障指数退避重试。
对应规范 §7.3 转换触发器 — Worker Exit: - 正常退出:安排续运重试(delay_type="continuation",1 秒延迟) - 异常退出:安排指数退避重试(delay_type="failure")
此回调在事件循环线程中执行,可安全修改 state。
- 参数:
issue_id -- 问题 ID
task -- 完成的 worker task
- _schedule_retry(issue_id: str, attempt: int, meta: taolib.symphony.orchestrator.state.RetryMeta) None#
安排一次重试。
对应规范 §8.4 重试和退避。
退避公式: - 续运重试(delay_type="continuation"):固定 1000 ms 延迟 - 故障重试(delay_type="failure"):delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)
retry_token 用于防止过期重试(参考 Elixir make_ref() 模式): 当同一 Issue 被重新调度时,旧计时器的 token 将不匹配, 避免过期回调触发重复分派。
- 参数:
issue_id -- 问题 ID
attempt -- 重试尝试序号
meta -- 重试调度元数据
- _compute_retry_delay_ms(attempt: int, is_continuation: bool) int#
计算重试延迟。
对应规范 §8.4 退避公式: - 续运重试:1000 ms 固定延迟 - 故障重试:delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)
- 参数:
attempt -- 重试尝试序号(从 1 开始)
is_continuation -- 是否为续运重试
- 返回:
延迟毫秒数
- async _on_retry_timer(issue_id: str, retry_token: uuid.UUID) None#
重试计时器触发回调。
对应规范 §16.6 on_retry_timer: 1. 验证 retry_token(防过期重试) 2. 获取活跃候选问题 3. 按 issue_id 查找特定问题 4. 如果未找到,释放声明 5. 如果找到且仍符合候选条件,分派 6. 如果找到但不再活跃,释放声明 7. 如果槽位不可用,以错误重新排队
- 参数:
issue_id -- 触发重试的问题 ID
retry_token -- 重试令牌,不匹配则跳过(防止过期重试)
- async _reconcile_stalled_runs() None#
停顿检测。
对应规范 §8.5 部分 A: - 对于每个运行中的问题,计算 elapsed_ms - 基于 last_codex_timestamp(如已看到任何事件),否则 started_at - 如果 elapsed_ms > stall_timeout_ms,终止 Worker 并排队重试 - 如果 stall_timeout_ms <= 0,跳过停顿检测
- async _reconcile_tracker_states() None#
跟踪器状态刷新。
对应规范 §8.5 部分 B 和 §16.3: - 获取所有运行中问题 ID 的当前问题状态 - 终态:终止 Worker 并清理工作区 - 活跃:更新内存中的问题快照 - 既非活跃也非终态:终止 Worker 但不清理工作区 - 如果状态刷新失败,保持 Worker 运行
- async _terminate_running_issue(issue_id: str, entry: taolib.symphony.orchestrator.state.RunningEntry, *, cleanup_workspace: bool) None#
终止运行中的问题。
- 参数:
issue_id -- 问题 ID
entry -- 运行条目
cleanup_workspace -- 是否清理工作区
- async _startup_terminal_cleanup() None#
启动终态工作区清理。
对应规范 §8.6: 1. 查询跟踪器中处于终态的问题 2. 移除对应的工作区目录 3. 如果终态问题获取失败,记录警告并继续启动
- _refresh_runtime_config() None#
从最新配置刷新运行时参数(支持 WORKFLOW.md 热重载)。
配置由 WorkflowStore/Watcher 管理热重载, 此方法将最新配置值同步到编排器状态。
- snapshot() dict[str, Any]#
可观测性:生成当前状态快照(供 HTTP API 和仪表板使用)。
委托 SnapshotGenerator 生成快照,包含: - 运行中的 worker 信息 - 重试队列条目 - 令牌汇总 - 配置参数 - 轮询状态
- _revalidate_issue_for_dispatch(issue: taolib.symphony.tracker.models.Issue) bool#
分派前重新验证 Issue 有效性(防竞态调度)。
在 _dispatch_issue 中调用,确保从 filter_dispatchable 到 实际分派之间状态未发生变化。
- 参数:
issue -- 待验证的问题
- 返回:
True 表示验证通过,可以分派
- _select_worker_host() str | None#
SSH 场景下最小负载主机选择。
从配置的 ssh_hosts 中选择当前运行 worker 数最少的主机。 本地执行时返回 None。
- 返回:
选中的主机地址,或 None 表示本地执行
- _validate_dispatch_config() None#
分派预检验证。
对应规范 §6.3: - tracker.kind 存在且受支持 - tracker.api_key 在 $ 解析后存在 - tracker.project_slug 存在(当 tracker.kind == "linear" 时) - codex.command 存在且非空
- 抛出:
DispatchValidationError -- 验证失败
- class taolib.symphony.orchestrator.Scheduler#
候选排序和并发控制。
无状态的工具类,所有方法接受 state 参数进行判断。 对齐规范 §8.2(候选选择规则)和 §8.3(并发控制)。
- sort_for_dispatch(issues: list[taolib.symphony.tracker.models.Issue]) list[taolib.symphony.tracker.models.Issue]#
按分派优先级排序候选问题列表。
排序键(稳定排序意图): 1. priority 升序(1..4 优先;null/未知排最后,映射为 999) 2. created_at 最旧优先(null 映射为 datetime.max) 3. identifier 字典序决胜
- 参数:
issues -- 候选问题列表
- 返回:
排序后的新列表(不修改原列表)
- available_slots(state: taolib.symphony.orchestrator.state.OrchestratorState) int#
计算全局可用槽位。
- 参数:
state -- 编排器当前状态
- 返回:
可用槽位数(>= 0)
- available_slots_for_state(issue_state: str, state: taolib.symphony.orchestrator.state.OrchestratorState, config: taolib.symphony.config.schema.AgentConfig) int#
计算指定状态的可用槽位。
当 max_concurrent_agents_by_state 中存在该状态的配置时, 使用该配置的上限;否则回退到全局限制。
状态键按小写规范化后查找(规范 §5.3.5)。
- 参数:
issue_state -- 问题跟踪器状态名称
state -- 编排器当前状态
config -- 智能体配置
- 返回:
该状态下的可用槽位数(>= 0)
- should_dispatch(issue: taolib.symphony.tracker.models.Issue, state: taolib.symphony.orchestrator.state.OrchestratorState) bool#
判断是否应分派该 issue。
检查规则(规范 §8.2 候选选择规则): 1. issue 具有 id、identifier、title 和 state 2. 不已在 running 中 3. 不已在 claimed 中 4. 不已在 retry_attempts 中
注意:Todo 状态的阻塞规则检查需要额外的阻塞项状态信息, 此处基于 Issue.blocked_by(阻塞方 ID 列表)做保守判断—— 当 blocked_by 非空时,由编排器在分派前通过对账确认阻塞项状态。 全局和每状态槽位检查由调用方在循环中执行。
- 参数:
issue -- 待判断的候选问题
state -- 编排器当前状态
- 返回:
True 表示应分派
- filter_dispatchable(issues: list[taolib.symphony.tracker.models.Issue], state: taolib.symphony.orchestrator.state.OrchestratorState, config: taolib.symphony.config.schema.AgentConfig) list[taolib.symphony.tracker.models.Issue]#
从候选列表中筛选可分派的问题,按优先级排序。
依次检查全局槽位、每状态槽位和分派资格, 返回在当前槽位限制下可分派的问题子列表。
- 参数:
issues -- 候选问题列表
state -- 编排器当前状态
config -- 智能体配置
- 返回:
可分派的问题列表(已排序,数量不超过可用槽位)
- class taolib.symphony.orchestrator.OrchestratorState#
编排器单一权威内存状态。
对应规范 §4.1.8。所有状态变更由编排器串行化执行, 避免并发修改导致的重复分派或状态不一致。
- poll_interval_ms#
当前有效轮询间隔
- max_concurrent_agents#
当前有效全局并发限制
- running#
issue_id -> RunningEntry 映射
- claimed#
已保留/运行中/重试中的问题 ID 集合
- retry_attempts#
issue_id -> RetryEntry 映射
- completed#
已完成的问题 ID 集合(仅记录,不控制分派)
- codex_totals#
聚合令牌 + 运行秒数
- codex_rate_limits#
来自智能体事件的最新速率限制快照
- running: dict[str, RunningEntry]#
- retry_attempts: dict[str, RetryEntry]#
- codex_totals: CodexTotals#