taolib.testing.file_storage.repository#
Repository 模块。
导出所有 Repository 类。
Submodules#
- taolib.testing.file_storage.repository.bucket_repo
- taolib.testing.file_storage.repository.chunk_repo
- taolib.testing.file_storage.repository.file_repo
- taolib.testing.file_storage.repository.thumbnail_repo
- taolib.testing.file_storage.repository.upload_repo
- taolib.testing.file_storage.repository.version_repo
Classes#
存储桶 Repository。 |
|
分片记录 Repository。 |
|
文件元数据 Repository。 |
|
缩略图 Repository。 |
|
上传会话 Repository。 |
|
文件版本 Repository。 |
Package Contents#
- class taolib.testing.file_storage.repository.BucketRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.file_storage.models.bucket.BucketDocument]存储桶 Repository。
- async find_by_name(name: str) taolib.testing.file_storage.models.bucket.BucketDocument | None#
根据名称查找存储桶。
- async find_by_tags(tags: list[str]) list[taolib.testing.file_storage.models.bucket.BucketDocument]#
根据标签查找存储桶。
- class taolib.testing.file_storage.repository.ChunkRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.file_storage.models.upload.ChunkRecord]分片记录 Repository。
- async find_by_session(session_id: str) list[taolib.testing.file_storage.models.upload.ChunkRecord]#
查找会话的所有分片。
- async find_by_session_and_index(session_id: str, chunk_index: int) taolib.testing.file_storage.models.upload.ChunkRecord | None#
查找指定分片。
- class taolib.testing.file_storage.repository.FileRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.file_storage.models.file.FileMetadataDocument]文件元数据 Repository。
- async find_by_bucket(bucket_id: str, prefix: str | None = None, skip: int = 0, limit: int = 100) list[taolib.testing.file_storage.models.file.FileMetadataDocument]#
查找桶内文件。
- async find_by_object_key(bucket_id: str, object_key: str) taolib.testing.file_storage.models.file.FileMetadataDocument | None#
根据桶 ID 和对象键查找文件。
- async find_expired_files(before: datetime.datetime) list[taolib.testing.file_storage.models.file.FileMetadataDocument]#
查找已过期文件。
- async find_by_tags(tags: list[str], bucket_id: str | None = None) list[taolib.testing.file_storage.models.file.FileMetadataDocument]#
根据标签查找文件。
- async find_by_media_type(media_type: taolib.testing.file_storage.models.enums.MediaType, bucket_id: str | None = None) list[taolib.testing.file_storage.models.file.FileMetadataDocument]#
根据媒体类型查找文件。
- async update_status(file_id: str, status: taolib.testing.file_storage.models.enums.FileStatus) taolib.testing.file_storage.models.file.FileMetadataDocument | None#
更新文件状态。
- class taolib.testing.file_storage.repository.ThumbnailRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.file_storage.models.thumbnail.ThumbnailDocument]缩略图 Repository。
- async find_by_file(file_id: str) list[taolib.testing.file_storage.models.thumbnail.ThumbnailDocument]#
查找文件的所有缩略图。
- async find_by_file_and_size(file_id: str, size: taolib.testing.file_storage.models.enums.ThumbnailSize) taolib.testing.file_storage.models.thumbnail.ThumbnailDocument | None#
查找指定文件的指定规格缩略图。
- class taolib.testing.file_storage.repository.UploadSessionRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.file_storage.models.upload.UploadSessionDocument]上传会话 Repository。
- async find_active_by_user(user_id: str) list[taolib.testing.file_storage.models.upload.UploadSessionDocument]#
查找用户的活跃上传会话。
- async find_expired_sessions(before: datetime.datetime) list[taolib.testing.file_storage.models.upload.UploadSessionDocument]#
查找已过期的上传会话。
- async update_status(session_id: str, status: taolib.testing.file_storage.models.enums.UploadStatus) taolib.testing.file_storage.models.upload.UploadSessionDocument | None#
更新上传状态。
- class taolib.testing.file_storage.repository.FileVersionRepository(collection)#
Bases:
taolib.testing._base.repository.AsyncRepository[taolib.testing.file_storage.models.version.FileVersionDocument]文件版本 Repository。
- async find_by_file(file_id: str, skip: int = 0, limit: int = 50) list[taolib.testing.file_storage.models.version.FileVersionDocument]#
查找文件的版本历史(按版本号降序)。
- async find_latest(file_id: str) taolib.testing.file_storage.models.version.FileVersionDocument | None#
获取最新版本。