测试

测试#

import torch
import torch.nn.functional as F
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule

# --------- 校准与量化参数计算 ---------

def calc_symmetric_qparams(min_v: np.ndarray, max_v: np.ndarray, qmax=127):
    # min_v/max_v: 可以是标量或向量(per-channel)
    absmax = np.maximum(np.abs(min_v), np.abs(max_v))
    # 防止除零
    absmax = np.where(absmax == 0, 1e-8, absmax)
    scale = absmax / qmax
    zp = np.zeros_like(scale, dtype=np.int32)
    return scale.astype(np.float32), zp

def calibrate_activation_per_tensor(calib_loader):
    mins, maxs = [], []
    with torch.no_grad():
        for x in calib_loader:
            mins.append(x.min().item())
            maxs.append(x.max().item())
    a_min, a_max = float(np.min(mins)), float(np.max(maxs))
    s, z = calc_symmetric_qparams(np.array([a_min]), np.array([a_max]))
    return torch.tensor(s[0], dtype=torch.float32), torch.tensor(z[0], dtype=torch.int32)

def calibrate_weight_per_channel(w: torch.Tensor, axis: int = 0):
    # w: [oc, ic, kh, kw],按 out_channels 做 per-channel
    w_np = w.detach().cpu().numpy()
    oc = w_np.shape[axis]
    w_flat = w_np.reshape(oc, -1)
    w_min = w_flat.min(axis=1)
    w_max = w_flat.max(axis=1)
    s, z = calc_symmetric_qparams(w_min, w_max)
    return torch.tensor(s, dtype=torch.float32), torch.tensor(z, dtype=torch.int32)

# --------- BasePyModule:量化-反量化与推理 ---------

@I.ir_module
class PerChannelPTQ(BasePyModule):
    """Per-tensor 激活 + per-channel 权重量化 的 Python 端仿真模块。"""

    @I.pyfunc
    def quant_dequant_activation(self, x: torch.Tensor, act_scale: torch.Tensor) -> torch.Tensor:
        # 对称量化,zp=0
        q = torch.clamp(torch.round(x / act_scale), -128, 127).to(torch.int8)
        return q.to(torch.float32) * act_scale

    @I.pyfunc
    def quant_dequant_weight_pc(self, w: torch.Tensor, w_scales: torch.Tensor) -> torch.Tensor:
        # w_scales: [oc],对齐到 [oc,1,1,1]
        s = w_scales.view(-1, 1, 1, 1)
        q = torch.clamp(torch.round(w / s), -128, 127).to(torch.int8)
        return q.to(torch.float32) * s

    @I.pyfunc
    def conv2d_infer(
        self,
        x: torch.Tensor,
        w: torch.Tensor,
        bias: torch.Tensor,
        stride_h: int,
        stride_w: int,
        pad_h: int,
        pad_w: int,
        dil_h: int,
        dil_w: int,
        groups: int,
    ) -> torch.Tensor:
        return F.conv2d(x, w, bias, stride=(stride_h, stride_w), padding=(pad_h, pad_w),
                        dilation=(dil_h, dil_w), groups=groups)

    @I.pyfunc
    def main(
        self,
        x: torch.Tensor,
        w: torch.Tensor,
        b: torch.Tensor,
        act_scale: torch.Tensor,    # 标量
        w_scales: torch.Tensor,     # [out_channels]
    ) -> torch.Tensor:
        # 1) 激活 per-tensor 假量化
        x_qdq = self.quant_dequant_activation(x, act_scale)

        # 2) 权重 per-channel 假量化(沿 oc 维度)
        w_qdq = self.quant_dequant_weight_pc(w, w_scales)

        # 3) 用量化-反量化后的张量进行推理
        y = self.conv2d_infer(
            x_qdq, w_qdq, b,
            stride_h=1, stride_w=1,
            pad_h=0, pad_w=0,
            dil_h=1, dil_w=1,
            groups=1,
        )
        return y
# 假设我们有一层 conv 的权重与偏置
conv = torch.nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=True).eval()
w = conv.weight
b = conv.bias

# 准备校准数据并计算量化参数
def calib_loader(n=8, bs=4):
    for _ in range(n // bs):
        yield torch.randn(bs, 3, 224, 224)

act_scale, _ = calibrate_activation_per_tensor(calib_loader())
w_scales, _ = calibrate_weight_per_channel(w, axis=0)

# 构建模块实例(CPU / CUDA 都行)
module = PerChannelPTQ
instance = module(tvm.cpu(0))

# 运行一次前向
x = torch.randn(1, 3, 224, 224)
y = instance.main(x, w, b, act_scale, w_scales)
print(y.shape)  # -> torch.Size([1, 16, 224, 224])
Warning: Failed to compile Relax VM: 'NoneType' object has no attribute 'kind'
torch.Size([1, 16, 222, 222])
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule

# ===== 纯 numpy 量化工具函数 =====
def calc_symmetric_qparams(min_v: np.ndarray, max_v: np.ndarray, qmax=127):
    absmax = np.maximum(np.abs(min_v), np.abs(max_v))
    absmax = np.where(absmax == 0, 1e-8, absmax)
    scale = absmax / qmax
    zp = np.zeros_like(scale, dtype=np.int32)
    return scale.astype(np.float32), zp

def quant_dequant_activation_np(x: np.ndarray, act_scale: float):
    q = np.clip(np.round(x / act_scale), -128, 127).astype(np.int8)
    return q.astype(np.float32) * act_scale

def quant_dequant_weight_pc_np(w: np.ndarray, w_scales: np.ndarray):
    s = w_scales.reshape(-1, 1, 1, 1)
    q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
    return q.astype(np.float32) * s

def conv2d_nchw_np(x, w, b, stride=1, padding=0):
    N, C_in, H, W = x.shape
    C_out, _, KH, KW = w.shape
    H_out = (H + 2*padding - KH) // stride + 1
    W_out = (W + 2*padding - KW) // stride + 1
    if padding > 0:
        x_padded = np.zeros((N, C_in, H + 2*padding, W + 2*padding), dtype=x.dtype)
        x_padded[:, :, padding:padding+H, padding:padding+W] = x
    else:
        x_padded = x
    out = np.zeros((N, C_out, H_out, W_out), dtype=np.float32)
    for n in range(N):
        for oc in range(C_out):
            for i in range(H_out):
                for j in range(W_out):
                    region = x_padded[n, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
                    out[n, oc, i, j] = np.sum(region * w[oc]) + b[oc]
    return out

# ===== 嵌入到 BasePyModule =====
@I.ir_module
class NumpyPTQModule(BasePyModule):
    """NumPy 版 PTQ 仿真模块:激活 per-tensor + 权重 per-channel"""

    @I.pyfunc
    def quant_dequant_activation(self, x: np.ndarray, act_scale: float) -> np.ndarray:
        return quant_dequant_activation_np(x, act_scale)

    @I.pyfunc
    def quant_dequant_weight_pc(self, w: np.ndarray, w_scales: np.ndarray) -> np.ndarray:
        return quant_dequant_weight_pc_np(w, w_scales)

    @I.pyfunc
    def conv2d_infer(
        self,
        x: np.ndarray,
        w: np.ndarray,
        b: np.ndarray,
        stride_h: int,
        stride_w: int,
        pad_h: int,
        pad_w: int,
    ) -> np.ndarray:
        # 简化:stride_h == stride_w, pad_h == pad_w
        return conv2d_nchw_np(x, w, b, stride=stride_h, padding=pad_h)

    @I.pyfunc
    def main(
        self,
        x: np.ndarray,
        w: np.ndarray,
        b: np.ndarray,
        act_scale: float,
        w_scales: np.ndarray,
    ) -> np.ndarray:
        # 1) 激活 Q/DQ
        x_qdq = self.quant_dequant_activation(x, act_scale)
        # 2) 权重 Q/DQ
        w_qdq = self.quant_dequant_weight_pc(w, w_scales)
        # 3) 推理
        y = self.conv2d_infer(x_qdq, w_qdq, b, 1, 1, 1, 1)
        return y
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule

# ======== 工具:BN 融合到卷积(若你提供未融合参数,可先调用该函数) ========
def fold_bn_into_conv(w_conv, b_conv, gamma, beta, mean, var, eps=1e-5):
    # w_conv: (C_out, C_in, KH, KW), b_conv: (C_out,)
    # BN: y = gamma * (x - mean) / sqrt(var+eps) + beta
    C_out = w_conv.shape[0]
    if b_conv is None:
        b_conv = np.zeros(C_out, dtype=np.float32)
    denom = gamma / np.sqrt(var + eps)  # (C_out,)
    w_fused = w_conv * denom.reshape(-1, 1, 1, 1)
    b_fused = (b_conv - mean) * denom + beta
    return w_fused.astype(np.float32), b_fused.astype(np.float32)

# ======== 量化/反量化(NumPy) ========
def qdq_activation(x: np.ndarray, act_scale: float):
    q = np.clip(np.round(x / act_scale), -128, 127).astype(np.int8)
    return q.astype(np.float32) * act_scale

def qdq_weight_pc(w: np.ndarray, w_scales: np.ndarray):
    s = w_scales.reshape(-1, 1, 1, 1)
    q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
    return q.astype(np.float32) * s

def qdq_weight_pc_linear(w: np.ndarray, w_scales: np.ndarray):
    # w: (out_features, in_features); per-output-channel scales
    s = w_scales.reshape(-1, 1)
    q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
    return q.astype(np.float32) * s

# ======== 基础算子(简明直观、便于对齐验证) ========
def conv2d_nchw(x, w, b, stride=1, padding=0):
    N, C_in, H, W = x.shape
    C_out, _, KH, KW = w.shape
    H_out = (H + 2*padding - KH) // stride + 1
    W_out = (W + 2*padding - KW) // stride + 1
    if padding > 0:
        x_pad = np.zeros((N, C_in, H + 2*padding, W + 2*padding), dtype=x.dtype)
        x_pad[:, :, padding:padding+H, padding:padding+W] = x
    else:
        x_pad = x
    out = np.zeros((N, C_out, H_out, W_out), dtype=np.float32)
    for n in range(N):
        for oc in range(C_out):
            for i in range(H_out):
                for j in range(W_out):
                    region = x_pad[n, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
                    out[n, oc, i, j] = np.sum(region * w[oc]) + (0.0 if b is None else b[oc])
    return out

def relu(x): return np.maximum(x, 0.0)

def maxpool2d_nchw(x, kernel=3, stride=2, padding=1):
    N, C, H, W = x.shape
    H_out = (H + 2*padding - kernel) // stride + 1
    W_out = (W + 2*padding - kernel) // stride + 1
    if padding > 0:
        x_pad = np.full((N, C, H + 2*padding, W + 2*padding), -np.inf, dtype=x.dtype)
        x_pad[:, :, padding:padding+H, padding:padding+W] = x
    else:
        x_pad = x
    out = np.zeros((N, C, H_out, W_out), dtype=np.float32)
    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    region = x_pad[n, c, i*stride:i*stride+kernel, j*stride:j*stride+kernel]
                    out[n, c, i, j] = np.max(region)
    return out

def global_avg_pool_nchw(x):
    # N,C,H,W -> N,C
    return x.mean(axis=(2, 3))

def linear(x, w, b):
    # x: (N, in_features), w: (out_features, in_features), b: (out_features,)
    return x @ w.T + (0.0 if b is None else b)

# ======== 架构与命名(与 PyTorch ResNet18 一致) ========
RESNET18_TOPO = [
    # (name, stride, padding)
    ("conv1", 2, 3),
    # layer1 (2 blocks, no downsample)
    ("layer1.0.conv1", 1, 1),
    ("layer1.0.conv2", 1, 1),
    ("layer1.1.conv1", 1, 1),
    ("layer1.1.conv2", 1, 1),
    # layer2 (first block has downsample and stride=2 on conv1)
    ("layer2.0.conv1", 2, 1),
    ("layer2.0.conv2", 1, 1),
    ("layer2.0.downsample.0", 2, 0),
    ("layer2.1.conv1", 1, 1),
    ("layer2.1.conv2", 1, 1),
    # layer3
    ("layer3.0.conv1", 2, 1),
    ("layer3.0.conv2", 1, 1),
    ("layer3.0.downsample.0", 2, 0),
    ("layer3.1.conv1", 1, 1),
    ("layer3.1.conv2", 1, 1),
    # layer4
    ("layer4.0.conv1", 2, 1),
    ("layer4.0.conv2", 1, 1),
    ("layer4.0.downsample.0", 2, 0),
    ("layer4.1.conv1", 1, 1),
    ("layer4.1.conv2", 1, 1),
]
DOWNSAMPLE_KEYS = {"layer2.0.downsample.0", "layer3.0.downsample.0", "layer4.0.downsample.0"}

# 将每个 block 的边界用于残差加法与 ReLU
BLOCK_ENDS = {
    # (conv names in block, optional downsample name)
    "layer1.0": (["layer1.0.conv1", "layer1.0.conv2"], None),
    "layer1.1": (["layer1.1.conv1", "layer1.1.conv2"], None),
    "layer2.0": (["layer2.0.conv1", "layer2.0.conv2"], "layer2.0.downsample.0"),
    "layer2.1": (["layer2.1.conv1", "layer2.1.conv2"], None),
    "layer3.0": (["layer3.0.conv1", "layer3.0.conv2"], "layer3.0.downsample.0"),
    "layer3.1": (["layer3.1.conv1", "layer3.1.conv2"], None),
    "layer4.0": (["layer4.0.conv1", "layer4.0.conv2"], "layer4.0.downsample.0"),
    "layer4.1": (["layer4.1.conv1", "layer4.1.conv2"], None),
}

# ======== BasePyModule:自动遍历 ResNet18,逐层 Q/DQ + 推理 ========
@I.ir_module
class NumpyPTQResNet18(BasePyModule):
    """
    NumPy 版 ResNet18 PTQ 仿真模块
    - set_params: 装入已 BN 融合的所有卷积与 FC 参数
    - set_quant:  装入每个卷积/FC 的量化参数(激活 per-tensor,权重 per-channel)
    - main:       端到端按拓扑自动遍历、逐层 Q/DQ + 推理
    """

    @I.pyfunc
    def set_params(self, params: object) -> None:
        """
        params: dict[str, np.ndarray]
          - 卷积:  "<name>.weight" -> (C_out,C_in,KH,KW), "<name>.bias" -> (C_out,)
          - FC:   "fc.weight" -> (num_classes, 512), "fc.bias" -> (num_classes,)
          约定 name 来自 RESNET18_TOPO 中的 key(外加 "fc")
        """
        self.params = params

    @I.pyfunc
    def set_quant(self, w_scales: object, act_scales: object, fc_w_scales: np.ndarray, fc_act_scale: float) -> None:
        """
        w_scales: dict[str, np.ndarray]     # per-channel, shape (C_out,)
        act_scales: dict[str, float]        # per-tensor, each conv input
        fc_w_scales: (num_classes,)
        fc_act_scale: float
        """
        self.w_scales = w_scales
        self.act_scales = act_scales
        self.fc_w_scales = fc_w_scales.astype(np.float32)
        self.fc_act_scale = float(fc_act_scale)

    @I.pyfunc
    def _run_conv_qdq(self, name: str, x: np.ndarray, stride: int, padding: int) -> np.ndarray:
        # 激活 Q/DQ(输入到该卷积的激活)
        x_q = qdq_activation(x, float(self.act_scales[name]))
        # 权重 Q/DQ(该卷积 per-channel)
        w = self.params[f"{name}.weight"]; b = self.params.get(f"{name}.bias", None)
        w_q = qdq_weight_pc(w, self.w_scales[name])
        # 卷积
        y = conv2d_nchw(x_q, w_q, b, stride=stride, padding=padding)
        return y

    @I.pyfunc
    def _run_block(self, x_in: np.ndarray, block_key: str) -> np.ndarray:
        # block_key: "layer{1..4}.{0|1}"
        convs, down = BLOCK_ENDS[block_key]
        # 保存残差输入
        identity = x_in

        # conv1
        y = self._run_conv_qdq(convs[0], x_in, *self._sp(convs[0]))
        y = relu(y)
        # conv2
        y = self._run_conv_qdq(convs[1], y, *self._sp(convs[1]))

        # 下采样分支(若有)
        if down is not None:
            identity = self._run_conv_qdq(down, identity, *self._sp(down))

        # 残差相加 + ReLU
        out = relu(y + identity)
        return out

    @I.pyfunc
    def _sp(self, name: str) -> tuple:
        # 返回 (stride, padding)
        for k, s, p in RESNET18_TOPO:
            if k == name:
                return (s, p)
        # 不应发生
        return (1, 0)

    @I.pyfunc
    def main(self, x: np.ndarray) -> np.ndarray:
        # stem
        x = self._run_conv_qdq("conv1", x, *self._sp("conv1"))
        x = relu(x)
        x = maxpool2d_nchw(x, kernel=3, stride=2, padding=1)

        # layer1
        x = self._run_block(x, "layer1.0")
        x = self._run_block(x, "layer1.1")
        # layer2
        x = self._run_block(x, "layer2.0")
        x = self._run_block(x, "layer2.1")
        # layer3
        x = self._run_block(x, "layer3.0")
        x = self._run_block(x, "layer3.1")
        # layer4
        x = self._run_block(x, "layer4.0")
        x = self._run_block(x, "layer4.1")

        # GAP + FC(FC 也做 Q/DQ)
        x = global_avg_pool_nchw(x)              # (N, 512)
        x_q = qdq_activation(x, float(self.fc_act_scale))
        w = self.params["fc.weight"]; b = self.params.get("fc.bias", None)
        w_q = qdq_weight_pc_linear(w, self.fc_w_scales)  # per-out
        y = linear(x_q, w_q, b)                  # (N, num_classes)
        return y
import numpy as np
import tvm
from tvm.script import ir as I
from tvm.relax import BasePyModule

# ========= 可选:BN 融合到卷积 =========
def fold_bn_into_conv(w_conv, b_conv, gamma, beta, mean, var, eps=1e-5):
    # w_conv: (C_out, C_in, KH, KW), b_conv: (C_out,)
    C_out = w_conv.shape[0]
    if b_conv is None:
        b_conv = np.zeros(C_out, dtype=np.float32)
    denom = gamma / np.sqrt(var + eps)  # (C_out,)
    w_fused = w_conv * denom.reshape(-1, 1, 1, 1)
    b_fused = (b_conv - mean) * denom + beta
    return w_fused.astype(np.float32), b_fused.astype(np.float32)

# ========= 量化/反量化(NumPy) =========
def qdq_activation(x: np.ndarray, act_scale: float):
    q = np.clip(np.round(x / act_scale), -128, 127).astype(np.int8)
    return q.astype(np.float32) * act_scale

def qdq_weight_pc(w: np.ndarray, w_scales: np.ndarray):
    s = w_scales.reshape(-1, 1, 1, 1)  # per-out-channel
    q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
    return q.astype(np.float32) * s

def qdq_weight_pc_linear(w: np.ndarray, w_scales: np.ndarray):
    s = w_scales.reshape(-1, 1)  # per-out-channel
    q = np.clip(np.round(w / s), -128, 127).astype(np.int8)
    return q.astype(np.float32) * s

# ========= 基础算子(NumPy) =========
def conv2d_nchw(x, w, b, stride=1, padding=0):
    N, C_in, H, W = x.shape
    C_out, _, KH, KW = w.shape
    H_out = (H + 2*padding - KH) // stride + 1
    W_out = (W + 2*padding - KW) // stride + 1
    if padding > 0:
        x_pad = np.zeros((N, C_in, H + 2*padding, W + 2*padding), dtype=x.dtype)
        x_pad[:, :, padding:padding+H, padding:padding+W] = x
    else:
        x_pad = x
    out = np.zeros((N, C_out, H_out, W_out), dtype=np.float32)
    for n in range(N):
        for oc in range(C_out):
            for i in range(H_out):
                for j in range(W_out):
                    region = x_pad[n, :, i*stride:i*stride+KH, j*stride:j*stride+KW]
                    out[n, oc, i, j] = np.sum(region * w[oc]) + (0.0 if b is None else b[oc])
    return out

def relu(x): return np.maximum(x, 0.0)

def maxpool2d_nchw(x, kernel=3, stride=2, padding=1):
    N, C, H, W = x.shape
    H_out = (H + 2*padding - kernel) // stride + 1
    W_out = (W + 2*padding - kernel) // stride + 1
    if padding > 0:
        x_pad = np.full((N, C, H + 2*padding, W + 2*padding), -np.inf, dtype=x.dtype)
        x_pad[:, :, padding:padding+H, padding:padding+W] = x
    else:
        x_pad = x
    out = np.zeros((N, C, H_out, W_out), dtype=np.float32)
    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    region = x_pad[n, c, i*stride:i*stride+kernel, j*stride:j*stride+kernel]
                    out[n, c, i, j] = np.max(region)
    return out

def global_avg_pool_nchw(x):
    # N,C,H,W -> N,C
    return x.mean(axis=(2, 3))

def linear(x, w, b):
    # x: (N, in_features), w: (out_features, in_features)
    return x @ w.T + (0.0 if b is None else b)

# ========= ResNet18 拓扑与 block 边界 =========
RESNET18_TOPO = [
    ("conv1", 2, 3),
    # layer1
    ("layer1.0.conv1", 1, 1),
    ("layer1.0.conv2", 1, 1),
    ("layer1.1.conv1", 1, 1),
    ("layer1.1.conv2", 1, 1),
    # layer2
    ("layer2.0.conv1", 2, 1),
    ("layer2.0.conv2", 1, 1),
    ("layer2.0.downsample.0", 2, 0),
    ("layer2.1.conv1", 1, 1),
    ("layer2.1.conv2", 1, 1),
    # layer3
    ("layer3.0.conv1", 2, 1),
    ("layer3.0.conv2", 1, 1),
    ("layer3.0.downsample.0", 2, 0),
    ("layer3.1.conv1", 1, 1),
    ("layer3.1.conv2", 1, 1),
    # layer4
    ("layer4.0.conv1", 2, 1),
    ("layer4.0.conv2", 1, 1),
    ("layer4.0.downsample.0", 2, 0),
    ("layer4.1.conv1", 1, 1),
    ("layer4.1.conv2", 1, 1),
]

BLOCK_ENDS = {
    "layer1.0": (["layer1.0.conv1", "layer1.0.conv2"], None),
    "layer1.1": (["layer1.1.conv1", "layer1.1.conv2"], None),
    "layer2.0": (["layer2.0.conv1", "layer2.0.conv2"], "layer2.0.downsample.0"),
    "layer2.1": (["layer2.1.conv1", "layer2.1.conv2"], None),
    "layer3.0": (["layer3.0.conv1", "layer3.0.conv2"], "layer3.0.downsample.0"),
    "layer3.1": (["layer3.1.conv1", "layer3.1.conv2"], None),
    "layer4.0": (["layer4.0.conv1", "layer4.0.conv2"], "layer4.0.downsample.0"),
    "layer4.1": (["layer4.1.conv1", "layer4.1.conv2"], None),
}

# ========= BasePyModule:自动遍历 + 逐层 Q/DQ =========
@I.ir_module
class NumpyPTQResNet18(BasePyModule):
    """
    NumPy 版 ResNet18 PTQ 仿真
    - set_params: 装入已 BN 融合的所有卷积与 FC 参数
    - set_quant:  装入每个卷积/FC 的量化参数(激活 per-tensor,权重 per-channel)
    - main:       端到端按拓扑自动遍历、逐层 Q/DQ + 推理
    """

    @I.pyfunc
    def set_params(self, params: object) -> None:
        """
        params: dict[str, np.ndarray]
          卷积:  "<name>.weight" (C_out,C_in,KH,KW), "<name>.bias" (C_out,)
          FC:    "fc.weight" (num_classes, 512), "fc.bias" (num_classes,)
        """
        self.params = params

    @I.pyfunc
    def set_quant(
        self,
        w_scales: object,
        act_scales: object,
        fc_w_scales: np.ndarray,
        fc_act_scale: float
    ) -> None:
        """
        w_scales:   dict[str, np.ndarray]  # per-channel, shape (C_out,)
        act_scales: dict[str, float]       # per-tensor, 每个卷积输入
        fc_w_scales: np.ndarray            # (num_classes,)
        fc_act_scale: float
        """
        self.w_scales = w_scales
        self.act_scales = act_scales
        self.fc_w_scales = fc_w_scales.astype(np.float32)
        self.fc_act_scale = float(fc_act_scale)

    @I.pyfunc
    def _sp(self, name: str) -> tuple:
        # 返回 (stride, padding)
        for k, s, p in RESNET18_TOPO:
            if k == name:
                return (s, p)
        return (1, 0)

    @I.pyfunc
    def _run_conv_qdq(self, name: str, x: np.ndarray, stride: int, padding: int) -> np.ndarray:
        # 激活 Q/DQ(输入到该卷积的激活)
        x_q = qdq_activation(x, float(self.act_scales[name]))
        # 权重 Q/DQ(该卷积 per-channel)
        w = self.params[f"{name}.weight"]; b = self.params.get(f"{name}.bias", None)
        w_q = qdq_weight_pc(w, self.w_scales[name])
        # 卷积
        y = conv2d_nchw(x_q, w_q, b, stride=stride, padding=padding)
        return y

    @I.pyfunc
    def _run_block(self, x_in: np.ndarray, block_key: str) -> np.ndarray:
        # block_key: "layer{1..4}.{0|1}"
        convs, down = BLOCK_ENDS[block_key]
        identity = x_in

        # conv1
        y = self._run_conv_qdq(convs[0], x_in, *self._sp(convs[0]))
        y = relu(y)
        # conv2
        y = self._run_conv_qdq(convs[1], y, *self._sp(convs[1]))

        # 下采样分支(可选)
        if down is not None:
            identity = self._run_conv_qdq(down, identity, *self._sp(down))

        # 残差相加 + ReLU
        out = relu(y + identity)
        return out

    @I.pyfunc
    def main(self, x: np.ndarray) -> np.ndarray:
        # stem
        x = self._run_conv_qdq("conv1", x, *self._sp("conv1"))
        x = relu(x)
        x = maxpool2d_nchw(x, kernel=3, stride=2, padding=1)

        # layer1
        x = self._run_block(x, "layer1.0")
        x = self._run_block(x, "layer1.1")
        # layer2
        x = self._run_block(x, "layer2.0")
        x = self._run_block(x, "layer2.1")
        # layer3
        x = self._run_block(x, "layer3.0")
        x = self._run_block(x, "layer3.1")
        # layer4
        x = self._run_block(x, "layer4.0")
        x = self._run_block(x, "layer4.1")

        # GAP + FC(FC 也做 Q/DQ)
        x = global_avg_pool_nchw(x)              # (N, 512)
        x_q = qdq_activation(x, float(self.fc_act_scale))
        w = self.params["fc.weight"]; b = self.params.get("fc.bias", None)
        w_q = qdq_weight_pc_linear(w, self.fc_w_scales)  # per-out-channel
        y = linear(x_q, w_q, b)                  # (N, num_classes)
        return y
import numpy as np
import tvm

# 1) 设备与实例
device = tvm.cpu(0)
Module = NumpyPTQResNet18
inst = Module(device)

# 2) 构造权重(示例:随机占位)
def make_shape_map():
    return {
        "conv1": (64, 3, 7, 7),
        "layer1.0.conv1": (64, 64, 3, 3),
        "layer1.0.conv2": (64, 64, 3, 3),
        "layer1.1.conv1": (64, 64, 3, 3),
        "layer1.1.conv2": (64, 64, 3, 3),
        "layer2.0.conv1": (128, 64, 3, 3),
        "layer2.0.conv2": (128, 128, 3, 3),
        "layer2.0.downsample.0": (128, 64, 1, 1),
        "layer2.1.conv1": (128, 128, 3, 3),
        "layer2.1.conv2": (128, 128, 3, 3),
        "layer3.0.conv1": (256, 128, 3, 3),
        "layer3.0.conv2": (256, 256, 3, 3),
        "layer3.0.downsample.0": (256, 128, 1, 1),
        "layer3.1.conv1": (256, 256, 3, 3),
        "layer3.1.conv2": (256, 256, 3, 3),
        "layer4.0.conv1": (512, 256, 3, 3),
        "layer4.0.conv2": (512, 512, 3, 3),
        "layer4.0.downsample.0": (512, 256, 1, 1),
        "layer4.1.conv1": (512, 512, 3, 3),
        "layer4.1.conv2": (512, 512, 3, 3),
    }

np.random.seed(0)
shapes = make_shape_map()
params = {}
for name, shape in shapes.items():
    params[f"{name}.weight"] = np.random.randn(*shape).astype(np.float32)
    params[f"{name}.bias"] = np.random.randn(shape[0]).astype(np.float32)

num_classes = 1000
params["fc.weight"] = np.random.randn(num_classes, 512).astype(np.float32)
params["fc.bias"] = np.random.randn(num_classes).astype(np.float32)

inst.set_params(params)

# 3) 量化参数(示例:随机/常数占位;实际应来自校准)
act_scales = {}
w_scales = {}
for name, shape in shapes.items():
    C_out = shape[0]
    act_scales[name] = float(0.05)  # per-tensor
    # per-channel scale,避免为 0
    w_scales[name] = (np.random.rand(C_out).astype(np.float32) * 0.1 + 1e-3)

fc_act_scale = float(0.05)
fc_w_scales = (np.random.rand(num_classes).astype(np.float32) * 0.1 + 1e-3)

inst.set_quant(w_scales, act_scales, fc_w_scales, fc_act_scale)

# 4) 端到端推理
x = np.random.randn(1, 3, 224, 224).astype(np.float32)
y = inst.main(x)
print(y.shape, y.dtype)  # (1, num_classes) float32