VM 代码生成#

# 导入必要的包
import numpy as np  # 用于生成随机数据和数组操作
import pytest       # 测试框架
import tvm          # TVM核心库
import tvm.testing  # TVM测试工具
from tvm import relax  # Relax框架
from tvm.relax.testing.runtime_builtin import MakeShapeCode, MatchShapeCode  # 运行时内置函数支持
from tvm.relax.testing.vm import check_saved_func  # VM测试辅助函数
from tvm.script import ir as I  # IR脚本支持
from tvm.script import relax as R  # Relax脚本支持
from tvm.script import tir as T  # TIR脚本支持
# 代码生成辅助函数
def codegen(mod, target, exec_mode="bytecode"):
    """
    将Relax IR模块转换为可执行的VM代码
    
    参数:
        mod: 输入的IR模块
        target: 目标执行平台
        exec_mode: 执行模式,可选值为"bytecode"(字节码模式)或"compiled"(编译模式)
    
    返回值:
        编译后的可执行模块
    """
    builder = relax.ExecBuilder()
    # 通过VM代码生成器将模块转换为TIR模块
    tir_mod = relax.vm_build._vmcodegen(builder, mod, exec_mode=exec_mode)
    # 将TIR模块链接成可执行模块
    return relax.vm_build._vmlink(builder, target, tir_mod)
# 定义执行模式参数化列表,测试将在字节码模式和编译模式下分别运行
EXEC_MODE = ["bytecode", "compiled"]

测试 VM 内置的复制算子功能#

@tvm.script.ir_module
class TestVMMove:
    @R.function(pure=False)
    def foo(x: R.Tensor((3, 4), "float32")):
        R.func_attr({"global_symbol": "foo"})  # 设置函数的全局符号名称
        # 调用VM内置的复制函数
        z = R.call_packed("vm.builtin.copy", x, sinfo_args=(R.Tensor((3, 4), dtype="float32")))
        return z
for exec_mode in EXEC_MODE:
    mod = TestVMMove
    target = tvm.target.Target("llvm", host="llvm")  # 设置目标平台为LLVM
    ex = codegen(mod, target, exec_mode)  # 生成可执行模块
    inp = tvm.nd.array(np.random.rand(3, 4).astype(np.float32))  # 生成随机测试数据
    vm = relax.VirtualMachine(ex, tvm.cpu())  # 创建虚拟机实例
    res = check_saved_func(vm, "foo", inp)  # 检查保存的函数执行结果
    # 验证复制结果与原始数据是否一致
    tvm.testing.assert_allclose(res.numpy(), inp.numpy(), rtol=1e-7, atol=1e-7)

测试 VM 设备间数据传输#

@tvm.script.ir_module
class TestVMToDevice:
    @R.function(pure=False)
    def foo(x: R.Tensor((3, 4), "float32")):
        R.func_attr({"global_symbol": "foo"})
        # 将x复制到第一个CPU:device_type=1和device_id=0
        z = R.call_packed(
            "vm.builtin.to_device", x, 1, 0, sinfo_args=(R.Tensor((3, 4), dtype="float32"))
        )
        return z
        
for exec_mode in EXEC_MODE:
    mod = TestVMToDevice
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    inp = tvm.nd.array(np.random.rand(3, 4).astype(np.float32))
    vm = relax.VirtualMachine(ex, tvm.cpu())
    res = check_saved_func(vm, "foo", inp)
    tvm.testing.assert_allclose(res.numpy(), inp.numpy(), rtol=1e-7, atol=1e-7)
    # 检查结果张量是否在cpu:0上
    assert res.device == tvm.cpu(0)
    assert res.device.device_type == 1
    assert res.device.device_id == 0

测试常量条件的if语句#

@tvm.script.ir_module
class TestVMIfCondConst:
    @R.function
    def main(x: R.Tensor(ndim=2, dtype="float32")) -> R.Tensor(ndim=2, dtype="float32"):
        R.func_attr({"global_symbol": "main"})
        # 使用常量布尔值作为条件
        if relax.const(True, dtype="bool"):
            ret = x
        else:
            ret = x
        return ret
for exec_mode in EXEC_MODE:
    mod = TestVMIfCondConst
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    inp = tvm.nd.array(np.random.rand(3, 4))
    res = vm["main"](inp)
    tvm.testing.assert_allclose(res.numpy(), inp.numpy())

测试VM可执行文件的序列化和导出#

@tvm.script.ir_module
class TestVMMove:
    @R.function(pure=False)
    def foo(x: R.Tensor((3, 4), "float32")):
        R.func_attr({"global_symbol": "foo"})
        z = R.call_packed("vm.builtin.copy", x, sinfo_args=(R.Tensor((3, 4), dtype="float32")))
        return z
for exec_mode in EXEC_MODE:
    mod = TestVMMove
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target)
    from tvm.contrib import utils

    temp_dir = utils.tempdir()  # 创建临时目录
    path_exec = temp_dir.relpath("exec.so")  # 设置导出路径
    ex.export_library(path_exec)  # 导出库文件

    loaded_exec = tvm.runtime.load_module(path_exec)  # 加载导出的库文件
    # 验证加载的库与原始库内容是否一致
    assert ex.as_text() == loaded_exec["as_text"]()

测试条件分支执行#

@tvm.script.ir_module
class TestVMCompileIf:
    @R.function(pure=False)
    def ife(cond: R.Tensor((), "bool"), x: R.Tensor((3, 4), "float32")) -> R.Tensor:
        R.func_attr({"global_symbol": "ife"})
        # 根据条件执行不同的操作
        if cond:
            w = R.call_packed("test.vm.add", x, x, sinfo_args=(R.Tensor))
        else:
            w = R.call_packed("test.vm.mul", x, x, sinfo_args=(R.Tensor))
        return w
for exec_mode in EXEC_MODE:
    mod = TestVMCompileIf
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    inp = tvm.nd.array(np.random.rand(3, 4))
    # 测试条件为整数1(True)的情况
    res = vm["ife"](tvm.nd.array(1), inp)
    tvm.testing.assert_allclose(res.numpy(), inp.numpy() + inp.numpy(), rtol=1e-7, atol=1e-7)
    # 测试条件为布尔True的情况
    res = vm["ife"](tvm.nd.array(True), inp)
    tvm.testing.assert_allclose(res.numpy(), inp.numpy() + inp.numpy(), rtol=1e-7, atol=1e-7)
    # 测试条件为整数0(False)的情况
    res = vm["ife"](tvm.nd.array(0), inp)
    tvm.testing.assert_allclose(res.numpy(), inp.numpy() * inp.numpy(), rtol=1e-7, atol=1e-7)
    # 测试条件为布尔False的情况
    res = vm["ife"](tvm.nd.array(False), inp)
    tvm.testing.assert_allclose(res.numpy(), inp.numpy() * inp.numpy(), rtol=1e-7, atol=1e-7)

测试VM返回常量元组#

@tvm.script.ir_module
class ReturnConstTuple:
    @R.function
    def main(x: R.Tensor(ndim=2, dtype="float32")):
        R.func_attr({"global_symbol": "main"})
        y = R.const([1, 2])  # 创建常量数组
        z = (y, R.const([3, 4]), x)  # 创建包含常量和变量的元组
        return z
for exec_mode in EXEC_MODE:
    mod = ReturnConstTuple
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    inp = tvm.nd.array(np.random.rand(2, 3))
    res0, res1, res2 = vm["main"](inp)  # 解包返回的元组
    # 验证各个返回值是否符合预期
    tvm.testing.assert_allclose(res0.numpy(), np.array([1, 2]))
    tvm.testing.assert_allclose(res1.numpy(), np.array([3, 4]))
    tvm.testing.assert_allclose(res2.numpy(), inp.numpy())

测试常量作为函数调用参数#

@tvm.script.ir_module
class TestVMConstAsCallArg:
    @R.function(pure=False)
    def main(x: R.Tensor(ndim=2, dtype="float32")):
        R.func_attr({"global_symbol": "main"})
        # 调用函数并传递常量作为参数
        a = R.call_packed(
            "test.vm.add",
            relax.const([1, 2]),
            relax.const([3, 4]),
            sinfo_args=(R.Tensor(ndim=2, dtype="float32")),
        )
        b = R.call_packed(
            "test.vm.add",
            a,
            x,
            sinfo_args=(R.Tensor(ndim=2, dtype="float32")),
        )
        return b

for exec_mode in EXEC_MODE:
    mod = TestVMConstAsCallArg
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    inp = tvm.nd.array(np.random.rand(1, 2))
    res = vm["main"](inp)
    # 验证计算结果是否正确:常量加法[1,2]+[3,4]=[4,6],再加上输入张量
    tvm.testing.assert_allclose(res.numpy(), np.array([4, 6]) + inp.numpy())

测试形状检查内置函数#

MS = MatchShapeCode  # 形状匹配代码
MK = MakeShapeCode   # 形状创建代码
# 形状变量在形状堆中的槽位分配
# 0: n, 1: m
sindex = {"n": 0, "m": 1}

@tvm.script.ir_module
class TestVMShapeCheck:
    @R.function(pure=False)
    def main(x: R.Tensor(["n", "m"], "float32")) -> R.Shape(ndim=3):
        R.func_attr({"global_symbol": "main"})
        n = T.int64()
        k = T.int64()
        # 分配形状堆
        shape_heap = R.call_builtin_with_ctx(
            "vm.builtin.alloc_shape_heap",
            [R.prim_value(3)],
            sinfo_args=[R.Tensor(ndim=1, dtype="int64")],
        )
        # 检查张量信息
        _ = R.call_packed(
            "vm.builtin.check_tensor_info", x, 2, R.dtype("float32"), "", sinfo_args=[R.Tuple()]
        )
        # 匹配张量形状并存储到形状堆中
        _ = R.call_packed(
            "vm.builtin.match_shape",
            x,
            shape_heap,
            2,
            MS.STORE_TO_HEAP,
            sindex["n"],
            MS.STORE_TO_HEAP,
            sindex["m"],
            "",
            sinfo_args=[R.Tuple()],
        )
        # 构造返回的形状值
        s = R.call_packed(
            "vm.builtin.make_shape",
            shape_heap,
            3,
            MK.LOAD_SHAPE,
            sindex["m"],
            MK.LOAD_SHAPE,
            sindex["n"],
            MK.USE_IMM,
            2,
            sinfo_args=[R.Shape(ndim=3)],
        )
        return s

for exec_mode in EXEC_MODE:
    mod = TestVMShapeCheck
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    # 测试正常情况:输入形状为(1, 2)的张量
    x = tvm.nd.array(np.zeros((1, 2)).astype("float32"))
    res = vm["main"](x)
    # 预期返回形状为(m, n, 2),即(2, 1, 2)
    assert res == tvm.runtime.container.ShapeTuple([2, 1, 2])

    # 测试错误情况:错误的输入类型
    with pytest.raises(TypeError):
        vm["main"]([])

    # 测试错误情况:错误的维度数
    with pytest.raises(ValueError, match=r".*ndim.*"):
        vm["main"](tvm.nd.array(np.zeros(1).astype("float32")))

    # 测试错误情况:错误的数据类型
    with pytest.raises(ValueError, match=r".*dtype.*"):
        vm["main"](tvm.nd.array(np.zeros((1, 2)).astype("int32")))

测试原语值处理#

@tvm.script.ir_module
class TestVMPrimValue:
    @R.function
    def main():
        R.func_attr({"global_symbol": "main"})
        ret = R.prim_value(T.int64(1))  # 创建原语整数值
        return ret
for exec_mode in EXEC_MODE:
    mod = TestVMPrimValue
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    res = vm["main"]()
    assert res == 1

测试字符串常量#

@tvm.script.ir_module
class TestVMStringImm:
    @R.function
    def main():
        R.func_attr({"global_symbol": "main"})
        ret = R.str("hello")  # 创建一个字符串常量
        return ret
for exec_mode in EXEC_MODE:
    mod = TestVMStringImm
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    res = vm["main"]()
    assert res == "hello"

测试数据类型常量#

@tvm.script.ir_module
class TestDataTypeImm:
    @R.function
    def main():
        R.func_attr({"global_symbol": "main"})
        ret = R.dtype("float32")  # 创建一个数据类型常量
        return ret
for exec_mode in EXEC_MODE:
    mod = TestDataTypeImm
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    vm = relax.VirtualMachine(ex, tvm.cpu())
    res = vm["main"]()
    assert res == "float32"

测试VM内置的reshape操作#

@tvm.script.ir_module
class TestVMBuiltinReshape:
    @R.function(pure=False)
    def main(x: R.Tensor((3, 4), "float32")):
        R.func_attr({"global_symbol": "main"})
        # 调用VM内置的reshape函数,将(3,4)张量变形为(6,2)
        y = R.call_packed(
            "vm.builtin.reshape", x, R.shape([6, 2]), sinfo_args=R.Tensor((6, 2), "float32")
        )
        return y
for exec_mode in EXEC_MODE:
    mod = TestVMBuiltinReshape
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    dev = tvm.cpu()
    vm = relax.VirtualMachine(ex, dev)

    input_np = np.random.rand(3, 4).astype("float32")
    input = tvm.nd.array(input_np, dev)
    res = vm["main"](input)
    expected = input_np.reshape(6, 2)  # 预期的变形结果
    tvm.testing.assert_allclose(res.numpy(), expected, rtol=1e-7, atol=1e-7)

测试VM对象销毁(内存释放)功能#

@I.ir_module
class TestKillObject:
    # TIR原始函数:将输入缓冲区填充为0
    @T.prim_func
    def full(T_full: T.Buffer((T.int64(4),), "float32")):
        T.func_attr({"global_symbol": "full", "tir.noalias": True})
        for ax0 in range(T.int64(4)):
            with T.block("T_full"):
                v_ax0 = T.axis.spatial(T.int64(4), ax0)
                T.reads()
                T.writes(T_full[v_ax0])
                T_full[v_ax0] = T.float32(0)

    # TIR原始函数:将输入缓冲区填充为1
    @T.prim_func
    def full1(T_full: T.Buffer((T.int64(4),), "float32")):
        T.func_attr({"global_symbol": "full1", "tir.noalias": True})
        for ax0 in range(T.int64(4)):
            with T.block("T_full"):
                v_ax0 = T.axis.spatial(T.int64(4), ax0)
                T.reads()
                T.writes(T_full[v_ax0])
                T_full[v_ax0] = T.float32(1)

    # 直接调用的PrimFuncs被视为不纯函数
    @R.function(pure=False)
    def main() -> R.Tensor((4,), dtype="float32"):
        R.func_attr({"global_symbol": "main"})
        cls = TestKillObject
        # 分配存储
        storage: R.Object = R.vm.alloc_storage(R.shape([16]), R.prim_value(0), R.dtype("uint8"))
        # 基于存储分配张量
        alloc: R.Tensor((4,), dtype="float32") = R.vm.alloc_tensor(
            storage, R.prim_value(0), R.shape([4]), R.dtype("float32")
        )
        _: R.Tuple = cls.full(alloc)  # 将张量填充为0
        __1: R.Tuple = R.vm.kill_object(alloc)  # 销毁张量对象
        x: R.Tensor((4,), dtype="float32") = alloc  # 尝试访问已销毁的对象
        # 重新使用同一块存储分配新张量
        alloc1: R.Tensor((4,), dtype="float32") = R.vm.alloc_tensor(
            storage, R.prim_value(0), R.shape([4]), R.dtype("float32")
        )
        _1: R.Tuple = cls.full(alloc1)  # 将新张量填充为0
        _1_1: R.Tuple = R.vm.kill_object(alloc1)  # 销毁新张量
        y: R.Tensor((4,), dtype="float32") = alloc1  # 尝试访问已销毁的对象
        # 分配新的存储
        storage_1: R.Object = R.vm.alloc_storage(
            R.shape([16]), R.prim_value(0), R.dtype("uint8")
        )
        # 基于新存储分配张量
        alloc2: R.Tensor((4,), dtype="float32") = R.vm.alloc_tensor(
            storage_1, R.prim_value(0), R.shape([4]), R.dtype("float32")
        )
        _2: R.Tuple = cls.full1(alloc2)  # 将张量填充为1
        z: R.Tensor((4,), dtype="float32") = alloc2
        _2_1: R.Tuple = R.vm.kill_object(storage)  # 销毁第一个存储对象
        return z
        
for exec_mode in EXEC_MODE:
    mod = TestKillObject
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    dev = tvm.cpu()
    vm = relax.VirtualMachine(ex, dev)

    res = vm["main"]()
    # 验证结果是否为全1数组,确保内存管理正常工作
    tvm.testing.assert_allclose(res.numpy(), np.ones((4,), "float32"))

测试VM保留简单绑定#

@I.ir_module
class mod:
    @R.function(pure=False)
    def main():
        # 外部函数用于检查对象是否已定义
        callback = R.ExternFunc("test.vm.check_if_defined")

        # 分配存储和张量
        storage = R.vm.alloc_storage(R.shape([16]), R.prim_value(0), R.dtype("uint8"))
        alloc = R.vm.alloc_tensor(storage, R.prim_value(0), R.shape([4]), R.dtype("float32"))
        storage_alias = storage  # 创建存储的别名
        alloc_alias = alloc      # 创建张量的别名

        # 检查对象销毁前各对象的状态
        storage_before = callback(storage)
        alloc_before = callback(alloc)
        storage_alias_before = callback(storage_alias)
        alloc_alias_before = callback(alloc_alias)

        # 销毁原始对象
        _ = R.vm.kill_object(storage)
        _ = R.vm.kill_object(alloc)

        # 检查对象销毁后各对象的状态
        storage_after = callback(storage)
        alloc_after = callback(alloc)
        storage_alias_after = callback(storage_alias)
        alloc_alias_after = callback(alloc_alias)

        # 返回所有检查结果
        return (
            storage_before,
            alloc_before,
            storage_alias_before,
            alloc_alias_before,
            storage_after,
            alloc_after,
            storage_alias_after,
            alloc_alias_after,
        )

for exec_mode in EXEC_MODE:
    target = tvm.target.Target("llvm", host="llvm")
    ex = codegen(mod, target, exec_mode)
    dev = tvm.cpu()
    vm = relax.VirtualMachine(ex, dev)

    result_list = vm["main"]()

    # 创建预期结果字典,以提高测试失败时的可读性
    # 这相当于对结果数组的每个元素进行断言,但允许pytest在失败时给出字典差异
    expected_results = {
        "storage_before": True,       # 销毁前,原始存储对象存在
        "alloc_before": True,         # 销毁前,原始张量对象存在
        "storage_alias_before": True, # 销毁前,存储别名存在
        "alloc_alias_before": True,   # 销毁前,张量别名存在
        "storage_after": False,       # 销毁后,原始存储对象不存在
        "alloc_after": False,         # 销毁后,原始张量对象不存在
        "storage_alias_after": True,  # 销毁后,存储别名仍然存在
        "alloc_alias_after": True,    # 销毁后,张量别名仍然存在
    }

    # 将观察结果转换为字典
    observed_results = {
        name: bool(tir_bool) for name, tir_bool in zip(expected_results.keys(), result_list)
    }

    # 验证观察结果是否符合预期
    assert observed_results == expected_results
/tmp/ipykernel_2018648/809391981.py:48: UserWarning: Returning type `vm.Storage` which is not registered via register_object, fallback to Object
  result_list = vm["main"]()