# YOLO 简介

参考：[ultralytics](https://github.com/ultralytics/ultralytics)

In [None]:
import torch
import sys
sys.path.append("..")
import set_env
from d2py.utils.file import mkdir
root_dir = ".temp"
mkdir(f"{root_dir}/logs")
# from ultralytics import YOLO
torch.cuda.empty_cache()

## 测试 YOLOv5n PyTorch 前端

In [None]:
from PIL import Image
import numpy as np
from ultralytics import YOLO

input_path = "images/vehicle-jaguar-f-type-car-red-cars-wallpaper.jpg"
im = Image.open(input_path) #.resize((384, 640))
# self = YOLO("yolov8n-p2.yaml")
# self.load("yolov8n.pt")
self = YOLO("yolov5n.pt")
# results = self.train(data='coco.yaml', epochs=2)

# self.export(format="torchscript")
# results = self(input_path)
results = self(np.array(im), conf=0.25)
# results = postprocess(preds, im, [np.ascontiguousarray(image_np)], self.model.names)
Image.fromarray(results[0].plot())

导出 ONNX 模型：

In [None]:
self.export(format="onnx")

## YOLOv5n 输入预处理

In [None]:
from PIL import Image
import numpy as np
import torch
from tvm_book.data.augment import LetterBox

In [None]:
imgsz = 640, 640
strides = self.model.stride
mean = (0,)
std = (255,)

In [None]:
letterbox = LetterBox(imgsz, strides=strides, auto=False)

origin_image = np.asanyarray(Image.open(input_path))
letterbox_image = letterbox(image=origin_image)
xs = np.stack([letterbox_image - mean])
print(f"数据内存的连续性：{xs.flags["C_CONTIGUOUS"]}")
xs = xs.transpose((0, 3, 1, 2))  # BHWC to BCHW, (n, 3, h, w)
print(f"数据内存的连续性(transpose)：{xs.flags["C_CONTIGUOUS"]}")
xs = np.ascontiguousarray(xs)  # contiguous
print(f"数据内存的连续性：{xs.flags["C_CONTIGUOUS"]}")
xs = xs.astype("float32") / std # 归一化值域范围为 0.0 - 1.0
Image.fromarray(np.concatenate([letterbox_image, (xs[0]*std).astype("uint8").transpose((1, 2, 0))], axis=1))

## 测试 YOLOv5n ONNX Relay 前端

前端导入：

In [None]:
import onnx
from tvm import relay

input_name = "images"
onnx_model = onnx.load('yolov5nu.onnx')
mod, params = relay.frontend.from_onnx(onnx_model, {input_name: xs.shape}, freeze_params=True)

运行时推理：

In [None]:
import tvm

with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    lib = relay.build(mod, target="llvm", params=params)
func = lib[lib.libmod_name]
module = tvm.contrib.graph_executor.GraphModule(func(tvm.cpu(0)))
module.run(**{input_name: xs})
num_outputs = module.get_num_outputs()
float_outputs = [module.get_output(k).numpy() for k in range(num_outputs)]

后处理：

In [None]:
from ultralytics.utils import ops
from ultralytics.engine.results import Results

def postprocess(preds, img, orig_imgs, names, input_path, conf_thres=0.25, iou_thres=0.45,):
    """Post-processes predictions and returns a list of Results objects."""
    preds = ops.non_max_suppression(
        preds,
        conf_thres=conf_thres,
        iou_thres=iou_thres,
        # agnostic=self.args.agnostic_nms,
        # max_det=self.args.max_det,
        # classes=80,
    )

    results = []
    for i, pred in enumerate(preds):
        orig_img = orig_imgs[i]
        pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
        img_path = input_path
        results.append(Results(orig_img, path=img_path, names=names, boxes=pred))
    return results

In [None]:
results = postprocess(
    [torch.from_numpy(o) for o in float_outputs], 
    xs, [origin_image], self.model.names, 
    input_path, conf_thres=0.25, iou_thres=0.45,
)
Image.fromarray(results[0].plot())

In [None]:
from tvm.relay.dataflow_pattern import rewrite
from tvm_book.transforms.yolo import Dist2xywhSimplify
import tvm

with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    run_mod = tvm.IRModule.from_expr(rewrite(Dist2xywhSimplify(), mod["main"]))
    lib = relay.build(run_mod, target="llvm", params=params)

func = lib[lib.libmod_name]
module = tvm.contrib.graph_executor.GraphModule(func(tvm.cpu(0)))
module.run(**{input_name: xs})
num_outputs = module.get_num_outputs()
run_float_outputs = [module.get_output(k).numpy() for k in range(num_outputs)]
[
    np.testing.assert_allclose(a, b, rtol=1e-07, atol=1e-3)
    for a, b in zip(float_outputs, run_float_outputs)
]
results = postprocess(
    [torch.from_numpy(o) for o in run_float_outputs], 
    xs, [origin_image], self.model.names, 
    input_path, conf_thres=0.25, iou_thres=0.45,
)
Image.fromarray(results[0].plot())

In [None]:
run_mod.show()

## 裁剪 YOLOv5n 以探索计算图分割

In [None]:
# from tvm.relay.dataflow_pattern import (
#     wildcard, is_constant, is_op, is_var, is_tuple, is_tuple_get_item
# )

In [None]:
from copy import deepcopy
from tvm.relay.analysis import extract_intermdeiate_expr
from tvm_book.compiler.utils import merge_compiler

run_mod = deepcopy(mod) 
# run_mod = extract_intermdeiate_expr(run_mod, 110)
with tvm.transform.PassContext(opt_level=3):
    run_mod["main"] = rewrite(Dist2xywhSimplify(), run_mod["main"])
    run_mod = relay.quantize.prerequisite_optimize(run_mod, params)
    run_mod = merge_compiler(run_mod, compiler_name="vta_special")

In [18]:
print(run_mod["main"])

fn (%images: Tensor[(1, 3, 640, 640), float32] /* ty=Tensor[(1, 3, 640, 640), float32] span=/model.0/conv/Conv.images:0:0 */) -> Tensor[(1, 84, 8400), float32] {
  %0 = @vta_special.conv2d_0(%images, meta[relay.Constant][0] /* ty=Tensor[(16, 3, 6, 6), float32] span=/model.0/conv/Conv.model.0.conv.weight:0:0 */, meta[relay.Constant][1] /* ty=Tensor[(16, 1, 1), float32] */) /* ty=Tensor[(1, 16, 320, 320), float32] */;
  %1 = @vta_special.conv2d_1(%0, meta[relay.Constant][2] /* ty=Tensor[(32, 16, 3, 3), float32] span=/model.1/conv/Conv.model.1.conv.weight:0:0 */, meta[relay.Constant][3] /* ty=Tensor[(32, 1, 1), float32] */) /* ty=Tensor[(1, 32, 160, 160), float32] */;
  %2 = @vta_special.conv2d_2(%1, meta[relay.Constant][4] /* ty=Tensor[(16, 32, 1, 1), float32] span=/model.2/cv1/conv/Conv.model.2.cv1.conv.weight:0:0 */, meta[relay.Constant][5] /* ty=Tensor[(16, 1, 1), float32] */) /* ty=Tensor[(1, 16, 160, 160), float32] */;
  %3 = @vta_special.conv2d_3(%2, meta[relay.Constant][6] /* ty=T

In [None]:
from pathlib import Path
from tqdm import tqdm

ENV = {
    "model_type": "onnx",
    "input_name": "images",
    "channel": 3,
    "height": 640, 
    "width": 640,
    "mode": "RGB", # 输入图片格式
    "mean": (0,),
    "std": (255,)
}

def letterbox_image(im: Image, dst_width: int, dst_height: int):
    '''使用填充保持纵横比缩放图像
    
    Args:
        im: 原始 Image
        dst_width: 目标宽度
        dst_height: 目标高度
    '''
    iw, ih = im.size # 原始尺寸
    scale = min(dst_width/iw, dst_height/ih)
    nw = int(iw*scale)
    nh = int(ih*scale)
    im = im.resize((nw, nh), Image.BICUBIC)
    new_image = Image.new('RGB', (dst_width, dst_height), (114, 114, 114))
    new_image.paste(im, ((dst_width-nw)//2, (dst_height-nh)//2))
    return new_image

def preprocessing(path: str|None, **ENV: dict):
    if not path:
        im = np.random.randint(0, 256, size=(32, 32, 3), dtype="uint8")
        im = Image.fromarray(im) # 转为 Image 实例
    else:
        im = Image.open(path)
    # im = im.resize((ENV["width"], ENV["height"]), Image.BICUBIC)
    im = letterbox_image(im, ENV["width"], ENV["height"])
    if ENV["mode"] == "L": # 将灰度图转换为 HWC 布局
        img = im.convert("L")
        img = np.expand_dims(img, axis=-1) # 转为 HWC
    elif ENV["mode"] == "RGB":
        img = np.array(im.convert("RGB")) # 转为 HWC 布局
    elif ENV["mode"] == "BGR":
        img = np.array(im.convert("RGB")) # 转为 HWC 布局
        img = img[..., ::-1] # RGB 转 BGR
    else:
        raise TypeError(f'暂未支持数据布局 {ENV["mode"]}')
    image_np = np.expand_dims(img, 0) # 转换为 NHWC (uint8 数据)
    # 预处理后的数据
    data_inp = ((image_np - ENV["mean"]) / ENV["std"]).astype(np.float32)
    data_inp = data_inp.transpose(0, 3, 1, 2)
    return np.ascontiguousarray(image_np), np.ascontiguousarray(data_inp)

def calibrateset(calibrate_num=2, data_dir="/media/pc/data/lxw/home/data/coco/train2017"):
    """用于量化的校准数据集"""
    for k, path in tqdm(enumerate(Path(data_dir).iterdir()), desc="Calibrate", unit="batch"):
        if k >= calibrate_num:
            break
        yield {ENV["input_name"]: preprocessing(path, **ENV)[1]}

In [None]:
run_mod = deepcopy(mod)
with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    run_mod["main"] = rewrite(Dist2xywhSimplify(), run_mod["main"])
    with relay.quantize.qconfig(
        calibrate_mode="percentile", weight_scale="max"):
        qmod = relay.quantize.quantize(run_mod, params, dataset=calibrateset())

In [None]:
qmod.show()

In [None]:
from tvm.relay.dataflow_pattern import rewrite
from tvm_book.transforms.yolo import Dist2xywhSimplify
import tvm

with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    lib = relay.build(qmod, target="llvm", params=params)

func = lib[lib.libmod_name]
module = tvm.contrib.graph_executor.GraphModule(func(tvm.cpu(0)))
module.run(**{input_name: xs})
num_outputs = module.get_num_outputs()
quant_outputs = [module.get_output(k).numpy() for k in range(num_outputs)]
results = postprocess(
    [torch.from_numpy(o) for o in quant_outputs], 
    xs, [origin_image], self.model.names, 
    input_path, conf_thres=0.25, iou_thres=0.45,
)
Image.fromarray(results[0].plot())

In [None]:
ww

In [None]:
from tvm.relay.analysis import _ffi_api

output_map = _ffi_api.get_calibrate_output_map(run_mod)
calibrate_mod = _ffi_api.get_calibrate_module(run_mod)
calibrate_mod = relay.transform.Inline()(calibrate_mod)

In [None]:
ref_res = relay.build_module.create_executor("graph", mod=calibrate_mod, device=tvm.cpu(0)).evaluate()(**{input_name: xs})

calib_data = {}
for gvar, indices in output_map.items():
    offset = int(indices[0])
    in_len = int(indices[1])
    out_len = int(indices[2])
    value = {
        "inputs": ref_res[offset : offset + in_len],
        "outputs": ref_res[offset + in_len : offset + in_len + out_len],
    }
    calib_data[gvar] = value
func_map = {int(kk.name_hint.split("_")[-1]): kk for kk in calib_data.keys()}

In [None]:
calib_data[func_map[len(func_map)-1]]