taolib.testing.file_storage.events#

事件模块。

Submodules#

Classes#

StorageEventPublisher

文件存储事件发布器。

BucketCreatedEvent

存储桶创建事件。

FileDeletedEvent

文件删除事件。

FileExpiredEvent

文件过期事件。

FileUploadedEvent

文件上传完成事件。

UploadSessionCompletedEvent

上传会话完成事件。

Package Contents#

class taolib.testing.file_storage.events.StorageEventPublisher(redis_client: Any | None = None)#

文件存储事件发布器。

CHANNEL_PREFIX = 'storage'#
_redis = None#
async publish(event_type: str, event_data: dict) None#

发布事件。

参数:
  • event_type -- 事件类型(如 'file.uploaded')

  • event_data -- 事件数据字典

async publish_file_uploaded(event) None#

发布文件上传事件。

async publish_file_deleted(event) None#

发布文件删除事件。

async publish_upload_completed(event) None#

发布上传完成事件。

async publish_bucket_created(event) None#

发布存储桶创建事件。

class taolib.testing.file_storage.events.BucketCreatedEvent#

存储桶创建事件。

bucket_id: str#
bucket_name: str#
created_by: str#
timestamp: datetime.datetime#
to_dict() dict#

转换为字典。

class taolib.testing.file_storage.events.FileDeletedEvent#

文件删除事件。

file_id: str#
bucket_id: str#
object_key: str#
deleted_by: str#
timestamp: datetime.datetime#
to_dict() dict#

转换为字典。

class taolib.testing.file_storage.events.FileExpiredEvent#

文件过期事件。

file_id: str#
bucket_id: str#
expired_at: datetime.datetime#
timestamp: datetime.datetime#
to_dict() dict#

转换为字典。

class taolib.testing.file_storage.events.FileUploadedEvent#

文件上传完成事件。

file_id: str#
bucket_id: str#
object_key: str#
content_type: str#
size_bytes: int#
uploaded_by: str#
timestamp: datetime.datetime#
to_dict() dict#

转换为字典。

class taolib.testing.file_storage.events.UploadSessionCompletedEvent#

上传会话完成事件。

session_id: str#
file_id: str#
total_chunks: int#
total_bytes: int#
timestamp: datetime.datetime#
to_dict() dict#

转换为字典。