裁剪 YOLOv8n 以探索计算图分割

裁剪 YOLOv8n 以探索计算图分割#

from tvm.relay.dataflow_pattern import rewrite
from tvm_book.transforms.yolo import Dist2xywhSimplify
import tvm

with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    run_mod = tvm.IRModule.from_expr(rewrite(Dist2xywhSimplify(), mod["main"]))
    lib = relay.build(run_mod, target="llvm", params=params)

func = lib[lib.libmod_name]
module = tvm.contrib.graph_executor.GraphModule(func(tvm.cpu(0)))
module.run(**{input_name: xs})
num_outputs = module.get_num_outputs()
run_float_outputs = [module.get_output(k).numpy() for k in range(num_outputs)]
[
    np.testing.assert_allclose(a, b, rtol=1e-07, atol=1e-3)
    for a, b in zip(float_outputs, run_float_outputs)
]
results = postprocess(
    [torch.from_numpy(o) for o in run_float_outputs], 
    xs, [origin_image], self.model.names, 
    input_path, conf_thres=0.25, iou_thres=0.45,
)
Image.fromarray(results[0].plot())
from copy import deepcopy
from tvm.relay.analysis import extract_intermdeiate_expr
from tvm_book.compiler.utils import merge_compiler

run_mod = deepcopy(mod) 
# run_mod = extract_intermdeiate_expr(run_mod, 110)
with tvm.transform.PassContext(opt_level=3):
    run_mod["main"] = rewrite(Dist2xywhSimplify(), run_mod["main"])
    run_mod = relay.quantize.prerequisite_optimize(run_mod, params)
    run_mod = merge_compiler(run_mod, compiler_name="vta_special")
print(run_mod["main"])
from pathlib import Path
from tqdm import tqdm

ENV = {
    "model_type": "onnx",
    "input_name": "images",
    "channel": 3,
    "height": 640, 
    "width": 640,
    "mode": "RGB", # 输入图片格式
    "mean": (0,),
    "std": (255,)
}

def letterbox_image(im: Image, dst_width: int, dst_height: int):
    '''使用填充保持纵横比缩放图像
    
    Args:
        im: 原始 Image
        dst_width: 目标宽度
        dst_height: 目标高度
    '''
    iw, ih = im.size # 原始尺寸
    scale = min(dst_width/iw, dst_height/ih)
    nw = int(iw*scale)
    nh = int(ih*scale)
    im = im.resize((nw, nh), Image.BICUBIC)
    new_image = Image.new('RGB', (dst_width, dst_height), (114, 114, 114))
    new_image.paste(im, ((dst_width-nw)//2, (dst_height-nh)//2))
    return new_image

def preprocessing(path: str|None, **ENV: dict):
    if not path:
        im = np.random.randint(0, 256, size=(32, 32, 3), dtype="uint8")
        im = Image.fromarray(im) # 转为 Image 实例
    else:
        im = Image.open(path)
    # im = im.resize((ENV["width"], ENV["height"]), Image.BICUBIC)
    im = letterbox_image(im, ENV["width"], ENV["height"])
    if ENV["mode"] == "L": # 将灰度图转换为 HWC 布局
        img = im.convert("L")
        img = np.expand_dims(img, axis=-1) # 转为 HWC
    elif ENV["mode"] == "RGB":
        img = np.array(im.convert("RGB")) # 转为 HWC 布局
    elif ENV["mode"] == "BGR":
        img = np.array(im.convert("RGB")) # 转为 HWC 布局
        img = img[..., ::-1] # RGB 转 BGR
    else:
        raise TypeError(f'暂未支持数据布局 {ENV["mode"]}')
    image_np = np.expand_dims(img, 0) # 转换为 NHWC (uint8 数据)
    # 预处理后的数据
    data_inp = ((image_np - ENV["mean"]) / ENV["std"]).astype(np.float32)
    data_inp = data_inp.transpose(0, 3, 1, 2)
    return np.ascontiguousarray(image_np), np.ascontiguousarray(data_inp)

def calibrateset(calibrate_num=2, data_dir="/media/pc/data/lxw/home/data/coco/train2017"):
    """用于量化的校准数据集"""
    for k, path in tqdm(enumerate(Path(data_dir).iterdir()), desc="Calibrate", unit="batch"):
        if k >= calibrate_num:
            break
        yield {ENV["input_name"]: preprocessing(path, **ENV)[1]}
run_mod = deepcopy(mod)
with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    run_mod["main"] = rewrite(Dist2xywhSimplify(), run_mod["main"])
    with relay.quantize.qconfig(
        calibrate_mode="percentile", weight_scale="max"):
        qmod = relay.quantize.quantize(run_mod, params, dataset=calibrateset())
qmod.show()
from tvm.relay.dataflow_pattern import rewrite
from tvm_book.transforms.yolo import Dist2xywhSimplify
import tvm

with tvm.transform.PassContext(opt_level=3, disabled_pass={"AlterOpLayout"}):
    lib = relay.build(qmod, target="llvm", params=params)

func = lib[lib.libmod_name]
module = tvm.contrib.graph_executor.GraphModule(func(tvm.cpu(0)))
module.run(**{input_name: xs})
num_outputs = module.get_num_outputs()
quant_outputs = [module.get_output(k).numpy() for k in range(num_outputs)]
results = postprocess(
    [torch.from_numpy(o) for o in quant_outputs], 
    xs, [origin_image], self.model.names, 
    input_path, conf_thres=0.25, iou_thres=0.45,
)
Image.fromarray(results[0].plot())
ww
from tvm.relay.analysis import _ffi_api

output_map = _ffi_api.get_calibrate_output_map(run_mod)
calibrate_mod = _ffi_api.get_calibrate_module(run_mod)
calibrate_mod = relay.transform.Inline()(calibrate_mod)
ref_res = relay.build_module.create_executor("graph", mod=calibrate_mod, device=tvm.cpu(0)).evaluate()(**{input_name: xs})

calib_data = {}
for gvar, indices in output_map.items():
    offset = int(indices[0])
    in_len = int(indices[1])
    out_len = int(indices[2])
    value = {
        "inputs": ref_res[offset : offset + in_len],
        "outputs": ref_res[offset + in_len : offset + in_len + out_len],
    }
    calib_data[gvar] = value
func_map = {int(kk.name_hint.split("_")[-1]): kk for kk in calib_data.keys()}
calib_data[func_map[len(func_map)-1]]