taolib.testing.config_center.server.websocket.heartbeat#

WebSocket 心跳检测模块。

独立的心跳监控器,定期向所有连接发送 ping 并检测僵尸连接。

Attributes#

Classes#

HeartbeatMonitor

心跳监控器。

Module Contents#

taolib.testing.config_center.server.websocket.heartbeat.logger#
taolib.testing.config_center.server.websocket.heartbeat.OnStaleCallback#
class taolib.testing.config_center.server.websocket.heartbeat.HeartbeatMonitor(*, interval: int = 30, timeout: int = 70, on_stale: OnStaleCallback | None = None)#

心跳监控器。

定期向所有 WebSocket 连接发送 ping 消息, 检测并回调清理超过阈值未响应的僵尸连接。

_interval = 30#
_timeout = 70#
_on_stale = None#
_task: asyncio.Task[None] | None = None#
async start(get_connections: collections.abc.Callable[[], dict[Any, taolib.testing.config_center.server.websocket.models.ConnectionInfo]]) None#

启动心跳循环。

参数:

get_connections -- 获取当前连接字典的回调

async stop() None#

停止心跳循环。

async _run(get_connections: collections.abc.Callable[[], dict[Any, taolib.testing.config_center.server.websocket.models.ConnectionInfo]]) None#

心跳主循环。

async _check_all(connections: dict[Any, taolib.testing.config_center.server.websocket.models.ConnectionInfo]) None#

检查所有连接并发送 ping / 清理僵尸。

static _send_ping(ws: Any, now: datetime.datetime) None#
Async:

向单个连接发送 ping 消息。