taolib.testing.config_center.server.websocket.manager#
WebSocket 连接管理器模块。
管理所有 WebSocket 连接、频道订阅、消息投递、心跳检测和 ACK 重传。 支持数万级并发连接和分布式多实例部署。
Attributes#
Classes#
高性能 WebSocket 连接管理器。 |
Module Contents#
- taolib.testing.config_center.server.websocket.manager.logger#
- class taolib.testing.config_center.server.websocket.manager.WebSocketConnectionManager(presence_tracker: taolib.testing.config_center.server.websocket.protocols.PresenceTrackerProtocol | None = None, message_buffer: taolib.testing.config_center.server.websocket.protocols.MessageBufferProtocol | None = None, *, instance_id: str = '', heartbeat_interval: int = 30, heartbeat_timeout: int = 70, ack_timeout: int = 10, max_retries: int = 3)#
高性能 WebSocket 连接管理器。
核心职责: - 连接池管理:accept / disconnect / 多设备支持 - 频道订阅:subscribe / unsubscribe / broadcast - 消息投递:优先级排序、ACK 追踪、超时重传 - 心跳检测:通过 HeartbeatMonitor 检测僵尸连接 - 在线状态:通过 PresenceTracker 跨实例同步 - 离线缓冲:通过 MessageBuffer 保证 at-least-once 投递 - 统计监控:连接数 / 消息数 / ACK 统计
- _connections: dict[fastapi.WebSocket, taolib.testing.config_center.server.websocket.models.ConnectionInfo]#
- _presence = None#
- _buffer = None#
- _instance_id = ''#
- _ack_timeout = 10#
- _max_retries = 3#
- _stats#
- _started_at#
- _heartbeat#
- _ack_task: asyncio.Task[None] | None = None#
- _lock#
- async connect(websocket: fastapi.WebSocket, user_id: str, environments: list[str] | None = None, services: list[str] | None = None, metadata: dict[str, Any] | None = None) None#
接受 WebSocket 连接并初始化。
- 参数:
websocket -- WebSocket 连接
user_id -- 用户 ID
environments -- 订阅的环境列表
services -- 订阅的服务列表
metadata -- 额外上下文信息
- async broadcast(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#
向频道所有订阅者广播消息。
- 返回:
成功投递的连接数
- async send_to_user(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#
向特定用户的所有连接发送消息。
若用户离线,消息存入离线缓冲。
- 返回:
成功投递的连接数
- async _deliver_message(ws: fastapi.WebSocket, info: taolib.testing.config_center.server.websocket.models.ConnectionInfo, msg: taolib.testing.config_center.server.websocket.models.PushMessage) bool#
原子消息投递:发送 + ACK 追踪。
- async handle_client_message(websocket: fastapi.WebSocket, raw_data: str) None#
处理来自客户端的消息。
支持的消息类型:ack, pong, subscribe, unsubscribe
- get_stats() taolib.testing.config_center.server.websocket.models.ConnectionStats#
获取当前连接统计快照。
- get_user_presence(user_id: str) taolib.testing.config_center.server.websocket.models.UserPresence | None#
获取用户本地连接的在线状态。
- taolib.testing.config_center.server.websocket.manager.manager#