taolib.testing.config_center.server.websocket.message_buffer#
离线消息缓冲模块。
基于 Redis LIST 实现的消息缓冲,支持用户离线消息暂存和频道消息轮询。
Attributes#
Classes#
基于 Redis LIST 的消息缓冲实现。 |
|
内存消息缓冲实现(用于测试)。 |
Module Contents#
- taolib.testing.config_center.server.websocket.message_buffer.logger#
- class taolib.testing.config_center.server.websocket.message_buffer.RedisMessageBuffer(redis_client: redis.asyncio.Redis, *, max_user_messages: int = 1000, max_channel_messages: int = 5000, user_buffer_ttl: int = 86400, channel_buffer_ttl: int = 3600)#
基于 Redis LIST 的消息缓冲实现。
- _redis#
- _max_user = 1000#
- _max_channel = 5000#
- _user_ttl = 86400#
- _channel_ttl = 3600#
- async push(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
缓冲一条用户离线消息。
- async push_to_channel(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
缓冲一条频道消息(供 HTTP 轮询使用)。
- async flush(user_id: str, limit: int = 100) list[taolib.testing.config_center.server.websocket.models.PushMessage]#
取出并清除用户的离线消息。
使用 LRANGE + LTRIM 保证原子性读取。 返回消息按时间正序排列(最旧的在前)。
- async get_recent(channel: str, since: datetime.datetime, limit: int = 50) list[taolib.testing.config_center.server.websocket.models.PushMessage]#
获取频道自指定时间以来的消息。
- class taolib.testing.config_center.server.websocket.message_buffer.InMemoryMessageBuffer(*, max_messages: int = 1000)#
内存消息缓冲实现(用于测试)。
- _user_buffers: dict[str, collections.deque[taolib.testing.config_center.server.websocket.models.PushMessage]]#
- _channel_buffers: dict[str, collections.deque[taolib.testing.config_center.server.websocket.models.PushMessage]]#
- _max = 1000#
- async push(user_id: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
- async push_to_channel(channel: str, message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
- async flush(user_id: str, limit: int = 100) list[taolib.testing.config_center.server.websocket.models.PushMessage]#
- async get_recent(channel: str, since: datetime.datetime, limit: int = 50) list[taolib.testing.config_center.server.websocket.models.PushMessage]#