taolib.testing.config_center.server.websocket.models

目录

taolib.testing.config_center.server.websocket.models#

推送服务数据模型模块。

定义消息、连接信息、在线状态等核心数据类型。

Classes#

MessagePriority

消息优先级。

ConnectionStatus

连接状态。

MessageType

消息类型。

PushMessage

推送消息。

ConnectionInfo

WebSocket 连接上下文信息。

ConnectionStats

连接统计信息。

UserPresence

用户在线状态。

Module Contents#

class taolib.testing.config_center.server.websocket.models.MessagePriority#

Bases: enum.StrEnum

消息优先级。

HIGH = 'high'#
NORMAL = 'normal'#
LOW = 'low'#
class taolib.testing.config_center.server.websocket.models.ConnectionStatus#

Bases: enum.StrEnum

连接状态。

ONLINE = 'online'#
OFFLINE = 'offline'#
RECONNECTING = 'reconnecting'#
class taolib.testing.config_center.server.websocket.models.MessageType#

Bases: enum.StrEnum

消息类型。

PUSH = 'push'#
ACK = 'ack'#
HEARTBEAT = 'heartbeat'#
PING = 'ping'#
PONG = 'pong'#
SUBSCRIBE = 'subscribe'#
UNSUBSCRIBE = 'unsubscribe'#
ERROR = 'error'#
SYSTEM = 'system'#
CONFIG_CHANGED = 'config_changed'#
class taolib.testing.config_center.server.websocket.models.PushMessage#

推送消息。

id#

唯一消息 ID

channel#

目标频道

event_type#

事件类型

data#

消息负载

priority#

优先级

timestamp#

创建时间戳

requires_ack#

是否需要客户端确认

retry_count#

当前重试次数

max_retries#

最大重试次数

sender_id#

发送者标识(服务实例 ID)

channel: str#
event_type: str#
data: dict[str, Any]#
id: str#
priority: MessagePriority#
timestamp: datetime.datetime#
requires_ack: bool = False#
retry_count: int = 0#
max_retries: int = 3#
sender_id: str = ''#
to_dict() dict[str, Any]#

序列化为可发送的字典格式。

classmethod from_dict(raw: dict[str, Any]) PushMessage#

从字典反序列化。

class taolib.testing.config_center.server.websocket.models.ConnectionInfo#

WebSocket 连接上下文信息。

user_id#

用户唯一标识

connected_at#

连接建立时间

last_heartbeat#

最后心跳时间

status#

当前连接状态

channels#

已订阅频道集合

pending_acks#

等待确认的消息 (message_id -> PushMessage)

message_buffer#

离线消息缓冲队列

metadata#

额外上下文(环境、服务列表等)

user_id: str#
connected_at: datetime.datetime#
last_heartbeat: datetime.datetime#
status: ConnectionStatus#
channels: set[str]#
pending_acks: dict[str, PushMessage]#
message_buffer: collections.deque[PushMessage]#
metadata: dict[str, Any]#
class taolib.testing.config_center.server.websocket.models.ConnectionStats#

连接统计信息。

total_connections: int = 0#
active_connections: int = 0#
total_channels: int = 0#
total_messages_sent: int = 0#
total_messages_failed: int = 0#
total_acks_received: int = 0#
total_acks_timeout: int = 0#
uptime_seconds: float = 0.0#
online_users: int = 0#
reconnecting_users: int = 0#
to_dict() dict[str, Any]#

序列化为字典。

class taolib.testing.config_center.server.websocket.models.UserPresence#

用户在线状态。

user_id: str#
status: ConnectionStatus#
last_seen: datetime.datetime#
connection_count: int = 0#
active_channels: list[str] = []#
to_dict() dict[str, Any]#

序列化为字典。