taolib.testing._base#
共享基础模块。
提供跨模块复用的基础设施组件:泛型 Repository 基类和 Redis 连接池管理。
Submodules#
Classes#
异步 Repository 基类。 |
Functions#
|
关闭 Redis 连接。 |
|
获取 Redis 客户端(单例模式)。 |
Package Contents#
- async taolib.testing._base.get_redis_client(redis_url: str = 'redis://localhost:6379') redis.asyncio.Redis#
获取 Redis 客户端(单例模式)。
- 参数:
redis_url -- Redis 连接字符串
- 返回:
Redis 异步客户端实例
- class taolib.testing._base.AsyncRepository(collection: motor.motor_asyncio.AsyncIOMotorCollection, model_class: type[T])#
Bases:
abc.ABC,Generic[T]异步 Repository 基类。
- _collection#
- _model_class#
- async get_by_id(doc_id: str) T | None#
根据 ID 获取文档。
- 参数:
doc_id -- 文档 ID
- 返回:
Pydantic 模型实例,如果不存在则返回 None
- async update(doc_id: str, updates: dict[str, Any]) T | None#
更新文档。
- 参数:
doc_id -- 文档 ID
updates -- 更新字段字典
- 返回:
更新后的 Pydantic 模型实例,如果不存在则返回 None