taolib.testing.auth

目录

taolib.testing.auth#

通用 JWT 认证中间件。

提供无状态、可扩展的 JWT 认证,支持令牌黑名单、API 密钥备选认证和 RBAC。

基础用法:

import os
from taolib.testing.auth import AuthConfig, JWTService

# 生产环境:从环境变量获取密钥(必须 >= 32 字符)
jwt_secret = os.environ.get("JWT_SECRET", "")
if not jwt_secret or len(jwt_secret) < 32:
    raise ValueError("JWT_SECRET must be set and >= 32 characters")

config = AuthConfig(jwt_secret=jwt_secret)
jwt_service = JWTService(config)
token_pair = jwt_service.create_token_pair("user-123", ["admin"])

FastAPI 集成:

import os
from taolib.testing.auth import AuthConfig, JWTService, AuthenticatedUser
from taolib.testing.auth.fastapi.dependencies import create_auth_dependency

jwt_secret = os.environ.get("JWT_SECRET", "")
config = AuthConfig(jwt_secret=jwt_secret)
jwt_service = JWTService(config)
auth = create_auth_dependency(jwt_service)

@app.get("/protected")
async def protected(user: AuthenticatedUser = Depends(auth)):
    return {"user_id": user.user_id}

Submodules#

Exceptions#

APIKeyInvalidError

API 密钥无效或未找到。

AuthError

所有认证错误的基类。

InsufficientPermissionError

权限不足。

TokenBlacklistedError

令牌已被吊销(在黑名单中)。

TokenExpiredError

令牌已过期。

TokenInvalidError

令牌无效(解码失败或签名不匹配)。

Classes#

APIKeyLookupProtocol

API 密钥查找 Protocol。

StaticAPIKeyLookup

基于静态配置的 API 密钥查找。

InMemoryTokenBlacklist

基于内存的令牌黑名单。

NullTokenBlacklist

空操作黑名单。

RedisTokenBlacklist

基于 Redis 的令牌黑名单。

TokenBlacklistProtocol

令牌黑名单 Protocol。

AuthConfig

认证配置。

AuthenticatedUser

已认证的用户信息。

TokenPair

令牌对,包含 Access Token 和 Refresh Token。

TokenPayload

JWT 令牌解码后的 payload。

Permission

权限定义。

RBACPolicy

RBAC 策略引擎。

RoleDefinition

角色定义。

JWTService

JWT 令牌服务。

Package Contents#

class taolib.testing.auth.APIKeyLookupProtocol#

Bases: Protocol

API 密钥查找 Protocol。

定义 API 密钥认证的标准接口,支持多种存储后端。

async lookup(api_key: str) taolib.testing.auth.models.AuthenticatedUser | None#

根据 API 密钥查找对应的用户信息。

参数:

api_key -- API 密钥字符串

返回:

如果密钥有效返回 AuthenticatedUser,否则返回 None

class taolib.testing.auth.StaticAPIKeyLookup(keys: dict[str, taolib.testing.auth.models.AuthenticatedUser])#

基于静态配置的 API 密钥查找。

从构造时传入的字典中查找 API 密钥,适合小规模部署 或配置文件管理的固定密钥。

参数:

keys -- API 密钥到用户信息的映射

_keys#
async lookup(api_key: str) taolib.testing.auth.models.AuthenticatedUser | None#

从静态配置中查找 API 密钥。

class taolib.testing.auth.InMemoryTokenBlacklist#

基于内存的令牌黑名单。

使用字典存储,检查时自动清理过期条目。适用于测试和单进程开发环境。

_store: dict[str, float]#
async add(jti: str, ttl_seconds: int) None#

将令牌加入内存黑名单。

async is_blacklisted(jti: str) bool#

检查令牌是否在内存黑名单中。

class taolib.testing.auth.NullTokenBlacklist#

空操作黑名单。

不执行任何操作,is_blacklisted 始终返回 False。 用于不需要黑名单功能的场景。

async add(jti: str, ttl_seconds: int) None#

空操作。

async is_blacklisted(jti: str) bool#

始终返回 False。

class taolib.testing.auth.RedisTokenBlacklist(redis_client: Any, key_prefix: str = 'taolib:auth:blacklist:')#

基于 Redis 的令牌黑名单。

使用 Redis SET + EX 命令存储,TTL 自动过期确保黑名单不会无限增长。

参数:
  • redis_client -- redis.asyncio.Redis 客户端实例

  • key_prefix -- Redis 键前缀

_redis#
_prefix = 'taolib:auth:blacklist:'#
async add(jti: str, ttl_seconds: int) None#

将令牌加入 Redis 黑名单。

async is_blacklisted(jti: str) bool#

检查令牌是否在 Redis 黑名单中。

class taolib.testing.auth.TokenBlacklistProtocol#

Bases: Protocol

令牌黑名单 Protocol。

定义令牌黑名单的标准接口,支持多种存储后端。

async add(jti: str, ttl_seconds: int) None#

将令牌加入黑名单。

参数:
  • jti -- 令牌的唯一标识(JWT ID)

  • ttl_seconds -- 黑名单条目的存活时间(秒),应与令牌剩余有效期对齐

async is_blacklisted(jti: str) bool#

检查令牌是否在黑名单中。

参数:

jti -- 令牌的唯一标识

返回:

如果令牌已被吊销返回 True

class taolib.testing.auth.AuthConfig#

认证配置。

所有认证参数通过构造函数注入,不依赖全局 settings 单例。

参数:
  • jwt_secret -- JWT 签名密钥(必填)

  • jwt_algorithm -- JWT 签名算法

  • access_token_ttl -- Access Token 有效期

  • refresh_token_ttl -- Refresh Token 有效期

  • token_issuer -- 可选的 JWT iss 声明

  • blacklist_key_prefix -- Redis 黑名单键前缀

jwt_secret: str#
jwt_algorithm: str = 'HS256'#
access_token_ttl: datetime.timedelta#
refresh_token_ttl: datetime.timedelta#
token_issuer: str | None = None#
blacklist_key_prefix: str = 'taolib:auth:blacklist:'#
classmethod from_env(prefix: str = 'TAOLIB_AUTH_') Self#

从环境变量创建配置。

参数:

prefix -- 环境变量前缀

返回:

AuthConfig 实例

抛出:

ValueError -- 如果必填的 JWT_SECRET 未设置

exception taolib.testing.auth.APIKeyInvalidError(message: str = '无效的 API 密钥')#

Bases: AuthError

API 密钥无效或未找到。

exception taolib.testing.auth.AuthError(message: str = '认证错误')#

Bases: Exception

所有认证错误的基类。

message = '认证错误'#
exception taolib.testing.auth.InsufficientPermissionError(message: str = '权限不足')#

Bases: AuthError

权限不足。

exception taolib.testing.auth.TokenBlacklistedError(message: str = '令牌已被吊销')#

Bases: AuthError

令牌已被吊销(在黑名单中)。

exception taolib.testing.auth.TokenExpiredError(message: str = '令牌已过期,请刷新')#

Bases: AuthError

令牌已过期。

exception taolib.testing.auth.TokenInvalidError(message: str = '无效的认证令牌', detail: str | None = None)#

Bases: AuthError

令牌无效(解码失败或签名不匹配)。

detail = None#
class taolib.testing.auth.AuthenticatedUser#

已认证的用户信息。

附加到 request.state.user,供下游处理使用。

参数:
  • user_id -- 用户 ID

  • roles -- 用户角色列表

  • auth_method -- 认证方式("jwt""api_key"

  • metadata -- 扩展信息(如 API Key 名称、JWT 额外声明等)

user_id: str#
roles: list[str]#
auth_method: str#
metadata: dict[str, Any]#
class taolib.testing.auth.TokenPair#

令牌对,包含 Access Token 和 Refresh Token。

参数:
  • access_token -- 访问令牌

  • refresh_token -- 刷新令牌

  • token_type -- 令牌类型标识

  • expires_in -- Access Token 有效秒数

access_token: str#
refresh_token: str#
token_type: str = 'bearer'#
expires_in: int = 3600#
class taolib.testing.auth.TokenPayload#

JWT 令牌解码后的 payload。

参数:
  • sub -- 用户 ID(JWT 标准 subject 声明)

  • roles -- 用户角色列表

  • exp -- 过期时间

  • iat -- 签发时间

  • type -- 令牌类型("access""refresh"

  • jti -- 令牌唯一标识(JWT ID),用于黑名单

sub: str#
roles: list[str]#
exp: datetime.datetime#
iat: datetime.datetime#
type: str#
jti: str#
class taolib.testing.auth.Permission#

权限定义。

参数:
  • resource -- 资源类型

  • action -- 操作类型

resource: str#
action: str#
class taolib.testing.auth.RBACPolicy(roles: dict[str, RoleDefinition])#

RBAC 策略引擎。

提供基于角色的权限检查和作用域验证。

参数:

roles -- 角色名称到 RoleDefinition 的映射

_roles#
get_role(name: str) RoleDefinition | None#

获取角色定义。

参数:

name -- 角色名称

返回:

角色定义,如不存在返回 None

has_permission(user_roles: list[str], resource: str, action: str) bool#

检查用户角色列表中是否有任何角色拥有指定权限。

参数:
  • user_roles -- 用户拥有的角色名称列表

  • resource -- 资源类型

  • action -- 操作类型

返回:

是否有权限

has_scope(user_roles: list[str], scope_type: str, scope_value: str) bool#

检查用户角色列表中是否有任何角色在指定作用域内。

参数:
  • user_roles -- 用户拥有的角色名称列表

  • scope_type -- 作用域类型(如 "environment", "service"

  • scope_value -- 作用域值(如 "production", "auth-service"

返回:

是否在作用域内

classmethod from_dict(data: dict[str, dict[str, Any]]) Self#

从字典构建 RBAC 策略。

兼容现有 RBACService.SYSTEM_ROLES 的数据格式。

参数:

data -- 角色名称到角色数据字典的映射。每个角色数据应包含: - description: 角色描述 - permissions: 权限列表,每项含 resourceactions - 其他以 _scope 结尾的键将被解析为作用域

返回:

RBACPolicy 实例

class taolib.testing.auth.RoleDefinition#

角色定义。

参数:
  • name -- 角色名称

  • description -- 角色描述

  • permissions -- 权限列表

  • scopes -- 作用域映射,键为作用域类型,值为允许的范围列表。 None 值表示该作用域无限制。

name: str#
description: str#
permissions: list[Permission] = []#
scopes: dict[str, list[str] | None]#
class taolib.testing.auth.JWTService(config: taolib.testing.auth.config.AuthConfig)#

JWT 令牌服务。

AuthConfig 注入初始化,提供令牌创建和验证的完整功能。

参数:

config -- 认证配置实例

_config#
property config: taolib.testing.auth.config.AuthConfig#

获取认证配置。

create_access_token(user_id: str, roles: list[str], extra_claims: dict[str, Any] | None = None) str#

生成 Access Token。

参数:
  • user_id -- 用户 ID

  • roles -- 用户角色列表

  • extra_claims -- 额外的 JWT 声明

返回:

JWT Token 字符串

create_refresh_token(user_id: str, extra_claims: dict[str, Any] | None = None) str#

生成 Refresh Token。

Refresh Token 不包含 roles,刷新时应重新从用户系统获取最新角色。

参数:
  • user_id -- 用户 ID

  • extra_claims -- 额外的 JWT 声明

返回:

JWT Token 字符串

create_token_pair(user_id: str, roles: list[str]) taolib.testing.auth.models.TokenPair#

同时生成 Access Token 和 Refresh Token。

参数:
  • user_id -- 用户 ID

  • roles -- 用户角色列表

返回:

TokenPair 实例

decode_token(token: str) taolib.testing.auth.models.TokenPayload#

解码任意类型的 JWT 令牌。

参数:

token -- JWT Token 字符串

返回:

TokenPayload 实例

抛出:
verify_access_token(token: str) taolib.testing.auth.models.TokenPayload#

验证 Access Token。

解码令牌并确认类型为 "access"

参数:

token -- JWT Token 字符串

返回:

TokenPayload 实例

抛出:
verify_refresh_token(token: str) taolib.testing.auth.models.TokenPayload#

验证 Refresh Token。

解码令牌并确认类型为 "refresh"

参数:

token -- JWT Token 字符串

返回:

TokenPayload 实例

抛出:
static _payload_from_dict(data: dict[str, Any]) taolib.testing.auth.models.TokenPayload#

从字典构建 TokenPayload。

兼容旧版令牌(可能缺少 jti/iat 字段)。

参数:

data -- JWT payload 字典

返回:

TokenPayload 实例