taolib.testing.config_center.server.websocket

目录

taolib.testing.config_center.server.websocket#

WebSocket 实时推送模块。

提供 WebSocket 连接管理、Redis PubSub 桥接、心跳检测、 离线消息缓冲和在线状态追踪等完整的实时推送基础设施。

Submodules#

Classes#

WebSocketConnectionManager

高性能 WebSocket 连接管理器。

ConnectionInfo

WebSocket 连接上下文信息。

ConnectionStats

连接统计信息。

ConnectionStatus

连接状态。

MessagePriority

消息优先级。

MessageType

消息类型。

PushMessage

推送消息。

UserPresence

用户在线状态。

ConnectionManagerProtocol

WebSocket 连接管理器协议。

MessageBufferProtocol

离线消息缓冲协议。

PresenceTrackerProtocol

用户在线状态追踪协议。

PubSubBridge

Redis PubSub 桥接器。

Package Contents#

class taolib.testing.config_center.server.websocket.WebSocketConnectionManager(presence_tracker: taolib.testing.config_center.server.websocket.protocols.PresenceTrackerProtocol | None = None, message_buffer: taolib.testing.config_center.server.websocket.protocols.MessageBufferProtocol | None = None, *, instance_id: str = '', heartbeat_interval: int = 30, heartbeat_timeout: int = 70, ack_timeout: int = 10, max_retries: int = 3)#

高性能 WebSocket 连接管理器。

核心职责: - 连接池管理:accept / disconnect / 多设备支持 - 频道订阅:subscribe / unsubscribe / broadcast - 消息投递:优先级排序、ACK 追踪、超时重传 - 心跳检测:通过 HeartbeatMonitor 检测僵尸连接 - 在线状态:通过 PresenceTracker 跨实例同步 - 离线缓冲:通过 MessageBuffer 保证 at-least-once 投递 - 统计监控:连接数 / 消息数 / ACK 统计

_connections: dict[fastapi.WebSocket, taolib.testing.config_center.server.websocket.models.ConnectionInfo]#
_subscriptions: dict[str, set[fastapi.WebSocket]]#
_user_connections: dict[str, set[fastapi.WebSocket]]#
_presence = None#
_buffer = None#
_instance_id = ''#
_ack_timeout = 10#
_max_retries = 3#
_stats#
_started_at#
_heartbeat#
_ack_task: asyncio.Task[None] | None = None#
_lock#
async start() None#

启动心跳检测和 ACK 清理后台任务。

async stop() None#

优雅关闭:停止后台任务,断开所有连接。

async connect(websocket: fastapi.WebSocket, user_id: str, environments: list[str] | None = None, services: list[str] | None = None, metadata: dict[str, Any] | None = None) None#

接受 WebSocket 连接并初始化。

参数:
  • websocket -- WebSocket 连接

  • user_id -- 用户 ID

  • environments -- 订阅的环境列表

  • services -- 订阅的服务列表

  • metadata -- 额外上下文信息

disconnect(websocket: fastapi.WebSocket) None#

断开 WebSocket 连接并清理所有关联状态。

async subscribe(websocket: fastapi.WebSocket, channel: str) None#

订阅频道。

unsubscribe(websocket: fastapi.WebSocket, channel: str) None#

取消订阅频道。

async broadcast(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#

向频道所有订阅者广播消息。

返回:

成功投递的连接数

async send_to_user(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#

向特定用户的所有连接发送消息。

若用户离线,消息存入离线缓冲。

返回:

成功投递的连接数

async send_personal(websocket: fastapi.WebSocket, message: dict[str, Any]) None#

发送个人消息(向后兼容接口)。

async _deliver_message(ws: fastapi.WebSocket, info: taolib.testing.config_center.server.websocket.models.ConnectionInfo, msg: taolib.testing.config_center.server.websocket.models.PushMessage) bool#

原子消息投递:发送 + ACK 追踪。

async handle_client_message(websocket: fastapi.WebSocket, raw_data: str) None#

处理来自客户端的消息。

支持的消息类型:ack, pong, subscribe, unsubscribe

_handle_ack(websocket: fastapi.WebSocket, message_id: str) None#

处理客户端消息确认。

static _send_error(websocket: fastapi.WebSocket, detail: str) None#
Async:

向客户端发送错误消息。

async _run_ack_cleanup_loop() None#

定期检查 ACK 超时,执行重传或离线缓冲。

async _cleanup_pending_acks() None#

扫描所有连接的 pending_acks,处理超时消息。

async _handle_stale_connection(websocket: fastapi.WebSocket) None#

处理心跳超时的僵尸连接。

get_stats() taolib.testing.config_center.server.websocket.models.ConnectionStats#

获取当前连接统计快照。

get_user_presence(user_id: str) taolib.testing.config_center.server.websocket.models.UserPresence | None#

获取用户本地连接的在线状态。

class taolib.testing.config_center.server.websocket.ConnectionInfo#

WebSocket 连接上下文信息。

user_id#

用户唯一标识

connected_at#

连接建立时间

last_heartbeat#

最后心跳时间

status#

当前连接状态

channels#

已订阅频道集合

pending_acks#

等待确认的消息 (message_id -> PushMessage)

message_buffer#

离线消息缓冲队列

metadata#

额外上下文(环境、服务列表等)

user_id: str#
connected_at: datetime.datetime#
last_heartbeat: datetime.datetime#
status: ConnectionStatus#
channels: set[str]#
pending_acks: dict[str, PushMessage]#
message_buffer: collections.deque[PushMessage]#
metadata: dict[str, Any]#
class taolib.testing.config_center.server.websocket.ConnectionStats#

连接统计信息。

total_connections: int = 0#
active_connections: int = 0#
total_channels: int = 0#
total_messages_sent: int = 0#
total_messages_failed: int = 0#
total_acks_received: int = 0#
total_acks_timeout: int = 0#
uptime_seconds: float = 0.0#
online_users: int = 0#
reconnecting_users: int = 0#
to_dict() dict[str, Any]#

序列化为字典。

class taolib.testing.config_center.server.websocket.ConnectionStatus#

Bases: enum.StrEnum

连接状态。

ONLINE = 'online'#
OFFLINE = 'offline'#
RECONNECTING = 'reconnecting'#
class taolib.testing.config_center.server.websocket.MessagePriority#

Bases: enum.StrEnum

消息优先级。

HIGH = 'high'#
NORMAL = 'normal'#
LOW = 'low'#
class taolib.testing.config_center.server.websocket.MessageType#

Bases: enum.StrEnum

消息类型。

PUSH = 'push'#
ACK = 'ack'#
HEARTBEAT = 'heartbeat'#
PING = 'ping'#
PONG = 'pong'#
SUBSCRIBE = 'subscribe'#
UNSUBSCRIBE = 'unsubscribe'#
ERROR = 'error'#
SYSTEM = 'system'#
CONFIG_CHANGED = 'config_changed'#
class taolib.testing.config_center.server.websocket.PushMessage#

推送消息。

id#

唯一消息 ID

channel#

目标频道

event_type#

事件类型

data#

消息负载

priority#

优先级

timestamp#

创建时间戳

requires_ack#

是否需要客户端确认

retry_count#

当前重试次数

max_retries#

最大重试次数

sender_id#

发送者标识(服务实例 ID)

channel: str#
event_type: str#
data: dict[str, Any]#
id: str#
priority: MessagePriority#
timestamp: datetime.datetime#
requires_ack: bool = False#
retry_count: int = 0#
max_retries: int = 3#
sender_id: str = ''#
to_dict() dict[str, Any]#

序列化为可发送的字典格式。

classmethod from_dict(raw: dict[str, Any]) PushMessage#

从字典反序列化。

class taolib.testing.config_center.server.websocket.UserPresence#

用户在线状态。

user_id: str#
status: ConnectionStatus#
last_seen: datetime.datetime#
connection_count: int = 0#
active_channels: list[str] = []#
to_dict() dict[str, Any]#

序列化为字典。

class taolib.testing.config_center.server.websocket.ConnectionManagerProtocol#

Bases: Protocol

WebSocket 连接管理器协议。

async broadcast(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#

向频道广播消息,返回投递成功的连接数。

async send_to_user(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) int#

向特定用户发送消息,返回投递成功的连接数。

get_stats() taolib.testing.config_center.server.websocket.models.ConnectionStats#

获取连接统计信息。

class taolib.testing.config_center.server.websocket.MessageBufferProtocol#

Bases: Protocol

离线消息缓冲协议。

async push(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

缓冲一条用户离线消息。

async push_to_channel(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

缓冲一条频道消息(供 HTTP 轮询使用)。

async flush(user_id: str, limit: int = 100) list[taolib.testing.config_center.server.websocket.models.PushMessage]#

取出并清除用户的离线消息。

async get_recent(channel: str, since: datetime.datetime, limit: int = 50) list[taolib.testing.config_center.server.websocket.models.PushMessage]#

获取频道自指定时间以来的消息(HTTP 轮询用)。

class taolib.testing.config_center.server.websocket.PresenceTrackerProtocol#

Bases: Protocol

用户在线状态追踪协议。

async set_online(user_id: str, instance_id: str) None#

标记用户在线。

async set_offline(user_id: str, instance_id: str) None#

标记用户离线。

async get_status(user_id: str) taolib.testing.config_center.server.websocket.models.UserPresence | None#

获取用户在线状态。

async get_all_online() list[taolib.testing.config_center.server.websocket.models.UserPresence]#

获取所有在线用户。

class taolib.testing.config_center.server.websocket.PubSubBridge(redis_client: redis.asyncio.Redis, websocket_manager: taolib.testing.config_center.server.websocket.protocols.ConnectionManagerProtocol, *, instance_id: str = '', health_check_interval: int = 15, max_reconnect_delay: int = 30)#

Redis PubSub 桥接器。

通过 PSUBSCRIBE 模式订阅所有推送频道, 将接收到的消息路由到本地 WebSocket 连接管理器。

BROADCAST_PATTERN = 'push:broadcast:*'#
INSTANCE_KEY_PREFIX = 'push:instance:'#
_redis#
_manager#
_instance_id#
_health_interval = 15#
_max_reconnect_delay = 30#
_pubsub: redis.asyncio.client.PubSub | None = None#
_listen_task: asyncio.Task[None] | None = None#
_health_task: asyncio.Task[None] | None = None#
_healthy = False#
_reconnect_attempts = 0#
property healthy: bool#

PubSub 连接是否健康。

property instance_id: str#
async start() None#

启动 PubSub 订阅和健康检查。

async stop() None#

停止 PubSub 订阅和清理资源。

async _setup_pubsub() None#

创建并配置 PubSub 订阅。

async _listen_loop() None#

监听 Redis PubSub 消息并转发到 WebSocket。

async _listen_messages() None#

从 PubSub 读取消息并路由到管理器。

async publish(message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

发布消息到 Redis PubSub。

参数:

message -- 推送消息

async _health_check_loop() None#

定期检查 Redis 连接健康状态并注册实例心跳。

async _reconnect() None#

指数退避重连。