import tvm
from tvm import te
import numpy as np
from tvm import relay
from tvm.contrib import graph_executor
from tvm.relay.testing import run_infer_type
def quantize_test_driver(in_dtype, quant_args, axis, out_dtype, in_data, verify_output_data):
"""
QNN量化测试驱动函数
该函数用于执行量化操作并验证结果正确性。它创建一个包含量化操作的
Relay计算图,编译并执行,然后将结果与预期输出进行比较。
参数:
in_dtype (str): 输入数据类型
quant_args (dict): 量化参数,包含'out_zero_point'和'out_scale'
axis (int): 量化轴
out_dtype (str): 输出数据类型
in_data (numpy.ndarray): 输入数据
verify_output_data (numpy.ndarray): 预期输出数据
返回:
None
"""
# 获取输入数据形状
shape = in_data.shape
# 创建Relay输入变量
input_data = relay.var("input_data", shape=shape, dtype=in_dtype)
# 创建量化参数常量
output_zero_point = relay.const(quant_args["out_zero_point"])
output_scale = relay.const(quant_args["out_scale"])
# 创建量化操作
quantized_output = relay.qnn.quantize(
input_data,
output_scale=output_scale,
output_zero_point=output_zero_point,
axis=axis,
out_dtype=out_dtype,
)
# 构建Relay函数和IR模块
mod = relay.Function(relay.analysis.free_vars(quantized_output), quantized_output)
mod = tvm.IRModule.from_expr(mod)
# 编译模块
with tvm.transform.PassContext(opt_level=3):
graph, lib, params = relay.build(mod, "llvm", params=None)
# 创建运行时模块
rt_mod = graph_executor.create(graph, lib, device=tvm.cpu(0))
# 设置输入数据
rt_mod.set_input(input_data=in_data)
rt_mod.set_input(**params)
# 执行计算
rt_mod.run()
# 获取输出结果
res = rt_mod.get_output(0).numpy()
# 验证结果
np.testing.assert_equal(res, verify_output_data)
assert res.dtype == out_dtype
def test_float32_to_uint8():
"""
测试float32到uint8的量化
该函数测试将float32类型数据量化为uint8类型数据的功能,
使用特定的零点和缩放因子参数。
"""
# 创建测试输入数据
data = (
np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
.astype("float32")
.reshape((2, 5))
)
# 预期输出数据
output = np.array([0, 1, 2, 3, 4, 251, 252, 253, 254, 255]).astype("uint8").reshape((2, 5))
# 量化参数:零点=127,缩放因子=0.5
quant_args = {"out_zero_point": np.int32(127), "out_scale": np.float32(0.5)}
# 调用测试驱动函数
quantize_test_driver(
in_dtype="float32",
quant_args=quant_args,
axis=-1, # 最后一个维度作为量化轴
out_dtype="uint8",
in_data=data,
verify_output_data=output,
)
def test_float32_to_int8():
"""
测试float32到int8的量化
该函数测试将float32类型数据量化为int8类型数据的功能,
使用特定的零点和缩放因子参数。
"""
# 创建测试输入数据
data = (
np.array([-63.5, -63, -62.5, -62, -61.5, 62, 62.5, 63, 63.5, 64])
.astype("float32")
.reshape((2, 5))
)
# 预期输出数据
output = (
np.array([-128, -127, -126, -125, -124, 123, 124, 125, 126, 127])
.astype("int8")
.reshape((2, 5))
)
# 量化参数:零点=-1,缩放因子=0.5
quant_args = {"out_zero_point": np.int32(-1), "out_scale": np.float32(0.5)}
# 调用测试驱动函数
quantize_test_driver(
in_dtype="float32",
quant_args=quant_args,
axis=-1, # 最后一个维度作为量化轴
out_dtype="int8",
in_data=data,
verify_output_data=output,
)
def test_float32_to_uint16():
"""
测试float32到uint16的量化
该函数测试将float32类型数据量化为uint16类型数据的功能,
使用特定的零点和缩放因子参数。
"""
# 创建测试输入数据
data = (
np.array([-6553, -6552.8, -6552.6, -6552.4, -6552.2, 6553.2, 6553.4, 6553.6, 6553.8, 6554])
.astype("float32")
.reshape((2, 5))
)
# 预期输出数据
output = (
np.array([0, 1, 2, 3, 4, 65531, 65532, 65533, 65534, 65535])
.astype("uint16")
.reshape((2, 5))
)
# 量化参数:零点=32765,缩放因子=0.2
quant_args = {"out_zero_point": np.int32(32765), "out_scale": np.float32(0.2)}
# 调用测试驱动函数
quantize_test_driver(
in_dtype="float32",
quant_args=quant_args,
axis=-1, # 最后一个维度作为量化轴
out_dtype="uint16",
in_data=data,
verify_output_data=output,
)
def test_scalar_float32_to_int8():
"""
测试标量float32到int8的量化
该函数测试将标量float32类型数据量化为int8类型数据的功能,
验证对标量输入的处理正确性。
"""
# 创建标量测试输入数据
data = np.array(-63.5).astype("float32")
# 预期输出数据
output = np.array(-128).astype("int8")
# 量化参数:零点=-1,缩放因子=0.5
quant_args = {"out_zero_point": np.int32(-1), "out_scale": np.float32(0.5)}
# 调用测试驱动函数
quantize_test_driver(
in_dtype="float32",
quant_args=quant_args,
axis=-1, # 标量的唯一维度作为量化轴
out_dtype="int8",
in_data=data,
verify_output_data=output,
)
def test_channelwise_axis_0():
"""
测试通道维度为0的通道量化
该函数测试针对多维数据,以第0维作为通道维度进行通道量化的功能,
每个通道使用不同的量化参数。
"""
# 创建测试输入数据
data = (
np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
.astype("float32")
.reshape((2, 5))
)
# 预期输出数据
output = np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
# 通道量化参数:每个通道有自己的零点和缩放因子
quant_args = {
"out_zero_point": np.array([127, 123]).astype("int32"), # 两个通道的零点
"out_scale": np.array([0.5, 0.25]).astype("float32"), # 两个通道的缩放因子
}
# 调用测试驱动函数,axis=0表示按第0维(通道维)进行量化
quantize_test_driver(
in_dtype="float32",
quant_args=quant_args,
axis=0,
out_dtype="uint8",
in_data=data,
verify_output_data=output,
)
def test_channelwise_axis_1():
"""
测试通道维度为1的通道量化
该函数测试针对多维数据,以第1维作为通道维度进行通道量化的功能,
每个通道使用不同的量化参数。
"""
# 创建测试输入数据并转置,使通道维变为第1维
data = np.transpose(
np.array([-63.5, -63, -62.5, -62, -61.5, 30, 31, 31.5, 31.75, 32])
.astype("float32")
.reshape((2, 5))
)
# 预期输出数据并转置
output = np.transpose(
np.array([0, 1, 2, 3, 4, 243, 247, 249, 250, 251]).astype("uint8").reshape((2, 5))
)
# 通道量化参数:每个通道有自己的零点和缩放因子
quant_args = {
"out_zero_point": np.array([127, 123]).astype("int32"), # 两个通道的零点
"out_scale": np.array([0.5, 0.25]).astype("float32"), # 两个通道的缩放因子
}
# 调用测试驱动函数,axis=-1表示按最后一维(通道维)进行量化
quantize_test_driver(
in_dtype="float32",
quant_args=quant_args,
axis=-1,
out_dtype="uint8",
in_data=data,
verify_output_data=output,
)
def test_dynamic_quantize():
"""
测试动态量化功能
该函数测试动态量化功能,其中量化参数(缩放因子和零点)是动态变量,
而不是固定常量。这验证了量化操作在运行时接受动态输入的能力。
"""
# 创建动态变量
x = relay.var("x", shape=(1, 2, 3, 4), dtype="float32") # 输入数据变量
scale_var = relay.var("scale", shape=(), dtype="float32") # 缩放因子变量
zp_var = relay.var("zp", shape=(), dtype="int32") # 零点变量
# 创建量化操作,使用动态计算的缩放因子和零点
q_x = relay.qnn.quantize(x, scale_var * scale_var, zp_var + zp_var)
# 推断类型
tt = run_infer_type(q_x)
# 验证输出类型
assert tt.checked_type == relay.TensorType((1, 2, 3, 4), "int8")
# 创建Relay函数
func = relay.Function([x, scale_var, zp_var], q_x)
# 生成测试数据
data = np.random.uniform(size=(1, 2, 3, 4)).astype("float32")
scale = np.array(1).astype("float32")
zp = np.array(0).astype("int32")
# 构建IR模块
mod = tvm.ir.IRModule.from_expr(func)
# 在可用目标上编译
for target, dev in tvm.testing.enabled_targets():
# 禁用AlterOpLayout优化(待修复)
with relay.build_config(opt_level=3, disabled_pass=["AlterOpLayout"]):
lib = relay.build(mod, target=target)
# 创建运行时模块并执行
module = graph_executor.GraphModule(lib["default"](dev))
module.set_input(**{"x": data, "scale": scale, "zp": zp})
module.run()
if __name__ == "__main__":
# 执行所有测试函数
test_float32_to_uint8() # 测试float32到uint8的量化
test_float32_to_int8() # 测试float32到int8的量化
test_float32_to_uint16() # 测试float32到uint16的量化
test_scalar_float32_to_int8() # 测试标量float32到int8的量化
test_channelwise_axis_0() # 测试通道维度为0的通道量化
test_channelwise_axis_1() # 测试通道维度为1的通道量化
test_dynamic_quantize() # 测试动态量化功能