taolib.testing.oauth.integration.config_center#

Config Center 集成桥接模块。

提供 OAuth 模块与 config_center 用户系统和 JWT 体系的桥接。

Classes#

ConfigCenterIntegration

Config Center 集成桥接。

Module Contents#

class taolib.testing.oauth.integration.config_center.ConfigCenterIntegration(user_repo, role_collection, jwt_secret: str, jwt_algorithm: str = 'HS256')#

Config Center 集成桥接。

连接 OAuth 模块与 config_center 的用户管理和 JWT Token 系统。

参数:
  • user_repo -- config_center 的 UserRepository

  • role_collection -- MongoDB 角色集合

  • jwt_secret -- JWT 密钥

  • jwt_algorithm -- JWT 算法

_user_repo#
_role_collection#
_jwt_secret#
_jwt_algorithm = 'HS256'#
async find_user_by_email(email: str)#

按邮箱查找 config_center 用户。

参数:

email -- 邮箱地址

返回:

UserDocument 或 None

async create_user_from_oauth(user_info: taolib.testing.oauth.models.profile.OAuthUserInfo, onboarding: taolib.testing.oauth.models.profile.OnboardingData)#

通过 OAuth 信息创建 config_center 用户。

创建一个无密码的用户(使用占位密码哈希),仅通过 OAuth 认证。

参数:
  • user_info -- OAuth 提供商的用户信息

  • onboarding -- 用户引导数据

返回:

新创建的 UserDocument

async get_user_roles(user_id: str) list[str]#

获取用户的角色名称列表。

参数:

user_id -- 用户 ID

返回:

角色名称列表

async complete_onboarding(connection_id: str, onboarding: taolib.testing.oauth.models.profile.OnboardingData, user_info: taolib.testing.oauth.models.profile.OAuthUserInfo, connection_repo: taolib.testing.oauth.repository.connection_repo.OAuthConnectionRepository) dict[str, Any]#

完成新用户引导流程。

  1. 按邮箱查找已有用户或创建新用户

  2. 更新 OAuth 连接,关联到用户

  3. 返回用户信息和角色

参数:
  • connection_id -- OAuth 连接 ID

  • onboarding -- 引导数据

  • user_info -- OAuth 用户信息

  • connection_repo -- OAuth 连接仓储

返回:

包含 user_id 和 roles 的字典