通过文件样式接口流式传输数据#

本示例展示如何解码流式数据:当文件不在本地时,只下载为解码所需的那部分数据。我们利用 Python 的 "类文件对象" 来实现。

示例使用视频文件,因此采用 :class:torchcodec.decoders.VideoDecoder 进行解码;这些方法同样适用于音频文件以及 :class:torchcodec.decoders.AudioDecoder

预备函数:下载与简单基准#

import torch
import httpx
from time import perf_counter_ns

def get_url_content(url):
    response = httpx.get(url, headers={"User-Agent": ""}, follow_redirects=True)
    if response.status_code != 200:
        raise RuntimeError(f"Failed to download video. status_code = {response.status_code}.")
    return response.content

def bench(f, average_over=10, warmup=2):
    for _ in range(warmup):
        f()

    times = []
    for _ in range(average_over):
        start = perf_counter_ns()
        f()
        end = perf_counter_ns()
        times.append(end - start)

    times = torch.tensor(times) * 1e-6  # ns to ms
    std = times.std().item()
    med = times.median().item()
    print(f"{med = :.2f}ms +- {std:.2f}")

性能对比:先完整下载 vs 边下边解码#

我们研究三种场景下,仅解码"第一帧"的耗时:

  1. 从已下载的内存字节解码(下载成本为 0,作为基线)

  2. 解码前先整段下载

  3. 直接把 URL 传给解码器,由 FFmpeg 决定下载多少

注意:均设置 seek_mode="approximate",以避免初始化时扫描完整视频(这会触发整段下载)。

from torchcodec.decoders import VideoDecoder

nasa_url = "https://download.pytorch.org/torchaudio/tutorial-assets/stream-api/NASAs_Most_Scientifically_Complex_Space_Observatory_Requires_Precision-MP4.mp4"

pre_downloaded_raw_video_bytes = get_url_content(nasa_url)
decoder = VideoDecoder(pre_downloaded_raw_video_bytes)

print(f"Video size in MB: {len(pre_downloaded_raw_video_bytes) // 1024 // 1024}")
print(decoder.metadata)

视频约 253MB,分辨率 1920×1080,约 30 FPS,时长将近 3.5 分钟。若只需第一帧,显然无需整段下载。

def decode_from_existing_download():
    decoder = VideoDecoder(
        source=pre_downloaded_raw_video_bytes,
        seek_mode="approximate",
    )
    return decoder[0]

def download_before_decode():
    raw_video_bytes = get_url_content(nasa_url)
    decoder = VideoDecoder(
        source=raw_video_bytes,
        seek_mode="approximate",
    )
    return decoder[0]

def direct_url_to_ffmpeg():
    decoder = VideoDecoder(
        source=nasa_url,
        seek_mode="approximate",
    )
    return decoder[0]

print("Decode from existing download:")
bench(decode_from_existing_download)
print()

print("Download before decode:")
bench(download_before_decode)
print()

print("Direct url to FFmpeg:")
bench(direct_url_to_ffmpeg)

从已下载内容解码最快;每次都完整下载再解码要慢很多;直接 URL 更好,但仍可能多下载。

进一步优化的方法是使用仅按需下载的"类文件对象"(实现自定义 read/seek),例如 fsspec 提供的对象。该能力需要 aiohttp 支持,可通过 pip install fsspec aiohttp 安装。

try:
    import fsspec
    def stream_while_decode():
        with fsspec.open(nasa_url, client_kwargs={'trust_env': True}) as file_like:
            decoder = VideoDecoder(file_like, seek_mode="approximate")
            return decoder[0]
except ImportError:
    import httpx
    class HTTPXFileLike:
        def __init__(self, url, headers=None):
            self._url = url
            self._headers = headers or {}
            self._client = httpx.Client(follow_redirects=True)
            self._pos = 0
            try:
                head = self._client.head(self._url, headers=self._headers)
                self._length = int(head.headers.get("Content-Length")) if head.headers.get("Content-Length") else None
            except Exception:
                self._length = None
        def read(self, size: int) -> bytes:
            start = self._pos
            end = self._pos + max(size, 0) - 1 if size is not None and size >= 0 else None
            headers = dict(self._headers)
            if end is not None:
                headers["Range"] = f"bytes={start}-{end}"
            r = self._client.get(self._url, headers=headers)
            data = r.content
            self._pos += len(data)
            return data
        def seek(self, offset: int, whence: int) -> int:
            if whence == 0:
                self._pos = offset
            elif whence == 1:
                self._pos += offset
            elif whence == 2:
                if self._length is None:
                    raise OSError("unknown length")
                self._pos = self._length + offset
            return self._pos
        def close(self):
            self._client.close()
    def stream_while_decode():
        file_like = HTTPXFileLike(nasa_url)
        decoder = VideoDecoder(file_like, seek_mode="approximate")
        return decoder[0]

print("Stream while decode: ")
bench(stream_while_decode)

通过类文件对象进行流式解码更快,同时更通用:VideoDecoder 能接受直接 URL 是因为底层 FFmpeg 支持,但协议种类受 FFmpeg 版本限制;而类文件对象可以适配任意资源(包括你的私有协议/存储),无需 FFmpeg 认识。

工作原理#

在 Python 中,"类文件对象"是指暴露读/写/定位(seek)等特殊方法的对象;并不一定要由真实文件驱动。库只需面向"文件样式接口",其他库即可通过包装自身资源来适配。

对我们的解码场景,仅需 readseek 方法。下面通过包装本地文件并统计调用次数来演示。

from pathlib import Path
import tempfile

# 创建本地临时文件
temp_dir = tempfile.mkdtemp()
nasa_video_path = Path(temp_dir) / "nasa_video.mp4"
with open(nasa_video_path, "wb") as f:
    f.write(pre_downloaded_raw_video_bytes)

# 包装真实文件,统计 read/seek 次数
class FileOpCounter:
    def __init__(self, file):
        self._file = file
        self.num_reads = 0
        self.num_seeks = 0

    def read(self, size: int) -> bytes:
        self.num_reads += 1
        return self._file.read(size)

    def seek(self, offset: int, whence: int) -> int:
        self.num_seeks += 1
        return self._file.seek(offset, whence)

file_op_counter = FileOpCounter(open(nasa_video_path, "rb"))
counter_decoder = VideoDecoder(file_op_counter, seek_mode="approximate")

print("Decoder initialization required "
      f"{file_op_counter.num_reads} reads and "
      f"{file_op_counter.num_seeks} seeks.")

init_reads = file_op_counter.num_reads
init_seeks = file_op_counter.num_seeks

first_frame = counter_decoder[0]

print("Decoding the first frame required "
      f"{file_op_counter.num_reads - init_reads} additional reads and "
      f"{file_op_counter.num_seeks - init_seeks} additional seeks.")

初始化解码器的读/seek 次数多于解码第一帧,是因为实现中会调用 FFmpeg 的特殊函数以解码前几帧,来获取更健壮的元数据。

另外,Python 的类文件接口只是故事的一半;FFmpeg 也有自己的回调机制来驱动读/seek。VideoDecoder 负责把你在 Python 中定义的方法桥接到 FFmpeg。

性能:本地文件路径 vs 本地已打开文件对象#

既然我们已创建了本地文件,比较两种提供方式:

  1. 直接传入路径(解码器内部打开文件)

  2. 自行打开文件并传入类文件对象

下面进行对比测试。

def decode_from_existing_file_path():
    decoder = VideoDecoder(nasa_video_path, seek_mode="approximate")
    return decoder[0]

def decode_from_existing_open_file_object():
    with open(nasa_video_path, "rb") as file:
        decoder = VideoDecoder(file, seek_mode="approximate")
        return decoder[0]

print("Decode from existing file path:")
bench(decode_from_existing_file_path)
print()

print("Decode from existing open file object:")
bench(decode_from_existing_open_file_object)

两种方式在本地文件场景下耗时基本一致;可在实际代码中选你更方便的方式。这也说明数据的读与拷贝成本主导了整体耗时,而通过 Python 方法的调用开销并不显著。

清理临时资源#

import shutil
shutil.rmtree(temp_dir)