taolib.testing.audit

目录

taolib.testing.audit#

审计日志模块。

提供审计日志的记录、存储和查询功能。

示例

基本使用:

from taolib.testing.audit import AuditLogger, InMemoryAuditStorage

# 创建存储后端
storage = InMemoryAuditStorage()

# 创建审计日志记录器
audit_logger = AuditLogger(storage)

# 记录日志
await audit_logger.log_create(
    resource_type="user",
    resource_id="user-123",
    user_id="admin",
    details={"name": "John Doe"},
)

FastAPI 中间件:

from fastapi import FastAPI
from taolib.testing.audit import AuditMiddleware, AuditLogger, InMemoryAuditStorage

app = FastAPI()
storage = InMemoryAuditStorage()
audit_logger = AuditLogger(storage)
app.add_middleware(AuditMiddleware, audit_logger=audit_logger)

MongoDB 存储:

from motor.motor_asyncio import AsyncIOMotorClient
from taolib.testing.audit import AuditLogger, MongoDBAuditStorage

client = AsyncIOMotorClient("mongodb://localhost:27017")
storage = MongoDBAuditStorage(client, database_name="myapp")
await storage.create_indexes()
audit_logger = AuditLogger(storage)

Submodules#

Exceptions#

AuditConfigError

审计日志配置错误。

AuditError

所有审计日志错误的基类。

AuditStorageError

审计日志存储错误。

Classes#

AuditLogger

审计日志记录器。

AuditStorageProtocol

审计日志存储后端协议。

FileAuditStorage

文件审计日志存储。

InMemoryAuditStorage

内存审计日志存储。

MongoDBAuditStorage

MongoDB 审计日志存储。

AuditMiddleware

FastAPI 审计日志中间件。

AuditAction

审计操作类型枚举。

AuditLog

审计日志模型。

AuditLogCreate

创建审计日志请求模型。

AuditLogFilter

审计日志查询过滤器。

AuditLogListResponse

审计日志列表响应模型。

AuditLogResponse

审计日志响应模型。

AuditStatus

审计操作状态枚举。

RequestAuditInfo

请求审计信息。

Functions#

extract_client_ip(→ str)

提取客户端 IP 地址。

extract_user_id(→ str | None)

尝试从请求中提取用户 ID。

Package Contents#

exception taolib.testing.audit.AuditConfigError(message: str = '审计日志配置错误')#

Bases: AuditError

审计日志配置错误。

exception taolib.testing.audit.AuditError(message: str = '审计日志错误')#

Bases: Exception

所有审计日志错误的基类。

message = '审计日志错误'#
exception taolib.testing.audit.AuditStorageError(message: str = '审计日志存储失败')#

Bases: AuditError

审计日志存储错误。

class taolib.testing.audit.AuditLogger(storage: AuditStorageProtocol, default_user_id: str | None = None, default_ip_address: str | None = None)#

审计日志记录器。

提供审计日志的记录功能,支持同步和异步操作。

_storage#

存储后端

_default_user_id#

默认用户 ID

_default_ip_address#

默认 IP 地址

_storage#
_default_user_id = None#
_default_ip_address = None#
async log(action: str, resource_type: str, resource_id: str | None = None, user_id: str | None = None, details: dict[str, Any] | None = None, ip_address: str | None = None, user_agent: str | None = None, status: str = 'success', error_message: str | None = None) taolib.testing.audit.models.AuditLog#

记录审计日志。

参数:
  • action -- 操作类型

  • resource_type -- 资源类型

  • resource_id -- 资源 ID

  • user_id -- 用户 ID

  • details -- 操作详情

  • ip_address -- IP 地址

  • user_agent -- User-Agent

  • status -- 操作状态

  • error_message -- 错误信息

返回:

创建的审计日志实例

async log_create(resource_type: str, resource_id: str, user_id: str | None = None, details: dict[str, Any] | None = None, ip_address: str | None = None, user_agent: str | None = None) taolib.testing.audit.models.AuditLog#

记录创建操作。

参数:
  • resource_type -- 资源类型

  • resource_id -- 资源 ID

  • user_id -- 用户 ID

  • details -- 操作详情

  • ip_address -- IP 地址

  • user_agent -- User-Agent

返回:

创建的审计日志实例

async log_update(resource_type: str, resource_id: str, user_id: str | None = None, details: dict[str, Any] | None = None, ip_address: str | None = None, user_agent: str | None = None) taolib.testing.audit.models.AuditLog#

记录更新操作。

参数:
  • resource_type -- 资源类型

  • resource_id -- 资源 ID

  • user_id -- 用户 ID

  • details -- 操作详情

  • ip_address -- IP 地址

  • user_agent -- User-Agent

返回:

创建的审计日志实例

async log_delete(resource_type: str, resource_id: str, user_id: str | None = None, details: dict[str, Any] | None = None, ip_address: str | None = None, user_agent: str | None = None) taolib.testing.audit.models.AuditLog#

记录删除操作。

参数:
  • resource_type -- 资源类型

  • resource_id -- 资源 ID

  • user_id -- 用户 ID

  • details -- 操作详情

  • ip_address -- IP 地址

  • user_agent -- User-Agent

返回:

创建的审计日志实例

async log_login(user_id: str, ip_address: str | None = None, user_agent: str | None = None, success: bool = True, error_message: str | None = None) taolib.testing.audit.models.AuditLog#

记录登录操作。

参数:
  • user_id -- 用户 ID

  • ip_address -- IP 地址

  • user_agent -- User-Agent

  • success -- 是否成功

  • error_message -- 错误信息

返回:

创建的审计日志实例

async log_logout(user_id: str, ip_address: str | None = None, user_agent: str | None = None) taolib.testing.audit.models.AuditLog#

记录登出操作。

参数:
  • user_id -- 用户 ID

  • ip_address -- IP 地址

  • user_agent -- User-Agent

返回:

创建的审计日志实例

async query(filter_params: taolib.testing.audit.models.AuditLogFilter) list[taolib.testing.audit.models.AuditLogResponse]#

查询审计日志。

参数:

filter_params -- 查询过滤器

返回:

匹配的审计日志列表

async count(filter_params: taolib.testing.audit.models.AuditLogFilter) int#

统计审计日志数量。

参数:

filter_params -- 查询过滤器

返回:

匹配的日志数量

async delete_old_logs(days: int = 90) int#

删除旧日志。

参数:

days -- 保留天数

返回:

删除的日志数量

class taolib.testing.audit.AuditStorageProtocol#

Bases: Protocol

审计日志存储后端协议。

定义审计日志存储后端需要实现的所有操作。

async save(log: taolib.testing.audit.models.AuditLog) None#

保存审计日志。

参数:

log -- 审计日志实例

async save_batch(logs: collections.abc.Sequence[taolib.testing.audit.models.AuditLog]) None#

批量保存审计日志。

参数:

logs -- 审计日志列表

async query(filter_params: taolib.testing.audit.models.AuditLogFilter) list[taolib.testing.audit.models.AuditLogResponse]#

查询审计日志。

参数:

filter_params -- 查询过滤器

返回:

匹配的审计日志列表

async count(filter_params: taolib.testing.audit.models.AuditLogFilter) int#

统计审计日志数量。

参数:

filter_params -- 查询过滤器

返回:

匹配的日志数量

async delete_old_logs(before: datetime.datetime) int#

删除旧日志。

参数:

before -- 删除此时间之前的日志

返回:

删除的日志数量

class taolib.testing.audit.FileAuditStorage(file_path: str | pathlib.Path, max_size: int = 100000)#

文件审计日志存储。

将审计日志保存到 JSON 文件中。

_file_path#

日志文件路径

_max_size#

最大存储数量

_file_path#
_max_size = 100000#
_ensure_file_exists() None#

确保日志文件存在。

_read_logs() list[taolib.testing.audit.models.AuditLog]#

读取所有日志。

返回:

日志列表

_write_logs(logs: list[taolib.testing.audit.models.AuditLog]) None#

写入所有日志。

参数:

logs -- 日志列表

async save(log: taolib.testing.audit.models.AuditLog) None#

保存审计日志。

参数:

log -- 审计日志实例

async save_batch(logs: collections.abc.Sequence[taolib.testing.audit.models.AuditLog]) None#

批量保存审计日志。

参数:

logs -- 审计日志列表

async query(filter_params: taolib.testing.audit.models.AuditLogFilter) list[taolib.testing.audit.models.AuditLogResponse]#

查询审计日志。

参数:

filter_params -- 查询过滤器

返回:

匹配的审计日志列表

async count(filter_params: taolib.testing.audit.models.AuditLogFilter) int#

统计审计日志数量。

参数:

filter_params -- 查询过滤器

返回:

匹配的日志数量

async delete_old_logs(before: datetime.datetime) int#

删除旧日志。

参数:

before -- 删除此时间之前的日志

返回:

删除的日志数量

_match_filter(log: taolib.testing.audit.models.AuditLog, filter_params: taolib.testing.audit.models.AuditLogFilter) bool#

检查日志是否匹配过滤器。

class taolib.testing.audit.InMemoryAuditStorage(max_size: int = 10000)#

内存审计日志存储。

适用于测试和开发环境。

_logs#

日志存储列表

_max_size#

最大存储数量

_logs: list[taolib.testing.audit.models.AuditLog] = []#
_max_size = 10000#
async save(log: taolib.testing.audit.models.AuditLog) None#

保存审计日志。

参数:

log -- 审计日志实例

async save_batch(logs: collections.abc.Sequence[taolib.testing.audit.models.AuditLog]) None#

批量保存审计日志。

参数:

logs -- 审计日志列表

async query(filter_params: taolib.testing.audit.models.AuditLogFilter) list[taolib.testing.audit.models.AuditLogResponse]#

查询审计日志。

参数:

filter_params -- 查询过滤器

返回:

匹配的审计日志列表

async count(filter_params: taolib.testing.audit.models.AuditLogFilter) int#

统计审计日志数量。

参数:

filter_params -- 查询过滤器

返回:

匹配的日志数量

async delete_old_logs(before: datetime.datetime) int#

删除旧日志。

参数:

before -- 删除此时间之前的日志

返回:

删除的日志数量

_match_filter(log: taolib.testing.audit.models.AuditLog, filter_params: taolib.testing.audit.models.AuditLogFilter) bool#

检查日志是否匹配过滤器。

参数:
  • log -- 审计日志

  • filter_params -- 过滤器

返回:

是否匹配

class taolib.testing.audit.MongoDBAuditStorage(client: motor.motor_asyncio.AsyncIOMotorClient, database_name: str = 'audit', collection_name: str = 'logs')#

MongoDB 审计日志存储。

使用 MongoDB 作为审计日志存储后端。

_client#

MongoDB 客户端

_database_name#

数据库名称

_collection_name#

集合名称

_client#
_database_name = 'audit'#
_collection_name = 'logs'#
_collection#
async save(log: taolib.testing.audit.models.AuditLog) None#

保存审计日志。

参数:

log -- 审计日志实例

async save_batch(logs: collections.abc.Sequence[taolib.testing.audit.models.AuditLog]) None#

批量保存审计日志。

参数:

logs -- 审计日志列表

async query(filter_params: taolib.testing.audit.models.AuditLogFilter) list[taolib.testing.audit.models.AuditLogResponse]#

查询审计日志。

参数:

filter_params -- 查询过滤器

返回:

匹配的审计日志列表

async count(filter_params: taolib.testing.audit.models.AuditLogFilter) int#

统计审计日志数量。

参数:

filter_params -- 查询过滤器

返回:

匹配的日志数量

async delete_old_logs(before: datetime.datetime) int#

删除旧日志。

参数:

before -- 删除此时间之前的日志

返回:

删除的日志数量

async create_indexes() None#

创建索引。

_build_query(filter_params: taolib.testing.audit.models.AuditLogFilter) dict[str, Any]#

构建 MongoDB 查询条件。

参数:

filter_params -- 过滤器

返回:

MongoDB 查询字典

class taolib.testing.audit.AuditMiddleware(app: Any, audit_logger: taolib.testing.audit.logger.AuditLogger, exclude_paths: set[str] | None = None, include_request_body: bool = False, include_response_body: bool = False, sensitive_body_paths: set[str] | None = None)#

Bases: starlette.middleware.base.BaseHTTPMiddleware

FastAPI 审计日志中间件。

自动记录所有 API 请求的审计日志,包括: - 请求方法和路径 - 客户端 IP 和 User-Agent - 用户 ID(如果已认证) - 响应状态码和响应时间

参数:
  • app -- ASGI 应用

  • audit_logger -- 审计日志记录器

  • exclude_paths -- 排除的路径前缀集合

  • include_request_body -- 是否记录请求体

  • include_response_body -- 是否记录响应体

  • sensitive_body_paths -- 包含敏感数据的路径前缀集合

_audit_logger#
_exclude_paths#
_include_request_body = False#
_include_response_body = False#
_sensitive_body_paths#
_should_skip(path: str) bool#

检查路径是否应跳过审计。

参数:

path -- 请求路径

返回:

是否跳过

_is_sensitive_path(path: str) bool#

检查路径是否包含敏感数据。

参数:

path -- 请求路径

返回:

是否敏感

async dispatch(request: fastapi.Request, call_next: Any) fastapi.Response#

处理请求并记录审计日志。

参数:
  • request -- 接收的请求

  • call_next -- 下一个处理函数

返回:

响应对象

_determine_action(method: str, path: str) taolib.testing.audit.models.AuditAction#

根据请求方法和路径确定操作类型。

参数:
  • method -- HTTP 方法

  • path -- 请求路径

返回:

操作类型

taolib.testing.audit.extract_client_ip(request: fastapi.Request) str#

提取客户端 IP 地址。

优先级: 1. X-Forwarded-For 第一个 IP(代理场景) 2. X-Real-IP 3. request.client.host(直接连接)

参数:

request -- FastAPI 请求对象

返回:

客户端 IP 地址

taolib.testing.audit.extract_user_id(request: fastapi.Request) str | None#

尝试从请求中提取用户 ID。

从 request.state.user 或自定义头 X-User-ID 提取。

参数:

request -- FastAPI 请求对象

返回:

用户 ID 或 None

class taolib.testing.audit.AuditAction#

Bases: enum.StrEnum

审计操作类型枚举。

CREATE = 'create'#
READ = 'read'#
UPDATE = 'update'#
DELETE = 'delete'#
LOGIN = 'login'#
LOGOUT = 'logout'#
LOGIN_FAILED = 'login.failed'#
EXPORT = 'export'#
IMPORT = 'import'#
EXECUTE = 'execute'#
ACCESS = 'access'#
class taolib.testing.audit.AuditLog#

Bases: pydantic.BaseModel

审计日志模型。

记录系统中所有重要操作的审计信息。

id#

日志唯一标识符

timestamp#

操作时间戳

user_id#

操作用户 ID

action#

操作类型

resource_type#

资源类型

resource_id#

资源 ID

details#

操作详情

ip_address#

客户端 IP 地址

user_agent#

客户端 User-Agent

status#

操作状态

error_message#

错误信息(失败时)

id: uuid.UUID#
timestamp: datetime.datetime#
user_id: str | None#
action: AuditAction#
resource_type: str#
resource_id: str | None#
details: dict[str, Any]#
ip_address: str | None#
user_agent: str | None#
status: AuditStatus#
error_message: str | None#
model_config#
class taolib.testing.audit.AuditLogCreate#

Bases: pydantic.BaseModel

创建审计日志请求模型。

user_id#

操作用户 ID

action#

操作类型

resource_type#

资源类型

resource_id#

资源 ID

details#

操作详情

ip_address#

客户端 IP 地址

user_agent#

客户端 User-Agent

status#

操作状态

error_message#

错误信息

user_id: str | None#
action: AuditAction#
resource_type: str#
resource_id: str | None#
details: dict[str, Any]#
ip_address: str | None#
user_agent: str | None#
status: AuditStatus#
error_message: str | None#
class taolib.testing.audit.AuditLogFilter#

Bases: pydantic.BaseModel

审计日志查询过滤器。

user_id#

按用户 ID 过滤

action#

按操作类型过滤

resource_type#

按资源类型过滤

resource_id#

按资源 ID 过滤

status#

按状态过滤

start_time#

开始时间

end_time#

结束时间

ip_address#

按 IP 地址过滤

limit#

返回数量限制

offset#

偏移量

user_id: str | None#
action: AuditAction | None#
resource_type: str | None#
resource_id: str | None#
status: AuditStatus | None#
start_time: datetime.datetime | None#
end_time: datetime.datetime | None#
ip_address: str | None#
limit: int#
offset: int#
class taolib.testing.audit.AuditLogListResponse#

Bases: pydantic.BaseModel

审计日志列表响应模型。

items#

日志列表

total#

总数量

limit#

每页数量

offset#

偏移量

items: list[AuditLogResponse]#
total: int#
limit: int#
offset: int#
class taolib.testing.audit.AuditLogResponse#

Bases: pydantic.BaseModel

审计日志响应模型。

id#

日志唯一标识符

timestamp#

操作时间戳

user_id#

操作用户 ID

action#

操作类型

resource_type#

资源类型

resource_id#

资源 ID

details#

操作详情

ip_address#

客户端 IP 地址

user_agent#

客户端 User-Agent

status#

操作状态

error_message#

错误信息

id: uuid.UUID#
timestamp: datetime.datetime#
user_id: str | None#
action: AuditAction#
resource_type: str#
resource_id: str | None#
details: dict[str, Any]#
ip_address: str | None#
user_agent: str | None#
status: AuditStatus#
error_message: str | None#
model_config#
class taolib.testing.audit.AuditStatus#

Bases: enum.StrEnum

审计操作状态枚举。

SUCCESS = 'success'#
FAILED = 'failed'#
class taolib.testing.audit.RequestAuditInfo#

Bases: pydantic.BaseModel

请求审计信息。

用于中间件自动收集请求信息。

method#

HTTP 方法

path#

请求路径

query_params#

查询参数

path_params#

路径参数

headers#

请求头(敏感信息已过滤)

status_code#

响应状态码

response_time_ms#

响应时间(毫秒)

method: str#
path: str#
query_params: dict[str, str]#
path_params: dict[str, str]#
headers: dict[str, str]#
status_code: int | None#
response_time_ms: float | None#