taolib.testing.multi_agent.models.protocol#

智能体间通信协议。

定义智能体之间通信的标准协议和消息格式。

Classes#

ProtocolVersion

协议版本。

ProtocolHeader

协议头部。

ProtocolMessage

协议消息。

TaskRequestPayload

任务请求载荷。

TaskResponsePayload

任务响应载荷。

SkillRequestPayload

技能请求载荷。

SkillResponsePayload

技能响应载荷。

StatusUpdatePayload

状态更新载荷。

HeartbeatPayload

心跳载荷。

Functions#

create_task_request(→ ProtocolMessage)

创建任务请求消息。

create_task_response(→ ProtocolMessage)

创建任务响应消息。

create_heartbeat(→ ProtocolMessage)

创建心跳消息。

Module Contents#

class taolib.testing.multi_agent.models.protocol.ProtocolVersion#

Bases: enum.StrEnum

协议版本。

V1 = '1.0.0'#
class taolib.testing.multi_agent.models.protocol.ProtocolHeader#

Bases: pydantic.BaseModel

协议头部。

version: ProtocolVersion#
message_id: str#
timestamp: datetime.datetime#
ttl: int#
class taolib.testing.multi_agent.models.protocol.ProtocolMessage#

Bases: pydantic.BaseModel

协议消息。

header: ProtocolHeader#
message_type: taolib.testing.multi_agent.models.enums.MessageType#
sender: str#
receiver: str | None#
body: dict[str, Any]#
metadata: dict[str, Any]#
is_expired() bool#

检查消息是否已过期。

to_dict() dict[str, Any]#

转换为字典格式。

classmethod from_dict(data: dict[str, Any]) ProtocolMessage#

从字典创建协议消息。

class taolib.testing.multi_agent.models.protocol.TaskRequestPayload#

Bases: pydantic.BaseModel

任务请求载荷。

task_id: str#
task_description: str#
required_skills: list[str]#
parameters: dict[str, Any]#
deadline: datetime.datetime | None#
class taolib.testing.multi_agent.models.protocol.TaskResponsePayload#

Bases: pydantic.BaseModel

任务响应载荷。

task_id: str#
status: str#
result: dict[str, Any] | None#
error: str | None#
progress: float#
class taolib.testing.multi_agent.models.protocol.SkillRequestPayload#

Bases: pydantic.BaseModel

技能请求载荷。

skill_id: str#
skill_name: str#
input_data: dict[str, Any]#
class taolib.testing.multi_agent.models.protocol.SkillResponsePayload#

Bases: pydantic.BaseModel

技能响应载荷。

skill_id: str#
success: bool#
output_data: dict[str, Any] | None#
error: str | None#
class taolib.testing.multi_agent.models.protocol.StatusUpdatePayload#

Bases: pydantic.BaseModel

状态更新载荷。

agent_id: str#
status: str#
current_task: str | None#
resources: dict[str, Any]#
class taolib.testing.multi_agent.models.protocol.HeartbeatPayload#

Bases: pydantic.BaseModel

心跳载荷。

agent_id: str#
timestamp: datetime.datetime#
load: float#
status: str#
taolib.testing.multi_agent.models.protocol.create_task_request(sender: str, receiver: str | None, task_id: str, task_description: str, required_skills: list[str] | None = None, parameters: dict[str, Any] | None = None, deadline: datetime.datetime | None = None) ProtocolMessage#

创建任务请求消息。

taolib.testing.multi_agent.models.protocol.create_task_response(sender: str, receiver: str, task_id: str, status: str, result: dict[str, Any] | None = None, error: str | None = None, progress: float = 0.0) ProtocolMessage#

创建任务响应消息。

taolib.testing.multi_agent.models.protocol.create_heartbeat(agent_id: str, load: float = 0.0, status: str = 'idle') ProtocolMessage#

创建心跳消息。