taolib.testing.config_center.events.publisher#

事件发布器模块。

实现消息的发布功能,支持优先级、批量发布和离线消息缓冲, 确保 at-least-once 投递保证。

Attributes#

Classes#

EventPublisher

事件发布器。

Module Contents#

taolib.testing.config_center.events.publisher.logger#
class taolib.testing.config_center.events.publisher.EventPublisher(redis_client: redis.asyncio.Redis, message_buffer: taolib.testing.config_center.server.websocket.protocols.MessageBufferProtocol | None = None, instance_id: str = '')#

事件发布器。

职责: - 构建带唯一 ID 的 PushMessage - 发布到 Redis Pub/Sub 供跨实例广播 - 写入 MessageBuffer 保证离线用户可达 - 支持批量发布和消息优先级

BROADCAST_PREFIX = 'push:broadcast:'#
_redis#
_buffer = None#
_instance_id#
async publish_config_changed(event: taolib.testing.config_center.events.types.ConfigChangedEvent) str#

发布配置变更事件。

构建 HIGH 优先级、需要 ACK 确认的消息。

参数:

event -- 配置变更事件

返回:

消息 ID

async publish(channel: str, event_type: str, data: dict[str, Any], *, priority: taolib.testing.config_center.server.websocket.models.MessagePriority = MessagePriority.NORMAL, requires_ack: bool = False) str#

发布通用事件。

参数:
  • channel -- 目标频道

  • event_type -- 事件类型

  • data -- 消息负载

  • priority -- 消息优先级

  • requires_ack -- 是否需要客户端确认

返回:

消息 ID

async publish_batch(messages: list[taolib.testing.config_center.server.websocket.models.PushMessage]) list[str]#

批量发布消息(使用 Redis pipeline 优化)。

参数:

messages -- 消息列表

返回:

消息 ID 列表

async publish_to_user(user_id: str, event_type: str, data: dict[str, Any], *, priority: taolib.testing.config_center.server.websocket.models.MessagePriority = MessagePriority.NORMAL, requires_ack: bool = False) str#

发布用户直达消息。

async publish_system_alert(data: dict[str, Any], *, priority: taolib.testing.config_center.server.websocket.models.MessagePriority = MessagePriority.HIGH) str#

发布系统告警。

async _publish(message: taolib.testing.config_center.server.websocket.models.PushMessage) None#

核心发布动作:Redis PUBLISH + 消息缓冲。