taolib.testing.file_storage.storage.protocols#
存储后端协议和数据类。
定义 StorageBackendProtocol 及相关数据类。
Classes#
存储对象信息。 |
|
分片上传部分信息。 |
|
对象写入结果。 |
|
存储后端协议。 |
Module Contents#
- class taolib.testing.file_storage.storage.protocols.ObjectInfo#
存储对象信息。
- last_modified: datetime.datetime#
- class taolib.testing.file_storage.storage.protocols.PartInfo#
分片上传部分信息。
- class taolib.testing.file_storage.storage.protocols.PutObjectResult#
对象写入结果。
- class taolib.testing.file_storage.storage.protocols.StorageBackendProtocol#
Bases:
Protocol存储后端协议。
定义文件存储后端需要实现的所有操作。 支持 S3 兼容存储、本地文件系统等多种后端。
- async put_object(bucket: str, key: str, data: bytes | collections.abc.AsyncIterator[bytes], content_type: str = 'application/octet-stream', metadata: dict[str, str] | None = None) PutObjectResult#
上传对象。
- async get_object(bucket: str, key: str) collections.abc.AsyncIterator[bytes]#
下载对象(流式)。
- async copy_object(src_bucket: str, src_key: str, dst_bucket: str, dst_key: str) PutObjectResult#
复制对象。
- async head_object(bucket: str, key: str) ObjectInfo#
获取对象元信息。
- async list_objects(bucket: str, prefix: str = '', max_keys: int = 1000, continuation_token: str | None = None) tuple[list[ObjectInfo], str | None]#
列出对象。
- 返回:
(对象列表, 下一页 token 或 None)
- async generate_presigned_url(bucket: str, key: str, expires_in: int = 3600, method: str = 'GET') str#
生成预签名 URL。
- async create_multipart_upload(bucket: str, key: str, content_type: str = 'application/octet-stream') str#
创建分片上传会话。
- 返回:
upload_id
- async upload_part(bucket: str, key: str, upload_id: str, part_number: int, data: bytes) PartInfo#
上传分片。