-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathverify_on_device.py
133 lines (113 loc) · 5.81 KB
/
verify_on_device.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Interaction with the python interpreter: Inserting path to the deployed driver
import sys
# YAML for loading experiment configurations
import yaml
# Numpy for handling arrays (inputs/outputs to/from the model)
import numpy as np
# For some reason, pynq must be imported here - exactly here, otherwise the
# script fails freeing some allocated buffers at the end (after everything else
# passed successfully)
import pynq # noqa: This is not really used except for fixning this weird bug
# QONNX wrapper around ONNX models
from qonnx.core.modelwrapper import ModelWrapper
# Convert ONNX nodes to QONNX CustomOp instances
from qonnx.custom_op.registry import getCustomOp
# Extracts the model and sets up the accelerator from the dataflow parent model
def extract_and_setup_model(parent, accelerator): # noqa: Shadows outer scope
# Assumption: The whole graph has three nodes: The MultiThreshold operation
# quantizing the input, the StreamingDataflowPartition corresponding to the
# FPGA accelerator and a Mul node de-quantizing the output
assert len(parent.graph.node), "To many node in the dataflow parent graph"
# Function wrapping the input quantization as it is described by the model
def quantize(x):
# The multi thresholds must be the first node of the graph
multithreshold = parent.graph.node[0]
# Check whether this is indeed the thresholding quantization
assert multithreshold.op_type == "MultiThreshold", \
f"First node must be MultiThreshold: {multithreshold.name}"
# Get the quantization thresholds which should be stored as an
# initializer tensor within the model graph
thresholds = parent.get_initializer(multithreshold.input[1])
# Prepare the input execution context
context = {
multithreshold.input[0]: x, multithreshold.input[1]: thresholds
}
# Execute the node on the input context, writing the result back into
# the context
getCustomOp(multithreshold).execute_node(context, parent.graph)
# Extract the output from the execution context
return context[multithreshold.output[0]]
# Function wrapping the output de-quantization as it is described by the
# model
def dequantize(x):
# The de-quantization multiplication node of the graph
mul = parent.graph.node[2]
# Check whether this is indeed the mul de-quantization
assert mul.op_type == "Mul", f"last node must be Mul: {mul.name}"
# Get the de-quantization scale which should be stored as an initializer
# tensor within the model graph
scale = parent.get_initializer(mul.input[1])
# Apply the de-quantization scale to the tensor
return scale * x
# Wrap the whole model as a function
def model(x): # noqa: Shadows model from outer scope
# Chain calls to the quantization/de-quantization and accelerator parts
return dequantize(accelerator.execute(quantize(x)))
# Return the model stitching software and accelerator parts in a simple
# python function interface
return model
# Adds the batch dimension at the front of a shape
def add_batch(shapes):
return [(1, *shape) for shape in shapes]
# Script entrypoint
if __name__ == "__main__":
# Open the configuration file
with open("params.yaml") as file:
# Load the configuration from yaml format
params = yaml.safe_load(file)
# Path to the build output directory
build = f"{params['build']['output_dir']}"
# Load the parent model of the build dataflow accelerator
parent = ModelWrapper(f"{build}/intermediate_models/dataflow_parent.onnx")
# Path to the deployment package generated by FINN
deploy = f"{build}/deploy/"
# Add the path to the deployed driver to the search path
sys.path.append(f"{deploy}/driver")
# Import the accelerator overlay and configuration from the deployed driver
from driver import FINNExampleOverlay, io_shape_dict, Device # noqa
# Patch the I/O shapes to reintroduce the batch size
io_shape_dict["ishape_normal"] = add_batch(io_shape_dict["ishape_normal"])
io_shape_dict["oshape_normal"] = add_batch(io_shape_dict["oshape_normal"])
io_shape_dict["ishape_folded"] = add_batch(io_shape_dict["ishape_folded"])
io_shape_dict["oshape_folded"] = add_batch(io_shape_dict["oshape_folded"])
io_shape_dict["ishape_packed"] = add_batch(io_shape_dict["ishape_packed"])
io_shape_dict["oshape_packed"] = add_batch(io_shape_dict["oshape_packed"])
# Load the verification input/output pair
inp = np.load("inp.npy")
out = np.load("out.npy")
# Extract the batch size from the verification input/output pair
batch_size = inp.shape[0]
# Load the accelerator overlay
accelerator = FINNExampleOverlay(
# Path to the accelerator bitfile built by FINN
bitfile_name=f"{deploy}/bitfile/finn-accel.bit",
# Dictionary describing the I/O of the FINN-generated accelerator
io_shape_dict=io_shape_dict,
# Path to folder containing runtime-writable .dat weights
runtime_weight_dir=f"{deploy}/driver/runtime_weights/",
# Default to the device at index 0 for now...
device=Device.devices[0],
# Target platform: zynq-iodma or alveo
platform="zynq-iodma",
# Size of the verification batch input/output pair
batch_size=batch_size
)
# Extract the software parts of the model and sticht together with the
# accelerator part
model = extract_and_setup_model(parent, accelerator)
# Run the verification input through the model
y = model(inp)
# Compare the output produced by the model to the expected output
assert np.allclose(y, out), "Produced and expected output do not match"
# Just print some success message for this simple dummy
print("Verification on device: SUCCESS")