taolib.testing.multi_agent.models.protocol#
智能体间通信协议。
定义智能体之间通信的标准协议和消息格式。
Classes#
协议版本。 |
|
协议头部。 |
|
协议消息。 |
|
任务请求载荷。 |
|
任务响应载荷。 |
|
技能请求载荷。 |
|
技能响应载荷。 |
|
状态更新载荷。 |
|
心跳载荷。 |
Functions#
|
创建任务请求消息。 |
|
创建任务响应消息。 |
|
创建心跳消息。 |
Module Contents#
- class taolib.testing.multi_agent.models.protocol.ProtocolVersion#
Bases:
enum.StrEnum协议版本。
- V1 = '1.0.0'#
- class taolib.testing.multi_agent.models.protocol.ProtocolHeader#
Bases:
pydantic.BaseModel协议头部。
- version: ProtocolVersion#
- timestamp: datetime.datetime#
- class taolib.testing.multi_agent.models.protocol.ProtocolMessage#
Bases:
pydantic.BaseModel协议消息。
- header: ProtocolHeader#
- message_type: taolib.testing.multi_agent.models.enums.MessageType#
- classmethod from_dict(data: dict[str, Any]) ProtocolMessage#
从字典创建协议消息。
- class taolib.testing.multi_agent.models.protocol.TaskRequestPayload#
Bases:
pydantic.BaseModel任务请求载荷。
- deadline: datetime.datetime | None#
- class taolib.testing.multi_agent.models.protocol.TaskResponsePayload#
Bases:
pydantic.BaseModel任务响应载荷。
- class taolib.testing.multi_agent.models.protocol.SkillRequestPayload#
Bases:
pydantic.BaseModel技能请求载荷。
- class taolib.testing.multi_agent.models.protocol.SkillResponsePayload#
Bases:
pydantic.BaseModel技能响应载荷。
- class taolib.testing.multi_agent.models.protocol.StatusUpdatePayload#
Bases:
pydantic.BaseModel状态更新载荷。
- class taolib.testing.multi_agent.models.protocol.HeartbeatPayload#
Bases:
pydantic.BaseModel心跳载荷。
- timestamp: datetime.datetime#
- taolib.testing.multi_agent.models.protocol.create_task_request(sender: str, receiver: str | None, task_id: str, task_description: str, required_skills: list[str] | None = None, parameters: dict[str, Any] | None = None, deadline: datetime.datetime | None = None) ProtocolMessage#
创建任务请求消息。
- taolib.testing.multi_agent.models.protocol.create_task_response(sender: str, receiver: str, task_id: str, status: str, result: dict[str, Any] | None = None, error: str | None = None, progress: float = 0.0) ProtocolMessage#
创建任务响应消息。
- taolib.testing.multi_agent.models.protocol.create_heartbeat(agent_id: str, load: float = 0.0, status: str = 'idle') ProtocolMessage#
创建心跳消息。