asyncio 支持的多种队列

asyncio 支持的多种队列#

参考:asyncio 支持的多种队列

在设计应用程序来处理事件或其他类型的数据时,经常需要一种机制来存储这些事件,并将它们分发给一组 worker。然后这些 worker 可根据这些事件同时执行我们需要执行的任何操作,从而节省时间。asyncio 提供了一个异步队列,可以让我们实现这一点,可将数据块添加到队列中,并让多个 worker 同时运行,从队列中提取数据并在可用时对其进行处理。

这些通常称为 producer-consumer 工作流,某些情况会产生我们需要处理的数据或事件,而处理这些工作内容可能需要很长时间。队列负责帮助我们传输长时间运行的任务,同时保持用户界面持续对外界进行响应。我们可将一个项目放在队列中以供日后处理,并通知用户我们已经在后台开始了这项工作。异步队列还有一个额外优势,就是它提供了一种限制并发的机制,因为每个队列通常允许有限数量的 worker 任务。

队列是一种先进先出的数据结构,这与在杂货店结账时的队列没有太大区别。在结账时,你加入队列,并排在队尾,等待收银员为你前面的所有人结账。一旦收银员为前面的顾客结完账,你就会在队列中移动,而在你之后加入的人会在你身后等待。然后,当你排在队列的第一个位置时,收银员将为你结账。结账后,你将离开队列。

正如我们所描述的,结账队列是一个同步工作流,一名收银员一次为一名顾客结账。如果我们重新设计队列,从而更好地利用并发性,并依旧使用超市收银的例子会怎样?这将意味着多个收银员和一个队列,而不是一个收银员。只要有收银员,他们就可以将下一个顾客引导到收银台,这意味着除了多个收银员同时为客户结账,还有多个收银员同时从队列中引导客户。

这是异步队列的核心内容,我们将多个等待处理的工作项添加到队列中,然后让多个 worker 从队列中提取项目并执行。

import asyncio
from dataclasses import dataclass
from asyncio import Queue
from random import sample, randint

@dataclass
class Product:
    """
    商品
    """
    name: str  # 商品名称
    checkout_time: float  # 结算需要的时间

@dataclass
class Customer:
    """
    客户
    """
    customer_id: int  # 客户的 id
    products: list[Product] # 客户购买的商品

async def checkout_customer(queue: Queue, cashier_id: int):
    # 检查队列中是否有客户
    while not queue.empty():
        customer: Customer = await queue.get()
        print(f"收银员 {cashier_id} 开始对客户 {customer.customer_id} 的商品进行结算")
        for product in customer.products:
            print(f"收银员 {cashier_id} 正在结算客户 {customer.customer_id} 的商品: {product.name}")
            await asyncio.sleep(product.checkout_time)
        print(f"收银员 {cashier_id} 已完成对客户 {customer.customer_id} 商品的结算")
        queue.task_done()  # 这行代码后续解释

async def main():
    customer_queue = Queue()
    all_products = [Product("苹果", 2), Product("香蕉", .5),
                    Product("草莓", 1), Product("蓝莓", .2)]
    # 创建 4 个客户,并用随机产品进行填充。
    for i in range(1, 5):
        products = sample(all_products, randint(1, 4))
        await customer_queue.put(Customer(i, products))
    # 创建三个收银员,从队列中取出客户,进行服务
    cashiers = [asyncio.create_task(checkout_customer(customer_queue, i)) for i in range(1, 4)]
    await asyncio.gather(customer_queue.join(), *cashiers)

await main()
Hide code cell output
收银员 1 开始对客户 1 的商品进行结算
收银员 1 正在结算客户 1 的商品: 苹果
收银员 2 开始对客户 2 的商品进行结算
收银员 2 正在结算客户 2 的商品: 蓝莓
收银员 3 开始对客户 3 的商品进行结算
收银员 3 正在结算客户 3 的商品: 蓝莓
收银员 2 正在结算客户 2 的商品: 苹果
收银员 3 正在结算客户 3 的商品: 苹果
收银员 1 已完成对客户 1 商品的结算
收银员 1 开始对客户 4 的商品进行结算
收银员 1 正在结算客户 4 的商品: 蓝莓
收银员 2 正在结算客户 2 的商品: 香蕉
收银员 3 正在结算客户 3 的商品: 香蕉
收银员 1 已完成对客户 4 商品的结算
收银员 2 正在结算客户 2 的商品: 草莓
收银员 3 已完成对客户 3 商品的结算
收银员 2 已完成对客户 2 商品的结算

异步队列的实际使用场景#

当有一个可以在后台运行的潜在耗时操作的时候,队列在 Web 应用程序中将很有帮助。如果在 Web 请求的主协程中运行此操作,将阻止对用户的响应(直到操作完成),这可能会给最终用户留下一个缓慢、无响应的页面,降低用户的使用体验。

设想我们就职于一家电子商务公司,并使用缓慢的订单管理系统进行操作。处理订单可能需要几秒钟的时间,但我们不想让用户在下单时进行等待。此外,订单管理系统不能很好地处理负载,所以我们想限制同时向它发出的请求数量。这种情况下,队列可以解决这两个问题。正如之前看到的,在添加更多块或抛出异常之前,队列可以拥有允许的最大元素数量。这为并发性提供了天然限制。

队列还解决了用户等待响应时间过长的问题,将元素放到队列中是立即发生的,这意味着可通知用户他们的订单已经被接收了,从而提供快捷的用户体验。当然在现实世界中,这可能导致后台任务在没有通知用户的情况下失败,因此需要某种形式的数据持久性和逻辑来应对这种情况。

为验证这一点,我们 FastAPI 创建一个简单的 Web 应用程序,它使用一个队列来运行后台任务,这里通过 asyncio.sleep 来模拟与慢速订单管理系统的交互。在现实世界的微服务体系结构中,你可能通过 aiohttp(或类似的库)调用一个 REST API 进行通信,但为了简单,这里就使用 sleep。

Web 应用所做的事情如下:FastAPI 启动之后通过 hook 创建一个队列和一组 worker 任务,这些任务负责与慢速订单服务交互。然后创建一个 HTTP POST 订单端点,它将在队列上放置一个订单,一旦将订单放入队列中,将返回一个 HTTP 200 和一条消息,表明已经完成下单。此外还将在 FastAPI 的关闭 hook 中添加一些安全的关闭逻辑,因为当应用程序关闭时可能仍有一些订单正在被处理。在关闭 hook 中,将等到所有忙碌的 worker 完成它们的工作