taolib.testing.config_center.client#

配置中心客户端 SDK 模块。

提供轻量级客户端,供应用程序获取配置和监听配置变更。

Attributes#

Classes#

ConfigCenterClient

配置中心客户端。

Module Contents#

taolib.testing.config_center.client.logger#
class taolib.testing.config_center.client.ConfigCenterClient(base_url: str, token: str, cache_ttl: int = 60)#

配置中心客户端。

提供同步和异步方式获取配置,以及 WebSocket 监听配置变更。

_base_url#
_token#
_cache_ttl = 60#
_local_cache: dict[str, tuple[Any, float]]#
_headers#
_cache_key(environment: str, service: str, key: str) str#

生成本地缓存 Key。

_get_cached(key: str) Any | None#

获取本地缓存。

_set_cached(key: str, value: Any) None#

设置本地缓存。

get_config(key: str, environment: str, service: str) Any | None#

同步获取配置。

参数:
  • key -- 配置键

  • environment -- 环境类型

  • service -- 服务名称

返回:

配置值,如果不存在则返回 None

async aget_config(key: str, environment: str, service: str) Any | None#

异步获取配置。

参数:
  • key -- 配置键

  • environment -- 环境类型

  • service -- 服务名称

返回:

配置值,如果不存在则返回 None

get_configs(environment: str, service: str) list[dict[str, Any]]#

获取服务的所有配置。

参数:
  • environment -- 环境类型

  • service -- 服务名称

返回:

配置列表

async watch_config(key: str, environment: str, service: str, callback: collections.abc.Callable[[dict[str, Any]], None]) None#

监听配置变更(需要 websockets 依赖)。

参数:
  • key -- 配置键

  • environment -- 环境类型

  • service -- 服务名称

  • callback -- 变更回调函数