relay.qnn.quantize

relay.qnn.quantize#


import tvm
from tvm import te
import numpy as np
from tvm import relay
from tvm.contrib import graph_executor
from tvm.relay.testing import run_infer_type


def quantize_test_driver(in_dtype, quant_args, axis, out_dtype, in_data, verify_output_data):
    """
    QNN量化测试驱动函数

    该函数用于执行量化操作并验证结果正确性。它创建一个包含量化操作的
    Relay计算图,编译并执行,然后将结果与预期输出进行比较。

    参数:
    in_dtype (str): 输入数据类型
    quant_args (dict): 量化参数,包含'out_zero_point'和'out_scale'
    axis (int): 量化轴
    out_dtype (str): 输出数据类型
    in_data (numpy.ndarray): 输入数据
    verify_output_data (numpy.ndarray): 预期输出数据

    返回:
    None
    """
    # 获取输入数据形状
    shape = in_data.shape
    # 创建Relay输入变量
    input_data = relay.var("input_data", shape=shape, dtype=in_dtype)
    # 创建量化参数常量
    output_zero_point = relay.const(quant_args["out_zero_point"])
    output_scale = relay.const(quant_args["out_scale"])
    # 创建量化操作
    quantized_output = relay.qnn.quantize(
        input_data,
        output_scale=output_scale,
        output_zero_point=output_zero_point,
        axis=axis,
        out_dtype=out_dtype,
    )
    # 构建Relay函数和IR模块
    mod = relay.Function(relay.analysis.free_vars(quantized_output), quantized_output)
    mod = tvm.IRModule.from_expr(mod)
    # 编译模块
    with tvm.transform.PassContext(opt_level=3):
        graph, lib, params = relay.build(mod, "llvm", params=None)
        # 创建运行时模块
        rt_mod = graph_executor.create(graph, lib, device=tvm.cpu(0))
        # 设置输入数据
        rt_mod.set_input(input_data=in_data)
        rt_mod.set_input(**params)
        # 执行计算
        rt_mod.run()
        # 获取输出结果
        res = rt_mod.get_output(0).numpy()
        # 验证结果
        np.testing.assert_equal(res, verify_output_data)
        assert res.dtype == out_dtype


def test_float32_to_uint8():
    """
    测试float32到uint8的量化

    该函数测试将float32类型数据量化为uint8类型数据的功能,
    使用特定的零点和缩放因子参数。
    """
    # 创建测试输入数据
    data = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
        .astype("float32")
        .reshape((2, 5))
    )
    # 预期输出数据
    output = np.array([0, 1, 2, 3, 4, 251, 252, 253, 254, 255]).astype("uint8").reshape((2, 5))
    # 量化参数:零点=127,缩放因子=0.5
    quant_args = {"out_zero_point": np.int32(127), "out_scale": np.float32(0.5)}
    # 调用测试驱动函数
    quantize_test_driver(
        in_dtype="float32",
        quant_args=quant_args,
        axis=-1,  # 最后一个维度作为量化轴
        out_dtype="uint8",
        in_data=data,
        verify_output_data=output,
    )


def test_float32_to_int8():
    """
    测试float32到int8的量化

    该函数测试将float32类型数据量化为int8类型数据的功能,
    使用特定的零点和缩放因子参数。
    """
    # 创建测试输入数据
    data = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
        .astype("float32")
        .reshape((2, 5))
    )
    # 预期输出数据
    output = (
        np.array([-128, -127, -126, -125, -124, 123, 124, 125, 126, 127])
        .astype("int8")
        .reshape((2, 5))
    )
    # 量化参数:零点=-1,缩放因子=0.5
    quant_args = {"out_zero_point": np.int32(-1), "out_scale": np.float32(0.5)}
    # 调用测试驱动函数
    quantize_test_driver(
        in_dtype="float32",
        quant_args=quant_args,
        axis=-1,  # 最后一个维度作为量化轴
        out_dtype="int8",
        in_data=data,
        verify_output_data=output,
    )


def test_float32_to_uint16():
    """
    测试float32到uint16的量化

    该函数测试将float32类型数据量化为uint16类型数据的功能,
    使用特定的零点和缩放因子参数。
    """
    # 创建测试输入数据
    data = (
        np.array([-6553, -6552.8, -6552.6, -6552.4, -6552.2, 6553.2, 6553.4, 6553.6, 6553.8, 6554])
        .astype("float32")
        .reshape((2, 5))
    )
    # 预期输出数据
    output = (
        np.array([0, 1, 2, 3, 4, 65531, 65532, 65533, 65534, 65535])
        .astype("uint16")
        .reshape((2, 5))
    )
    # 量化参数:零点=32765,缩放因子=0.2
    quant_args = {"out_zero_point": np.int32(32765), "out_scale": np.float32(0.2)}
    # 调用测试驱动函数
    quantize_test_driver(
        in_dtype="float32",
        quant_args=quant_args,
        axis=-1,  # 最后一个维度作为量化轴
        out_dtype="uint16",
        in_data=data,
        verify_output_data=output,
    )


def test_scalar_float32_to_int8():
    """
    测试标量float32到int8的量化

    该函数测试将标量float32类型数据量化为int8类型数据的功能,
    验证对标量输入的处理正确性。
    """
    # 创建标量测试输入数据
    data = np.array(-63.5).astype("float32")
    # 预期输出数据
    output = np.array(-128).astype("int8")
    # 量化参数:零点=-1,缩放因子=0.5
    quant_args = {"out_zero_point": np.int32(-1), "out_scale": np.float32(0.5)}
    # 调用测试驱动函数
    quantize_test_driver(
        in_dtype="float32",
        quant_args=quant_args,
        axis=-1,  # 标量的唯一维度作为量化轴
        out_dtype="int8",
        in_data=data,
        verify_output_data=output,
    )


def test_channelwise_axis_0():
    """
    测试通道维度为0的通道量化

    该函数测试针对多维数据,以第0维作为通道维度进行通道量化的功能,
    每个通道使用不同的量化参数。
    """
    # 创建测试输入数据
    data = (
        np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
        .astype("float32")
        .reshape((2, 5))
    )
    # 预期输出数据
    output = np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
    # 通道量化参数:每个通道有自己的零点和缩放因子
    quant_args = {
        "out_zero_point": np.array([127, 123]).astype("int32"),  # 两个通道的零点
        "out_scale": np.array([0.5, 0.25]).astype("float32"),   # 两个通道的缩放因子
    }

    # 调用测试驱动函数,axis=0表示按第0维(通道维)进行量化
    quantize_test_driver(
        in_dtype="float32",
        quant_args=quant_args,
        axis=0,
        out_dtype="uint8",
        in_data=data,
        verify_output_data=output,
    )


def test_channelwise_axis_1():
    """
    测试通道维度为1的通道量化

    该函数测试针对多维数据,以第1维作为通道维度进行通道量化的功能,
    每个通道使用不同的量化参数。
    """
    # 创建测试输入数据并转置,使通道维变为第1维
    data = np.transpose(
        np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
        .astype("float32")
        .reshape((2, 5))
    )
    # 预期输出数据并转置
    output = np.transpose(
        np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
    )
    # 通道量化参数:每个通道有自己的零点和缩放因子
    quant_args = {
        "out_zero_point": np.array([127, 123]).astype("int32"),  # 两个通道的零点
        "out_scale": np.array([0.5, 0.25]).astype("float32"),   # 两个通道的缩放因子
    }

    # 调用测试驱动函数,axis=-1表示按最后一维(通道维)进行量化
    quantize_test_driver(
        in_dtype="float32",
        quant_args=quant_args,
        axis=-1,
        out_dtype="uint8",
        in_data=data,
        verify_output_data=output,
    )


def test_dynamic_quantize():
    """
    测试动态量化功能

    该函数测试动态量化功能,其中量化参数(缩放因子和零点)是动态变量,
    而不是固定常量。这验证了量化操作在运行时接受动态输入的能力。
    """
    # 创建动态变量
    x = relay.var("x", shape=(1, 2, 3, 4), dtype="float32")  # 输入数据变量
    scale_var = relay.var("scale", shape=(), dtype="float32")  # 缩放因子变量
    zp_var = relay.var("zp", shape=(), dtype="int32")  # 零点变量

    # 创建量化操作,使用动态计算的缩放因子和零点
    q_x = relay.qnn.quantize(x, scale_var * scale_var, zp_var + zp_var)
    # 推断类型
    tt = run_infer_type(q_x)

    # 验证输出类型
    assert tt.checked_type == relay.TensorType((1, 2, 3, 4), "int8")
    # 创建Relay函数
    func = relay.Function([x, scale_var, zp_var], q_x)
    # 生成测试数据
    data = np.random.uniform(size=(1, 2, 3, 4)).astype("float32")
    scale = np.array(1).astype("float32")
    zp = np.array(0).astype("int32")

    # 构建IR模块
    mod = tvm.ir.IRModule.from_expr(func)

    # 在可用目标上编译
    for target, dev in tvm.testing.enabled_targets():
        # 禁用AlterOpLayout优化(待修复)
        with relay.build_config(opt_level=3, disabled_pass=["AlterOpLayout"]):
            lib = relay.build(mod, target=target)

    # 创建运行时模块并执行
    module = graph_executor.GraphModule(lib["default"](dev))
    module.set_input(**{"x": data, "scale": scale, "zp": zp})
    module.run()


if __name__ == "__main__":
    # 执行所有测试函数
    test_float32_to_uint8()  # 测试float32到uint8的量化
    test_float32_to_int8()   # 测试float32到int8的量化
    test_float32_to_uint16() # 测试float32到uint16的量化
    test_scalar_float32_to_int8()  # 测试标量float32到int8的量化
    test_channelwise_axis_0()  # 测试通道维度为0的通道量化
    test_channelwise_axis_1()  # 测试通道维度为1的通道量化
    test_dynamic_quantize()  # 测试动态量化功能