taolib.testing.file_storage.services

目录

taolib.testing.file_storage.services#

服务层模块。

导出所有服务类。

Submodules#

Classes#

AccessService

访问控制服务。

BucketService

存储桶管理服务。

FileService

文件管理服务。

LifecycleService

生命周期管理服务。

StatsService

统计服务。

UploadService

分片上传服务。

Package Contents#

class taolib.testing.file_storage.services.AccessService(file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol, cdn_provider: taolib.testing.file_storage.cdn.protocols.CDNProviderProtocol | None = None, signed_url_secret: str = '')#

访问控制服务。

_file_repo#
_storage_backend#
_cdn_provider = None#
_secret = ''#
async generate_signed_url(file_id: str, expires_in: int = 3600, method: str = 'GET') str#

生成签名 URL。

generate_token(file_id: str, expires_in: int = 3600) dict[str, str | int]#

生成签名 token。

validate_token(file_id: str, expires: int, signature: str) bool#

验证签名 token。

async check_access(file_id: str, user_id: str | None = None, action: str = 'read') bool#

检查访问权限。

class taolib.testing.file_storage.services.BucketService(bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol)#

存储桶管理服务。

_bucket_repo#
_storage_backend#
async create_bucket(data: taolib.testing.file_storage.models.bucket.BucketCreate, user_id: str = 'system') taolib.testing.file_storage.models.bucket.BucketResponse#

创建存储桶。

async get_bucket(bucket_id: str) taolib.testing.file_storage.models.bucket.BucketResponse | None#

获取存储桶详情。

async update_bucket(bucket_id: str, data: taolib.testing.file_storage.models.bucket.BucketUpdate, user_id: str = 'system') taolib.testing.file_storage.models.bucket.BucketResponse | None#

更新存储桶配置。

async delete_bucket(bucket_id: str, force: bool = False) bool#

删除存储桶。

async list_buckets(skip: int = 0, limit: int = 100) list[taolib.testing.file_storage.models.bucket.BucketResponse]#

列出所有存储桶。

class taolib.testing.file_storage.services.FileService(file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, thumbnail_repo: taolib.testing.file_storage.repository.thumbnail_repo.ThumbnailRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol, pipeline: taolib.testing.file_storage.processing.pipeline.ProcessingPipeline, cdn_provider: taolib.testing.file_storage.cdn.protocols.CDNProviderProtocol | None = None)#

文件管理服务。

_file_repo#
_bucket_repo#
_thumbnail_repo#
_storage_backend#
_pipeline#
_cdn_provider = None#
async upload_file(bucket_id: str, object_key: str, data: bytes, filename: str, content_type: str, user_id: str = 'system', access_level: taolib.testing.file_storage.models.enums.AccessLevel = AccessLevel.PRIVATE, tags: list[str] | None = None, custom_metadata: dict[str, str] | None = None) taolib.testing.file_storage.models.file.FileMetadataResponse#

上传文件(简单上传,适用于小文件)。

async get_file(file_id: str) taolib.testing.file_storage.models.file.FileMetadataResponse | None#

获取文件元数据。

async download_file(file_id: str) collections.abc.AsyncIterator[bytes]#

下载文件(流式)。

async update_metadata(file_id: str, data: taolib.testing.file_storage.models.file.FileMetadataUpdate, user_id: str = 'system') taolib.testing.file_storage.models.file.FileMetadataResponse | None#

更新文件元数据。

async delete_file(file_id: str, user_id: str = 'system') bool#

删除文件。

async list_files(bucket_id: str | None = None, prefix: str | None = None, tags: list[str] | None = None, media_type: str | None = None, skip: int = 0, limit: int = 100) list[taolib.testing.file_storage.models.file.FileMetadataResponse]#

列出文件。

async get_file_url(file_id: str, expires_in: int = 3600) str#

获取文件访问 URL。

class taolib.testing.file_storage.services.LifecycleService(file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, version_repo: taolib.testing.file_storage.repository.version_repo.FileVersionRepository, thumbnail_repo: taolib.testing.file_storage.repository.thumbnail_repo.ThumbnailRepository, upload_repo: taolib.testing.file_storage.repository.upload_repo.UploadSessionRepository, chunk_repo: taolib.testing.file_storage.repository.chunk_repo.ChunkRepository, bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol)#

生命周期管理服务。

_file_repo#
_version_repo#
_thumbnail_repo#
_upload_repo#
_chunk_repo#
_bucket_repo#
_storage_backend#
async expire_files() int#

处理已过期文件。

async create_file_version(file_id: str, user_id: str = 'system') taolib.testing.file_storage.models.version.FileVersionResponse#

为当前文件创建版本快照。

async rollback_to_version(file_id: str, version_number: int, user_id: str = 'system') taolib.testing.file_storage.models.version.FileVersionResponse | None#

回滚到指定版本。

async gc_deleted_files() int#

清理已删除状态的文件记录。

class taolib.testing.file_storage.services.StatsService(bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, upload_repo: taolib.testing.file_storage.repository.upload_repo.UploadSessionRepository)#

统计服务。

_bucket_repo#
_file_repo#
_upload_repo#
async get_bucket_stats(bucket_id: str) taolib.testing.file_storage.models.stats.BucketStatsResponse | None#

获取存储桶统计。

async get_storage_overview() taolib.testing.file_storage.models.stats.StorageOverviewResponse#

获取全局存储概览。

async get_upload_stats() taolib.testing.file_storage.models.stats.UploadStatsResponse#

获取上传统计。

class taolib.testing.file_storage.services.UploadService(upload_repo: taolib.testing.file_storage.repository.upload_repo.UploadSessionRepository, chunk_repo: taolib.testing.file_storage.repository.chunk_repo.ChunkRepository, file_repo: taolib.testing.file_storage.repository.file_repo.FileRepository, bucket_repo: taolib.testing.file_storage.repository.bucket_repo.BucketRepository, storage_backend: taolib.testing.file_storage.storage.protocols.StorageBackendProtocol, upload_session_ttl_hours: int = 24)#

分片上传服务。

_upload_repo#
_chunk_repo#
_file_repo#
_bucket_repo#
_storage_backend#
_ttl_hours = 24#
async init_upload(data: taolib.testing.file_storage.models.upload.UploadSessionCreate, user_id: str = 'system') taolib.testing.file_storage.models.upload.UploadSessionResponse#

初始化分片上传会话。

async upload_chunk(session_id: str, chunk_index: int, data: bytes, checksum: str | None = None) taolib.testing.file_storage.models.upload.ChunkRecord#

上传分片。

async complete_upload(session_id: str, user_id: str = 'system') taolib.testing.file_storage.models.file.FileMetadataResponse#

完成分片上传。

async abort_upload(session_id: str) bool#

中止分片上传。

async get_upload_status(session_id: str) taolib.testing.file_storage.models.upload.UploadSessionResponse | None#

获取上传状态。

async cleanup_expired_sessions() int#

清理过期的上传会话。