taolib.testing.file_storage.storage.local_backend#
本地文件系统存储后端。
用于开发和测试环境。
Classes#
本地文件系统存储后端实现。 |
Module Contents#
- class taolib.testing.file_storage.storage.local_backend.LocalStorageBackend(base_path: str = './storage')#
本地文件系统存储后端实现。
- _base_path#
- _bucket_path(bucket: str) pathlib.Path#
- _object_path(bucket: str, key: str) pathlib.Path#
- async put_object(bucket: str, key: str, data: bytes | collections.abc.AsyncIterator[bytes], content_type: str = 'application/octet-stream', metadata: dict[str, str] | None = None) taolib.testing.file_storage.storage.protocols.PutObjectResult#
上传对象到本地文件系统。
- async get_object(bucket: str, key: str) collections.abc.AsyncIterator[bytes]#
从本地文件系统读取对象。
- async copy_object(src_bucket: str, src_key: str, dst_bucket: str, dst_key: str) taolib.testing.file_storage.storage.protocols.PutObjectResult#
复制本地文件。
- async head_object(bucket: str, key: str) taolib.testing.file_storage.storage.protocols.ObjectInfo#
获取本地文件元信息。
- async list_objects(bucket: str, prefix: str = '', max_keys: int = 1000, continuation_token: str | None = None) tuple[list[taolib.testing.file_storage.storage.protocols.ObjectInfo], str | None]#
列出本地文件。
- async generate_presigned_url(bucket: str, key: str, expires_in: int = 3600, method: str = 'GET') str#
生成本地文件伪签名 URL(仅用于开发)。
- async create_multipart_upload(bucket: str, key: str, content_type: str = 'application/octet-stream') str#
创建本地分片上传会话。
- async upload_part(bucket: str, key: str, upload_id: str, part_number: int, data: bytes) taolib.testing.file_storage.storage.protocols.PartInfo#
上传分片到本地临时目录。
- async complete_multipart_upload(bucket: str, key: str, upload_id: str, parts: list[taolib.testing.file_storage.storage.protocols.PartInfo]) taolib.testing.file_storage.storage.protocols.PutObjectResult#
完成本地分片上传(合并文件)。