taolib.testing.file_storage.storage.local_backend#

本地文件系统存储后端。

用于开发和测试环境。

Classes#

LocalStorageBackend

本地文件系统存储后端实现。

Module Contents#

class taolib.testing.file_storage.storage.local_backend.LocalStorageBackend(base_path: str = './storage')#

本地文件系统存储后端实现。

_base_path#
_multipart_uploads: dict[str, dict[str, Any]]#
_bucket_path(bucket: str) pathlib.Path#
_object_path(bucket: str, key: str) pathlib.Path#
async put_object(bucket: str, key: str, data: bytes | collections.abc.AsyncIterator[bytes], content_type: str = 'application/octet-stream', metadata: dict[str, str] | None = None) taolib.testing.file_storage.storage.protocols.PutObjectResult#

上传对象到本地文件系统。

async get_object(bucket: str, key: str) collections.abc.AsyncIterator[bytes]#

从本地文件系统读取对象。

async delete_object(bucket: str, key: str) bool#

从本地文件系统删除对象。

async copy_object(src_bucket: str, src_key: str, dst_bucket: str, dst_key: str) taolib.testing.file_storage.storage.protocols.PutObjectResult#

复制本地文件。

async head_object(bucket: str, key: str) taolib.testing.file_storage.storage.protocols.ObjectInfo#

获取本地文件元信息。

async object_exists(bucket: str, key: str) bool#

检查本地文件是否存在。

async list_objects(bucket: str, prefix: str = '', max_keys: int = 1000, continuation_token: str | None = None) tuple[list[taolib.testing.file_storage.storage.protocols.ObjectInfo], str | None]#

列出本地文件。

async generate_presigned_url(bucket: str, key: str, expires_in: int = 3600, method: str = 'GET') str#

生成本地文件伪签名 URL(仅用于开发)。

async create_multipart_upload(bucket: str, key: str, content_type: str = 'application/octet-stream') str#

创建本地分片上传会话。

async upload_part(bucket: str, key: str, upload_id: str, part_number: int, data: bytes) taolib.testing.file_storage.storage.protocols.PartInfo#

上传分片到本地临时目录。

async complete_multipart_upload(bucket: str, key: str, upload_id: str, parts: list[taolib.testing.file_storage.storage.protocols.PartInfo]) taolib.testing.file_storage.storage.protocols.PutObjectResult#

完成本地分片上传(合并文件)。

async abort_multipart_upload(bucket: str, key: str, upload_id: str) None#

中止本地分片上传。

async create_bucket(bucket: str) None#

创建本地存储桶目录。

async delete_bucket(bucket: str) None#

删除本地存储桶目录。

async bucket_exists(bucket: str) bool#

检查本地存储桶目录是否存在。