taolib.testing.config_center.server.websocket.pubsub_bridge#

Redis PubSub 桥接模块。

桥接 Redis PubSub 与 WebSocket Manager,实现多实例部署时的消息广播。 支持模式订阅、健康监控和自动指数退避重连。

Attributes#

Classes#

PubSubBridge

Redis PubSub 桥接器。

Module Contents#

taolib.testing.config_center.server.websocket.pubsub_bridge.logger#
class taolib.testing.config_center.server.websocket.pubsub_bridge.PubSubBridge(redis_client: redis.asyncio.Redis, websocket_manager: taolib.testing.config_center.server.websocket.protocols.ConnectionManagerProtocol, *, instance_id: str = '', health_check_interval: int = 15, max_reconnect_delay: int = 30)#

Redis PubSub 桥接器。

通过 PSUBSCRIBE 模式订阅所有推送频道, 将接收到的消息路由到本地 WebSocket 连接管理器。

BROADCAST_PATTERN = 'push:broadcast:*'#
INSTANCE_KEY_PREFIX = 'push:instance:'#
_redis#
_manager#
_instance_id#
_health_interval = 15#
_max_reconnect_delay = 30#
_pubsub: redis.asyncio.client.PubSub | None = None#
_listen_task: asyncio.Task[None] | None = None#
_health_task: asyncio.Task[None] | None = None#
_healthy = False#
_reconnect_attempts = 0#
property healthy: bool#

PubSub 连接是否健康。

property instance_id: str#
async start() None#

启动 PubSub 订阅和健康检查。

async stop() None#

停止 PubSub 订阅和清理资源。

async _setup_pubsub() None#

创建并配置 PubSub 订阅。

async _listen_loop() None#

监听 Redis PubSub 消息并转发到 WebSocket。

async _listen_messages() None#

从 PubSub 读取消息并路由到管理器。

async publish(message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

发布消息到 Redis PubSub。

参数:

message -- 推送消息

async _health_check_loop() None#

定期检查 Redis 连接健康状态并注册实例心跳。

async _reconnect() None#

指数退避重连。