taolib.symphony.orchestrator

目录

taolib.symphony.orchestrator#

Symphony 编排器子包。

提供编排引擎的核心实现,包括状态机、调度引擎和并发控制。

主要导出: - Orchestrator:编排引擎主类,负责轮询、对账、分派与重试 - OrchestratorState:编排器单一权威内存状态 - Scheduler:候选排序和并发控制

Submodules#

Exceptions#

DispatchValidationError

分派预检验证失败。

Classes#

Orchestrator

Symphony 编排引擎。

Scheduler

候选排序和并发控制。

OrchestratorState

编排器单一权威内存状态。

Package Contents#

exception taolib.symphony.orchestrator.DispatchValidationError(reasons: list[str])#

Bases: Exception

分派预检验证失败。

reasons#
class taolib.symphony.orchestrator.Orchestrator(config: taolib.symphony.config.schema.SymphonyConfig, tracker: taolib.symphony.tracker.base.TrackerClient, workspace_manager: taolib.symphony.workspace.manager.WorkspaceManager, agent_runner: Any, *, scheduler: taolib.symphony.orchestrator.scheduler.Scheduler | None = None)#

Symphony 编排引擎。

负责轮询 → 对账 → 分派 → 监控的完整生命周期。 单线程 asyncio 事件循环,所有状态变更串行化。

对应规范 §7(编排状态机)和 §8(轮询、调度和对账)。

用法:

config = SymphonyConfig()
tracker = LinearTrackerClient(...)
workspace_mgr = WorkspaceManager(...)
agent_runner = AgentRunner(...)

orch = Orchestrator(config, tracker, workspace_mgr, agent_runner)
await orch.run()  # 阻塞直到 shutdown
_config#
_tracker#
_workspace_manager#
_agent_runner#
_scheduler#
_running = False#
state#
async run() None#

服务主循环:启动验证 → 终态清理 → 首次 tick → 周期性 tick。

阻塞调用,直到 shutdown() 被调用。

对应规范 §16.1 服务启动: 1. 配置验证 2. 启动终态工作区清理 3. 即时首次 tick 4. 周期性 tick

async shutdown() None#

优雅关闭:停止新分派,等待活跃 worker,清理重试计时器。

对应规范 §14 操作员干预 — 重启服务场景。

async _tick() None#

轮询和分派节拍。

对应规范 §16.2 轮询和分派节拍: 1. 对账运行中的问题(停顿检测 + 跟踪器状态刷新) 2. 运行分派预检验证 3. 使用活跃状态从跟踪器获取候选问题 4. 按分派优先级排序问题 5. 在槽位可用时分派符合条件的问题

_dispatch_issue(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#

分派一个 issue,创建命名 worker task。

对应规范 §16.4 分派一个问题。

参数:
  • issue -- 待分派的问题

  • attempt -- 重试尝试序号(首次为 None)

async _run_worker(issue: taolib.symphony.tracker.models.Issue, attempt: int | None) None#

Worker 协程:运行智能体尝试。

对应规范 §16.5 Worker 尝试(工作区 + 提示词 + 智能体)。

参数:
  • issue -- 待处理的问题

  • attempt -- 重试尝试序号

_make_update_callback(issue_id: str)#

创建 Codex 事件更新回调。

回调在编排器事件循环线程中同步执行, 因此可以安全修改 state。

参数:

issue_id -- 问题 ID

返回:

回调函数

_on_worker_done(issue_id: str, task: asyncio.Task) None#

Worker 完成回调:续运重试或故障指数退避重试。

对应规范 §7.3 转换触发器 — Worker Exit: - 正常退出:安排续运重试(delay_type="continuation",1 秒延迟) - 异常退出:安排指数退避重试(delay_type="failure")

此回调在事件循环线程中执行,可安全修改 state。

参数:
  • issue_id -- 问题 ID

  • task -- 完成的 worker task

_schedule_retry(issue_id: str, attempt: int, meta: taolib.symphony.orchestrator.state.RetryMeta) None#

安排一次重试。

对应规范 §8.4 重试和退避。

退避公式: - 续运重试(delay_type="continuation"):固定 1000 ms 延迟 - 故障重试(delay_type="failure"):delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)

retry_token 用于防止过期重试(参考 Elixir make_ref() 模式): 当同一 Issue 被重新调度时,旧计时器的 token 将不匹配, 避免过期回调触发重复分派。

参数:
  • issue_id -- 问题 ID

  • attempt -- 重试尝试序号

  • meta -- 重试调度元数据

_compute_retry_delay_ms(attempt: int, is_continuation: bool) int#

计算重试延迟。

对应规范 §8.4 退避公式: - 续运重试:1000 ms 固定延迟 - 故障重试:delay = min(10000 * 2^(attempt-1), max_retry_backoff_ms)

参数:
  • attempt -- 重试尝试序号(从 1 开始)

  • is_continuation -- 是否为续运重试

返回:

延迟毫秒数

async _on_retry_timer(issue_id: str, retry_token: uuid.UUID) None#

重试计时器触发回调。

对应规范 §16.6 on_retry_timer: 1. 验证 retry_token(防过期重试) 2. 获取活跃候选问题 3. 按 issue_id 查找特定问题 4. 如果未找到,释放声明 5. 如果找到且仍符合候选条件,分派 6. 如果找到但不再活跃,释放声明 7. 如果槽位不可用,以错误重新排队

参数:
  • issue_id -- 触发重试的问题 ID

  • retry_token -- 重试令牌,不匹配则跳过(防止过期重试)

async _reconcile_running_issues() None#

对账活跃运行。

对应规范 §8.5 活跃运行对账,包含两个部分: A. 停顿检测 B. 跟踪器状态刷新

async _reconcile_stalled_runs() None#

停顿检测。

对应规范 §8.5 部分 A: - 对于每个运行中的问题,计算 elapsed_ms - 基于 last_codex_timestamp(如已看到任何事件),否则 started_at - 如果 elapsed_ms > stall_timeout_ms,终止 Worker 并排队重试 - 如果 stall_timeout_ms <= 0,跳过停顿检测

async _reconcile_tracker_states() None#

跟踪器状态刷新。

对应规范 §8.5 部分 B 和 §16.3: - 获取所有运行中问题 ID 的当前问题状态 - 终态:终止 Worker 并清理工作区 - 活跃:更新内存中的问题快照 - 既非活跃也非终态:终止 Worker 但不清理工作区 - 如果状态刷新失败,保持 Worker 运行

async _terminate_running_issue(issue_id: str, entry: taolib.symphony.orchestrator.state.RunningEntry, *, cleanup_workspace: bool) None#

终止运行中的问题。

参数:
  • issue_id -- 问题 ID

  • entry -- 运行条目

  • cleanup_workspace -- 是否清理工作区

async _startup_terminal_cleanup() None#

启动终态工作区清理。

对应规范 §8.6: 1. 查询跟踪器中处于终态的问题 2. 移除对应的工作区目录 3. 如果终态问题获取失败,记录警告并继续启动

_refresh_runtime_config() None#

从最新配置刷新运行时参数(支持 WORKFLOW.md 热重载)。

配置由 WorkflowStore/Watcher 管理热重载, 此方法将最新配置值同步到编排器状态。

snapshot() dict[str, Any]#

可观测性:生成当前状态快照(供 HTTP API 和仪表板使用)。

委托 SnapshotGenerator 生成快照,包含: - 运行中的 worker 信息 - 重试队列条目 - 令牌汇总 - 配置参数 - 轮询状态

_revalidate_issue_for_dispatch(issue: taolib.symphony.tracker.models.Issue) bool#

分派前重新验证 Issue 有效性(防竞态调度)。

在 _dispatch_issue 中调用,确保从 filter_dispatchable 到 实际分派之间状态未发生变化。

参数:

issue -- 待验证的问题

返回:

True 表示验证通过,可以分派

_select_worker_host() str | None#

SSH 场景下最小负载主机选择。

从配置的 ssh_hosts 中选择当前运行 worker 数最少的主机。 本地执行时返回 None。

返回:

选中的主机地址,或 None 表示本地执行

_validate_dispatch_config() None#

分派预检验证。

对应规范 §6.3: - tracker.kind 存在且受支持 - tracker.api_key 在 $ 解析后存在 - tracker.project_slug 存在(当 tracker.kind == "linear" 时) - codex.command 存在且非空

抛出:

DispatchValidationError -- 验证失败

class taolib.symphony.orchestrator.Scheduler#

候选排序和并发控制。

无状态的工具类,所有方法接受 state 参数进行判断。 对齐规范 §8.2(候选选择规则)和 §8.3(并发控制)。

sort_for_dispatch(issues: list[taolib.symphony.tracker.models.Issue]) list[taolib.symphony.tracker.models.Issue]#

按分派优先级排序候选问题列表。

排序键(稳定排序意图): 1. priority 升序(1..4 优先;null/未知排最后,映射为 999) 2. created_at 最旧优先(null 映射为 datetime.max) 3. identifier 字典序决胜

参数:

issues -- 候选问题列表

返回:

排序后的新列表(不修改原列表)

available_slots(state: taolib.symphony.orchestrator.state.OrchestratorState) int#

计算全局可用槽位。

参数:

state -- 编排器当前状态

返回:

可用槽位数(>= 0)

available_slots_for_state(issue_state: str, state: taolib.symphony.orchestrator.state.OrchestratorState, config: taolib.symphony.config.schema.AgentConfig) int#

计算指定状态的可用槽位。

当 max_concurrent_agents_by_state 中存在该状态的配置时, 使用该配置的上限;否则回退到全局限制。

状态键按小写规范化后查找(规范 §5.3.5)。

参数:
  • issue_state -- 问题跟踪器状态名称

  • state -- 编排器当前状态

  • config -- 智能体配置

返回:

该状态下的可用槽位数(>= 0)

should_dispatch(issue: taolib.symphony.tracker.models.Issue, state: taolib.symphony.orchestrator.state.OrchestratorState) bool#

判断是否应分派该 issue。

检查规则(规范 §8.2 候选选择规则): 1. issue 具有 id、identifier、title 和 state 2. 不已在 running 中 3. 不已在 claimed 中 4. 不已在 retry_attempts 中

注意:Todo 状态的阻塞规则检查需要额外的阻塞项状态信息, 此处基于 Issue.blocked_by(阻塞方 ID 列表)做保守判断—— 当 blocked_by 非空时,由编排器在分派前通过对账确认阻塞项状态。 全局和每状态槽位检查由调用方在循环中执行。

参数:
  • issue -- 待判断的候选问题

  • state -- 编排器当前状态

返回:

True 表示应分派

filter_dispatchable(issues: list[taolib.symphony.tracker.models.Issue], state: taolib.symphony.orchestrator.state.OrchestratorState, config: taolib.symphony.config.schema.AgentConfig) list[taolib.symphony.tracker.models.Issue]#

从候选列表中筛选可分派的问题,按优先级排序。

依次检查全局槽位、每状态槽位和分派资格, 返回在当前槽位限制下可分派的问题子列表。

参数:
  • issues -- 候选问题列表

  • state -- 编排器当前状态

  • config -- 智能体配置

返回:

可分派的问题列表(已排序,数量不超过可用槽位)

class taolib.symphony.orchestrator.OrchestratorState#

编排器单一权威内存状态。

对应规范 §4.1.8。所有状态变更由编排器串行化执行, 避免并发修改导致的重复分派或状态不一致。

poll_interval_ms#

当前有效轮询间隔

max_concurrent_agents#

当前有效全局并发限制

running#

issue_id -> RunningEntry 映射

claimed#

已保留/运行中/重试中的问题 ID 集合

retry_attempts#

issue_id -> RetryEntry 映射

completed#

已完成的问题 ID 集合(仅记录,不控制分派)

codex_totals#

聚合令牌 + 运行秒数

codex_rate_limits#

来自智能体事件的最新速率限制快照

poll_interval_ms: int = 30000#
max_concurrent_agents: int = 10#
poll_check_in_progress: bool = False#
next_poll_due_at_ms: float | None = None#
running: dict[str, RunningEntry]#
claimed: set[str]#
retry_attempts: dict[str, RetryEntry]#
completed: set[str]#
codex_totals: CodexTotals#
codex_rate_limits: dict[str, Any] | None = None#