AsyncFileSystem 简介#

fsspec 支持在某些实现上进行异步操作。这允许在批量操作(如同时获取多个文件的内容)中进行并发调用,即使是普通代码也可以这样做,并且可以在不阻塞的情况下直接在异步代码中使用 fsspec。异步实现派生自 fsspec.asyn.AsyncFileSystem 类。可以使用类属性 async_impl 来测试一个实现是否为异步的。

AsyncFileSystem 包含了 AbstractFileSystem 中方法的 async def 协程版本。按照约定,这些方法都以 _ 为前缀,表示它们不应在普通代码中直接调用,只有在你知道自己在做什么时候才调用。在大多数情况下,代码是相同的或稍作修改,将同步调用替换为对异步函数的 await 调用。

fsspec 内置的唯一异步实现是 HTTPFileSystem

同步 API#

AbstractFileSystem 的方法在普通代码中可用并可被调用。它们会调用并等待相应的异步函数。工作在单独的线程中进行,所以如果同时有许多 fsspec 操作在多个线程中启动,它们仍然都会在同一个专用于 IO 的线程上进行处理。

大多数用户不应该意识到他们的代码是异步运行的。

请注意,同步函数使用 sync_wrapper 包装,它会复制 AbstractFileSystem 中的文档字符串,除非在实现中明确给出。

示例:

fs = fsspec.filesystem("http")
out = fs.cat([url1, url2, url3])  # fetches data concurrently

协程批处理#

各种创建许多协程并将其传递给事件循环进行处理的方法可以进行批处理:一次性提交一定数量的协程,并等待它们完成之后再启动更多。这对于解决本地打开文件限制(可能小于100)和避免堆溢出很重要。

fsspec.asyn._run_coros_in_chunks() 控制这个过程,但从用户的角度来看,有三种方式可以影响它。按优先级递增的顺序:

  • 全局变量 fsspec.asyn._DEFAULT_BATCH_SIZEfsspec.asyn._NOFILES_DEFAULT_BATCH_SIZE(分别用于涉及本地文件或不涉及的调用)

  • 配置键 "gather_batch_size""nofiles_gather_batch_size"

  • 异步文件系统的批处理方法接受的 batch_size 关键字。

在 Async 中使用#

可以使用 asynchronous=True 创建文件系统实例。这意味着实例化发生在协程内部,因此可以使用 await 直接调用各种 async 方法,就像在异步代码中一样。

请注意,由于 __init__ 是一个阻塞函数,任何创建异步资源的操作都将被推迟。通常需要显式地等待一个协程来创建它们。由于垃圾回收也发生在阻塞代码中,你可能也希望显式地等待资源析构器。示例:

async def work_coroutine():
    fs = fsspec.filesystem("http", asynchronous=True)
    session = await fs.set_session()  # creates client
    out = await fs._cat([url1, url2, url3])  # fetches data concurrently
    await session.close()  # explicit destructor

asyncio.run(work_coroutine())

自带事件循环#

对于非异步情况,fsspec 通常会在特定线程上创建一个 asyncio 事件循环。然而,调用应用程序可能更希望 IO 进程运行在一个已经存在并正在运行的事件循环上(在另一个线程中)。这个循环需要符合 asyncio 规范,但不一定是一个 asyncio.events.AbstractEventLoop。示例:

loop = ...  # however a loop was made, running on another thread
fs = fsspec.filesystem("http", loop=loop)
out = fs.cat([url1, url2, url3])  # fetches data concurrently

实现新的后端#

异步文件系统应该派生自 AsyncFileSystem,并实现其中的 async def _* 协程。这些函数将自动生成同步版本(如果名称在 async_methods 列表中),或者可以使用 sync_wrapper 直接创建。

class MyFileSystem(AsyncFileSystem):

    async def _my_method(self):
        ...

    my_method = sync_wrapper(_my_method)

这些函数不能调用本身是同步的方法或函数,而应该 await 其他协程。调用不需要同步的方法是可以的,例如 _strip_protocol

请注意,__init__ 不能是异步的,所以可能需要使用同步函数来分配异步资源,但只有在 asynchronous=False 的情况下。如果是 True,你可能需要要求调用者等待一个创建这些资源的协程。同样,任何析构函数(例如 __del__)都将在常规代码中运行,可能在循环已停止/关闭之后。

要调用同步,你需要传递关联的事件循环,它将作为属性 .loop 可用。