VM 代码生成#
# 导入必要的包
import numpy as np # 用于生成随机数据和数组操作
import pytest # 测试框架
import tvm # TVM核心库
import tvm.testing # TVM测试工具
from tvm import relax # Relax框架
from tvm.relax.testing.runtime_builtin import MakeShapeCode, MatchShapeCode # 运行时内置函数支持
from tvm.relax.testing.vm import check_saved_func # VM测试辅助函数
from tvm.script import ir as I # IR脚本支持
from tvm.script import relax as R # Relax脚本支持
from tvm.script import tir as T # TIR脚本支持
# 代码生成辅助函数
def codegen(mod, target, exec_mode="bytecode"):
"""
将Relax IR模块转换为可执行的VM代码
参数:
mod: 输入的IR模块
target: 目标执行平台
exec_mode: 执行模式,可选值为"bytecode"(字节码模式)或"compiled"(编译模式)
返回值:
编译后的可执行模块
"""
builder = relax.ExecBuilder()
# 通过VM代码生成器将模块转换为TIR模块
tir_mod = relax.vm_build._vmcodegen(builder, mod, exec_mode=exec_mode)
# 将TIR模块链接成可执行模块
return relax.vm_build._vmlink(builder, target, tir_mod)
# 定义执行模式参数化列表,测试将在字节码模式和编译模式下分别运行
EXEC_MODE = ["bytecode", "compiled"]
测试 VM 内置的复制算子功能#
@tvm.script.ir_module
class TestVMMove:
@R.function(pure=False)
def foo(x: R.Tensor((3, 4), "float32")):
R.func_attr({"global_symbol": "foo"}) # 设置函数的全局符号名称
# 调用VM内置的复制函数
z = R.call_packed("vm.builtin.copy", x, sinfo_args=(R.Tensor((3, 4), dtype="float32")))
return z
for exec_mode in EXEC_MODE:
mod = TestVMMove
target = tvm.target.Target("llvm", host="llvm") # 设置目标平台为LLVM
ex = codegen(mod, target, exec_mode) # 生成可执行模块
inp = tvm.nd.array(np.random.rand(3, 4).astype(np.float32)) # 生成随机测试数据
vm = relax.VirtualMachine(ex, tvm.cpu()) # 创建虚拟机实例
res = check_saved_func(vm, "foo", inp) # 检查保存的函数执行结果
# 验证复制结果与原始数据是否一致
tvm.testing.assert_allclose(res.numpy(), inp.numpy(), rtol=1e-7, atol=1e-7)
测试 VM 设备间数据传输#
@tvm.script.ir_module
class TestVMToDevice:
@R.function(pure=False)
def foo(x: R.Tensor((3, 4), "float32")):
R.func_attr({"global_symbol": "foo"})
# 将x复制到第一个CPU:device_type=1和device_id=0
z = R.call_packed(
"vm.builtin.to_device", x, 1, 0, sinfo_args=(R.Tensor((3, 4), dtype="float32"))
)
return z
for exec_mode in EXEC_MODE:
mod = TestVMToDevice
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
inp = tvm.nd.array(np.random.rand(3, 4).astype(np.float32))
vm = relax.VirtualMachine(ex, tvm.cpu())
res = check_saved_func(vm, "foo", inp)
tvm.testing.assert_allclose(res.numpy(), inp.numpy(), rtol=1e-7, atol=1e-7)
# 检查结果张量是否在cpu:0上
assert res.device == tvm.cpu(0)
assert res.device.device_type == 1
assert res.device.device_id == 0
测试常量条件的if语句#
@tvm.script.ir_module
class TestVMIfCondConst:
@R.function
def main(x: R.Tensor(ndim=2, dtype="float32")) -> R.Tensor(ndim=2, dtype="float32"):
R.func_attr({"global_symbol": "main"})
# 使用常量布尔值作为条件
if relax.const(True, dtype="bool"):
ret = x
else:
ret = x
return ret
for exec_mode in EXEC_MODE:
mod = TestVMIfCondConst
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
inp = tvm.nd.array(np.random.rand(3, 4))
res = vm["main"](inp)
tvm.testing.assert_allclose(res.numpy(), inp.numpy())
测试VM可执行文件的序列化和导出#
@tvm.script.ir_module
class TestVMMove:
@R.function(pure=False)
def foo(x: R.Tensor((3, 4), "float32")):
R.func_attr({"global_symbol": "foo"})
z = R.call_packed("vm.builtin.copy", x, sinfo_args=(R.Tensor((3, 4), dtype="float32")))
return z
for exec_mode in EXEC_MODE:
mod = TestVMMove
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target)
from tvm.contrib import utils
temp_dir = utils.tempdir() # 创建临时目录
path_exec = temp_dir.relpath("exec.so") # 设置导出路径
ex.export_library(path_exec) # 导出库文件
loaded_exec = tvm.runtime.load_module(path_exec) # 加载导出的库文件
# 验证加载的库与原始库内容是否一致
assert ex.as_text() == loaded_exec["as_text"]()
测试条件分支执行#
@tvm.script.ir_module
class TestVMCompileIf:
@R.function(pure=False)
def ife(cond: R.Tensor((), "bool"), x: R.Tensor((3, 4), "float32")) -> R.Tensor:
R.func_attr({"global_symbol": "ife"})
# 根据条件执行不同的操作
if cond:
w = R.call_packed("test.vm.add", x, x, sinfo_args=(R.Tensor))
else:
w = R.call_packed("test.vm.mul", x, x, sinfo_args=(R.Tensor))
return w
for exec_mode in EXEC_MODE:
mod = TestVMCompileIf
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
inp = tvm.nd.array(np.random.rand(3, 4))
# 测试条件为整数1(True)的情况
res = vm["ife"](tvm.nd.array(1), inp)
tvm.testing.assert_allclose(res.numpy(), inp.numpy() + inp.numpy(), rtol=1e-7, atol=1e-7)
# 测试条件为布尔True的情况
res = vm["ife"](tvm.nd.array(True), inp)
tvm.testing.assert_allclose(res.numpy(), inp.numpy() + inp.numpy(), rtol=1e-7, atol=1e-7)
# 测试条件为整数0(False)的情况
res = vm["ife"](tvm.nd.array(0), inp)
tvm.testing.assert_allclose(res.numpy(), inp.numpy() * inp.numpy(), rtol=1e-7, atol=1e-7)
# 测试条件为布尔False的情况
res = vm["ife"](tvm.nd.array(False), inp)
tvm.testing.assert_allclose(res.numpy(), inp.numpy() * inp.numpy(), rtol=1e-7, atol=1e-7)
测试VM返回常量元组#
@tvm.script.ir_module
class ReturnConstTuple:
@R.function
def main(x: R.Tensor(ndim=2, dtype="float32")):
R.func_attr({"global_symbol": "main"})
y = R.const([1, 2]) # 创建常量数组
z = (y, R.const([3, 4]), x) # 创建包含常量和变量的元组
return z
for exec_mode in EXEC_MODE:
mod = ReturnConstTuple
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
inp = tvm.nd.array(np.random.rand(2, 3))
res0, res1, res2 = vm["main"](inp) # 解包返回的元组
# 验证各个返回值是否符合预期
tvm.testing.assert_allclose(res0.numpy(), np.array([1, 2]))
tvm.testing.assert_allclose(res1.numpy(), np.array([3, 4]))
tvm.testing.assert_allclose(res2.numpy(), inp.numpy())
测试常量作为函数调用参数#
@tvm.script.ir_module
class TestVMConstAsCallArg:
@R.function(pure=False)
def main(x: R.Tensor(ndim=2, dtype="float32")):
R.func_attr({"global_symbol": "main"})
# 调用函数并传递常量作为参数
a = R.call_packed(
"test.vm.add",
relax.const([1, 2]),
relax.const([3, 4]),
sinfo_args=(R.Tensor(ndim=2, dtype="float32")),
)
b = R.call_packed(
"test.vm.add",
a,
x,
sinfo_args=(R.Tensor(ndim=2, dtype="float32")),
)
return b
for exec_mode in EXEC_MODE:
mod = TestVMConstAsCallArg
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
inp = tvm.nd.array(np.random.rand(1, 2))
res = vm["main"](inp)
# 验证计算结果是否正确:常量加法[1,2]+[3,4]=[4,6],再加上输入张量
tvm.testing.assert_allclose(res.numpy(), np.array([4, 6]) + inp.numpy())
测试形状检查内置函数#
MS = MatchShapeCode # 形状匹配代码
MK = MakeShapeCode # 形状创建代码
# 形状变量在形状堆中的槽位分配
# 0: n, 1: m
sindex = {"n": 0, "m": 1}
@tvm.script.ir_module
class TestVMShapeCheck:
@R.function(pure=False)
def main(x: R.Tensor(["n", "m"], "float32")) -> R.Shape(ndim=3):
R.func_attr({"global_symbol": "main"})
n = T.int64()
k = T.int64()
# 分配形状堆
shape_heap = R.call_builtin_with_ctx(
"vm.builtin.alloc_shape_heap",
[R.prim_value(3)],
sinfo_args=[R.Tensor(ndim=1, dtype="int64")],
)
# 检查张量信息
_ = R.call_packed(
"vm.builtin.check_tensor_info", x, 2, R.dtype("float32"), "", sinfo_args=[R.Tuple()]
)
# 匹配张量形状并存储到形状堆中
_ = R.call_packed(
"vm.builtin.match_shape",
x,
shape_heap,
2,
MS.STORE_TO_HEAP,
sindex["n"],
MS.STORE_TO_HEAP,
sindex["m"],
"",
sinfo_args=[R.Tuple()],
)
# 构造返回的形状值
s = R.call_packed(
"vm.builtin.make_shape",
shape_heap,
3,
MK.LOAD_SHAPE,
sindex["m"],
MK.LOAD_SHAPE,
sindex["n"],
MK.USE_IMM,
2,
sinfo_args=[R.Shape(ndim=3)],
)
return s
for exec_mode in EXEC_MODE:
mod = TestVMShapeCheck
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
# 测试正常情况:输入形状为(1, 2)的张量
x = tvm.nd.array(np.zeros((1, 2)).astype("float32"))
res = vm["main"](x)
# 预期返回形状为(m, n, 2),即(2, 1, 2)
assert res == tvm.runtime.container.ShapeTuple([2, 1, 2])
# 测试错误情况:错误的输入类型
with pytest.raises(TypeError):
vm["main"]([])
# 测试错误情况:错误的维度数
with pytest.raises(ValueError, match=r".*ndim.*"):
vm["main"](tvm.nd.array(np.zeros(1).astype("float32")))
# 测试错误情况:错误的数据类型
with pytest.raises(ValueError, match=r".*dtype.*"):
vm["main"](tvm.nd.array(np.zeros((1, 2)).astype("int32")))
测试原语值处理#
@tvm.script.ir_module
class TestVMPrimValue:
@R.function
def main():
R.func_attr({"global_symbol": "main"})
ret = R.prim_value(T.int64(1)) # 创建原语整数值
return ret
for exec_mode in EXEC_MODE:
mod = TestVMPrimValue
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
res = vm["main"]()
assert res == 1
测试字符串常量#
@tvm.script.ir_module
class TestVMStringImm:
@R.function
def main():
R.func_attr({"global_symbol": "main"})
ret = R.str("hello") # 创建一个字符串常量
return ret
for exec_mode in EXEC_MODE:
mod = TestVMStringImm
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
res = vm["main"]()
assert res == "hello"
测试数据类型常量#
@tvm.script.ir_module
class TestDataTypeImm:
@R.function
def main():
R.func_attr({"global_symbol": "main"})
ret = R.dtype("float32") # 创建一个数据类型常量
return ret
for exec_mode in EXEC_MODE:
mod = TestDataTypeImm
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
vm = relax.VirtualMachine(ex, tvm.cpu())
res = vm["main"]()
assert res == "float32"
测试VM内置的reshape操作#
@tvm.script.ir_module
class TestVMBuiltinReshape:
@R.function(pure=False)
def main(x: R.Tensor((3, 4), "float32")):
R.func_attr({"global_symbol": "main"})
# 调用VM内置的reshape函数,将(3,4)张量变形为(6,2)
y = R.call_packed(
"vm.builtin.reshape", x, R.shape([6, 2]), sinfo_args=R.Tensor((6, 2), "float32")
)
return y
for exec_mode in EXEC_MODE:
mod = TestVMBuiltinReshape
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
dev = tvm.cpu()
vm = relax.VirtualMachine(ex, dev)
input_np = np.random.rand(3, 4).astype("float32")
input = tvm.nd.array(input_np, dev)
res = vm["main"](input)
expected = input_np.reshape(6, 2) # 预期的变形结果
tvm.testing.assert_allclose(res.numpy(), expected, rtol=1e-7, atol=1e-7)
测试VM对象销毁(内存释放)功能#
@I.ir_module
class TestKillObject:
# TIR原始函数:将输入缓冲区填充为0
@T.prim_func
def full(T_full: T.Buffer((T.int64(4),), "float32")):
T.func_attr({"global_symbol": "full", "tir.noalias": True})
for ax0 in range(T.int64(4)):
with T.block("T_full"):
v_ax0 = T.axis.spatial(T.int64(4), ax0)
T.reads()
T.writes(T_full[v_ax0])
T_full[v_ax0] = T.float32(0)
# TIR原始函数:将输入缓冲区填充为1
@T.prim_func
def full1(T_full: T.Buffer((T.int64(4),), "float32")):
T.func_attr({"global_symbol": "full1", "tir.noalias": True})
for ax0 in range(T.int64(4)):
with T.block("T_full"):
v_ax0 = T.axis.spatial(T.int64(4), ax0)
T.reads()
T.writes(T_full[v_ax0])
T_full[v_ax0] = T.float32(1)
# 直接调用的PrimFuncs被视为不纯函数
@R.function(pure=False)
def main() -> R.Tensor((4,), dtype="float32"):
R.func_attr({"global_symbol": "main"})
cls = TestKillObject
# 分配存储
storage: R.Object = R.vm.alloc_storage(R.shape([16]), R.prim_value(0), R.dtype("uint8"))
# 基于存储分配张量
alloc: R.Tensor((4,), dtype="float32") = R.vm.alloc_tensor(
storage, R.prim_value(0), R.shape([4]), R.dtype("float32")
)
_: R.Tuple = cls.full(alloc) # 将张量填充为0
__1: R.Tuple = R.vm.kill_object(alloc) # 销毁张量对象
x: R.Tensor((4,), dtype="float32") = alloc # 尝试访问已销毁的对象
# 重新使用同一块存储分配新张量
alloc1: R.Tensor((4,), dtype="float32") = R.vm.alloc_tensor(
storage, R.prim_value(0), R.shape([4]), R.dtype("float32")
)
_1: R.Tuple = cls.full(alloc1) # 将新张量填充为0
_1_1: R.Tuple = R.vm.kill_object(alloc1) # 销毁新张量
y: R.Tensor((4,), dtype="float32") = alloc1 # 尝试访问已销毁的对象
# 分配新的存储
storage_1: R.Object = R.vm.alloc_storage(
R.shape([16]), R.prim_value(0), R.dtype("uint8")
)
# 基于新存储分配张量
alloc2: R.Tensor((4,), dtype="float32") = R.vm.alloc_tensor(
storage_1, R.prim_value(0), R.shape([4]), R.dtype("float32")
)
_2: R.Tuple = cls.full1(alloc2) # 将张量填充为1
z: R.Tensor((4,), dtype="float32") = alloc2
_2_1: R.Tuple = R.vm.kill_object(storage) # 销毁第一个存储对象
return z
for exec_mode in EXEC_MODE:
mod = TestKillObject
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
dev = tvm.cpu()
vm = relax.VirtualMachine(ex, dev)
res = vm["main"]()
# 验证结果是否为全1数组,确保内存管理正常工作
tvm.testing.assert_allclose(res.numpy(), np.ones((4,), "float32"))
测试VM保留简单绑定#
@I.ir_module
class mod:
@R.function(pure=False)
def main():
# 外部函数用于检查对象是否已定义
callback = R.ExternFunc("test.vm.check_if_defined")
# 分配存储和张量
storage = R.vm.alloc_storage(R.shape([16]), R.prim_value(0), R.dtype("uint8"))
alloc = R.vm.alloc_tensor(storage, R.prim_value(0), R.shape([4]), R.dtype("float32"))
storage_alias = storage # 创建存储的别名
alloc_alias = alloc # 创建张量的别名
# 检查对象销毁前各对象的状态
storage_before = callback(storage)
alloc_before = callback(alloc)
storage_alias_before = callback(storage_alias)
alloc_alias_before = callback(alloc_alias)
# 销毁原始对象
_ = R.vm.kill_object(storage)
_ = R.vm.kill_object(alloc)
# 检查对象销毁后各对象的状态
storage_after = callback(storage)
alloc_after = callback(alloc)
storage_alias_after = callback(storage_alias)
alloc_alias_after = callback(alloc_alias)
# 返回所有检查结果
return (
storage_before,
alloc_before,
storage_alias_before,
alloc_alias_before,
storage_after,
alloc_after,
storage_alias_after,
alloc_alias_after,
)
for exec_mode in EXEC_MODE:
target = tvm.target.Target("llvm", host="llvm")
ex = codegen(mod, target, exec_mode)
dev = tvm.cpu()
vm = relax.VirtualMachine(ex, dev)
result_list = vm["main"]()
# 创建预期结果字典,以提高测试失败时的可读性
# 这相当于对结果数组的每个元素进行断言,但允许pytest在失败时给出字典差异
expected_results = {
"storage_before": True, # 销毁前,原始存储对象存在
"alloc_before": True, # 销毁前,原始张量对象存在
"storage_alias_before": True, # 销毁前,存储别名存在
"alloc_alias_before": True, # 销毁前,张量别名存在
"storage_after": False, # 销毁后,原始存储对象不存在
"alloc_after": False, # 销毁后,原始张量对象不存在
"storage_alias_after": True, # 销毁后,存储别名仍然存在
"alloc_alias_after": True, # 销毁后,张量别名仍然存在
}
# 将观察结果转换为字典
observed_results = {
name: bool(tir_bool) for name, tir_bool in zip(expected_results.keys(), result_list)
}
# 验证观察结果是否符合预期
assert observed_results == expected_results
/tmp/ipykernel_2018648/809391981.py:48: UserWarning: Returning type `vm.Storage` which is not registered via register_object, fallback to Object
result_list = vm["main"]()