taolib.testing.config_center.server.dependencies#

依赖注入模块。

提供 FastAPI 依赖注入函数。

Attributes#

Functions#

get_mongo_client(→ motor.motor_asyncio.AsyncIOMotorClient)

获取 MongoDB 客户端。

get_mongo_db([client])

获取 MongoDB 数据库。

get_user_collection(...)

获取用户集合。

get_config_collection(...)

获取配置集合。

get_version_collection(...)

获取版本集合。

get_audit_collection(...)

获取审计日志集合。

get_role_collection(...)

获取角色集合。

get_user_repo(...)

获取用户 Repository。

get_config_repo(...)

获取配置 Repository。

get_version_repo(...)

获取版本 Repository。

get_audit_repo(...)

获取审计日志 Repository。

get_role_repo(...)

获取角色 Repository。

get_cache(...)

获取配置缓存。

get_current_user(, user_repo)

获取当前用户。

require_permission(resource, action)

权限检查依赖。

require_environment_access(environment)

环境访问检查依赖。

get_event_publisher(...)

获取事件发布器(从 app.state)。

Module Contents#

taolib.testing.config_center.server.dependencies.rbac_service#
async taolib.testing.config_center.server.dependencies.get_mongo_client() motor.motor_asyncio.AsyncIOMotorClient#

获取 MongoDB 客户端。

async taolib.testing.config_center.server.dependencies.get_mongo_db(client: motor.motor_asyncio.AsyncIOMotorClient = Depends(get_mongo_client))#

获取 MongoDB 数据库。

async taolib.testing.config_center.server.dependencies.get_user_collection(db=Depends(get_mongo_db)) motor.motor_asyncio.AsyncIOMotorCollection#

获取用户集合。

async taolib.testing.config_center.server.dependencies.get_config_collection(db=Depends(get_mongo_db)) motor.motor_asyncio.AsyncIOMotorCollection#

获取配置集合。

async taolib.testing.config_center.server.dependencies.get_version_collection(db=Depends(get_mongo_db)) motor.motor_asyncio.AsyncIOMotorCollection#

获取版本集合。

async taolib.testing.config_center.server.dependencies.get_audit_collection(db=Depends(get_mongo_db)) motor.motor_asyncio.AsyncIOMotorCollection#

获取审计日志集合。

async taolib.testing.config_center.server.dependencies.get_role_collection(db=Depends(get_mongo_db)) motor.motor_asyncio.AsyncIOMotorCollection#

获取角色集合。

async taolib.testing.config_center.server.dependencies.get_user_repo(collection: motor.motor_asyncio.AsyncIOMotorCollection = Depends(get_user_collection)) taolib.testing.config_center.repository.user_repo.UserRepository#

获取用户 Repository。

async taolib.testing.config_center.server.dependencies.get_config_repo(collection: motor.motor_asyncio.AsyncIOMotorCollection = Depends(get_config_collection)) taolib.testing.config_center.repository.config_repo.ConfigRepository#

获取配置 Repository。

async taolib.testing.config_center.server.dependencies.get_version_repo(collection: motor.motor_asyncio.AsyncIOMotorCollection = Depends(get_version_collection)) taolib.testing.config_center.repository.version_repo.VersionRepository#

获取版本 Repository。

async taolib.testing.config_center.server.dependencies.get_audit_repo(collection: motor.motor_asyncio.AsyncIOMotorCollection = Depends(get_audit_collection)) taolib.testing.config_center.repository.audit_repo.AuditLogRepository#

获取审计日志 Repository。

async taolib.testing.config_center.server.dependencies.get_role_repo(collection: motor.motor_asyncio.AsyncIOMotorCollection = Depends(get_role_collection)) taolib.testing.config_center.repository.role_repo.RoleRepository#

获取角色 Repository。

async taolib.testing.config_center.server.dependencies.get_cache() taolib.testing.config_center.cache.config_cache.ConfigCacheProtocol#

获取配置缓存。

async taolib.testing.config_center.server.dependencies.get_current_user(token: str = Depends(oauth2_scheme), user_repo: taolib.testing.config_center.repository.user_repo.UserRepository = Depends(get_user_repo)) taolib.testing.config_center.models.user.UserDocument#

获取当前用户。

参数:
  • token -- JWT Token

  • user_repo -- 用户 Repository

返回:

用户文档实例

抛出:

HTTPException -- 如果 Token 无效或用户不存在

taolib.testing.config_center.server.dependencies.require_permission(resource: str, action: str)#

权限检查依赖。

参数:
  • resource -- 资源类型

  • action -- 操作类型

返回:

依赖注入函数

taolib.testing.config_center.server.dependencies.require_environment_access(environment: taolib.testing.config_center.models.enums.Environment)#

环境访问检查依赖。

参数:

environment -- 环境类型

返回:

依赖注入函数

async taolib.testing.config_center.server.dependencies.get_event_publisher(request: fastapi.Request) taolib.testing.config_center.events.publisher.EventPublisher | None#

获取事件发布器(从 app.state)。