def _run(env, remote):
m = 2
n = 8
imm_shift = np.random.randint(0, 8)
imm_scale = np.random.randint(1, 5)
# compute
a = te.placeholder((m, n, env.BATCH, env.BLOCK_OUT), name="a", dtype=env.acc_dtype)
a_buf = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT), lambda *i: a(*i), "a_buf"
) # DRAM->SRAM
res_shift = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT), lambda *i: a_buf(*i) + imm_shift, "res_shift"
) # compute
res_scale = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT), lambda *i: res_shift(*i) >> imm_scale, "res_scale"
) # compute
res = te.compute(
(m, n, env.BATCH, env.BLOCK_OUT), lambda *i: res_scale(*i).astype(env.inp_dtype), "res"
) # SRAM->DRAM
# schedule
s = te.create_schedule(res.op)
s[a_buf].set_scope(env.acc_scope) # SRAM
s[res_shift].set_scope(env.acc_scope) # SRAM
s[res_scale].set_scope(env.acc_scope) # SRAM
s[a_buf].pragma(a_buf.op.axis[0], env.dma_copy) # DRAM->SRAM
s[res_shift].pragma(res_shift.op.axis[0], env.alu) # compute
s[res_scale].pragma(res_scale.op.axis[0], env.alu) # compute
s[res].pragma(res.op.axis[0], env.dma_copy) # SRAM->DRAM
# build
mod = vta.build(s, [a, res], tvm.target.Target("ext_dev", host=env.target_host))
if not remote:
return
temp = tempdir()
mod.save(temp.relpath("load_act.o"))
remote.upload(temp.relpath("load_act.o"))
f = remote.load_module("load_act.o")
# verify
dev = remote.ext_dev(0)
a_np = np.random.randint(-10, 10, size=(m, n, env.BATCH, env.BLOCK_OUT)).astype(a.dtype)
res_np = np.right_shift((a_np + imm_shift), imm_scale)
res_np = res_np.astype(res.dtype)
a_nd = tvm.nd.array(a_np, dev)
res_nd = tvm.nd.array(np.zeros((m, n, env.BATCH, env.BLOCK_OUT)).astype(res.dtype), dev)
if env.TARGET in ["sim", "tsim"]:
simulator.clear_stats()
f(a_nd, res_nd)
np.testing.assert_equal(res_np, res_nd.numpy())
if env.TARGET in ["sim", "tsim"]:
sim_stats = simulator.stats()
print("Shift and scale execution statistics:")
for k, v in sim_stats.items():
print("\t{:<16}: {:>16}".format(k, v))
vta.testing.run(_run)