taolib.testing.config_center.server.websocket.manager#

WebSocket 连接管理器模块。

管理所有 WebSocket 连接、频道订阅、消息投递、心跳检测和 ACK 重传。 支持数万级并发连接和分布式多实例部署。

Attributes#

Classes#

WebSocketConnectionManager

高性能 WebSocket 连接管理器。

Module Contents#

taolib.testing.config_center.server.websocket.manager.logger#
class taolib.testing.config_center.server.websocket.manager.WebSocketConnectionManager(presence_tracker: taolib.testing.config_center.server.websocket.protocols.PresenceTrackerProtocol | None = None, message_buffer: taolib.testing.config_center.server.websocket.protocols.MessageBufferProtocol | None = None, *, instance_id: str = '', heartbeat_interval: int = 30, heartbeat_timeout: int = 70, ack_timeout: int = 10, max_retries: int = 3)#

高性能 WebSocket 连接管理器。

核心职责: - 连接池管理:accept / disconnect / 多设备支持 - 频道订阅:subscribe / unsubscribe / broadcast - 消息投递:优先级排序、ACK 追踪、超时重传 - 心跳检测:通过 HeartbeatMonitor 检测僵尸连接 - 在线状态:通过 PresenceTracker 跨实例同步 - 离线缓冲:通过 MessageBuffer 保证 at-least-once 投递 - 统计监控:连接数 / 消息数 / ACK 统计

_connections: dict[fastapi.WebSocket, taolib.testing.config_center.server.websocket.models.ConnectionInfo]#
_subscriptions: dict[str, set[fastapi.WebSocket]]#
_user_connections: dict[str, set[fastapi.WebSocket]]#
_presence = None#
_buffer = None#
_instance_id = ''#
_ack_timeout = 10#
_max_retries = 3#
_stats#
_started_at#
_heartbeat#
_ack_task: asyncio.Task[None] | None = None#
_lock#
async start() None#

启动心跳检测和 ACK 清理后台任务。

async stop() None#

优雅关闭:停止后台任务,断开所有连接。

async connect(websocket: fastapi.WebSocket, user_id: str, environments: list[str] | None = None, services: list[str] | None = None, metadata: dict[str, Any] | None = None) None#

接受 WebSocket 连接并初始化。

参数:
  • websocket -- WebSocket 连接

  • user_id -- 用户 ID

  • environments -- 订阅的环境列表

  • services -- 订阅的服务列表

  • metadata -- 额外上下文信息

disconnect(websocket: fastapi.WebSocket) None#

断开 WebSocket 连接并清理所有关联状态。

async subscribe(websocket: fastapi.WebSocket, channel: str) None#

订阅频道。

unsubscribe(websocket: fastapi.WebSocket, channel: str) None#

取消订阅频道。

async broadcast(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#

向频道所有订阅者广播消息。

返回:

成功投递的连接数

async send_to_user(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#

向特定用户的所有连接发送消息。

若用户离线,消息存入离线缓冲。

返回:

成功投递的连接数

async send_personal(websocket: fastapi.WebSocket, message: dict[str, Any]) None#

发送个人消息(向后兼容接口)。

async _deliver_message(ws: fastapi.WebSocket, info: taolib.testing.config_center.server.websocket.models.ConnectionInfo, msg: taolib.testing.config_center.server.websocket.models.PushMessage) bool#

原子消息投递:发送 + ACK 追踪。

async handle_client_message(websocket: fastapi.WebSocket, raw_data: str) None#

处理来自客户端的消息。

支持的消息类型:ack, pong, subscribe, unsubscribe

_handle_ack(websocket: fastapi.WebSocket, message_id: str) None#

处理客户端消息确认。

static _send_error(websocket: fastapi.WebSocket, detail: str) None#
Async:

向客户端发送错误消息。

async _run_ack_cleanup_loop() None#

定期检查 ACK 超时,执行重传或离线缓冲。

async _cleanup_pending_acks() None#

扫描所有连接的 pending_acks,处理超时消息。

async _handle_stale_connection(websocket: fastapi.WebSocket) None#

处理心跳超时的僵尸连接。

get_stats() taolib.testing.config_center.server.websocket.models.ConnectionStats#

获取当前连接统计快照。

get_user_presence(user_id: str) taolib.testing.config_center.server.websocket.models.UserPresence | None#

获取用户本地连接的在线状态。

taolib.testing.config_center.server.websocket.manager.manager#