taolib.testing.config_center.server.websocket#
WebSocket 实时推送模块。
提供 WebSocket 连接管理、Redis PubSub 桥接、心跳检测、 离线消息缓冲和在线状态追踪等完整的实时推送基础设施。
Submodules#
- taolib.testing.config_center.server.websocket.heartbeat
- taolib.testing.config_center.server.websocket.manager
- taolib.testing.config_center.server.websocket.message_buffer
- taolib.testing.config_center.server.websocket.models
- taolib.testing.config_center.server.websocket.presence
- taolib.testing.config_center.server.websocket.protocols
- taolib.testing.config_center.server.websocket.pubsub_bridge
Classes#
高性能 WebSocket 连接管理器。 |
|
WebSocket 连接上下文信息。 |
|
连接统计信息。 |
|
连接状态。 |
|
消息优先级。 |
|
消息类型。 |
|
推送消息。 |
|
用户在线状态。 |
|
WebSocket 连接管理器协议。 |
|
离线消息缓冲协议。 |
|
用户在线状态追踪协议。 |
|
Redis PubSub 桥接器。 |
Package Contents#
- class taolib.testing.config_center.server.websocket.WebSocketConnectionManager(presence_tracker: taolib.testing.config_center.server.websocket.protocols.PresenceTrackerProtocol | None = None, message_buffer: taolib.testing.config_center.server.websocket.protocols.MessageBufferProtocol | None = None, *, instance_id: str = '', heartbeat_interval: int = 30, heartbeat_timeout: int = 70, ack_timeout: int = 10, max_retries: int = 3)#
高性能 WebSocket 连接管理器。
核心职责: - 连接池管理:accept / disconnect / 多设备支持 - 频道订阅:subscribe / unsubscribe / broadcast - 消息投递:优先级排序、ACK 追踪、超时重传 - 心跳检测:通过 HeartbeatMonitor 检测僵尸连接 - 在线状态:通过 PresenceTracker 跨实例同步 - 离线缓冲:通过 MessageBuffer 保证 at-least-once 投递 - 统计监控:连接数 / 消息数 / ACK 统计
- _connections: dict[fastapi.WebSocket, taolib.testing.config_center.server.websocket.models.ConnectionInfo]#
- _presence = None#
- _buffer = None#
- _instance_id = ''#
- _ack_timeout = 10#
- _max_retries = 3#
- _stats#
- _started_at#
- _heartbeat#
- _ack_task: asyncio.Task[None] | None = None#
- _lock#
- async connect(websocket: fastapi.WebSocket, user_id: str, environments: list[str] | None = None, services: list[str] | None = None, metadata: dict[str, Any] | None = None) None#
接受 WebSocket 连接并初始化。
- 参数:
websocket -- WebSocket 连接
user_id -- 用户 ID
environments -- 订阅的环境列表
services -- 订阅的服务列表
metadata -- 额外上下文信息
- async broadcast(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#
向频道所有订阅者广播消息。
- 返回:
成功投递的连接数
- async send_to_user(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#
向特定用户的所有连接发送消息。
若用户离线,消息存入离线缓冲。
- 返回:
成功投递的连接数
- async _deliver_message(ws: fastapi.WebSocket, info: taolib.testing.config_center.server.websocket.models.ConnectionInfo, msg: taolib.testing.config_center.server.websocket.models.PushMessage) bool#
原子消息投递:发送 + ACK 追踪。
- async handle_client_message(websocket: fastapi.WebSocket, raw_data: str) None#
处理来自客户端的消息。
支持的消息类型:ack, pong, subscribe, unsubscribe
- get_stats() taolib.testing.config_center.server.websocket.models.ConnectionStats#
获取当前连接统计快照。
- get_user_presence(user_id: str) taolib.testing.config_center.server.websocket.models.UserPresence | None#
获取用户本地连接的在线状态。
- class taolib.testing.config_center.server.websocket.ConnectionInfo#
WebSocket 连接上下文信息。
- user_id#
用户唯一标识
- connected_at#
连接建立时间
- last_heartbeat#
最后心跳时间
- status#
当前连接状态
- channels#
已订阅频道集合
- pending_acks#
等待确认的消息 (message_id -> PushMessage)
- message_buffer#
离线消息缓冲队列
- metadata#
额外上下文(环境、服务列表等)
- connected_at: datetime.datetime#
- last_heartbeat: datetime.datetime#
- status: ConnectionStatus#
- pending_acks: dict[str, PushMessage]#
- message_buffer: collections.deque[PushMessage]#
- class taolib.testing.config_center.server.websocket.ConnectionStats#
连接统计信息。
- class taolib.testing.config_center.server.websocket.ConnectionStatus#
Bases:
enum.StrEnum连接状态。
- ONLINE = 'online'#
- OFFLINE = 'offline'#
- RECONNECTING = 'reconnecting'#
- class taolib.testing.config_center.server.websocket.MessagePriority#
Bases:
enum.StrEnum消息优先级。
- HIGH = 'high'#
- NORMAL = 'normal'#
- LOW = 'low'#
- class taolib.testing.config_center.server.websocket.MessageType#
Bases:
enum.StrEnum消息类型。
- PUSH = 'push'#
- ACK = 'ack'#
- HEARTBEAT = 'heartbeat'#
- PING = 'ping'#
- PONG = 'pong'#
- SUBSCRIBE = 'subscribe'#
- UNSUBSCRIBE = 'unsubscribe'#
- ERROR = 'error'#
- SYSTEM = 'system'#
- CONFIG_CHANGED = 'config_changed'#
- class taolib.testing.config_center.server.websocket.PushMessage#
推送消息。
- id#
唯一消息 ID
- channel#
目标频道
- event_type#
事件类型
- data#
消息负载
- priority#
优先级
- timestamp#
创建时间戳
- requires_ack#
是否需要客户端确认
- retry_count#
当前重试次数
- max_retries#
最大重试次数
- sender_id#
发送者标识(服务实例 ID)
- priority: MessagePriority#
- timestamp: datetime.datetime#
- classmethod from_dict(raw: dict[str, Any]) PushMessage#
从字典反序列化。
- class taolib.testing.config_center.server.websocket.UserPresence#
用户在线状态。
- status: ConnectionStatus#
- last_seen: datetime.datetime#
- class taolib.testing.config_center.server.websocket.ConnectionManagerProtocol#
Bases:
ProtocolWebSocket 连接管理器协议。
- async broadcast(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#
向频道广播消息,返回投递成功的连接数。
- async send_to_user(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#
向特定用户发送消息,返回投递成功的连接数。
- get_stats() taolib.testing.config_center.server.websocket.models.ConnectionStats#
获取连接统计信息。
- class taolib.testing.config_center.server.websocket.MessageBufferProtocol#
Bases:
Protocol离线消息缓冲协议。
- async push(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
缓冲一条用户离线消息。
- async push_to_channel(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
缓冲一条频道消息(供 HTTP 轮询使用)。
- async flush(user_id: str, limit: int = 100) list[taolib.testing.config_center.server.websocket.models.PushMessage]#
取出并清除用户的离线消息。
- async get_recent(channel: str, since: datetime.datetime, limit: int = 50) list[taolib.testing.config_center.server.websocket.models.PushMessage]#
获取频道自指定时间以来的消息(HTTP 轮询用)。
- class taolib.testing.config_center.server.websocket.PresenceTrackerProtocol#
Bases:
Protocol用户在线状态追踪协议。
- async get_status(user_id: str) taolib.testing.config_center.server.websocket.models.UserPresence | None#
获取用户在线状态。
- async get_all_online() list[taolib.testing.config_center.server.websocket.models.UserPresence]#
获取所有在线用户。
- class taolib.testing.config_center.server.websocket.PubSubBridge(redis_client: redis.asyncio.Redis, websocket_manager: taolib.testing.config_center.server.websocket.protocols.ConnectionManagerProtocol, *, instance_id: str = '', health_check_interval: int = 15, max_reconnect_delay: int = 30)#
Redis PubSub 桥接器。
通过 PSUBSCRIBE 模式订阅所有推送频道, 将接收到的消息路由到本地 WebSocket 连接管理器。
- BROADCAST_PATTERN = 'push:broadcast:*'#
- INSTANCE_KEY_PREFIX = 'push:instance:'#
- _redis#
- _manager#
- _instance_id#
- _health_interval = 15#
- _max_reconnect_delay = 30#
- _listen_task: asyncio.Task[None] | None = None#
- _health_task: asyncio.Task[None] | None = None#
- _healthy = False#
- _reconnect_attempts = 0#
- async publish(message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
发布消息到 Redis PubSub。
- 参数:
message -- 推送消息