taolib.testing.config_center.server.websocket.message_buffer#

离线消息缓冲模块。

基于 Redis LIST 实现的消息缓冲,支持用户离线消息暂存和频道消息轮询。

Attributes#

Classes#

RedisMessageBuffer

基于 Redis LIST 的消息缓冲实现。

InMemoryMessageBuffer

内存消息缓冲实现(用于测试)。

Module Contents#

taolib.testing.config_center.server.websocket.message_buffer.logger#
class taolib.testing.config_center.server.websocket.message_buffer.RedisMessageBuffer(redis_client: redis.asyncio.Redis, *, max_user_messages: int = 1000, max_channel_messages: int = 5000, user_buffer_ttl: int = 86400, channel_buffer_ttl: int = 3600)#

基于 Redis LIST 的消息缓冲实现。

_redis#
_max_user = 1000#
_max_channel = 5000#
_user_ttl = 86400#
_channel_ttl = 3600#
_user_key(user_id: str) str#
_channel_key(channel: str) str#
async push(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

缓冲一条用户离线消息。

async push_to_channel(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

缓冲一条频道消息(供 HTTP 轮询使用)。

async flush(user_id: str, limit: int = 100) list[taolib.testing.config_center.server.websocket.models.PushMessage]#

取出并清除用户的离线消息。

使用 LRANGE + LTRIM 保证原子性读取。 返回消息按时间正序排列(最旧的在前)。

async get_recent(channel: str, since: datetime.datetime, limit: int = 50) list[taolib.testing.config_center.server.websocket.models.PushMessage]#

获取频道自指定时间以来的消息。

class taolib.testing.config_center.server.websocket.message_buffer.InMemoryMessageBuffer(*, max_messages: int = 1000)#

内存消息缓冲实现(用于测试)。

_user_buffers: dict[str, collections.deque[taolib.testing.config_center.server.websocket.models.PushMessage]]#
_channel_buffers: dict[str, collections.deque[taolib.testing.config_center.server.websocket.models.PushMessage]]#
_max = 1000#
async push(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
async push_to_channel(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
async flush(user_id: str, limit: int = 100) list[taolib.testing.config_center.server.websocket.models.PushMessage]#
async get_recent(channel: str, since: datetime.datetime, limit: int = 50) list[taolib.testing.config_center.server.websocket.models.PushMessage]#