通过文件样式接口流式传输数据#
本示例展示如何解码流式数据:当文件不在本地时,只下载为解码所需的那部分数据。我们利用 Python 的 "类文件对象" 来实现。
示例使用视频文件,因此采用 :class:torchcodec.decoders.VideoDecoder 进行解码;这些方法同样适用于音频文件以及 :class:torchcodec.decoders.AudioDecoder。
预备函数:下载与简单基准#
import torch
import httpx
from time import perf_counter_ns
def get_url_content(url):
response = httpx.get(url, headers={"User-Agent": ""}, follow_redirects=True)
if response.status_code != 200:
raise RuntimeError(f"Failed to download video. status_code = {response.status_code}.")
return response.content
def bench(f, average_over=10, warmup=2):
for _ in range(warmup):
f()
times = []
for _ in range(average_over):
start = perf_counter_ns()
f()
end = perf_counter_ns()
times.append(end - start)
times = torch.tensor(times) * 1e-6 # ns to ms
std = times.std().item()
med = times.median().item()
print(f"{med = :.2f}ms +- {std:.2f}")
性能对比:先完整下载 vs 边下边解码#
我们研究三种场景下,仅解码"第一帧"的耗时:
从已下载的内存字节解码(下载成本为 0,作为基线)
解码前先整段下载
直接把 URL 传给解码器,由 FFmpeg 决定下载多少
注意:均设置 seek_mode="approximate",以避免初始化时扫描完整视频(这会触发整段下载)。
from torchcodec.decoders import VideoDecoder
nasa_url = "https://download.pytorch.org/torchaudio/tutorial-assets/stream-api/NASAs_Most_Scientifically_Complex_Space_Observatory_Requires_Precision-MP4.mp4"
pre_downloaded_raw_video_bytes = get_url_content(nasa_url)
decoder = VideoDecoder(pre_downloaded_raw_video_bytes)
print(f"Video size in MB: {len(pre_downloaded_raw_video_bytes) // 1024 // 1024}")
print(decoder.metadata)
视频约 253MB,分辨率 1920×1080,约 30 FPS,时长将近 3.5 分钟。若只需第一帧,显然无需整段下载。
def decode_from_existing_download():
decoder = VideoDecoder(
source=pre_downloaded_raw_video_bytes,
seek_mode="approximate",
)
return decoder[0]
def download_before_decode():
raw_video_bytes = get_url_content(nasa_url)
decoder = VideoDecoder(
source=raw_video_bytes,
seek_mode="approximate",
)
return decoder[0]
def direct_url_to_ffmpeg():
decoder = VideoDecoder(
source=nasa_url,
seek_mode="approximate",
)
return decoder[0]
print("Decode from existing download:")
bench(decode_from_existing_download)
print()
print("Download before decode:")
bench(download_before_decode)
print()
print("Direct url to FFmpeg:")
bench(direct_url_to_ffmpeg)
从已下载内容解码最快;每次都完整下载再解码要慢很多;直接 URL 更好,但仍可能多下载。
进一步优化的方法是使用仅按需下载的"类文件对象"(实现自定义 read/seek),例如 fsspec 提供的对象。该能力需要 aiohttp 支持,可通过 pip install fsspec aiohttp 安装。
try:
import fsspec
def stream_while_decode():
with fsspec.open(nasa_url, client_kwargs={'trust_env': True}) as file_like:
decoder = VideoDecoder(file_like, seek_mode="approximate")
return decoder[0]
except ImportError:
import httpx
class HTTPXFileLike:
def __init__(self, url, headers=None):
self._url = url
self._headers = headers or {}
self._client = httpx.Client(follow_redirects=True)
self._pos = 0
try:
head = self._client.head(self._url, headers=self._headers)
self._length = int(head.headers.get("Content-Length")) if head.headers.get("Content-Length") else None
except Exception:
self._length = None
def read(self, size: int) -> bytes:
start = self._pos
end = self._pos + max(size, 0) - 1 if size is not None and size >= 0 else None
headers = dict(self._headers)
if end is not None:
headers["Range"] = f"bytes={start}-{end}"
r = self._client.get(self._url, headers=headers)
data = r.content
self._pos += len(data)
return data
def seek(self, offset: int, whence: int) -> int:
if whence == 0:
self._pos = offset
elif whence == 1:
self._pos += offset
elif whence == 2:
if self._length is None:
raise OSError("unknown length")
self._pos = self._length + offset
return self._pos
def close(self):
self._client.close()
def stream_while_decode():
file_like = HTTPXFileLike(nasa_url)
decoder = VideoDecoder(file_like, seek_mode="approximate")
return decoder[0]
print("Stream while decode: ")
bench(stream_while_decode)
通过类文件对象进行流式解码更快,同时更通用:VideoDecoder 能接受直接 URL 是因为底层 FFmpeg 支持,但协议种类受 FFmpeg 版本限制;而类文件对象可以适配任意资源(包括你的私有协议/存储),无需 FFmpeg 认识。
工作原理#
在 Python 中,"类文件对象"是指暴露读/写/定位(seek)等特殊方法的对象;并不一定要由真实文件驱动。库只需面向"文件样式接口",其他库即可通过包装自身资源来适配。
对我们的解码场景,仅需 read 与 seek 方法。下面通过包装本地文件并统计调用次数来演示。
from pathlib import Path
import tempfile
# 创建本地临时文件
temp_dir = tempfile.mkdtemp()
nasa_video_path = Path(temp_dir) / "nasa_video.mp4"
with open(nasa_video_path, "wb") as f:
f.write(pre_downloaded_raw_video_bytes)
# 包装真实文件,统计 read/seek 次数
class FileOpCounter:
def __init__(self, file):
self._file = file
self.num_reads = 0
self.num_seeks = 0
def read(self, size: int) -> bytes:
self.num_reads += 1
return self._file.read(size)
def seek(self, offset: int, whence: int) -> int:
self.num_seeks += 1
return self._file.seek(offset, whence)
file_op_counter = FileOpCounter(open(nasa_video_path, "rb"))
counter_decoder = VideoDecoder(file_op_counter, seek_mode="approximate")
print("Decoder initialization required "
f"{file_op_counter.num_reads} reads and "
f"{file_op_counter.num_seeks} seeks.")
init_reads = file_op_counter.num_reads
init_seeks = file_op_counter.num_seeks
first_frame = counter_decoder[0]
print("Decoding the first frame required "
f"{file_op_counter.num_reads - init_reads} additional reads and "
f"{file_op_counter.num_seeks - init_seeks} additional seeks.")
初始化解码器的读/seek 次数多于解码第一帧,是因为实现中会调用 FFmpeg 的特殊函数以解码前几帧,来获取更健壮的元数据。
另外,Python 的类文件接口只是故事的一半;FFmpeg 也有自己的回调机制来驱动读/seek。VideoDecoder 负责把你在 Python 中定义的方法桥接到 FFmpeg。
性能:本地文件路径 vs 本地已打开文件对象#
既然我们已创建了本地文件,比较两种提供方式:
直接传入路径(解码器内部打开文件)
自行打开文件并传入类文件对象
下面进行对比测试。
def decode_from_existing_file_path():
decoder = VideoDecoder(nasa_video_path, seek_mode="approximate")
return decoder[0]
def decode_from_existing_open_file_object():
with open(nasa_video_path, "rb") as file:
decoder = VideoDecoder(file, seek_mode="approximate")
return decoder[0]
print("Decode from existing file path:")
bench(decode_from_existing_file_path)
print()
print("Decode from existing open file object:")
bench(decode_from_existing_open_file_object)
两种方式在本地文件场景下耗时基本一致;可在实际代码中选你更方便的方式。这也说明数据的读与拷贝成本主导了整体耗时,而通过 Python 方法的调用开销并不显著。
清理临时资源#
import shutil
shutil.rmtree(temp_dir)