taolib.testing.file_storage.client#

轻量级客户端 SDK。

提供同步和异步的文件存储访问接口。

Classes#

FileStorageClient

文件存储客户端。

Module Contents#

class taolib.testing.file_storage.client.FileStorageClient(base_url: str = 'http://localhost:8002', token: str | None = None, timeout: float = 30.0)#

文件存储客户端。

_base_url = ''#
_timeout = 30.0#
_headers: dict[str, str]#
_client() httpx.Client#
_async_client() httpx.AsyncClient#
create_bucket(name: str, **kwargs: Any) dict[str, Any]#

创建存储桶。

list_buckets() list[dict[str, Any]]#

列出所有存储桶。

upload_file(bucket_id: str, file_path: str | pathlib.Path, object_key: str, content_type: str | None = None) dict[str, Any]#

上传文件。

自动根据文件大小选择简单上传或分片上传。

download_file(file_id: str, dest_path: str | pathlib.Path) pathlib.Path#

下载文件。

get_file(file_id: str) dict[str, Any]#

获取文件元数据。

delete_file(file_id: str) bool#

删除文件。

list_files(bucket_id: str | None = None, prefix: str | None = None) list[dict[str, Any]]#

列出文件。

get_file_url(file_id: str, expires_in: int = 3600) str#

获取文件访问 URL。

init_upload(bucket_id: str, object_key: str, filename: str, content_type: str, total_size_bytes: int, chunk_size_bytes: int = 5 * 1024 * 1024) dict[str, Any]#

初始化分片上传。

upload_chunk(session_id: str, chunk_index: int, data: bytes, checksum: str | None = None) dict[str, Any]#

上传分片。

complete_upload(session_id: str) dict[str, Any]#

完成分片上传。

abort_upload(session_id: str) bool#

中止分片上传。

get_upload_status(session_id: str) dict[str, Any]#

获取上传状态。