计算图分区#

import numpy as np

import tvm
from tvm.relay.backend import te_compiler
from tvm.relay.backend.runtime import Runtime
import tvm.relay.testing
from tvm import relay
from tvm import runtime as tvm_runtime
from tvm.relay import transform
from tvm.contrib import utils
from tvm.relay.expr_functor import ExprMutator
from tvm.relay.op.annotation import compiler_begin, compiler_end
from tvm.relay.op.contrib.register import get_pattern_table
from tvm.relay.build_module import bind_params_by_name
Hide code cell content
# Leverage the pass manager to write a simple allowed list based annotator
@transform.function_pass(opt_level=0)
class AllowedListAnnotator:
    def __init__(self, op_list, compiler):
        assert isinstance(op_list, (list, tuple, set))
        self.op_list = op_list
        self.compiler = compiler

    def transform_function(self, func, mod, dev):

        annotator = self

        class Annotator(tvm.relay.ExprMutator):
            def visit_call(self, call):
                op_name = call.op.name
                if op_name in annotator.op_list:
                    new_args = []
                    for arg in call.args:
                        ann = compiler_begin(super().visit(arg), annotator.compiler)
                        new_args.append(ann)
                    new_call = relay.Call(call.op, new_args, call.attrs, call.type_args)
                    return compiler_end(new_call, annotator.compiler)
                else:
                    return super().visit_call(call)

        return Annotator().visit(func)


class WholeGraphAnnotator(ExprMutator):
    """
    An annotator that creates a compiler for an entire graph.
    """

    def __init__(self, compiler):
        super(WholeGraphAnnotator, self).__init__()
        self.compiler = compiler
        self.last_call = True

    def visit_call(self, call):
        curr_last = self.last_call
        self.last_call = False

        params = []
        for arg in call.args:
            param = super().visit(arg)
            if isinstance(param, relay.expr.Var):
                param = compiler_begin(param, self.compiler)
            params.append(param)

        new_call = relay.Call(call.op, params, call.attrs)
        if curr_last:
            new_call = compiler_end(new_call, self.compiler)
        return new_call


class MobileNetAnnotator(ExprMutator):
    """
    Annotate mobilenet until global_avg_pool.
    """

    def __init__(self, compiler):
        super(MobileNetAnnotator, self).__init__()
        self.compiler = compiler
        self.compiler_open = False

    def visit_call(self, call):

        if call.op.name == "nn.global_avg_pool2d":
            self.compiler_open = True
        compiler_open = self.compiler_open

        params = []
        for arg in call.args:
            param = super().visit(arg)
            if call.op.name == "nn.global_avg_pool2d":
                param = compiler_end(param, self.compiler)
            if compiler_open and isinstance(param, relay.expr.Var):
                param = compiler_begin(param, self.compiler)
            params.append(param)

        new_call = relay.Call(call.op, params, call.attrs)
        return new_call
Hide code cell content
import os
def check_result(
    mod,
    map_inputs,
    out_shape,
    result,
    tol=1e-5,
    target="llvm",
    device=tvm.cpu(),
    params=None,
    runtime=Runtime("cpp"),
):
    def update_lib(lib):
        test_dir = os.path.dirname(os.path.realpath(os.path.expanduser("__file__")))
        source_dir = os.path.join(test_dir, "..", "..", "..")
        contrib_path = os.path.join(source_dir, "src", "runtime", "contrib")

        kwargs = {}
        kwargs["options"] = ["-O2", "-std=c++17", "-I" + contrib_path]
        tmp_path = utils.tempdir()
        lib_name = "lib.so"
        lib_path = tmp_path.relpath(lib_name)
        lib.export_library(lib_path, fcompile=False, **kwargs)
        lib = tvm_runtime.load_module(lib_path)

        return lib

    def check_vm_result():
        te_compiler.get().clear()
        with tvm.transform.PassContext(opt_level=3):
            exe = relay.vm.compile(mod, target=target, params=params)
        code, lib = exe.save()
        lib = update_lib(lib)
        exe = tvm_runtime.vm.Executable.load_exec(code, lib)
        vm = tvm_runtime.vm.VirtualMachine(exe, device)
        outs = vm.run(**map_inputs)
        outs = outs if isinstance(outs, tvm_runtime.container.ADT) else [outs]
        results = result if isinstance(result, list) else [result]
        for out, ref in zip(outs, results):
            tvm.testing.assert_allclose(out.numpy(), ref, rtol=tol, atol=tol)

    def check_graph_executor_result():
        te_compiler.get().clear()
        with tvm.transform.PassContext(opt_level=3):
            json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)
        lib = update_lib(lib)
        rt_mod = tvm.contrib.graph_executor.create(json, lib, device)

        for name, data in map_inputs.items():
            rt_mod.set_input(name, data)
        rt_mod.set_input(**param)
        rt_mod.run()

        out_shapes = out_shape if isinstance(out_shape, list) else [out_shape]
        results = result if isinstance(result, list) else [result]

        for idx, shape in enumerate(out_shapes):
            out = tvm.nd.empty(shape, device=device)
            out = rt_mod.get_output(idx, out)
            tvm.testing.assert_allclose(out.numpy(), results[idx], rtol=tol, atol=tol)

    check_vm_result()
    check_graph_executor_result()

测试外部 C 编译器处理单一算子#

@transform.function_pass(opt_level=0)
class MyAnnotator:
    def transform_function(self, func, mod, dev):
        class Annotator(tvm.relay.ExprMutator):
            def visit_call(self, call):
                new_args = []
                for arg in call.args:
                    ann = compiler_begin(self.visit(arg), "ccompiler")
                    new_args.append(ann)
                new_call = relay.Call(call.op, new_args)
                return compiler_end(new_call, "ccompiler")

        return Annotator().visit(func)

x = relay.var("x", shape=(8, 8))
y = relay.var("y", shape=(8, 8))
z = x + y
f = relay.Function([x, y], z)
x_data = np.random.rand(8, 8).astype("float32")
y_data = np.random.rand(8, 8).astype("float32")
mod = tvm.IRModule()
mod["main"] = f
mod = MyAnnotator()(mod)
mod = transform.PartitionGraph()(mod)

check_result(mod, {"x": x_data, "y": y_data}, (8, 8), x_data + y_data)
[17:36:37] /media/pc/data/lxw/ai/tvm/src/relay/backend/vm/compiler.cc:1199: All lowered functions have been build by BYOC -- generating an empty TVM module
/tmp/ipykernel_2738903/3034817802.py:45: DeprecationWarning: legacy graph executor behavior of producing json / lib / params will be removed in the next release. Please see documents of tvm.contrib.graph_executor.GraphModule for the  new recommended usage.
  json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)

测试外部 C 编译器处理默认算子#

def set_func_attr(func, compile_name, symbol_name):
    func = func.with_attr("Primitive", tvm.tir.IntImm("int32", 1))
    func = func.with_attr("Inline", tvm.tir.IntImm("int32", 1))
    func = func.with_attr("Compiler", compile_name)
    func = func.with_attr("global_symbol", symbol_name)
    return func
x = relay.var("x", shape=(8, 8))
y = relay.var("y", shape=(8, 8))
add = x + y
log = relay.log(add)
exp = relay.exp(add)
concat = relay.concatenate([log, exp], axis=0)
f = relay.Function([x, y], concat)
mod = tvm.IRModule()
mod["main"] = f
mod = AllowedListAnnotator(["add", "subtract", "multiply"], "ccompiler")(mod)
mod = transform.PartitionGraph()(mod)
fused_mod = transform.FuseOps(2)(mod)

x_data = np.random.rand(8, 8).astype("float32")
y_data = np.random.rand(8, 8).astype("float32")
np_add = x_data + y_data
res = np.concatenate([np.log(np_add), np.exp(np_add)])
check_result(mod, {"x": x_data, "y": y_data}, (16, 8), res)
/tmp/ipykernel_2738903/3034817802.py:45: DeprecationWarning: legacy graph executor behavior of producing json / lib / params will be removed in the next release. Please see documents of tvm.contrib.graph_executor.GraphModule for the  new recommended usage.
  json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)

测试外部编译器处理经过清理的算子#

x = relay.var("x", shape=(8, 8))
y = relay.var("y", shape=(8, 8))
add = x + y
log = relay.log(add)
exp = relay.exp(add)
concat = relay.concatenate([log, exp], axis=0)
f = relay.Function([x, y], concat)
mod = tvm.IRModule()
mod["main"] = f
mod = AllowedListAnnotator(["add", "subtract", "multiply"], "unsanitary-name++")(mod)
mod = transform.PartitionGraph()(mod)
fused_mod = transform.FuseOps(2)(mod)
fused_mod.show()
def @main(%x: Tensor[(8, 8), float32] /* ty=Tensor[(8, 8), float32] */, %y: Tensor[(8, 8), float32] /* ty=Tensor[(8, 8), float32] */) -> Tensor[(16, 8), float32] {
  %3 = @tvmgen_default_unsanitary_name___main_0(%x, %y) /* ty=Tensor[(8, 8), float32] */;
  %4 = fn (%p0: Tensor[(8, 8), float32] /* ty=Tensor[(8, 8), float32] */, Primitive=1) -> Tensor[(16, 8), float32] {
    %0 = log(%p0) /* ty=Tensor[(8, 8), float32] */;
    %1 = exp(%p0) /* ty=Tensor[(8, 8), float32] */;
    %2 = (%0, %1) /* ty=(Tensor[(8, 8), float32], Tensor[(8, 8), float32]) */;
    concatenate(%2) /* ty=Tensor[(16, 8), float32] */
  } /* ty=fn (Tensor[(8, 8), float32]) -> Tensor[(16, 8), float32] */;
  %4(%3) /* ty=Tensor[(16, 8), float32] */
}

def @tvmgen_default_unsanitary_name___main_0(%unsanitary-name++_0_i0: Tensor[(8, 8), float32] /* ty=Tensor[(8, 8), float32] */, %unsanitary-name++_0_i1: Tensor[(8, 8), float32] /* ty=Tensor[(8, 8), float32] */, Compiler="unsanitary-name++", Primitive=1, Inline=1, global_symbol="tvmgen_default_unsanitary_name___main_0") -> Tensor[(8, 8), float32] {
  add(%unsanitary-name++_0_i0, %unsanitary-name++_0_i1) /* ty=Tensor[(8, 8), float32] */
}

测试外部 C 编译器处理多个函数#

x = relay.var("x", shape=(8, 8))
y = relay.var("y", shape=(8, 8))
add = x + y
log = relay.log(add)
exp = relay.exp(add)
concat = relay.concatenate([log, exp], axis=0)
f = relay.Function([x, y], concat)
mod = tvm.IRModule()
mod["main"] = f
# define second function
a = relay.var("a", shape=(16, 16))
b = relay.var("b", shape=(16, 16))
add = a + b
log = relay.log(add)
exp = relay.exp(add)
concat = relay.concatenate([log, exp], axis=0)
f2 = relay.Function([a, b], concat)
mod["subfunction"] = f2
mod = AllowedListAnnotator(["add", "subtract", "multiply"], "ccompiler")(mod)
mod = transform.PartitionGraph()(mod)

fused_mod = transform.FuseOps(2)(mod)

x_data = np.random.rand(8, 8).astype("float32")
y_data = np.random.rand(8, 8).astype("float32")
np_add = x_data + y_data
res = np.concatenate([np.log(np_add), np.exp(np_add)])
check_result(mod, {"x": x_data, "y": y_data}, (16, 8), res)
/tmp/ipykernel_2738903/3034817802.py:45: DeprecationWarning: legacy graph executor behavior of producing json / lib / params will be removed in the next release. Please see documents of tvm.contrib.graph_executor.GraphModule for the  new recommended usage.
  json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)

测试外部 C 编译器#

x = relay.var("x", shape=(2, 2))
y = relay.var("y", shape=(2, 2))
z = x + x
p = y * y
f = relay.Function([x, y], p - z)
x_data = np.random.rand(2, 2).astype("float32")
y_data = np.random.rand(2, 2).astype("float32")
mod = tvm.IRModule()
mod["main"] = f
mod = AllowedListAnnotator(["add", "subtract", "multiply"], "ccompiler")(mod)
mod = transform.PartitionGraph()(mod)

check_result(mod, {"x": x_data, "y": y_data}, (2, 2), (y_data * y_data) - (x_data + x_data))
[17:36:40] /media/pc/data/lxw/ai/tvm/src/relay/backend/vm/compiler.cc:1199: All lowered functions have been build by BYOC -- generating an empty TVM module
/tmp/ipykernel_2738903/3034817802.py:45: DeprecationWarning: legacy graph executor behavior of producing json / lib / params will be removed in the next release. Please see documents of tvm.contrib.graph_executor.GraphModule for the  new recommended usage.
  json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)

测试函数提升#

Hide code cell content
def partition():
    data = relay.var("data", relay.TensorType((1, 3, 224, 224), "float32"))
    weight = relay.var("weight", relay.TensorType((16, 3, 3, 3), "float32"))
    bn_gamma = relay.var("bn_gamma", relay.TensorType((16,), "float32"))
    bn_beta = relay.var("bn_beta", relay.TensorType((16,), "float32"))
    bn_mmean = relay.var("bn_mean", relay.TensorType((16,), "float32"))
    bn_mvar = relay.var("bn_var", relay.TensorType((16,), "float32"))

    conv = relay.nn.conv2d(
        data=data, weight=weight, kernel_size=(3, 3), channels=16, padding=(1, 1)
    )
    bn_output = relay.nn.batch_norm(conv, bn_gamma, bn_beta, bn_mmean, bn_mvar)

    func = relay.Function(
        [data, weight, bn_gamma, bn_beta, bn_mmean, bn_mvar], bn_output.astuple()
    )
    mod = tvm.IRModule()
    mod["main"] = func
    mod = relay.transform.InferType()(mod)
    op_list = ["nn.batch_norm", "nn.conv2d"]
    mod = AllowedListAnnotator(op_list, "test_compiler")(mod)

    opt_pass = tvm.transform.Sequential(
        [
            transform.InferType(),
            transform.PartitionGraph(),
            transform.SimplifyInference(),
            transform.FoldConstant(),
            transform.AlterOpLayout(),
        ]
    )

    with tvm.transform.PassContext(opt_level=3):
        mod = opt_pass(mod)

    return mod
partitioned = partition()
partitioned.show()
def @main(%data: Tensor[(1, 3, 224, 224), float32] /* ty=Tensor[(1, 3, 224, 224), float32] */, %weight: Tensor[(16, 3, 3, 3), float32] /* ty=Tensor[(16, 3, 3, 3), float32] */, %bn_gamma: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_beta: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_mean: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_var: Tensor[(16), float32] /* ty=Tensor[(16), float32] */) -> (Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) {
  %0 = @tvmgen_default_test_compiler_main_0(%data, %weight) /* ty=Tensor[(1, 16, 224, 224), float32] */;
  @tvmgen_default_test_compiler_main_2(%0, %bn_gamma, %bn_beta, %bn_mean, %bn_var) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */
}

def @tvmgen_default_test_compiler_main_0(%test_compiler_0_i0: Tensor[(1, 3, 224, 224), float32] /* ty=Tensor[(1, 3, 224, 224), float32] */, %test_compiler_0_i1: Tensor[(16, 3, 3, 3), float32] /* ty=Tensor[(16, 3, 3, 3), float32] */, Compiler="test_compiler", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_compiler_main_0") -> Tensor[(1, 16, 224, 224), float32] {
  nn.conv2d(%test_compiler_0_i0, %test_compiler_0_i1, padding=[1, 1, 1, 1], channels=16, kernel_size=[3, 3]) /* ty=Tensor[(1, 16, 224, 224), float32] */
}

def @tvmgen_default_test_compiler_main_2(%test_compiler_2_i0: Tensor[(1, 16, 224, 224), float32] /* ty=Tensor[(1, 16, 224, 224), float32] */, %test_compiler_2_i1: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_compiler_2_i2: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_compiler_2_i3: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_compiler_2_i4: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, Compiler="test_compiler", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_compiler_main_2") -> (Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) {
  nn.batch_norm(%test_compiler_2_i0, %test_compiler_2_i1, %test_compiler_2_i2, %test_compiler_2_i3, %test_compiler_2_i4) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */
}

测试函数提升内联#

def partition():
    data = relay.var("data", relay.TensorType((1, 16, 224, 224), "float32"))
    bn_gamma = relay.var("bn_gamma", relay.TensorType((16,), "float32"))
    bn_beta = relay.var("bn_beta", relay.TensorType((16,), "float32"))
    bn_mmean = relay.var("bn_mean", relay.TensorType((16,), "float32"))
    bn_mvar = relay.var("bn_var", relay.TensorType((16,), "float32"))

    bn_output = relay.nn.batch_norm(data, bn_gamma, bn_beta, bn_mmean, bn_mvar)

    func = relay.Function([data, bn_gamma, bn_beta, bn_mmean, bn_mvar], bn_output.astuple())
    mod = tvm.IRModule()
    mod["main"] = func
    op_list = ["nn.batch_norm", "nn.conv2d"]
    mod = AllowedListAnnotator(op_list, "test_compiler")(mod)

    opt_pass = tvm.transform.Sequential(
        [
            transform.InferType(),
            transform.PartitionGraph(),
            transform.SimplifyInference(),
            transform.FoldConstant(),
            transform.AlterOpLayout(),
            transform.Inline(),
        ]
    )

    with tvm.transform.PassContext(opt_level=3):
        mod = opt_pass(mod)

    return mod

partitioned = partition()
partitioned.show()
def @main(%data: Tensor[(1, 16, 224, 224), float32] /* ty=Tensor[(1, 16, 224, 224), float32] */, %bn_gamma: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_beta: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_mean: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_var: Tensor[(16), float32] /* ty=Tensor[(16), float32] */) -> (Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) {
  %0 = fn (%test_compiler_0_i0: Tensor[(1, 16, 224, 224), float32] /* ty=Tensor[(1, 16, 224, 224), float32] */, %test_compiler_0_i1: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_compiler_0_i2: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_compiler_0_i3: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_compiler_0_i4: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, Compiler="test_compiler", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_compiler_main_0") -> (Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) {
    nn.batch_norm(%test_compiler_0_i0, %test_compiler_0_i1, %test_compiler_0_i2, %test_compiler_0_i3, %test_compiler_0_i4) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */
  };
  %0(%data, %bn_gamma, %bn_beta, %bn_mean, %bn_var)
}

测试常量传播#

ones = np.ones(shape=(8, 8), dtype="float32")
x = relay.var("x", shape=(8, 8))
y = relay.var("y", shape=(8, 8))
add = x + y
log = relay.log(add)
f = relay.Function([x, y], log)
f = bind_params_by_name(f, {"x": tvm.nd.array(ones)})
mod = tvm.IRModule()
mod["main"] = f
mod = AllowedListAnnotator(["add"], "ccompiler")(mod)
mod = transform.PartitionGraph()(mod)
mod = relay.transform.InferType()(mod)

y_data = np.random.rand(8, 8).astype("float32")
np_add = ones + y_data
check_result(mod, {"y": y_data}, (8, 8), np.log(np_add))
/tmp/ipykernel_2738903/3034817802.py:45: DeprecationWarning: legacy graph executor behavior of producing json / lib / params will be removed in the next release. Please see documents of tvm.contrib.graph_executor.GraphModule for the  new recommended usage.
  json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)

测试多输出#

Hide code cell content
def create_graph():
    data = relay.var("data", relay.TensorType((1, 3, 224, 224), "float32"))
    weight = relay.var("weight", relay.TensorType((16, 3, 3, 3), "float32"))
    bn_gamma = relay.var("bn_gamma", relay.TensorType((16,), "float32"))
    bn_beta = relay.var("bn_beta", relay.TensorType((16,), "float32"))
    bn_mean = relay.var("bn_mean", relay.TensorType((16,), "float32"))
    bn_var = relay.var("bn_var", relay.TensorType((16,), "float32"))

    data_cb = compiler_begin(data, "test_target")
    weight_cb = compiler_begin(weight, "test_target")
    bn_gamma_cb = compiler_begin(bn_gamma, "test_target")
    bn_beta_cb = compiler_begin(bn_beta, "test_target")
    bn_mean_cb = compiler_begin(bn_mean, "test_target")
    bn_var_cb = compiler_begin(bn_var, "test_target")

    conv_o = relay.nn.conv2d(
        data=data_cb, weight=weight_cb, kernel_size=(3, 3), channels=16, padding=(1, 1)
    )

    bn_o = relay.nn.batch_norm(conv_o, bn_gamma_cb, bn_beta_cb, bn_mean_cb, bn_var_cb)

    relu_o = relay.nn.relu(bn_o[0])
    relu_o_ce = compiler_end(relu_o, "test_target")

    bn_omean = bn_o[1]
    rebn_omean_ce = compiler_end(bn_omean, "test_target")
    bn_ovar = bn_o[2]
    bn_ovar_ce = compiler_end(bn_ovar, "test_target")

    dummy_mean_abs = relay.abs(rebn_omean_ce)
    dummy_ovar_abs = relay.abs(bn_ovar_ce)
    dummy_tuple = relay.Tuple((relu_o_ce, dummy_mean_abs, dummy_ovar_abs))

    func = relay.Function([data, weight, bn_gamma, bn_beta, bn_mean, bn_var], dummy_tuple)
    return func
mod = tvm.IRModule()
mod["main"] = create_graph()
partitioned = transform.PartitionGraph()(mod)
partitioned.show()
def @main(%data: Tensor[(1, 3, 224, 224), float32] /* ty=Tensor[(1, 3, 224, 224), float32] */, %weight: Tensor[(16, 3, 3, 3), float32] /* ty=Tensor[(16, 3, 3, 3), float32] */, %bn_gamma: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_beta: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_mean: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %bn_var: Tensor[(16), float32] /* ty=Tensor[(16), float32] */) -> (Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) {
  %0 = @tvmgen_default_test_target_main_0(%data, %weight, %bn_gamma, %bn_beta, %bn_mean, %bn_var) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */;
  %1 = %0.1 /* ty=Tensor[(16), float32] */;
  %2 = %0.2 /* ty=Tensor[(16), float32] */;
  %3 = %0.0 /* ty=Tensor[(1, 16, 224, 224), float32] */;
  %4 = abs(%1) /* ty=Tensor[(16), float32] */;
  %5 = abs(%2) /* ty=Tensor[(16), float32] */;
  (%3, %4, %5) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */
}

def @tvmgen_default_test_target_main_0(%test_target_0_i0: Tensor[(1, 3, 224, 224), float32] /* ty=Tensor[(1, 3, 224, 224), float32] */, %test_target_0_i1: Tensor[(16, 3, 3, 3), float32] /* ty=Tensor[(16, 3, 3, 3), float32] */, %test_target_0_i2: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_target_0_i3: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_target_0_i4: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, %test_target_0_i5: Tensor[(16), float32] /* ty=Tensor[(16), float32] */, Compiler="test_target", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_target_main_0") -> (Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) {
  %6 = nn.conv2d(%test_target_0_i0, %test_target_0_i1, padding=[1, 1, 1, 1], channels=16, kernel_size=[3, 3]) /* ty=Tensor[(1, 16, 224, 224), float32] */;
  %7 = nn.batch_norm(%6, %test_target_0_i2, %test_target_0_i3, %test_target_0_i4, %test_target_0_i5) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */;
  %8 = %7.0 /* ty=Tensor[(1, 16, 224, 224), float32] */;
  %9 = nn.relu(%8) /* ty=Tensor[(1, 16, 224, 224), float32] */;
  %10 = %7.1 /* ty=Tensor[(16), float32] */;
  %11 = %7.2 /* ty=Tensor[(16), float32] */;
  (%9, %10, %11) /* ty=(Tensor[(1, 16, 224, 224), float32], Tensor[(16), float32], Tensor[(16), float32]) */
}

测试混合单输出和多输出#

def create_graph():
    data = relay.var("data", shape=(10, 10))

    cb_1 = compiler_begin(data, "test_target")
    O_1 = relay.abs(cb_1)
    ce_2 = compiler_end(O_1, "test_target")
    O_2 = relay.nn.relu(O_1)
    ce_3 = compiler_end(O_2, "test_target")

    X = relay.tanh(ce_2)

    cb_3 = compiler_begin(ce_3, "test_target")
    cb_4 = compiler_begin(X, "test_target")
    O_3 = relay.add(cb_3, cb_4)
    ce_4 = compiler_end(O_3, "test_target")

    func = relay.Function([data], ce_4)
    return func

mod = tvm.IRModule()
mod["main"] = create_graph()
mod = transform.InferType()(mod)

partitioned = transform.PartitionGraph()(mod)
partitioned.show()
def @main(%data: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */) -> Tensor[(10, 10), float32] {
  %0 = @tvmgen_default_test_target_main_0(%data) /* ty=(Tensor[(10, 10), float32], Tensor[(10, 10), float32]) */;
  %1 = %0.1 /* ty=Tensor[(10, 10), float32] */;
  %2 = %0.0 /* ty=Tensor[(10, 10), float32] */;
  %3 = tanh(%1) /* ty=Tensor[(10, 10), float32] */;
  @tvmgen_default_test_target_main_1(%2, %3) /* ty=Tensor[(10, 10), float32] */
}

def @tvmgen_default_test_target_main_0(%test_target_0_i0: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */, Compiler="test_target", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_target_main_0") -> (Tensor[(10, 10), float32], Tensor[(10, 10), float32]) {
  %4 = abs(%test_target_0_i0) /* ty=Tensor[(10, 10), float32] */;
  %5 = nn.relu(%4) /* ty=Tensor[(10, 10), float32] */;
  (%5, %4) /* ty=(Tensor[(10, 10), float32], Tensor[(10, 10), float32]) */
}

def @tvmgen_default_test_target_main_1(%test_target_1_i0: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */, %test_target_1_i1: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */, Compiler="test_target", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_target_main_1") -> Tensor[(10, 10), float32] {
  add(%test_target_1_i0, %test_target_1_i1) /* ty=Tensor[(10, 10), float32] */
}

测试一个输出被多次使用的情况#

def expected_same_output_region():
    mod = tvm.IRModule()
    x = relay.var("x", shape=(8, 8))
    y = relay.var("y", shape=(8, 8))
    z = relay.var("z", shape=(8, 8))
    x0 = relay.var("x0", shape=(8, 8))
    y0 = relay.var("y0", shape=(8, 8))
    log = relay.log(x0)
    sub = x0 - y0
    mul = log * sub
    # The partitioned graph contains log, subtract, and multiply
    func = relay.Function([x0, y0], mul)
    func = set_func_attr(func, "ccompiler", "tvmgen_default_ccompiler_main_0")
    glb_0 = relay.GlobalVar("tvmgen_default_ccompiler_main_0")
    mod[glb_0] = func
    mod = transform.InferType()(mod)

    add = x + y
    call = relay.Call(glb_0, [add, z])
    main = relay.Function([x, y, z], call)
    mod["main"] = main
    mod = transform.InferType()(mod)
    return mod

def expected_different_output_region():
    mod = tvm.IRModule()
    x = relay.var("x", shape=(8, 8))
    y = relay.var("y", shape=(8, 8))
    z = relay.var("z", shape=(8, 8))

    # The partitioned graph contains log
    i0 = relay.var("i0", shape=(8, 8))
    log = relay.log(i0)
    func = relay.Function([i0], log)
    func = set_func_attr(func, "ccompiler", "tvmgen_default_ccompiler_main_0")
    glb_0 = relay.GlobalVar("tvmgen_default_ccompiler_main_0")
    mod[glb_0] = func
    mod = transform.InferType()(mod)

    # The partitioned graph contains subtract
    x0 = relay.var("x0", shape=(8, 8))
    y0 = relay.var("y0", shape=(8, 8))
    sub = x0 - y0
    func = relay.Function([x0, y0], sub)
    func = set_func_attr(func, "ccompiler", "tvmgen_default_ccompiler_main_1")
    glb_1 = relay.GlobalVar("tvmgen_default_ccompiler_main_1")
    mod[glb_1] = func
    mod = transform.InferType()(mod)

    add = x + y
    call_log = relay.Call(glb_0, [add])
    call_sub = relay.Call(glb_1, [add, z])
    main = relay.Function([x, y, z], call_log * call_sub)
    mod["main"] = main
    mod = transform.InferType()(mod)
    return mod

def get_mod():
    x = relay.var("x", shape=(8, 8))
    y = relay.var("y", shape=(8, 8))
    z = relay.var("z", shape=(8, 8))
    add = x + y
    sub = add - z
    log = relay.log(add)
    sub1 = log * sub
    f = relay.Function([x, y, z], sub1)
    mod = tvm.IRModule()
    mod["main"] = f
    return mod

def test_same_output_region():
    mod = get_mod()
    mod = AllowedListAnnotator(["subtract", "log", "multiply"], "ccompiler")(mod)
    mod = transform.MergeCompilerRegions()(mod)
    mod = transform.PartitionGraph()(mod)

    expected_mod = expected_same_output_region()
    tvm.ir.assert_structural_equal(mod, expected_mod, map_free_vars=True)

def test_different_output_region():
    mod = get_mod()
    mod = AllowedListAnnotator(["subtract", "log"], "ccompiler")(mod)
    mod = transform.MergeCompilerRegions()(mod)
    mod = transform.PartitionGraph()(mod)

    expected_mod = expected_different_output_region()
    tvm.ir.assert_structural_equal(mod, expected_mod, map_free_vars=True)

test_same_output_region()
test_different_output_region()

测试重复输出的情况#

target = "test_duplicate_outputs"

@tvm.ir.register_op_attr("abs", "target." + target)
def abs(expr):  # pylint: disable=unused-variable
    return True

def create_graph():
    data = relay.var("data", shape=(10, 10))
    x = relay.abs(data)
    out_1 = relay.nn.relu(x)
    out_2 = relay.tanh(x)
    out_3 = relay.log(x)
    out = relay.Tuple([out_1, out_2, out_3])
    func = relay.Function([data], out)
    return func

mod = tvm.IRModule()
mod["main"] = create_graph()

seq = tvm.transform.Sequential(
    [
        transform.AnnotateTarget(target),
        transform.MergeCompilerRegions(),
        transform.PartitionGraph(),
    ]
)
partitioned = seq(mod)
partitioned.show()
def @main(%data: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */) -> (Tensor[(10, 10), float32], Tensor[(10, 10), float32], Tensor[(10, 10), float32]) {
  %0 = @tvmgen_default_test_duplicate_outputs_main_0(%data) /* ty=Tensor[(10, 10), float32] */;
  %1 = nn.relu(%0) /* ty=Tensor[(10, 10), float32] */;
  %2 = tanh(%0) /* ty=Tensor[(10, 10), float32] */;
  %3 = log(%0) /* ty=Tensor[(10, 10), float32] */;
  (%1, %2, %3) /* ty=(Tensor[(10, 10), float32], Tensor[(10, 10), float32], Tensor[(10, 10), float32]) */
}

def @tvmgen_default_test_duplicate_outputs_main_0(%test_duplicate_outputs_0_i0: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */, Compiler="test_duplicate_outputs", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_duplicate_outputs_main_0") -> Tensor[(10, 10), float32] {
  abs(%test_duplicate_outputs_0_i0) /* ty=Tensor[(10, 10), float32] */
}

测试重复合并和 TupleGetItem 的情况#

target = "test_duplicate_merge_and_tuplegetitem"

@tvm.ir.register_op_attr("nn.batch_norm", "target." + target)
def batch_norm(expr):  # pylint: disable=unused-variable
    return True

@tvm.ir.register_op_attr("nn.relu", "target." + target)
def relu(expr):  # pylint: disable=unused-variable
    return True

def create_graph():
    data = relay.var("data", shape=(10, 10))
    bn_gamma = relay.var("bn_gamma")
    bn_beta = relay.var("bn_beta")
    bn_mmean = relay.var("bn_mean")
    bn_mvar = relay.var("bn_var")
    x = relay.nn.batch_norm(data, bn_gamma, bn_beta, bn_mmean, bn_mvar)
    out_1 = relay.nn.relu(x[0])
    bn_out_1 = x[1]
    out_2 = relay.tanh(bn_out_1)
    out_3 = relay.log(bn_out_1)
    out = relay.Tuple([out_1, out_2, out_3])
    func = relay.Function([data, bn_gamma, bn_beta, bn_mmean, bn_mvar], out)
    return func

mod = tvm.IRModule()
mod["main"] = create_graph()
mod = transform.InferType()(mod)

seq = tvm.transform.Sequential(
    [
        transform.AnnotateTarget(target),
        transform.MergeCompilerRegions(),
        transform.PartitionGraph(),
    ]
)

partitioned = seq(mod)
partitioned.show()
def @main(%data: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */, %bn_gamma: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, %bn_beta: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, %bn_mean: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, %bn_var: Tensor[(10), float32] /* ty=Tensor[(10), float32] */) -> (Tensor[(10, 10), float32], Tensor[(10), float32], Tensor[(10), float32]) {
  %0 = @tvmgen_default_test_duplicate_merge_and_tuplegetitem_main_0(%data, %bn_gamma, %bn_beta, %bn_mean, %bn_var) /* ty=(Tensor[(10, 10), float32], Tensor[(10), float32]) */;
  %1 = %0.1 /* ty=Tensor[(10), float32] */;
  %2 = %0.0 /* ty=Tensor[(10, 10), float32] */;
  %3 = tanh(%1) /* ty=Tensor[(10), float32] */;
  %4 = log(%1) /* ty=Tensor[(10), float32] */;
  (%2, %3, %4) /* ty=(Tensor[(10, 10), float32], Tensor[(10), float32], Tensor[(10), float32]) */
}

def @tvmgen_default_test_duplicate_merge_and_tuplegetitem_main_0(%test_duplicate_merge_and_tuplegetitem_0_i0: Tensor[(10, 10), float32] /* ty=Tensor[(10, 10), float32] */, %test_duplicate_merge_and_tuplegetitem_0_i1: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, %test_duplicate_merge_and_tuplegetitem_0_i2: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, %test_duplicate_merge_and_tuplegetitem_0_i3: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, %test_duplicate_merge_and_tuplegetitem_0_i4: Tensor[(10), float32] /* ty=Tensor[(10), float32] */, Compiler="test_duplicate_merge_and_tuplegetitem", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_duplicate_merge_and_tuplegetitem_main_0") -> (Tensor[(10, 10), float32], Tensor[(10), float32]) {
  %5 = nn.batch_norm(%test_duplicate_merge_and_tuplegetitem_0_i0, %test_duplicate_merge_and_tuplegetitem_0_i1, %test_duplicate_merge_and_tuplegetitem_0_i2, %test_duplicate_merge_and_tuplegetitem_0_i3, %test_duplicate_merge_and_tuplegetitem_0_i4) /* ty=(Tensor[(10, 10), float32], Tensor[(10), float32], Tensor[(10), float32]) */;
  %6 = %5.0 /* ty=Tensor[(10, 10), float32] */;
  %7 = nn.relu(%6) /* ty=Tensor[(10, 10), float32] */;
  %8 = %5.1 /* ty=Tensor[(10), float32] */;
  (%7, %8) /* ty=(Tensor[(10, 10), float32], Tensor[(10), float32]) */
}

测试常量元组#

@tvm.ir.register_op_attr("qnn.concatenate", "target.const_tuples")
def add(expr):  # pylint: disable=unused-variable
    return True

def create_graph():
    a = relay.var("a", shape=(10, 10), dtype="uint8")
    b = relay.var("b", shape=(10, 10), dtype="uint8")
    a1 = relay.abs(a)

    zeroi = relay.const(1, "int32")
    zerof = relay.const(0, "float32")
    con = relay.qnn.op.concatenate(
        (a1, b),
        input_scales=(zerof, zerof),
        input_zero_points=(zeroi, zeroi),
        output_scale=zerof,
        output_zero_point=zeroi,
        axis=1,
    )

    f = relay.Function([a, b], con)
    mod = tvm.IRModule.from_expr(f)
    mod = transform.InferType()(mod)
    return mod

seq = tvm.transform.Sequential(
    [
        transform.AnnotateTarget("const_tuples"),
        transform.InferType(),
        transform.MergeCompilerRegions(),
        transform.PartitionGraph(),
    ]
)

partitioned = seq(create_graph())

concat = partitioned["tvmgen_default_const_tuples_main_0"].body
assert type(concat.args[1]) == relay.Tuple
assert type(concat.args[2]) == relay.Tuple
assert type(concat.args[3]) == relay.Constant
assert type(concat.args[4]) == relay.Constant

测试扁平化元组输出#

target = "test_flatten_tuple_output"

@tvm.ir.register_op_attr("split", "target." + target)
def split(expr):  # pylint: disable=unused-variable
    return True

@tvm.ir.register_op_attr("abs", "target." + target)
def abs(expr):  # pylint: disable=unused-variable
    return True

def create_graph():
    a = relay.var("a", shape=(10, 10), dtype="uint8")

    a_split = relay.split(a, 2)
    a_split_0 = relay.TupleGetItem(a_split.astuple(), 0)
    a_split_0_abs = relay.abs(a_split_0)

    a_con = relay.concatenate(a_split, 0)
    a_split_0_relu = relay.nn.relu(a_split_0_abs)

    out = relay.Tuple((a_con, a_split_0_relu))
    f = relay.Function([a], out)
    mod = tvm.IRModule.from_expr(f)
    mod = transform.InferType()(mod)
    return mod
seq = tvm.transform.Sequential(
    [
        transform.AnnotateTarget(target),
        transform.MergeCompilerRegions(),
        transform.PartitionGraph(),
    ]
)

partitioned = seq(create_graph())
partitioned = transform.InferType()(partitioned)
partitioned.show()
def @main(%a: Tensor[(10, 10), uint8] /* ty=Tensor[(10, 10), uint8] */) -> (Tensor[(10, 10), uint8], Tensor[(5, 10), uint8]) {
  %0 = @tvmgen_default_test_flatten_tuple_output_main_0(%a) /* ty=(Tensor[(5, 10), uint8], Tensor[(5, 10), uint8], Tensor[(5, 10), uint8]) */;
  %1 = %0.0 /* ty=Tensor[(5, 10), uint8] */;
  %2 = %0.1 /* ty=Tensor[(5, 10), uint8] */;
  %3 = (%1, %2) /* ty=(Tensor[(5, 10), uint8], Tensor[(5, 10), uint8]) */;
  %4 = %0.2 /* ty=Tensor[(5, 10), uint8] */;
  %5 = concatenate(%3) /* ty=Tensor[(10, 10), uint8] */;
  %6 = nn.relu(%4) /* ty=Tensor[(5, 10), uint8] */;
  (%5, %6) /* ty=(Tensor[(10, 10), uint8], Tensor[(5, 10), uint8]) */
}

def @tvmgen_default_test_flatten_tuple_output_main_0(%test_flatten_tuple_output_0_i0: Tensor[(10, 10), uint8] /* ty=Tensor[(10, 10), uint8] */, Compiler="test_flatten_tuple_output", Primitive=1, Inline=1, global_symbol="tvmgen_default_test_flatten_tuple_output_main_0") -> (Tensor[(5, 10), uint8], Tensor[(5, 10), uint8], Tensor[(5, 10), uint8]) {
  %7 = split(%test_flatten_tuple_output_0_i0, indices_or_sections=meta[runtime.BoxInt][0]) /* ty=(Tensor[(5, 10), uint8], Tensor[(5, 10), uint8]) */;
  %8 = %7.0 /* ty=Tensor[(5, 10), uint8] */;
  %9 = %7.0 /* ty=Tensor[(5, 10), uint8] */;
  %10 = %7.1 /* ty=Tensor[(5, 10), uint8] */;
  %11 = abs(%8) /* ty=Tensor[(5, 10), uint8] */;
  (%9, %10, %11) /* ty=(Tensor[(5, 10), uint8], Tensor[(5, 10), uint8], Tensor[(5, 10), uint8]) */
}

测试元组输出的执行#

测试具有元组输出的子图的 C 代码生成和运行时。

a = relay.var("a", shape=(10, 10), dtype="float32")
b = relay.var("b", shape=(10, 10), dtype="float32")
ba = relay.annotation.compiler_begin(a, "ccompiler")
bb = relay.annotation.compiler_begin(b, "ccompiler")
add = relay.add(ba, bb)
sub = relay.subtract(ba, bb)
out = relay.Tuple((add, sub))
eout = relay.annotation.compiler_end(out, "ccompiler")
func = relay.Function([a, b], eout)

mod = tvm.IRModule()
mod["main"] = func
mod = transform.InferType()(mod)
mod = transform.PartitionGraph()(mod)

a_data = np.random.rand(10, 10).astype("float32")
b_data = np.random.rand(10, 10).astype("float32")

check_result(
    mod,
    {"a": a_data, "b": b_data},
    [(10, 10), (10, 10)],
    [(a_data + b_data), (a_data - b_data)],
)
[17:40:17] /media/pc/data/lxw/ai/tvm/src/relay/backend/vm/compiler.cc:1199: All lowered functions have been build by BYOC -- generating an empty TVM module
/tmp/ipykernel_2738903/3034817802.py:45: DeprecationWarning: legacy graph executor behavior of producing json / lib / params will be removed in the next release. Please see documents of tvm.contrib.graph_executor.GraphModule for the  new recommended usage.
  json, lib, param = relay.build(mod, target=target, params=params, runtime=runtime)

测试外部优化#

def Optimize(mod):
    return relay.transform.FoldConstant()(mod)

tvm.register_func("relay.ext.test_target.optimize", Optimize)

x = relay.var("x", shape=(2, 2))
y0 = relay.var("y0", shape=(2, 2))
y1 = relay.var("y1", shape=(2, 2))
yy0 = relay.annotation.compiler_begin(y0, "test_target")
yy1 = relay.annotation.compiler_begin(y1, "test_target")
z = yy0 + yy1
end = relay.annotation.compiler_end(z, "test_target")
f = relay.Function([x, y0, y1], end * x)
c = np.ones(shape=(2, 2), dtype="float32")
f = bind_params_by_name(f, {"y0": tvm.nd.array(c), "y1": tvm.nd.array(c)})
mod = tvm.IRModule()
mod["main"] = f
mod = transform.InferType()(mod)
mod = transform.PartitionGraph()(mod)

try:
    t0 = mod["tvmgen_default_test_target_main_0"]
except:
    raise KeyError("test_target_main_0 not found")

assert isinstance(t0.body, relay.Constant)
expected = np.empty([2, 2])
expected.fill(2)
tvm.testing.assert_allclose(t0.body.data.numpy(), expected, rtol=1e-5, atol=1e-5)

测试保留类型导入#

测试确保在 BYOC 管道中保留类型定义和导入。

from tvm.relay.prelude import Prelude, StaticTensorArrayOps

def run(dtype, shape):
    mod = tvm.IRModule()
    p = Prelude(mod)
    static_tensor_array_ops = StaticTensorArrayOps(p, dtype, shape)
    static_tensor_array_ops.register()

    tensor_array = p.get_global_var_static("tensor_array", dtype, shape)
    tensor = p.get_tensor_ctor_static("tensor_constructor", dtype, shape)
    write = p.get_global_var_static("tensor_array_write", dtype, shape)
    gather = p.get_global_var_static("tensor_array_gather", dtype, shape)
    v = relay.var("v")
    indice = relay.var("indice")
    init_tensor_array = tensor_array(relay.const(3))
    tensor_array1 = write(init_tensor_array, relay.const(0), tensor(v))
    tensor_array2 = write(tensor_array1, relay.const(1), tensor(v))
    tensor_array3 = write(tensor_array2, relay.const(2), tensor(v))
    out = gather(tensor_array3, indice)
    mod["main"] = relay.Function([v, indice], out)
    mod = transform.RemoveUnusedFunctions()(mod)
    mod = transform.PartitionGraph()(mod)

run("float32", [2, 3])

测试不绑定常量#

def get_net(prefix, data, out_channel):
    weight = relay.var(prefix + "weight")
    bn_gamma = relay.var(prefix + "bn_gamma")
    bn_beta = relay.var(prefix + "bn_beta")
    bn_mmean = relay.var(prefix + "bn_mean")
    bn_mvar = relay.var(prefix + "bn_var")

    layer = relay.nn.conv2d(
        data=data, weight=weight, kernel_size=(3, 3), channels=out_channel, padding=(1, 1)
    )
    bn_output = relay.nn.batch_norm(layer, bn_gamma, bn_beta, bn_mmean, bn_mvar)
    out = relay.nn.relu(bn_output[0])
    return relay.Function(relay.analysis.free_vars(out), out)

def get_partitoned_mod(mod, params, pattern_table, bind_constants):
    mod["main"] = bind_params_by_name(mod["main"], params)
    remove_bn_pass = tvm.transform.Sequential(
        [
            transform.InferType(),
            transform.SimplifyInference(),
            transform.FoldConstant(),
            transform.FoldScaleAxis(),
        ]
    )
    composite_partition = tvm.transform.Sequential(
        [
            remove_bn_pass,
            transform.MergeComposite(pattern_table),
            transform.AnnotateTarget("dnnl"),
            transform.PartitionGraph(bind_constants=bind_constants),
        ]
    )

    with tvm.transform.PassContext(opt_level=3, disabled_pass=["AlterOpLayout"]):
        return composite_partition(mod)

data = relay.var("data", relay.TensorType((1, 3, 224, 224), "float32"))
net = get_net("block_", data, 8)
mod, params = tvm.relay.testing.create_workload(net)

mod = get_partitoned_mod(mod, params, get_pattern_table("dnnl"), bind_constants=True)
len(mod["main"].body.args) == 1

mod = get_partitoned_mod(mod, params, get_pattern_table("dnnl"), bind_constants=False)
len(mod["main"].body.args) == 3
False