taolib.testing.config_center.events.publisher#
事件发布器模块。
实现消息的发布功能,支持优先级、批量发布和离线消息缓冲, 确保 at-least-once 投递保证。
Attributes#
Classes#
事件发布器。 |
Module Contents#
- taolib.testing.config_center.events.publisher.logger#
- class taolib.testing.config_center.events.publisher.EventPublisher(redis_client: redis.asyncio.Redis, message_buffer: taolib.testing.config_center.server.websocket.protocols.MessageBufferProtocol | None = None, instance_id: str = '')#
事件发布器。
职责: - 构建带唯一 ID 的 PushMessage - 发布到 Redis Pub/Sub 供跨实例广播 - 写入 MessageBuffer 保证离线用户可达 - 支持批量发布和消息优先级
- BROADCAST_PREFIX = 'push:broadcast:'#
- _redis#
- _buffer = None#
- _instance_id#
- async publish_config_changed(event: taolib.testing.config_center.events.types.ConfigChangedEvent) str#
发布配置变更事件。
构建 HIGH 优先级、需要 ACK 确认的消息。
- 参数:
event -- 配置变更事件
- 返回:
消息 ID
- async publish(channel: str, event_type: str, data: dict[str, Any], *, priority: taolib.testing.config_center.server.websocket.models.MessagePriority = MessagePriority.NORMAL, requires_ack: bool = False) str#
发布通用事件。
- 参数:
channel -- 目标频道
event_type -- 事件类型
data -- 消息负载
priority -- 消息优先级
requires_ack -- 是否需要客户端确认
- 返回:
消息 ID
- async publish_batch(messages: list[taolib.testing.config_center.server.websocket.models.PushMessage]) list[str]#
批量发布消息(使用 Redis pipeline 优化)。
- 参数:
messages -- 消息列表
- 返回:
消息 ID 列表
- async publish_to_user(user_id: str, event_type: str, data: dict[str, Any], *, priority: taolib.testing.config_center.server.websocket.models.MessagePriority = MessagePriority.NORMAL, requires_ack: bool = False) str#
发布用户直达消息。
- async publish_system_alert(data: dict[str, Any], *, priority: taolib.testing.config_center.server.websocket.models.MessagePriority = MessagePriority.HIGH) str#
发布系统告警。
- async _publish(message: taolib.testing.config_center.server.websocket.models.PushMessage) None#
核心发布动作:Redis PUBLISH + 消息缓冲。