taolib.testing.file_storage.services#
服务层模块。
导出所有服务类。
Submodules#
- taolib.testing.file_storage.services.access_service
- taolib.testing.file_storage.services.bucket_service
- taolib.testing.file_storage.services.file_service
- taolib.testing.file_storage.services.lifecycle_service
- taolib.testing.file_storage.services.stats_service
- taolib.testing.file_storage.services.upload_service
Classes#
访问控制服务。 |
|
存储桶管理服务。 |
|
文件管理服务。 |
|
生命周期管理服务。 |
|
统计服务。 |
|
分片上传服务。 |
Package Contents#
- class taolib.testing.file_storage.services.AccessService(file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol, cdn_provider: taolib.testing.file_storage.cdn.protocols.CDNProviderProtocol | None = None, signed_url_secret: str = '')#
访问控制服务。
- _file_repo#
- _storage_backend#
- _cdn_provider = None#
- _secret = ''#
- class taolib.testing.file_storage.services.BucketService(bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol)#
存储桶管理服务。
- _bucket_repo#
- _storage_backend#
- async create_bucket(data: taolib.testing.file_storage.models.bucket.BucketCreate, user_id: str = 'system') taolib.testing.file_storage.models.bucket.BucketResponse#
创建存储桶。
- async get_bucket(bucket_id: str) taolib.testing.file_storage.models.bucket.BucketResponse | None#
获取存储桶详情。
- async update_bucket(bucket_id: str, data: taolib.testing.file_storage.models.bucket.BucketUpdate, user_id: str = 'system') taolib.testing.file_storage.models.bucket.BucketResponse | None#
更新存储桶配置。
- async list_buckets(skip: int = 0, limit: int = 100) list[taolib.testing.file_storage.models.bucket.BucketResponse]#
列出所有存储桶。
- class taolib.testing.file_storage.services.FileService(file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, thumbnail_repo: taolib.testing.file_storage.repository.thumbnail_repo.ThumbnailRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol, pipeline: taolib.testing.file_storage.processing.pipeline.ProcessingPipeline, cdn_provider: taolib.testing.file_storage.cdn.protocols.CDNProviderProtocol | None = None)#
文件管理服务。
- _file_repo#
- _bucket_repo#
- _thumbnail_repo#
- _storage_backend#
- _pipeline#
- _cdn_provider = None#
- async upload_file(bucket_id: str, object_key: str, data: bytes, filename: str, content_type: str, user_id: str = 'system', access_level: taolib.testing.file_storage.models.enums.AccessLevel = AccessLevel.PRIVATE, tags: list[str] | None = None, custom_metadata: dict[str, str] | None = None) taolib.testing.file_storage.models.file.FileMetadataResponse#
上传文件(简单上传,适用于小文件)。
- async get_file(file_id: str) taolib.testing.file_storage.models.file.FileMetadataResponse | None#
获取文件元数据。
- async download_file(file_id: str) collections.abc.AsyncIterator[bytes]#
下载文件(流式)。
- async update_metadata(file_id: str, data: taolib.testing.file_storage.models.file.FileMetadataUpdate, user_id: str = 'system') taolib.testing.file_storage.models.file.FileMetadataResponse | None#
更新文件元数据。
- class taolib.testing.file_storage.services.LifecycleService(file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, version_repo: taolib.testing.file_storage.repository.version_repo.FileVersionRepository, thumbnail_repo: taolib.testing.file_storage.repository.thumbnail_repo.ThumbnailRepository, upload_repo: taolib.testing.file_storage.repository.upload_repo.UploadSessionRepository, chunk_repo: taolib.testing.file_storage.repository.chunk_repo.ChunkRepository, bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol)#
生命周期管理服务。
- _file_repo#
- _version_repo#
- _thumbnail_repo#
- _upload_repo#
- _chunk_repo#
- _bucket_repo#
- _storage_backend#
- async create_file_version(file_id: str, user_id: str = 'system') taolib.testing.file_storage.models.version.FileVersionResponse#
为当前文件创建版本快照。
- class taolib.testing.file_storage.services.StatsService(bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, upload_repo: taolib.testing.file_storage.repository.upload_repo.UploadSessionRepository)#
统计服务。
- _bucket_repo#
- _file_repo#
- _upload_repo#
- async get_bucket_stats(bucket_id: str) taolib.testing.file_storage.models.stats.BucketStatsResponse | None#
获取存储桶统计。
- async get_storage_overview() taolib.testing.file_storage.models.stats.StorageOverviewResponse#
获取全局存储概览。
- async get_upload_stats() taolib.testing.file_storage.models.stats.UploadStatsResponse#
获取上传统计。
- class taolib.testing.file_storage.services.UploadService(upload_repo: taolib.testing.file_storage.repository.upload_repo.UploadSessionRepository, chunk_repo: taolib.testing.file_storage.repository.chunk_repo.ChunkRepository, file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol, upload_session_ttl_hours: int = 24)#
分片上传服务。
- _upload_repo#
- _chunk_repo#
- _file_repo#
- _bucket_repo#
- _storage_backend#
- _ttl_hours = 24#
- async init_upload(data: taolib.testing.file_storage.models.upload.UploadSessionCreate, user_id: str = 'system') taolib.testing.file_storage.models.upload.UploadSessionResponse#
初始化分片上传会话。
- async upload_chunk(session_id: str, chunk_index: int, data: bytes, checksum: str | None = None) taolib.testing.file_storage.models.upload.ChunkRecord#
上传分片。
- async complete_upload(session_id: str, user_id: str = 'system') taolib.testing.file_storage.models.file.FileMetadataResponse#
完成分片上传。
- async get_upload_status(session_id: str) taolib.testing.file_storage.models.upload.UploadSessionResponse | None#
获取上传状态。