relay.qnn.dequantize

relay.qnn.dequantize#

import tvm
from tvm import te
import numpy as np
from tvm import relay
from tvm.contrib import graph_executor
from tvm.relay.testing import run_infer_type


def dequantize_test_driver(
    in_dtype, quant_args, in_data, verify_output_data, axis, out_dtype="float32"
):
    """
    QNN反量化测试驱动函数

    该函数用于执行反量化操作并验证结果正确性。它创建一个包含反量化操作的
    Relay计算图,编译并执行,然后将结果与预期输出进行比较。

    参数:
    in_dtype (str): 输入数据类型
    quant_args (dict): 量化参数,包含'in_zero_point'和'in_scale'
    in_data (numpy.ndarray): 输入数据
    verify_output_data (numpy.ndarray): 预期输出数据
    axis (int): 反量化轴
    out_dtype (str, optional): 输出数据类型,默认为'float32'

    返回:
    None
    """
    # 获取输入数据形状
    shape = in_data.shape
    # 创建Relay输入变量
    input_data = relay.var("input_data", shape=shape, dtype=in_dtype)
    # 创建反量化参数常量
    input_zero_point = relay.const(quant_args["in_zero_point"], "int32")
    input_scale = relay.const(quant_args["in_scale"], "float32")
    # 创建反量化操作
    quantized_output = relay.qnn.dequantize(
        input_data,
        input_scale=input_scale,
        input_zero_point=input_zero_point,
        axis=axis,
        out_dtype=out_dtype,
    )
    # 构建Relay函数和IR模块
    mod = relay.Function(relay.analysis.free_vars(quantized_output), quantized_output)
    mod = tvm.IRModule.from_expr(mod)
    # 编译模块
    with tvm.transform.PassContext(opt_level=3):
        graph, lib, params = relay.build(mod, "llvm", params=None)
        # 创建运行时模块
        rt_mod = graph_executor.create(graph, lib, device=tvm.cpu(0))
        # 设置输入数据
        rt_mod.set_input(input_data=in_data)
        rt_mod.set_input(**params)
        # 执行计算
        rt_mod.run()
        # 获取输出结果
        res = rt_mod.get_output(0).numpy()
        # 验证结果
        np.testing.assert_equal(res, verify_output_data)
        assert res.dtype == out_dtype


def test_uint8_to_float32():
    """
    测试uint8到float32的反量化

    该函数测试将uint8类型数据反量化为float32类型数据的功能,
    使用特定的零点和缩放因子参数。
    """
    # 创建测试输入数据
    data = np.array([0, 1, 2, 3, 4, 251, 252, 253, 254, 255]).astype("uint8").reshape((2, 5))
    # 预期输出数据
    output = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
        .astype("float32")
        .reshape((2, 5))
    )
    # 反量化参数:零点=127,缩放因子=0.5
    quant_args = {"in_zero_point": 127, "in_scale": 0.5}
    # 调用测试驱动函数
    dequantize_test_driver(
        in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
    )


def test_int8_to_float32():
    """
    测试int8到float32的反量化

    该函数测试将int8类型数据反量化为float32类型数据的功能,
    使用特定的零点和缩放因子参数。
    """
    # 创建测试输入数据
    data = (
        np.array([-128, -127, -126, -125, -124, 123, 124, 125, 126, 127])
        .astype("int8")
        .reshape((2, 5))
    )
    # 预期输出数据
    output = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
        .astype("float32")
        .reshape((2, 5))
    )
    # 反量化参数:零点=-1,缩放因子=0.5
    quant_args = {"in_zero_point": -1, "in_scale": 0.5}
    # 调用测试驱动函数
    dequantize_test_driver(
        in_dtype="int8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
    )


def test_int8_to_float16():
    """
    测试int8到float16的反量化

    该函数测试将int8类型数据反量化为float16类型数据的功能,
    验证不同输出精度的支持。
    """
    # 创建测试输入数据
    data = (
        np.array([-128, -127, -126, -125, -124, 123, 124, 125, 126, 127])
        .astype("int8")
        .reshape((2, 5))
    )
    # 预期输出数据(float16类型)
    output = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
        .astype("float16")
        .reshape((2, 5))
    )
    # 反量化参数:零点=-1,缩放因子=0.5
    quant_args = {"in_zero_point": -1, "in_scale": 0.5}
    # 调用测试驱动函数,指定输出类型为float16
    dequantize_test_driver(
        in_dtype="int8",
        quant_args=quant_args,
        in_data=data,
        verify_output_data=output,
        axis=-1,
        out_dtype="float16",
    )


def test_scalar_int8_to_float32():
    """
    测试标量int8到float32的反量化

    该函数测试将标量int8类型数据反量化为float32类型数据的功能,
    验证对标量输入的处理正确性。
    """
    # 创建标量测试输入数据
    data = np.array(-128).astype("int8")
    # 预期输出数据
    output = np.array(-63.5).astype("float32")
    # 反量化参数:零点=-1,缩放因子=0.5
    quant_args = {"in_zero_point": -1, "in_scale": 0.5}
    # 调用测试驱动函数
    dequantize_test_driver(
        in_dtype="int8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
    )


def test_int32_to_float32():
    """
    测试int32到float32的反量化

    该函数测试将int32类型数据反量化为float32类型数据的功能,
    验证对更高精度整数输入的支持。
    """
    # 创建测试输入数据
    data = np.array([113, 29, -1052]).astype("int32")
    # 预期输出数据
    output = np.array([0.6550452, 0.16810896, -6.098297]).astype("float32")
    # 反量化参数:零点=0,缩放因子=0.0057968604
    quant_args = {"in_zero_point": 0, "in_scale": 0.0057968604}
    # 调用测试驱动函数
    dequantize_test_driver(
        in_dtype="int32", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
    )


def test_channelwise_axis_1():
    """
    测试通道维度为1的通道反量化

    该函数测试针对多维数据,以第1维作为通道维度进行通道反量化的功能,
    每个通道使用不同的反量化参数。
    """
    # 创建测试输入数据并转置,使通道维变为第1维
    data = np.transpose(
        np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
    )
    # 预期输出数据并转置
    output = np.transpose(
        np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
        .astype("float32")
        .reshape((2, 5))
    )
    # 通道反量化参数:每个通道有自己的零点和缩放因子
    quant_args = {
        "in_zero_point": np.array([127, 123]).astype("int32"),  # 两个通道的零点
        "in_scale": np.array([0.5, 0.25]).astype("float32"),   # 两个通道的缩放因子
    }

    # 调用测试驱动函数,axis=-1表示按最后一维(通道维)进行反量化
    dequantize_test_driver(
        in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
    )


def test_channelwise_axis_0():
    """
    测试通道维度为0的通道反量化

    该函数测试针对多维数据,以第0维作为通道维度进行通道反量化的功能,
    每个通道使用不同的反量化参数。
    """
    # 创建测试输入数据
    data = np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
    # 预期输出数据
    output = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
        .astype("float32")
        .reshape((2, 5))
    )
    # 通道反量化参数:每个通道有自己的零点和缩放因子
    quant_args = {
        "in_zero_point": np.array([127, 123]).astype("int32"),  # 两个通道的零点
        "in_scale": np.array([0.5, 0.25]).astype("float32"),   # 两个通道的缩放因子
    }

    # 调用测试驱动函数,axis=0表示按第0维(通道维)进行反量化
    dequantize_test_driver(
        in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=0
    )


def test_per_tensor_vector_args():
    """
    测试向量参数的张量反量化

    该函数测试使用向量形式的反量化参数进行张量反量化的功能,
    验证对向量参数的支持。
    """
    # 创建测试输入数据
    data = np.array([0, 1, 2, 3, 4, 251, 252, 253, 254, 255]).astype("uint8")
    # 预期输出数据
    output = np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64]).astype("float32")

    # 向量形式的反量化参数
    quant_args = {
        "in_zero_point": np.array([127]).astype("int32"),  # 向量形式的零点
        "in_scale": np.array([0.5]).astype("float32"),   # 向量形式的缩放因子
    }

    # 调用测试驱动函数
    dequantize_test_driver(
        in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
    )


def test_dynamic_dequantize():
    """
    测试动态反量化功能

    该函数测试动态反量化功能,其中反量化参数(缩放因子和零点)是动态变量,
    而不是固定常量。这验证了反量化操作在运行时接受动态输入的能力。
    """
    # 创建动态变量
    x = relay.var("x", shape=(1, 2, 3, 4), dtype="int8")  # 输入数据变量
    scale_var = relay.var("scale", shape=(), dtype="float32")  # 缩放因子变量
    zp_var = relay.var("zp", shape=(), dtype="int32")  # 零点变量

    # 创建反量化操作,使用动态计算的缩放因子和零点
    deq_x = relay.qnn.dequantize(x, scale_var * scale_var, zp_var + zp_var)
    # 推断类型
    tt = run_infer_type(deq_x)

    # 验证输出类型
    assert tt.checked_type == relay.TensorType((1, 2, 3, 4), "float32")
    # 创建Relay函数
    func = relay.Function([x, scale_var, zp_var], deq_x)
    # 生成测试数据
    data = np.random.uniform(size=(1, 2, 3, 4)).astype("int8")
    scale = np.array(1).astype("float32")
    zp = np.array(0).astype("int32")

    # 构建IR模块
    mod = tvm.ir.IRModule.from_expr(func)

    # 在可用目标上编译
    for target, dev in tvm.testing.enabled_targets():
        # 禁用AlterOpLayout优化(待修复)
        with relay.build_config(opt_level=3, disabled_pass=["AlterOpLayout"]):
            lib = relay.build(mod, target=target)

    # 创建运行时模块并执行
    module = graph_executor.GraphModule(lib["default"](dev))
    module.set_input(**{"x": data, "scale": scale, "zp": zp})
    module.run()


if __name__ == "__main__":
    # 执行所有测试函数
    test_uint8_to_float32()  # 测试uint8到float32的反量化
    test_int8_to_float32()   # 测试int8到float32的反量化
    test_int8_to_float16()   # 测试int8到float16的反量化
    test_scalar_int8_to_float32()  # 测试标量int8到float32的反量化
    test_int32_to_float32()  # 测试int32到float32的反量化
    test_channelwise_axis_1()  # 测试通道维度为1的通道反量化
    test_channelwise_axis_0()  # 测试通道维度为0的通道反量化
    test_dynamic_dequantize()  # 测试动态反量化功能