taolib.harness.runtime.scheduler#

混合调度器 - 管理 LangGraph 微任务和 Metaflow 宏任务的优先级路由。

调度策略:

  • 短时/对话型 任务(如 LLM 单步调用)路由到 LangGraph 通道;

  • 长时/批处理 任务(如向量索引、数据预处理)卸载到 Metaflow 通道。

支持基于优先级队列的任务出队、基于 depends_on 的拓扑排序与基于 asyncio.Semaphore 的并发上限控制(限制 LLM API QPS)。

Classes#

HybridScheduler

混合调度器。

SchedulableTask

可调度任务统一抽象。

SchedulerConfig

调度器配置。

TaskDescriptor

任务描述符 - SchedulableTask 的标准实现。

TaskPriority

任务优先级(数值越小优先级越高)。

Module Contents#

class taolib.harness.runtime.scheduler.HybridScheduler(executor: taolib.harness.runtime.executor.UnifiedExecutor | None = None, config: SchedulerConfig | None = None)#

混合调度器。

将提交的任务排序后依次执行,遵守依赖、优先级与并发上限。 通过 submit() 注册任务,通过 run_all() 等待全部完成。

failed() list[str]#

失败任务 ID 列表。

on_complete(listener: collections.abc.Callable[[str, taolib.harness.runtime.executor.ExecutionResult], collections.abc.Awaitable[None] | None]) None#

注册任务完成回调。

async run_all(*, base_context: taolib.harness.runtime.executor.ExecutionContext | None = None) dict[str, taolib.harness.runtime.executor.ExecutionResult]#

运行已提交的全部任务,按拓扑顺序与优先级调度。

submit(task: SchedulableTask) str#

提交任务到调度器。

submit_many(tasks: collections.abc.Iterable[SchedulableTask]) list[str]#

批量提交任务。

succeeded() list[str]#

成功任务 ID 列表。

property results: dict[str, taolib.harness.runtime.executor.ExecutionResult]#

已完成任务的结果映射。

class taolib.harness.runtime.scheduler.SchedulableTask#

Bases: Protocol

可调度任务统一抽象。

实现者需暴露 task_id、目标、后端类型、优先级与依赖列表。

async run(executor: taolib.harness.runtime.executor.UnifiedExecutor, context: taolib.harness.runtime.executor.ExecutionContext) taolib.harness.runtime.executor.ExecutionResult#
backend: taolib.harness.runtime.executor.ExecutorBackend#
depends_on: tuple[str, Ellipsis]#
priority: TaskPriority#
task_id: str#
class taolib.harness.runtime.scheduler.SchedulerConfig#

Bases: pydantic.BaseModel

调度器配置。

auto_route: bool#
long_task_threshold_seconds: float#
max_concurrency: int#
max_llm_concurrency: int#
model_config#
class taolib.harness.runtime.scheduler.TaskDescriptor#

Bases: pydantic.BaseModel

任务描述符 - SchedulableTask 的标准实现。

async run(executor: taolib.harness.runtime.executor.UnifiedExecutor, context: taolib.harness.runtime.executor.ExecutionContext) taolib.harness.runtime.executor.ExecutionResult#

通过统一执行器运行该任务。

backend: taolib.harness.runtime.executor.ExecutorBackend#
depends_on: tuple[str, Ellipsis]#
estimated_duration_seconds: float | None#
inputs: dict[str, Any]#
model_config#
name: str = ''#
priority: TaskPriority#
requires_llm: bool#
target: Any = None#
task_id: str#
class taolib.harness.runtime.scheduler.TaskPriority#

Bases: enum.IntEnum

任务优先级(数值越小优先级越高)。

Initialize self. See help(type(self)) for accurate signature.

BACKGROUND = 1000#
CRITICAL = 0#
HIGH = 10#
LOW = 100#
NORMAL = 50#