taolib.harness.runtime.scheduler#
混合调度器 - 管理 LangGraph 微任务和 Metaflow 宏任务的优先级路由。
调度策略:
短时/对话型 任务(如 LLM 单步调用)路由到 LangGraph 通道;
长时/批处理 任务(如向量索引、数据预处理)卸载到 Metaflow 通道。
支持基于优先级队列的任务出队、基于 depends_on 的拓扑排序与基于
asyncio.Semaphore 的并发上限控制(限制 LLM API QPS)。
Classes#
混合调度器。 |
|
可调度任务统一抽象。 |
|
调度器配置。 |
|
任务描述符 - |
|
任务优先级(数值越小优先级越高)。 |
Module Contents#
- class taolib.harness.runtime.scheduler.HybridScheduler(executor: taolib.harness.runtime.executor.UnifiedExecutor | None = None, config: SchedulerConfig | None = None)#
混合调度器。
将提交的任务排序后依次执行,遵守依赖、优先级与并发上限。 通过
submit()注册任务,通过run_all()等待全部完成。- on_complete(listener: collections.abc.Callable[[str, taolib.harness.runtime.executor.ExecutionResult], collections.abc.Awaitable[None] | None]) None#
注册任务完成回调。
- async run_all(*, base_context: taolib.harness.runtime.executor.ExecutionContext | None = None) dict[str, taolib.harness.runtime.executor.ExecutionResult]#
运行已提交的全部任务,按拓扑顺序与优先级调度。
- submit(task: SchedulableTask) str#
提交任务到调度器。
- submit_many(tasks: collections.abc.Iterable[SchedulableTask]) list[str]#
批量提交任务。
- property results: dict[str, taolib.harness.runtime.executor.ExecutionResult]#
已完成任务的结果映射。
- class taolib.harness.runtime.scheduler.SchedulableTask#
Bases:
Protocol可调度任务统一抽象。
实现者需暴露
task_id、目标、后端类型、优先级与依赖列表。- async run(executor: taolib.harness.runtime.executor.UnifiedExecutor, context: taolib.harness.runtime.executor.ExecutionContext) taolib.harness.runtime.executor.ExecutionResult#
- priority: TaskPriority#
- class taolib.harness.runtime.scheduler.SchedulerConfig#
Bases:
pydantic.BaseModel调度器配置。
- model_config#
- class taolib.harness.runtime.scheduler.TaskDescriptor#
Bases:
pydantic.BaseModel任务描述符 -
SchedulableTask的标准实现。- async run(executor: taolib.harness.runtime.executor.UnifiedExecutor, context: taolib.harness.runtime.executor.ExecutionContext) taolib.harness.runtime.executor.ExecutionResult#
通过统一执行器运行该任务。
- model_config#
- priority: TaskPriority#
- target: Any = None#