taolib.testing.task_queue.worker.registry#

任务处理器注册表。

提供装饰器式的任务处理器注册和查找功能。

Attributes#

Classes#

TaskHandlerRegistry

任务处理器注册表。

Functions#

task_handler(→ collections.abc.Callable)

装饰器:在默认注册表中注册任务处理器。

get_default_registry(→ TaskHandlerRegistry)

获取默认注册表。

Module Contents#

class taolib.testing.task_queue.worker.registry.TaskHandlerRegistry#

任务处理器注册表。

管理 task_type → handler 的映射关系。 支持同步和异步处理器。

使用示例:

registry = TaskHandlerRegistry()

@registry.handler("send_email")
async def handle_send_email(params: dict) -> dict:
    # 发送邮件逻辑
    return {"sent": True}
_handlers: dict[str, collections.abc.Callable[Ellipsis, Any]]#
register(task_type: str, handler: collections.abc.Callable[Ellipsis, Any]) None#

注册任务处理器。

参数:
  • task_type -- 任务类型

  • handler -- 处理器函数(同步或异步)

get(task_type: str) collections.abc.Callable[Ellipsis, Any] | None#

获取任务处理器。

参数:

task_type -- 任务类型

返回:

处理器函数,未注册返回 None

has(task_type: str) bool#

检查是否已注册指定类型的处理器。

参数:

task_type -- 任务类型

返回:

是否已注册

list_types() list[str]#

列出所有已注册的任务类型。

返回:

任务类型列表

handler(task_type: str) collections.abc.Callable#

装饰器:注册任务处理器。

参数:

task_type -- 任务类型

返回:

装饰器函数

使用示例:

@registry.handler("generate_report")
async def handle_report(params: dict) -> dict:
    ...
static is_async_handler(handler: collections.abc.Callable[Ellipsis, Any]) bool#

判断处理器是否为异步函数。

参数:

handler -- 处理器函数

返回:

是否为异步函数

taolib.testing.task_queue.worker.registry._default_registry#
taolib.testing.task_queue.worker.registry.task_handler(task_type: str) collections.abc.Callable#

装饰器:在默认注册表中注册任务处理器。

参数:

task_type -- 任务类型

返回:

装饰器函数

使用示例:

@task_handler("send_email")
async def handle_send_email(params: dict) -> dict:
    await send_email(params["to"], params["subject"])
    return {"sent": True}
taolib.testing.task_queue.worker.registry.get_default_registry() TaskHandlerRegistry#

获取默认注册表。

返回:

默认 TaskHandlerRegistry 实例