taolib.harness.core.registry#

注册表 - 管理 Agent 图和 Metaflow Flow 的注册、发现和生命周期。

本模块提供三个层次的注册能力:

注册条目通过 RegistryEntry 描述元数据(版本、标签、描述等), 支持装饰器风格的便捷注册:@register_agent(name=..., version=...)

Classes#

AgentRegistry

LangGraph Agent / 子图定义注册表。

FlowRegistry

Metaflow Flow 模板注册表。

RegistryEntry

注册表条目元数据。

UnifiedRegistry

统一注册表 - 组合 AgentRegistryFlowRegistry

Functions#

register_agent(, description, registry, overwrite, T])

装饰器:将被装饰对象注册为 LangGraph Agent。

register_flow(, description, registry, overwrite, T])

装饰器:将被装饰对象注册为 Metaflow Flow。

Module Contents#

class taolib.harness.core.registry.AgentRegistry#

Bases: _BaseRegistry[Any]

LangGraph Agent / 子图定义注册表。

get_component(name: str, version: str | None = None) Any#

直接返回注册条目中的 component 对象。

class taolib.harness.core.registry.FlowRegistry#

Bases: _BaseRegistry[Any]

Metaflow Flow 模板注册表。

get_component(name: str, version: str | None = None) Any#

直接返回注册条目中的 component 对象。

class taolib.harness.core.registry.RegistryEntry#

Bases: pydantic.BaseModel, Generic[T]

注册表条目元数据。

component: Any#
description: str = ''#
model_config#
name: str#
tags: tuple[str, Ellipsis]#
version: str#
class taolib.harness.core.registry.UnifiedRegistry(*, agents: AgentRegistry | None = None, flows: FlowRegistry | None = None)#

统一注册表 - 组合 AgentRegistryFlowRegistry

通过 discover() 可在两侧同时检索;通过 agents()/flows() 访问各自的子注册表。

discover(*, tags: collections.abc.Iterable[str] | None = None, name_prefix: str | None = None) dict[str, list[RegistryEntry[Any]]]#

跨注册表发现组件,返回按类型分组的结果。

stats() dict[str, int]#

返回各子注册表的条目数量。

property agents: AgentRegistry#

Agent 子注册表。

property flows: FlowRegistry#

Flow 子注册表。

taolib.harness.core.registry.register_agent(*, name: str, version: str = '0.1.0', tags: collections.abc.Iterable[str] = (), description: str = '', registry: UnifiedRegistry | None = None, overwrite: bool = False) collections.abc.Callable[[T], T]#

装饰器:将被装饰对象注册为 LangGraph Agent。

Example:

@register_agent(name="planner", version="1.0.0", tags=("plan",))
def build_planner_graph(): ...
taolib.harness.core.registry.register_flow(*, name: str, version: str = '0.1.0', tags: collections.abc.Iterable[str] = (), description: str = '', registry: UnifiedRegistry | None = None, overwrite: bool = False) collections.abc.Callable[[T], T]#

装饰器:将被装饰对象注册为 Metaflow Flow。