测试#
import torch
import torch.nn.functional as F
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule
# --------- 校准与量化参数计算 ---------
def calc_symmetric_qparams(min_v: np.ndarray, max_v: np.ndarray, qmax=127):
# min_v/max_v: 可以是标量或向量(per-channel)
absmax = np.maximum(np.abs(min_v), np.abs(max_v))
# 防止除零
absmax = np.where(absmax == 0, 1e-8, absmax)
scale = absmax / qmax
zp = np.zeros_like(scale, dtype=np.int32)
return scale.astype(np.float32), zp
def calibrate_activation_per_tensor(calib_loader):
mins, maxs = [], []
with torch.no_grad():
for x in calib_loader:
mins.append(x.min().item())
maxs.append(x.max().item())
a_min, a_max = float(np.min(mins)), float(np.max(maxs))
s, z = calc_symmetric_qparams(np.array([a_min]), np.array([a_max]))
return torch.tensor(s[0], dtype=torch.float32), torch.tensor(z[0], dtype=torch.int32)
def calibrate_weight_per_channel(w: torch.Tensor, axis: int = 0):
# w: [oc, ic, kh, kw],按 out_channels 做 per-channel
w_np = w.detach().cpu().numpy()
oc = w_np.shape[axis]
w_flat = w_np.reshape(oc, -1)
w_min = w_flat.min(axis=1)
w_max = w_flat.max(axis=1)
s, z = calc_symmetric_qparams(w_min, w_max)
return torch.tensor(s, dtype=torch.float32), torch.tensor(z, dtype=torch.int32)
# --------- BasePyModule:量化-反量化与推理 ---------
@I.ir_module
class PerChannelPTQ(BasePyModule):
"""Per-tensor 激活 + per-channel 权重量化 的 Python 端仿真模块。"""
@I.pyfunc
def quant_dequant_activation(self, x: torch.Tensor, act_scale: torch.Tensor) -> torch.Tensor:
# 对称量化,zp=0
q = torch.clamp(torch.round(x / act_scale), -128, 127).to(torch.int8)
return q.to(torch.float32) * act_scale
@I.pyfunc
def quant_dequant_weight_pc(self, w: torch.Tensor, w_scales: torch.Tensor) -> torch.Tensor:
# w_scales: [oc],对齐到 [oc,1,1,1]
s = w_scales.view(-1, 1, 1, 1)
q = torch.clamp(torch.round(w / s), -128, 127).to(torch.int8)
return q.to(torch.float32) * s
@I.pyfunc
def conv2d_infer(
self,
x: torch.Tensor,
w: torch.Tensor,
bias: torch.Tensor,
stride_h: int,
stride_w: int,
pad_h: int,
pad_w: int,
dil_h: int,
dil_w: int,
groups: int,
) -> torch.Tensor:
return F.conv2d(x, w, bias, stride=(stride_h, stride_w), padding=(pad_h, pad_w),
dilation=(dil_h, dil_w), groups=groups)
@I.pyfunc
def main(
self,
x: torch.Tensor,
w: torch.Tensor,
b: torch.Tensor,
act_scale: torch.Tensor, # 标量
w_scales: torch.Tensor, # [out_channels]
) -> torch.Tensor:
# 1) 激活 per-tensor 假量化
x_qdq = self.quant_dequant_activation(x, act_scale)
# 2) 权重 per-channel 假量化(沿 oc 维度)
w_qdq = self.quant_dequant_weight_pc(w, w_scales)
# 3) 用量化-反量化后的张量进行推理
y = self.conv2d_infer(
x_qdq, w_qdq, b,
stride_h=1, stride_w=1,
pad_h=0, pad_w=0,
dil_h=1, dil_w=1,
groups=1,
)
return y
# 假设我们有一层 conv 的权重与偏置
conv = torch.nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=True).eval()
w = conv.weight
b = conv.bias
# 准备校准数据并计算量化参数
def calib_loader(n=8, bs=4):
for _ in range(n // bs):
yield torch.randn(bs, 3, 224, 224)
act_scale, _ = calibrate_activation_per_tensor(calib_loader())
w_scales, _ = calibrate_weight_per_channel(w, axis=0)
# 构建模块实例(CPU / CUDA 都行)
module = PerChannelPTQ
instance = module(tvm.cpu(0))
# 运行一次前向
x = torch.randn(1, 3, 224, 224)
y = instance.main(x, w, b, act_scale, w_scales)
print(y.shape) # -> torch.Size([1, 16, 224, 224])
Warning: Failed to compile Relax VM: 'NoneType' object has no attribute 'kind'
torch.Size([1, 16, 222, 222])
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule
# ===== 纯 numpy 量化工具函数 =====
def calc_symmetric_qparams(min_v: np.ndarray, max_v: np.ndarray, qmax=127):
absmax = np.maximum(np.abs(min_v), np.abs(max_v))
absmax = np.where(absmax == 0, 1e-8, absmax)
scale = absmax / qmax
zp = np.zeros_like(scale, dtype=np.int32)
return scale.astype(np.float32), zp
def quant_dequant_activation_np(x: np.ndarray, act_scale: float):
q = np.clip(np.round(x / act_scale), -128, 127).astype(np.int8)
return q.astype(np.float32) * act_scale
def quant_dequant_weight_pc_np(w: np.ndarray, w_scales: np.ndarray):
s = w_scales.reshape(-1, 1, 1, 1)
q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
return q.astype(np.float32) * s
def conv2d_nchw_np(x, w, b, stride=1, padding=0):
N, C_in, H, W = x.shape
C_out, _, KH, KW = w.shape
H_out = (H + 2*padding - KH) // stride + 1
W_out = (W + 2*padding - KW) // stride + 1
if padding > 0:
x_padded = np.zeros((N, C_in, H + 2*padding, W + 2*padding), dtype=x.dtype)
x_padded[:, :, padding:padding+H, padding:padding+W] = x
else:
x_padded = x
out = np.zeros((N, C_out, H_out, W_out), dtype=np.float32)
for n in range(N):
for oc in range(C_out):
for i in range(H_out):
for j in range(W_out):
region = x_padded[n, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
out[n, oc, i, j] = np.sum(region * w[oc]) + b[oc]
return out
# ===== 嵌入到 BasePyModule =====
@I.ir_module
class NumpyPTQModule(BasePyModule):
"""NumPy 版 PTQ 仿真模块:激活 per-tensor + 权重 per-channel"""
@I.pyfunc
def quant_dequant_activation(self, x: np.ndarray, act_scale: float) -> np.ndarray:
return quant_dequant_activation_np(x, act_scale)
@I.pyfunc
def quant_dequant_weight_pc(self, w: np.ndarray, w_scales: np.ndarray) -> np.ndarray:
return quant_dequant_weight_pc_np(w, w_scales)
@I.pyfunc
def conv2d_infer(
self,
x: np.ndarray,
w: np.ndarray,
b: np.ndarray,
stride_h: int,
stride_w: int,
pad_h: int,
pad_w: int,
) -> np.ndarray:
# 简化:stride_h == stride_w, pad_h == pad_w
return conv2d_nchw_np(x, w, b, stride=stride_h, padding=pad_h)
@I.pyfunc
def main(
self,
x: np.ndarray,
w: np.ndarray,
b: np.ndarray,
act_scale: float,
w_scales: np.ndarray,
) -> np.ndarray:
# 1) 激活 Q/DQ
x_qdq = self.quant_dequant_activation(x, act_scale)
# 2) 权重 Q/DQ
w_qdq = self.quant_dequant_weight_pc(w, w_scales)
# 3) 推理
y = self.conv2d_infer(x_qdq, w_qdq, b, 1, 1, 1, 1)
return y
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule
# ======== 工具:BN 融合到卷积(若你提供未融合参数,可先调用该函数) ========
def fold_bn_into_conv(w_conv, b_conv, gamma, beta, mean, var, eps=1e-5):
# w_conv: (C_out, C_in, KH, KW), b_conv: (C_out,)
# BN: y = gamma * (x - mean) / sqrt(var+eps) + beta
C_out = w_conv.shape[0]
if b_conv is None:
b_conv = np.zeros(C_out, dtype=np.float32)
denom = gamma / np.sqrt(var + eps) # (C_out,)
w_fused = w_conv * denom.reshape(-1, 1, 1, 1)
b_fused = (b_conv - mean) * denom + beta
return w_fused.astype(np.float32), b_fused.astype(np.float32)
# ======== 量化/反量化(NumPy) ========
def qdq_activation(x: np.ndarray, act_scale: float):
q = np.clip(np.round(x / act_scale), -128, 127).astype(np.int8)
return q.astype(np.float32) * act_scale
def qdq_weight_pc(w: np.ndarray, w_scales: np.ndarray):
s = w_scales.reshape(-1, 1, 1, 1)
q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
return q.astype(np.float32) * s
def qdq_weight_pc_linear(w: np.ndarray, w_scales: np.ndarray):
# w: (out_features, in_features); per-output-channel scales
s = w_scales.reshape(-1, 1)
q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
return q.astype(np.float32) * s
# ======== 基础算子(简明直观、便于对齐验证) ========
def conv2d_nchw(x, w, b, stride=1, padding=0):
N, C_in, H, W = x.shape
C_out, _, KH, KW = w.shape
H_out = (H + 2*padding - KH) // stride + 1
W_out = (W + 2*padding - KW) // stride + 1
if padding > 0:
x_pad = np.zeros((N, C_in, H + 2*padding, W + 2*padding), dtype=x.dtype)
x_pad[:, :, padding:padding+H, padding:padding+W] = x
else:
x_pad = x
out = np.zeros((N, C_out, H_out, W_out), dtype=np.float32)
for n in range(N):
for oc in range(C_out):
for i in range(H_out):
for j in range(W_out):
region = x_pad[n, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
out[n, oc, i, j] = np.sum(region * w[oc]) + (0.0 if b is None else b[oc])
return out
def relu(x): return np.maximum(x, 0.0)
def maxpool2d_nchw(x, kernel=3, stride=2, padding=1):
N, C, H, W = x.shape
H_out = (H + 2*padding - kernel) // stride + 1
W_out = (W + 2*padding - kernel) // stride + 1
if padding > 0:
x_pad = np.full((N, C, H + 2*padding, W + 2*padding), -np.inf, dtype=x.dtype)
x_pad[:, :, padding:padding+H, padding:padding+W] = x
else:
x_pad = x
out = np.zeros((N, C, H_out, W_out), dtype=np.float32)
for n in range(N):
for c in range(C):
for i in range(H_out):
for j in range(W_out):
region = x_pad[n, c, i*stride:i*stride+kernel, j*stride:j*stride+kernel]
out[n, c, i, j] = np.max(region)
return out
def global_avg_pool_nchw(x):
# N,C,H,W -> N,C
return x.mean(axis=(2, 3))
def linear(x, w, b):
# x: (N, in_features), w: (out_features, in_features), b: (out_features,)
return x @ w.T + (0.0 if b is None else b)
# ======== 架构与命名(与 PyTorch ResNet18 一致) ========
RESNET18_TOPO = [
# (name, stride, padding)
("conv1", 2, 3),
# layer1 (2 blocks, no downsample)
("layer1.0.conv1", 1, 1),
("layer1.0.conv2", 1, 1),
("layer1.1.conv1", 1, 1),
("layer1.1.conv2", 1, 1),
# layer2 (first block has downsample and stride=2 on conv1)
("layer2.0.conv1", 2, 1),
("layer2.0.conv2", 1, 1),
("layer2.0.downsample.0", 2, 0),
("layer2.1.conv1", 1, 1),
("layer2.1.conv2", 1, 1),
# layer3
("layer3.0.conv1", 2, 1),
("layer3.0.conv2", 1, 1),
("layer3.0.downsample.0", 2, 0),
("layer3.1.conv1", 1, 1),
("layer3.1.conv2", 1, 1),
# layer4
("layer4.0.conv1", 2, 1),
("layer4.0.conv2", 1, 1),
("layer4.0.downsample.0", 2, 0),
("layer4.1.conv1", 1, 1),
("layer4.1.conv2", 1, 1),
]
DOWNSAMPLE_KEYS = {"layer2.0.downsample.0", "layer3.0.downsample.0", "layer4.0.downsample.0"}
# 将每个 block 的边界用于残差加法与 ReLU
BLOCK_ENDS = {
# (conv names in block, optional downsample name)
"layer1.0": (["layer1.0.conv1", "layer1.0.conv2"], None),
"layer1.1": (["layer1.1.conv1", "layer1.1.conv2"], None),
"layer2.0": (["layer2.0.conv1", "layer2.0.conv2"], "layer2.0.downsample.0"),
"layer2.1": (["layer2.1.conv1", "layer2.1.conv2"], None),
"layer3.0": (["layer3.0.conv1", "layer3.0.conv2"], "layer3.0.downsample.0"),
"layer3.1": (["layer3.1.conv1", "layer3.1.conv2"], None),
"layer4.0": (["layer4.0.conv1", "layer4.0.conv2"], "layer4.0.downsample.0"),
"layer4.1": (["layer4.1.conv1", "layer4.1.conv2"], None),
}
# ======== BasePyModule:自动遍历 ResNet18,逐层 Q/DQ + 推理 ========
@I.ir_module
class NumpyPTQResNet18(BasePyModule):
"""
NumPy 版 ResNet18 PTQ 仿真模块
- set_params: 装入已 BN 融合的所有卷积与 FC 参数
- set_quant: 装入每个卷积/FC 的量化参数(激活 per-tensor,权重 per-channel)
- main: 端到端按拓扑自动遍历、逐层 Q/DQ + 推理
"""
@I.pyfunc
def set_params(self, params: object) -> None:
"""
params: dict[str, np.ndarray]
- 卷积: "<name>.weight" -> (C_out,C_in,KH,KW), "<name>.bias" -> (C_out,)
- FC: "fc.weight" -> (num_classes, 512), "fc.bias" -> (num_classes,)
约定 name 来自 RESNET18_TOPO 中的 key(外加 "fc")
"""
self.params = params
@I.pyfunc
def set_quant(self, w_scales: object, act_scales: object, fc_w_scales: np.ndarray, fc_act_scale: float) -> None:
"""
w_scales: dict[str, np.ndarray] # per-channel, shape (C_out,)
act_scales: dict[str, float] # per-tensor, each conv input
fc_w_scales: (num_classes,)
fc_act_scale: float
"""
self.w_scales = w_scales
self.act_scales = act_scales
self.fc_w_scales = fc_w_scales.astype(np.float32)
self.fc_act_scale = float(fc_act_scale)
@I.pyfunc
def _run_conv_qdq(self, name: str, x: np.ndarray, stride: int, padding: int) -> np.ndarray:
# 激活 Q/DQ(输入到该卷积的激活)
x_q = qdq_activation(x, float(self.act_scales[name]))
# 权重 Q/DQ(该卷积 per-channel)
w = self.params[f"{name}.weight"]; b = self.params.get(f"{name}.bias", None)
w_q = qdq_weight_pc(w, self.w_scales[name])
# 卷积
y = conv2d_nchw(x_q, w_q, b, stride=stride, padding=padding)
return y
@I.pyfunc
def _run_block(self, x_in: np.ndarray, block_key: str) -> np.ndarray:
# block_key: "layer{1..4}.{0|1}"
convs, down = BLOCK_ENDS[block_key]
# 保存残差输入
identity = x_in
# conv1
y = self._run_conv_qdq(convs[0], x_in, *self._sp(convs[0]))
y = relu(y)
# conv2
y = self._run_conv_qdq(convs[1], y, *self._sp(convs[1]))
# 下采样分支(若有)
if down is not None:
identity = self._run_conv_qdq(down, identity, *self._sp(down))
# 残差相加 + ReLU
out = relu(y + identity)
return out
@I.pyfunc
def _sp(self, name: str) -> tuple:
# 返回 (stride, padding)
for k, s, p in RESNET18_TOPO:
if k == name:
return (s, p)
# 不应发生
return (1, 0)
@I.pyfunc
def main(self, x: np.ndarray) -> np.ndarray:
# stem
x = self._run_conv_qdq("conv1", x, *self._sp("conv1"))
x = relu(x)
x = maxpool2d_nchw(x, kernel=3, stride=2, padding=1)
# layer1
x = self._run_block(x, "layer1.0")
x = self._run_block(x, "layer1.1")
# layer2
x = self._run_block(x, "layer2.0")
x = self._run_block(x, "layer2.1")
# layer3
x = self._run_block(x, "layer3.0")
x = self._run_block(x, "layer3.1")
# layer4
x = self._run_block(x, "layer4.0")
x = self._run_block(x, "layer4.1")
# GAP + FC(FC 也做 Q/DQ)
x = global_avg_pool_nchw(x) # (N, 512)
x_q = qdq_activation(x, float(self.fc_act_scale))
w = self.params["fc.weight"]; b = self.params.get("fc.bias", None)
w_q = qdq_weight_pc_linear(w, self.fc_w_scales) # per-out
y = linear(x_q, w_q, b) # (N, num_classes)
return y
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule
# ========= 可选:BN 融合到卷积 =========
def fold_bn_into_conv(w_conv, b_conv, gamma, beta, mean, var, eps=1e-5):
# w_conv: (C_out, C_in, KH, KW), b_conv: (C_out,)
C_out = w_conv.shape[0]
if b_conv is None:
b_conv = np.zeros(C_out, dtype=np.float32)
denom = gamma / np.sqrt(var + eps) # (C_out,)
w_fused = w_conv * denom.reshape(-1, 1, 1, 1)
b_fused = (b_conv - mean) * denom + beta
return w_fused.astype(np.float32), b_fused.astype(np.float32)
# ========= 量化/反量化(NumPy) =========
def qdq_activation(x: np.ndarray, act_scale: float):
q = np.clip(np.round(x / act_scale), -128, 127).astype(np.int8)
return q.astype(np.float32) * act_scale
def qdq_weight_pc(w: np.ndarray, w_scales: np.ndarray):
s = w_scales.reshape(-1, 1, 1, 1) # per-out-channel
q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
return q.astype(np.float32) * s
def qdq_weight_pc_linear(w: np.ndarray, w_scales: np.ndarray):
s = w_scales.reshape(-1, 1) # per-out-channel
q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
return q.astype(np.float32) * s
# ========= 基础算子(NumPy) =========
def conv2d_nchw(x, w, b, stride=1, padding=0):
N, C_in, H, W = x.shape
C_out, _, KH, KW = w.shape
H_out = (H + 2*padding - KH) // stride + 1
W_out = (W + 2*padding - KW) // stride + 1
if padding > 0:
x_pad = np.zeros((N, C_in, H + 2*padding, W + 2*padding), dtype=x.dtype)
x_pad[:, :, padding:padding+H, padding:padding+W] = x
else:
x_pad = x
out = np.zeros((N, C_out, H_out, W_out), dtype=np.float32)
for n in range(N):
for oc in range(C_out):
for i in range(H_out):
for j in range(W_out):
region = x_pad[n, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
out[n, oc, i, j] = np.sum(region * w[oc]) + (0.0 if b is None else b[oc])
return out
def relu(x): return np.maximum(x, 0.0)
def maxpool2d_nchw(x, kernel=3, stride=2, padding=1):
N, C, H, W = x.shape
H_out = (H + 2*padding - kernel) // stride + 1
W_out = (W + 2*padding - kernel) // stride + 1
if padding > 0:
x_pad = np.full((N, C, H + 2*padding, W + 2*padding), -np.inf, dtype=x.dtype)
x_pad[:, :, padding:padding+H, padding:padding+W] = x
else:
x_pad = x
out = np.zeros((N, C, H_out, W_out), dtype=np.float32)
for n in range(N):
for c in range(C):
for i in range(H_out):
for j in range(W_out):
region = x_pad[n, c, i*stride:i*stride+kernel, j*stride:j*stride+kernel]
out[n, c, i, j] = np.max(region)
return out
def global_avg_pool_nchw(x):
# N,C,H,W -> N,C
return x.mean(axis=(2, 3))
def linear(x, w, b):
# x: (N, in_features), w: (out_features, in_features)
return x @ w.T + (0.0 if b is None else b)
# ========= ResNet18 拓扑与 block 边界 =========
RESNET18_TOPO = [
("conv1", 2, 3),
# layer1
("layer1.0.conv1", 1, 1),
("layer1.0.conv2", 1, 1),
("layer1.1.conv1", 1, 1),
("layer1.1.conv2", 1, 1),
# layer2
("layer2.0.conv1", 2, 1),
("layer2.0.conv2", 1, 1),
("layer2.0.downsample.0", 2, 0),
("layer2.1.conv1", 1, 1),
("layer2.1.conv2", 1, 1),
# layer3
("layer3.0.conv1", 2, 1),
("layer3.0.conv2", 1, 1),
("layer3.0.downsample.0", 2, 0),
("layer3.1.conv1", 1, 1),
("layer3.1.conv2", 1, 1),
# layer4
("layer4.0.conv1", 2, 1),
("layer4.0.conv2", 1, 1),
("layer4.0.downsample.0", 2, 0),
("layer4.1.conv1", 1, 1),
("layer4.1.conv2", 1, 1),
]
BLOCK_ENDS = {
"layer1.0": (["layer1.0.conv1", "layer1.0.conv2"], None),
"layer1.1": (["layer1.1.conv1", "layer1.1.conv2"], None),
"layer2.0": (["layer2.0.conv1", "layer2.0.conv2"], "layer2.0.downsample.0"),
"layer2.1": (["layer2.1.conv1", "layer2.1.conv2"], None),
"layer3.0": (["layer3.0.conv1", "layer3.0.conv2"], "layer3.0.downsample.0"),
"layer3.1": (["layer3.1.conv1", "layer3.1.conv2"], None),
"layer4.0": (["layer4.0.conv1", "layer4.0.conv2"], "layer4.0.downsample.0"),
"layer4.1": (["layer4.1.conv1", "layer4.1.conv2"], None),
}
# ========= BasePyModule:自动遍历 + 逐层 Q/DQ =========
@I.ir_module
class NumpyPTQResNet18(BasePyModule):
"""
NumPy 版 ResNet18 PTQ 仿真
- set_params: 装入已 BN 融合的所有卷积与 FC 参数
- set_quant: 装入每个卷积/FC 的量化参数(激活 per-tensor,权重 per-channel)
- main: 端到端按拓扑自动遍历、逐层 Q/DQ + 推理
"""
@I.pyfunc
def set_params(self, params: object) -> None:
"""
params: dict[str, np.ndarray]
卷积: "<name>.weight" (C_out,C_in,KH,KW), "<name>.bias" (C_out,)
FC: "fc.weight" (num_classes, 512), "fc.bias" (num_classes,)
"""
self.params = params
@I.pyfunc
def set_quant(
self,
w_scales: object,
act_scales: object,
fc_w_scales: np.ndarray,
fc_act_scale: float
) -> None:
"""
w_scales: dict[str, np.ndarray] # per-channel, shape (C_out,)
act_scales: dict[str, float] # per-tensor, 每个卷积输入
fc_w_scales: np.ndarray # (num_classes,)
fc_act_scale: float
"""
self.w_scales = w_scales
self.act_scales = act_scales
self.fc_w_scales = fc_w_scales.astype(np.float32)
self.fc_act_scale = float(fc_act_scale)
@I.pyfunc
def _sp(self, name: str) -> tuple:
# 返回 (stride, padding)
for k, s, p in RESNET18_TOPO:
if k == name:
return (s, p)
return (1, 0)
@I.pyfunc
def _run_conv_qdq(self, name: str, x: np.ndarray, stride: int, padding: int) -> np.ndarray:
# 激活 Q/DQ(输入到该卷积的激活)
x_q = qdq_activation(x, float(self.act_scales[name]))
# 权重 Q/DQ(该卷积 per-channel)
w = self.params[f"{name}.weight"]; b = self.params.get(f"{name}.bias", None)
w_q = qdq_weight_pc(w, self.w_scales[name])
# 卷积
y = conv2d_nchw(x_q, w_q, b, stride=stride, padding=padding)
return y
@I.pyfunc
def _run_block(self, x_in: np.ndarray, block_key: str) -> np.ndarray:
# block_key: "layer{1..4}.{0|1}"
convs, down = BLOCK_ENDS[block_key]
identity = x_in
# conv1
y = self._run_conv_qdq(convs[0], x_in, *self._sp(convs[0]))
y = relu(y)
# conv2
y = self._run_conv_qdq(convs[1], y, *self._sp(convs[1]))
# 下采样分支(可选)
if down is not None:
identity = self._run_conv_qdq(down, identity, *self._sp(down))
# 残差相加 + ReLU
out = relu(y + identity)
return out
@I.pyfunc
def main(self, x: np.ndarray) -> np.ndarray:
# stem
x = self._run_conv_qdq("conv1", x, *self._sp("conv1"))
x = relu(x)
x = maxpool2d_nchw(x, kernel=3, stride=2, padding=1)
# layer1
x = self._run_block(x, "layer1.0")
x = self._run_block(x, "layer1.1")
# layer2
x = self._run_block(x, "layer2.0")
x = self._run_block(x, "layer2.1")
# layer3
x = self._run_block(x, "layer3.0")
x = self._run_block(x, "layer3.1")
# layer4
x = self._run_block(x, "layer4.0")
x = self._run_block(x, "layer4.1")
# GAP + FC(FC 也做 Q/DQ)
x = global_avg_pool_nchw(x) # (N, 512)
x_q = qdq_activation(x, float(self.fc_act_scale))
w = self.params["fc.weight"]; b = self.params.get("fc.bias", None)
w_q = qdq_weight_pc_linear(w, self.fc_w_scales) # per-out-channel
y = linear(x_q, w_q, b) # (N, num_classes)
return y
import numpy as np
import tvm
# 1) 设备与实例
device = tvm.cpu(0)
Module = NumpyPTQResNet18
inst = Module(device)
# 2) 构造权重(示例:随机占位)
def make_shape_map():
return {
"conv1": (64, 3, 7, 7),
"layer1.0.conv1": (64, 64, 3, 3),
"layer1.0.conv2": (64, 64, 3, 3),
"layer1.1.conv1": (64, 64, 3, 3),
"layer1.1.conv2": (64, 64, 3, 3),
"layer2.0.conv1": (128, 64, 3, 3),
"layer2.0.conv2": (128, 128, 3, 3),
"layer2.0.downsample.0": (128, 64, 1, 1),
"layer2.1.conv1": (128, 128, 3, 3),
"layer2.1.conv2": (128, 128, 3, 3),
"layer3.0.conv1": (256, 128, 3, 3),
"layer3.0.conv2": (256, 256, 3, 3),
"layer3.0.downsample.0": (256, 128, 1, 1),
"layer3.1.conv1": (256, 256, 3, 3),
"layer3.1.conv2": (256, 256, 3, 3),
"layer4.0.conv1": (512, 256, 3, 3),
"layer4.0.conv2": (512, 512, 3, 3),
"layer4.0.downsample.0": (512, 256, 1, 1),
"layer4.1.conv1": (512, 512, 3, 3),
"layer4.1.conv2": (512, 512, 3, 3),
}
np.random.seed(0)
shapes = make_shape_map()
params = {}
for name, shape in shapes.items():
params[f"{name}.weight"] = np.random.randn(*shape).astype(np.float32)
params[f"{name}.bias"] = np.random.randn(shape[0]).astype(np.float32)
num_classes = 1000
params["fc.weight"] = np.random.randn(num_classes, 512).astype(np.float32)
params["fc.bias"] = np.random.randn(num_classes).astype(np.float32)
inst.set_params(params)
# 3) 量化参数(示例:随机/常数占位;实际应来自校准)
act_scales = {}
w_scales = {}
for name, shape in shapes.items():
C_out = shape[0]
act_scales[name] = float(0.05) # per-tensor
# per-channel scale,避免为 0
w_scales[name] = (np.random.rand(C_out).astype(np.float32) * 0.1 + 1e-3)
fc_act_scale = float(0.05)
fc_w_scales = (np.random.rand(num_classes).astype(np.float32) * 0.1 + 1e-3)
inst.set_quant(w_scales, act_scales, fc_w_scales, fc_act_scale)
# 4) 端到端推理
x = np.random.randn(1, 3, 224, 224).astype(np.float32)
y = inst.main(x)
print(y.shape, y.dtype) # (1, num_classes) float32