taolib.testing.file_storage.storage.s3_backend#

S3 兼容存储后端。

支持 AWS S3、MinIO、阿里云 OSS 等所有 S3 兼容服务。

Classes#

S3StorageBackend

S3 兼容存储后端实现。

Module Contents#

class taolib.testing.file_storage.storage.s3_backend.S3StorageBackend(endpoint_url: str | None = None, access_key: str = '', secret_key: str = '', region: str = 'us-east-1')#

S3 兼容存储后端实现。

使用 aiobotocore 与 S3 兼容 API 交互。

_endpoint_url = None#
_access_key = ''#
_secret_key = ''#
_region = 'us-east-1'#
_session: Any = None#
_get_session() Any#

延迟初始化 aiobotocore session。

_create_client() Any#

创建 S3 客户端上下文管理器。

async put_object(bucket: str, key: str, data: bytes | collections.abc.AsyncIterator[bytes], content_type: str = 'application/octet-stream', metadata: dict[str, str] | None = None) taolib.testing.file_storage.storage.protocols.PutObjectResult#

上传对象到 S3。

async get_object(bucket: str, key: str) collections.abc.AsyncIterator[bytes]#

从 S3 下载对象(流式)。

async delete_object(bucket: str, key: str) bool#

从 S3 删除对象。

async copy_object(src_bucket: str, src_key: str, dst_bucket: str, dst_key: str) taolib.testing.file_storage.storage.protocols.PutObjectResult#

在 S3 上复制对象。

async head_object(bucket: str, key: str) taolib.testing.file_storage.storage.protocols.ObjectInfo#

获取 S3 对象元信息。

async object_exists(bucket: str, key: str) bool#

检查 S3 对象是否存在。

async list_objects(bucket: str, prefix: str = '', max_keys: int = 1000, continuation_token: str | None = None) tuple[list[taolib.testing.file_storage.storage.protocols.ObjectInfo], str | None]#

列出 S3 对象。

async generate_presigned_url(bucket: str, key: str, expires_in: int = 3600, method: str = 'GET') str#

生成 S3 预签名 URL。

async create_multipart_upload(bucket: str, key: str, content_type: str = 'application/octet-stream') str#

创建 S3 分片上传。

async upload_part(bucket: str, key: str, upload_id: str, part_number: int, data: bytes) taolib.testing.file_storage.storage.protocols.PartInfo#

上传 S3 分片。

async complete_multipart_upload(bucket: str, key: str, upload_id: str, parts: list[taolib.testing.file_storage.storage.protocols.PartInfo]) taolib.testing.file_storage.storage.protocols.PutObjectResult#

完成 S3 分片上传。

async abort_multipart_upload(bucket: str, key: str, upload_id: str) None#

中止 S3 分片上传。

async create_bucket(bucket: str) None#

创建 S3 存储桶。

async delete_bucket(bucket: str) None#

删除 S3 存储桶。

async bucket_exists(bucket: str) bool#

检查 S3 存储桶是否存在。