Relay 后端解释器

Relay 后端解释器#

参考:tvm/tests/python/relay/test_backend_interpreter.py

%cd ..
import testing
/media/pc/data/lxw/ai/tvm-book/doc/read/relay
import numpy as np
import tvm
from tvm.testing import device_enabled
from tvm import nd
from tvm import relay
from tvm.runtime import container
from tvm.relay.backend.interpreter import RefValue, ConstructorValue
from tvm.relay.scope_builder import ScopeBuilder
def check_eval(expr, args, expected_result, mod=None, rtol=1e-07):
    # TODO(tqchen) add more types once the schedule register is fixed.
    for target in ["llvm"]:
        dev = tvm.device(target, 0)
        if not device_enabled(target):
            return
        func = relay.create_executor(mod=mod, device=dev, target=target).evaluate(expr)
        result = func if args is None else func(*args)
        # use testing which also set atol
        np.testing.assert_allclose(result.numpy(), expected_result, rtol=rtol)
def test_tuple_value():
    tv = container.tuple_object([relay.const(1), relay.const(2), relay.const(3)])
    np.testing.assert_allclose(tv[0].data.numpy(), 1)
    np.testing.assert_allclose(tv[1].data.numpy(), 2)
    np.testing.assert_allclose(tv[2].data.numpy(), 3)


def test_tuple_getitem():
    two = relay.add(relay.const(1), relay.const(1))
    func = relay.Function([], relay.TupleGetItem(relay.Tuple([relay.const(1), relay.const(2)]), 0))
    check_eval(func, [], 1)


def test_id():
    x = relay.var("x", "float32")
    ident = relay.Function([x], x)
    one = np.array(1.0, "float32")
    check_eval(ident, [one], one)


def test_add_const():
    two = relay.add(relay.const(1), relay.const(1))
    func = relay.Function([], two)
    check_eval(func, [], 2)


def test_mul_param():
    x = relay.var("x", shape=(10, 10))
    y = relay.var("y", shape=(1, 10))
    func = relay.Function([x, y], relay.multiply(x, y))
    x_data = np.random.rand(10, 10).astype("float32")
    y_data = np.random.rand(1, 10).astype("float32")
    check_eval(func, [x_data, y_data], x_data * y_data)


def test_equal():
    i = relay.var("i", shape=[], dtype="int32")
    j = relay.var("i", shape=[], dtype="int32")
    z = relay.equal(i, j)
    func = relay.Function([i, j], z, ret_type=relay.TensorType([], "bool"))
    i_data = relay.const(0, "int32")
    j_data = relay.const(0, "int32")
    check_eval(func, [i_data, j_data], True)


def test_subtract():
    i = relay.var("i", shape=[], dtype="int32")
    sub = relay.subtract(i, relay.const(1, dtype="int32"))
    func = relay.Function([i], sub, ret_type=relay.TensorType([], "int32"))
    i_data = np.array(1, dtype="int32")
    check_eval(func, [i_data], 0)


def test_simple_loop():
    mod = tvm.IRModule({})
    sum_up = relay.GlobalVar("sum_up")
    i = relay.var("i", shape=[], dtype="int32")
    sb = ScopeBuilder()
    with sb.if_scope(relay.equal(i, relay.const(0, dtype="int32"))):
        sb.ret(i)
    with sb.else_scope():
        one_less = relay.subtract(i, relay.const(1, dtype="int32"))
        rec_call = relay.Call(sum_up, [one_less])
        sb.ret(relay.add(rec_call, i))
    func = relay.Function([i], sb.get(), ret_type=relay.TensorType([], "int32"))
    mod[sum_up] = func
    i_data = np.array(10, dtype="int32")
    check_eval(sum_up, [i_data], sum(range(1, 11)), mod=mod)


def test_loop():
    mod = tvm.IRModule({})
    sum_up = relay.GlobalVar("sum_up")
    i = relay.var("i", shape=[], dtype="int32")
    accum = relay.var("accum", shape=[], dtype="int32")
    sb = ScopeBuilder()
    with sb.if_scope(relay.equal(i, relay.const(0, "int32"))):
        sb.ret(accum)
    with sb.else_scope():
        one_less = relay.subtract(i, relay.const(1, "int32"))
        new_accum = relay.add(accum, i)
        sb.ret(relay.Call(sum_up, [one_less, new_accum]))
    func = relay.Function([i, accum], sb.get())
    mod[sum_up] = func
    i_data = np.array(10, dtype="int32")
    accum_data = np.array(0, dtype="int32")
    check_eval(sum_up, [i_data, accum_data], sum(range(1, 11)), mod=mod)


def test_ref():
    mod = tvm.IRModule()
    three_with_ref = relay.GlobalVar("three_with_ref")
    i = relay.Var("i")
    iv = relay.Var("iv")
    u = relay.Var("u")
    uv = relay.Var("uv")
    body = relay.add(iv, uv)
    body = relay.Let(uv, relay.RefRead(i), body)
    body = relay.Let(u, relay.RefWrite(i, relay.const(2)), body)
    body = relay.Let(iv, relay.RefRead(i), body)
    body = relay.Let(i, relay.RefCreate(relay.const(1)), body)
    mod[three_with_ref] = relay.Function([], body)
    check_eval(three_with_ref, [], 3, mod=mod)


def test_binds():
    x = relay.var("x")
    y = relay.add(x, x)
    xx = np.ones((10, 20))
    res = relay.create_executor().evaluate(y, binds={x: xx}).numpy()
    np.testing.assert_allclose(xx + xx, res)


def test_kwargs_params():
    x = relay.var("x", shape=(1, 10))
    y = relay.var("y", shape=(1, 10))
    z = relay.var("z", shape=(1, 10))
    f = relay.Function([x, y, z], x + y + z)
    x_data = np.random.rand(1, 10).astype("float32")
    y_data = np.random.rand(1, 10).astype("float32")
    z_data = np.random.rand(1, 10).astype("float32")
    params = {"y": y_data, "z": z_data}
    res = relay.create_executor().evaluate(f)(x_data, **params)
    np.testing.assert_allclose(res.numpy(), x_data + y_data + z_data)


def test_function_taking_adt_ref_tuple():
    mod = tvm.IRModule()
    prelude = relay.prelude.Prelude(mod)
    _, cons, nil = prelude.mod.get_type("List")

    nil_value = ConstructorValue(nil.tag, [], nil)
    cons_value = ConstructorValue(
        cons.tag,
        [nd.array(np.random.rand(1, 10).astype("float32")), nil_value],
        cons,
    )

    ref_value = RefValue(nd.array(np.random.rand(1, 10).astype("float32")))
    tuple_value = container.tuple_object(
        [nd.array(np.random.rand(1, 10).astype("float32")) for _ in range(10)]
    )

    id_func = relay.create_executor(mod=mod).evaluate(prelude.id)

    res_nil = id_func(nil_value)
    assert res_nil.tag == nil_value.tag
    assert len(res_nil.fields) == 0

    res_cons = id_func(cons_value)
    assert res_cons.tag == cons_value.tag
    assert len(res_cons.fields) == len(cons_value.fields)
    np.testing.assert_allclose(res_cons.fields[0].numpy(), cons_value.fields[0].numpy())
    assert isinstance(res_cons.fields[1], ConstructorValue)
    assert res_cons.fields[1].tag == nil.tag
    assert len(res_cons.fields[1].fields) == 0

    res_ref = id_func(ref_value)
    np.testing.assert_allclose(res_ref.value.numpy(), ref_value.value.numpy())

    res_tuple = id_func(tuple_value)
    for i in range(10):
        np.testing.assert_allclose(res_tuple[i].numpy(), tuple_value[i].numpy())


def test_tuple_passing():
    x = relay.var(
        "x",
        type_annotation=relay.ty.TupleType(
            [relay.ty.TensorType((), "int64"), relay.ty.TensorType((), "int64")]
        ),
    )

    fn = relay.Function([x], relay.expr.TupleGetItem(x, 0))
    mod = tvm.IRModule({})
    gv = relay.GlobalVar("main")
    mod[gv] = fn
    mod = relay.transform.InferType()(mod)

    dev = tvm.cpu()
    target = tvm.target.Target("llvm")
    f = relay.create_executor(mod=mod, device=dev, target=target).evaluate(gv)
    # First use a Python tuple.
    out = f((10, 8))
    np.testing.assert_allclose(out.numpy(), np.array(10))
    # Second use a tuple value.
    value_tuple = container.tuple_object([nd.array(np.array(11)), nd.array(np.array(12))])
    out = f(value_tuple)
    np.testing.assert_allclose(out.numpy(), np.array(11))


def test_dynamic():
    n = 3
    m = 2
    x = relay.Var("x", relay.TensorType([relay.Any(), m], "float32"))
    y = relay.Var("y", relay.TensorType([relay.Any(), m], "float32"))
    xx = x - relay.expr.const(3.0)
    yy = y * relay.expr.const(5.0)
    z = relay.op.concatenate([xx, yy], axis=0)
    mod = tvm.IRModule()
    mod["main"] = relay.Function([x, y], z)
    x_np = np.random.uniform(size=(n, m)).astype("float32")
    y_np = np.random.uniform(size=(n, m)).astype("float32")
    expected = np.concatenate([x_np - 3.0, y_np * 5.0], axis=0)
    check_eval(None, [x_np, y_np], expected, mod)


def test_ref_global_from_expr():
    n = 3
    x = relay.Var("x", relay.TensorType([n], "float32"))
    y = relay.Var("y", relay.TensorType([n], "float32"))
    mod = tvm.IRModule()
    mod["add"] = relay.Function([x, y], relay.add(x, y))
    x_np = np.random.uniform(size=(n,)).astype("float32")
    y_np = np.random.uniform(size=(n,)).astype("float32")
    expected = np.add(x_np, y_np)
    expr = relay.Call(mod.get_global_var("add"), [relay.const(x_np), relay.const(y_np)])
    check_eval(expr, None, expected, mod)


def test_keyword_args():
    n = 3
    x = relay.Var("x", relay.TensorType([n], "float32"))
    y = relay.Var("y", relay.TensorType([n], "float32"))
    z = relay.add(x, y)
    mod = tvm.IRModule()
    mod["main"] = relay.Function([x, y], z)
    x_np = np.random.uniform(size=(n,)).astype("float32")
    y_np = np.random.uniform(size=(n,)).astype("float32")
    expected = np.add(x_np, y_np)
    actual = relay.create_executor(mod=mod).evaluate()(y=y_np, x=x_np)
    np.testing.assert_allclose(actual.numpy(), expected)
# TODO(mbs): Support? Would help reduce wasted work when we need to prepare
# multiple functions w.r.t. the same module.
# "closures are currently not directly Python callable"
def test_functional_returns():
    n = 3
    x = relay.Var("x", relay.TensorType([n], "float32"))
    f = relay.Function([x], x)
    t = relay.Tuple([f, f])
    c = np.random.rand(n).astype("float32")
    result1, result2 = relay.create_executor().evaluate(t)
    testing.assert_allclose(result1(c).numpy(), c)
    testing.assert_allclose(result2(c).numpy(), c)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[5], line 7
      5 c = np.random.rand(n).astype("float32")
      6 result1, result2 = relay.create_executor().evaluate(t)
----> 7 np.testing.assert_allclose(result1(c).numpy(), c)
      8 np.testing.assert_allclose(result2(c).numpy(), c)

TypeError: 'Object' object is not callable