import tvm
from tvm import te
import numpy as np
from tvm import relay
from tvm.contrib import graph_executor
from tvm.relay.testing import run_infer_type
def dequantize_test_driver(
in_dtype, quant_args, in_data, verify_output_data, axis, out_dtype="float32"
):
"""
QNN反量化测试驱动函数
该函数用于执行反量化操作并验证结果正确性。它创建一个包含反量化操作的
Relay计算图,编译并执行,然后将结果与预期输出进行比较。
参数:
in_dtype (str): 输入数据类型
quant_args (dict): 量化参数,包含'in_zero_point'和'in_scale'
in_data (numpy.ndarray): 输入数据
verify_output_data (numpy.ndarray): 预期输出数据
axis (int): 反量化轴
out_dtype (str, optional): 输出数据类型,默认为'float32'
返回:
None
"""
# 获取输入数据形状
shape = in_data.shape
# 创建Relay输入变量
input_data = relay.var("input_data", shape=shape, dtype=in_dtype)
# 创建反量化参数常量
input_zero_point = relay.const(quant_args["in_zero_point"], "int32")
input_scale = relay.const(quant_args["in_scale"], "float32")
# 创建反量化操作
quantized_output = relay.qnn.dequantize(
input_data,
input_scale=input_scale,
input_zero_point=input_zero_point,
axis=axis,
out_dtype=out_dtype,
)
# 构建Relay函数和IR模块
mod = relay.Function(relay.analysis.free_vars(quantized_output), quantized_output)
mod = tvm.IRModule.from_expr(mod)
# 编译模块
with tvm.transform.PassContext(opt_level=3):
graph, lib, params = relay.build(mod, "llvm", params=None)
# 创建运行时模块
rt_mod = graph_executor.create(graph, lib, device=tvm.cpu(0))
# 设置输入数据
rt_mod.set_input(input_data=in_data)
rt_mod.set_input(**params)
# 执行计算
rt_mod.run()
# 获取输出结果
res = rt_mod.get_output(0).numpy()
# 验证结果
np.testing.assert_equal(res, verify_output_data)
assert res.dtype == out_dtype
def test_uint8_to_float32():
"""
测试uint8到float32的反量化
该函数测试将uint8类型数据反量化为float32类型数据的功能,
使用特定的零点和缩放因子参数。
"""
# 创建测试输入数据
data = np.array([0, 1, 2, 3, 4, 251, 252, 253, 254, 255]).astype("uint8").reshape((2, 5))
# 预期输出数据
output = (
np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
.astype("float32")
.reshape((2, 5))
)
# 反量化参数:零点=127,缩放因子=0.5
quant_args = {"in_zero_point": 127, "in_scale": 0.5}
# 调用测试驱动函数
dequantize_test_driver(
in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
)
def test_int8_to_float32():
"""
测试int8到float32的反量化
该函数测试将int8类型数据反量化为float32类型数据的功能,
使用特定的零点和缩放因子参数。
"""
# 创建测试输入数据
data = (
np.array([-128, -127, -126, -125, -124, 123, 124, 125, 126, 127])
.astype("int8")
.reshape((2, 5))
)
# 预期输出数据
output = (
np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
.astype("float32")
.reshape((2, 5))
)
# 反量化参数:零点=-1,缩放因子=0.5
quant_args = {"in_zero_point": -1, "in_scale": 0.5}
# 调用测试驱动函数
dequantize_test_driver(
in_dtype="int8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
)
def test_int8_to_float16():
"""
测试int8到float16的反量化
该函数测试将int8类型数据反量化为float16类型数据的功能,
验证不同输出精度的支持。
"""
# 创建测试输入数据
data = (
np.array([-128, -127, -126, -125, -124, 123, 124, 125, 126, 127])
.astype("int8")
.reshape((2, 5))
)
# 预期输出数据(float16类型)
output = (
np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
.astype("float16")
.reshape((2, 5))
)
# 反量化参数:零点=-1,缩放因子=0.5
quant_args = {"in_zero_point": -1, "in_scale": 0.5}
# 调用测试驱动函数,指定输出类型为float16
dequantize_test_driver(
in_dtype="int8",
quant_args=quant_args,
in_data=data,
verify_output_data=output,
axis=-1,
out_dtype="float16",
)
def test_scalar_int8_to_float32():
"""
测试标量int8到float32的反量化
该函数测试将标量int8类型数据反量化为float32类型数据的功能,
验证对标量输入的处理正确性。
"""
# 创建标量测试输入数据
data = np.array(-128).astype("int8")
# 预期输出数据
output = np.array(-63.5).astype("float32")
# 反量化参数:零点=-1,缩放因子=0.5
quant_args = {"in_zero_point": -1, "in_scale": 0.5}
# 调用测试驱动函数
dequantize_test_driver(
in_dtype="int8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
)
def test_int32_to_float32():
"""
测试int32到float32的反量化
该函数测试将int32类型数据反量化为float32类型数据的功能,
验证对更高精度整数输入的支持。
"""
# 创建测试输入数据
data = np.array([113, 29, -1052]).astype("int32")
# 预期输出数据
output = np.array([0.6550452, 0.16810896, -6.098297]).astype("float32")
# 反量化参数:零点=0,缩放因子=0.0057968604
quant_args = {"in_zero_point": 0, "in_scale": 0.0057968604}
# 调用测试驱动函数
dequantize_test_driver(
in_dtype="int32", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
)
def test_channelwise_axis_1():
"""
测试通道维度为1的通道反量化
该函数测试针对多维数据,以第1维作为通道维度进行通道反量化的功能,
每个通道使用不同的反量化参数。
"""
# 创建测试输入数据并转置,使通道维变为第1维
data = np.transpose(
np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
)
# 预期输出数据并转置
output = np.transpose(
np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
.astype("float32")
.reshape((2, 5))
)
# 通道反量化参数:每个通道有自己的零点和缩放因子
quant_args = {
"in_zero_point": np.array([127, 123]).astype("int32"), # 两个通道的零点
"in_scale": np.array([0.5, 0.25]).astype("float32"), # 两个通道的缩放因子
}
# 调用测试驱动函数,axis=-1表示按最后一维(通道维)进行反量化
dequantize_test_driver(
in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
)
def test_channelwise_axis_0():
"""
测试通道维度为0的通道反量化
该函数测试针对多维数据,以第0维作为通道维度进行通道反量化的功能,
每个通道使用不同的反量化参数。
"""
# 创建测试输入数据
data = np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
# 预期输出数据
output = (
np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
.astype("float32")
.reshape((2, 5))
)
# 通道反量化参数:每个通道有自己的零点和缩放因子
quant_args = {
"in_zero_point": np.array([127, 123]).astype("int32"), # 两个通道的零点
"in_scale": np.array([0.5, 0.25]).astype("float32"), # 两个通道的缩放因子
}
# 调用测试驱动函数,axis=0表示按第0维(通道维)进行反量化
dequantize_test_driver(
in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=0
)
def test_per_tensor_vector_args():
"""
测试向量参数的张量反量化
该函数测试使用向量形式的反量化参数进行张量反量化的功能,
验证对向量参数的支持。
"""
# 创建测试输入数据
data = np.array([0, 1, 2, 3, 4, 251, 252, 253, 254, 255]).astype("uint8")
# 预期输出数据
output = np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64]).astype("float32")
# 向量形式的反量化参数
quant_args = {
"in_zero_point": np.array([127]).astype("int32"), # 向量形式的零点
"in_scale": np.array([0.5]).astype("float32"), # 向量形式的缩放因子
}
# 调用测试驱动函数
dequantize_test_driver(
in_dtype="uint8", quant_args=quant_args, in_data=data, verify_output_data=output, axis=-1
)
def test_dynamic_dequantize():
"""
测试动态反量化功能
该函数测试动态反量化功能,其中反量化参数(缩放因子和零点)是动态变量,
而不是固定常量。这验证了反量化操作在运行时接受动态输入的能力。
"""
# 创建动态变量
x = relay.var("x", shape=(1, 2, 3, 4), dtype="int8") # 输入数据变量
scale_var = relay.var("scale", shape=(), dtype="float32") # 缩放因子变量
zp_var = relay.var("zp", shape=(), dtype="int32") # 零点变量
# 创建反量化操作,使用动态计算的缩放因子和零点
deq_x = relay.qnn.dequantize(x, scale_var * scale_var, zp_var + zp_var)
# 推断类型
tt = run_infer_type(deq_x)
# 验证输出类型
assert tt.checked_type == relay.TensorType((1, 2, 3, 4), "float32")
# 创建Relay函数
func = relay.Function([x, scale_var, zp_var], deq_x)
# 生成测试数据
data = np.random.uniform(size=(1, 2, 3, 4)).astype("int8")
scale = np.array(1).astype("float32")
zp = np.array(0).astype("int32")
# 构建IR模块
mod = tvm.ir.IRModule.from_expr(func)
# 在可用目标上编译
for target, dev in tvm.testing.enabled_targets():
# 禁用AlterOpLayout优化(待修复)
with relay.build_config(opt_level=3, disabled_pass=["AlterOpLayout"]):
lib = relay.build(mod, target=target)
# 创建运行时模块并执行
module = graph_executor.GraphModule(lib["default"](dev))
module.set_input(**{"x": data, "scale": scale, "zp": zp})
module.run()
if __name__ == "__main__":
# 执行所有测试函数
test_uint8_to_float32() # 测试uint8到float32的反量化
test_int8_to_float32() # 测试int8到float32的反量化
test_int8_to_float16() # 测试int8到float16的反量化
test_scalar_int8_to_float32() # 测试标量int8到float32的反量化
test_int32_to_float32() # 测试int32到float32的反量化
test_channelwise_axis_1() # 测试通道维度为1的通道反量化
test_channelwise_axis_0() # 测试通道维度为0的通道反量化
test_dynamic_dequantize() # 测试动态反量化功能