"""
Helper functions for reading and writing hardware files.
Each of the functions in inputoutput take a block and a file descriptor. The functions
provided either read the file and update the Block accordingly, or write information
from the Block out to the file.
"""
from __future__ import annotations
import collections
import functools
import operator
import os
import re
import subprocess
import sys
import tempfile
from typing import IO, TYPE_CHECKING
from pyrtl.core import Block, _NameSanitizer, working_block
from pyrtl.corecircuits import concat_list, rtl_all, rtl_any, select
from pyrtl.gate_graph import Gate, GateGraph
from pyrtl.memory import MemBlock, RomBlock
from pyrtl.passes import one_bit_selects, two_way_concat
from pyrtl.pyrtlexceptions import PyrtlError, PyrtlInternalError
from pyrtl.wire import Const, Input, Output, Register, WireVector, next_tempvar_name
if TYPE_CHECKING:
from pyrtl.core import LogicNet
from pyrtl.simulation import SimulationTrace
def _natural_sort_key(key: str) -> list:
"""Convert the key into a form such that it will be sorted naturally, e.g. such that
"tmp4" appears before "tmp18".
For example, given "a1b2" as input, this will return ['a', 1, 'b', 2, ''].
"""
def convert(text):
return int(text) if text.isdigit() else text
return [convert(c) for c in re.split(r"(\d+)", key)]
def _net_sorted(logic: set[LogicNet], name_mapper=lambda w: w.name) -> list[LogicNet]:
# Sort nets based on the name of the destination wire, unless it's a memory write
# net.
def natural_keys(n):
if n.op == "@":
# Sort based on the name of the wr_en wire, since this particular net is
# used within 'always begin ... end' blocks for memory update logic.
key = str(n.args[2])
else:
key = name_mapper(n.dests[0])
return _natural_sort_key(key)
return sorted(logic, key=natural_keys)
def _name_sorted(
wires: set[WireVector], name_mapper=lambda w: w.name
) -> list[WireVector]:
return sorted(wires, key=lambda w: _natural_sort_key(name_mapper(w)))
# -----------------------------------------------------------------
# __ ___
# |__) | | |___
# |__) |___ | |
class Subcircuit:
"""
This is a way to create and track per-module-instance wire names, so there are not
name clashes when we instantiate a module more than once.
"""
def __init__(self, model, is_top=False, clk_set=None, block=None):
if clk_set is None:
clk_set = {"clk"}
self.model = model
self.is_top = is_top
self.clk_set = clk_set
self.block = working_block(block)
self.inputs = {}
self.outputs = {}
self.wirevector_by_name = {}
def add_input(self, original_name, wire):
self.inputs[original_name] = wire
self.wirevector_by_name[original_name] = wire
def add_output(self, original_name, wire):
self.outputs[original_name] = wire
self.wirevector_by_name[original_name] = wire
def add_reg(self, original_name, wire):
self.wirevector_by_name[original_name] = wire
def add_clock(self, clock_name):
self.clk_set.add(clock_name)
def twire(self, x):
"""Find or make wire named x and return it."""
s = self.wirevector_by_name.get(x)
if s is None:
# Purposefully *not* setting its name to 'x', so we don't have name clashes
s = WireVector(bitwidth=1)
self.wirevector_by_name[x] = s
return s
# ----------------------------------------------------------------
# ___ __ __ __
# \ / |__ |__) | | / \ / _`
# \/ |___ | \ | |___ \__/ \__>
#
class _VerilogSanitizer(_NameSanitizer):
_ver_regex = r"[_A-Za-z][_a-zA-Z0-9\$]*$"
_verilog_reserved = """always and assign automatic begin buf bufif0 bufif1 case
casex casez cell cmos config deassign default defparam design disable edge else
end endcase endconfig endfunction endgenerate endmodule endprimitive endspecify
endtable endtask event for force forever fork function generate genvar highz0
highz1 if ifnone incdir include initial inout input instance integer join large
liblist library localparam macromodule medium module nand negedge nmos nor
noshowcancelledno not notif0 notif1 or output parameter pmos posedge primitive
pull0 pull1 pulldown pullup pulsestyle_oneventglitch pulsestyle_ondetectglitch
remos real realtime reg release repeat rnmos rpmos rtran rtranif0 rtranif1
scalared showcancelled signed small specify specparam strong0 strong1 supply0
supply1 table task time tran tranif0 tranif1 tri tri0 tri1 triand trior trireg
unsigned use vectored wait wand weak0 weak1 while wire wor xnor xor
"""
def __init__(self, internal_prefix):
self._verilog_reserved_set = frozenset(self._verilog_reserved.split())
super().__init__(self._ver_regex, internal_prefix, self._extra_checks)
def _extra_checks(self, str):
return (
str not in self._verilog_reserved_set # is not a Verilog reserved keyword
and str != "clk" # not the clock signal
and len(str) <= 1024 # not too long to be a Verilog id
)
class _VerilogOutput:
def __init__(self, block: Block, add_reset: bool | str, module_name="toplevel"):
block = working_block(block)
self.gate_graph = GateGraph(block)
self.add_reset = add_reset
self.module_name = module_name
if not isinstance(self.add_reset, bool) and self.add_reset != "asynchronous":
msg = (
f"Invalid add_reset option {self.add_reset}. Acceptable options are "
"False, True, and 'asynchronous'"
)
raise PyrtlError(msg)
if self.add_reset and self.gate_graph.get_gate("rst") is not None:
msg = (
"Found a user-defined wire named 'rst'. Pass in 'add_reset=False' to "
"use your existing reset logic."
)
raise PyrtlError(msg)
self.internal_names = _VerilogSanitizer("_ver_out_tmp_")
def gate_key(gate: Gate) -> str:
"""Sort Gates by name.
MemBlocks have no name, so use their ``wr_en`` name instead.
"""
if gate.name:
return gate.name
return gate.args[2].name
# Sanitize all Gate names.
for gate in sorted(self.gate_graph.gates, key=gate_key):
if gate.name:
self.internal_names.make_valid_string(gate.name)
self.inputs = self._name_sorted(self.gate_graph.inputs)
self.outputs = self._name_sorted(self.gate_graph.outputs)
self.io_list = [
"clk",
*[self._verilog_name(input.name) for input in self.inputs],
*[self._verilog_name(output.name) for output in self.outputs],
]
if self.add_reset:
self.io_list.insert(1, "rst")
if any(io_name.startswith("tmp") for io_name in self.io_list):
msg = 'input or output with name starting with "tmp" indicates unnamed IO'
raise PyrtlError(msg)
self.registers = self._name_sorted(self.gate_graph.registers)
# List of all unique MemBlocks and RomBlocks, sorted by memid.
self.all_memblocks = sorted(
# Build a set of unique MemBlocks first, to avoid duplicates.
{
mem_gate.mem
for mem_gate in self.gate_graph.mem_reads | self.gate_graph.mem_writes
},
key=lambda memblock: memblock.id,
)
# Sanitize all MemBlock names.
for memblock in self.all_memblocks:
self.internal_names.make_valid_string(memblock.name)
# List of unique MemBlocks (not RomBlocks!), sorted by memid.
self.memblocks = [
memblock for memblock in self.all_memblocks if type(memblock) is MemBlock
]
# List of unique RomBlocks (not MemBlocks!), sorted by memid.
self.romblocks = [
romblock
for romblock in self.all_memblocks
if isinstance(romblock, RomBlock)
]
def _verilog_name(self, name: str) -> str:
"""Return the sanitized Verilog identifier name for ``name``."""
return self.internal_names[name]
def _name_sorted(self, gates: set[Gate]) -> list[Gate]:
def name_mapper(gate: Gate) -> str:
if gate.name is None:
# MemBlock writes have no name, so instead sort by the name of
# ``wr_en``, since this particular net is used within 'always begin ...
# end' blocks for memory update logic.
return gate.args[2].name
return self._verilog_name(gate.name)
return _name_sorted(gates, name_mapper=name_mapper)
def _verilog_size(self, bitwidth: int) -> str:
"""Return a Verilog size declaration for ``bitwidth`` bits."""
return "" if bitwidth == 1 else f"[{bitwidth - 1}:0]"
def _is_sliced(self, gate: Gate) -> bool:
"""Return True iff gate has bitwidth 2 or more, and is an arg to a bit-slice.
Such gates must be declared, because Verilog's bit-selection operator ``[]``
only works on wires and registers, not arbitrary expressions.
"""
return (
gate.bitwidth
and gate.bitwidth > 1
and any(dest.op == "s" for dest in gate.dests)
)
def _should_declare_const(self, gate: Gate) -> bool:
"""Determine if we should declare a constant Verilog wire for ``gate``.
This function determines which constant Gates will be declared in Verilog. Any
constant gate without a corresponding Verilog declaration will be inlined.
Constant Verilog gates are declared for:
1. Named constant ``Gates``.
2. ``Gates`` that are ``args`` for a ``s`` bit-selection ``Gate``, because
Verilog's bit-selection operator ``[]`` only works on wires and registers,
not arbitrary expressions.
"""
is_const = gate.op == "C"
is_named = not gate.name.startswith("const_")
is_sliced = self._is_sliced(gate)
return is_const and (is_named or is_sliced)
def _should_declare_wire(self, gate: Gate) -> bool:
"""Determine if we should declare a temporary Verilog wire for ``gate``.
This function determines which temporary Gates will be declared in Verilog. Any
temporary gate without a corresponding Verilog declaration will be inlined.
Temporary Verilog wires are never declared for Outputs, Inputs, Consts, or
Registers, because those declarations are handled separately by
``_to_verilog_header``.
Temporary Verilog wires are never declared for memory writes, because writes
generate no output.
Otherwise, temporary Verilog wires are declared for:
1. Named ``Gates``.
2. ``Gates`` with multiple users.
3. ``MemBlock`` reads, because ``_to_verilog_memories`` expects a declaration.
4. ``Gates`` that are ``args`` for a ``s`` bit-selection ``Gate``, because
Verilog's bit-selection operator ``[]`` only works on wires and registers,
not arbitrary expressions.
"""
excluded = gate.is_output or gate.op in "ICr@"
is_named = gate.name and not gate.name.startswith("tmp")
multiple_users = len(gate.dests) > 1
is_read = gate.op == "m"
is_sliced = self._is_sliced(gate)
return not excluded and (is_named or multiple_users or is_read or is_sliced)
def _name_and_comment(self, name: str, kind="") -> tuple[str, str]:
"""Return the sanitized version of ``name`` and a Verilog comment with the
un-sanitized name. If ``kind`` is provided, it will always be included in the
comment.
"""
sanitized_name = self._verilog_name(name)
if kind:
if sanitized_name != name:
comment = f" // {kind} {name}"
else:
comment = f" // {kind}"
else:
if sanitized_name != name:
comment = f" // {name}"
else:
comment = ""
return sanitized_name, comment
def _to_verilog_header(self, file: IO, initialize_registers: bool):
"""Print the header of the verilog implementation."""
print("// Generated automatically via PyRTL", file=file)
print("// As one initial test of synthesis, map to FPGA with:", file=file)
print(
f'// yosys -p "synth_xilinx -top {self.module_name}" thisfile.v\n',
file=file,
)
# ``declared_gates`` is the set of Gates with corresponding Verilog reg/wire
# declarations. Generated Verilog code can refer to these Gates by name.
self.declared_gates = self.gate_graph.inputs | self.gate_graph.outputs
# Module name.
print(f"module {self.module_name}({', '.join(self.io_list)});", file=file)
# Declare Inputs and Outputs.
print(" input clk;", file=file)
if self.add_reset:
print(" input rst;", file=file)
for input_gate in self.inputs:
sanitized_name, comment = self._name_and_comment(input_gate.name)
print(
f" input{self._verilog_size(input_gate.bitwidth)} {sanitized_name};"
f"{comment}",
file=file,
)
for output_gate in self.outputs:
sanitized_name, comment = self._name_and_comment(output_gate.name)
print(
f" output{self._verilog_size(output_gate.bitwidth)} "
f"{sanitized_name};{comment}",
file=file,
)
# Declare MemBlocks and RomBlocks.
if self.all_memblocks:
print("\n // Memories", file=file)
for memblock in self.all_memblocks:
kind = "MemBlock"
if isinstance(memblock, RomBlock):
kind = "RomBlock"
sanitized_name, comment = self._name_and_comment(memblock.name, kind)
print(
f" reg{self._verilog_size(memblock.bitwidth)} "
f"{sanitized_name}{self._verilog_size(1 << memblock.addrwidth)};"
f"{comment}",
file=file,
)
# Declare Registers.
self.declared_gates |= self.gate_graph.registers
if self.registers:
print("\n // Registers", file=file)
for reg_gate in self.registers:
register_initialization = ""
if initialize_registers:
reset_value = 0
if reg_gate.reset_value is not None:
reset_value = reg_gate.reset_value
register_initialization = f" = {reg_gate.bitwidth}'d{reset_value}"
sanitized_name, comment = self._name_and_comment(reg_gate.name)
print(
f" reg{self._verilog_size(reg_gate.bitwidth)} "
f"{sanitized_name}{register_initialization};{comment}",
file=file,
)
# Declare constants.
const_gates = []
for const_gate in self._name_sorted(self.gate_graph.consts):
if self._should_declare_const(const_gate):
const_gates.append(const_gate)
self.declared_gates |= set(const_gates)
if const_gates:
print("\n // Constants", file=file)
for const_gate in const_gates:
sanitized_name, comment = self._name_and_comment(const_gate.name)
print(
f" wire{self._verilog_size(const_gate.bitwidth)} "
f"{sanitized_name} = {const_gate.bitwidth}'d{const_gate.const_value};"
f"{comment}",
file=file,
)
# Declare any needed temporary wires.
temp_gates = []
for gate in self.gate_graph:
if self._should_declare_wire(gate):
temp_gates.append(gate)
temp_gates = self._name_sorted(temp_gates)
self.declared_gates |= set(temp_gates)
if temp_gates:
print("\n // Temporaries", file=file)
for temp_gate in temp_gates:
sanitized_name, comment = self._name_and_comment(temp_gate.name)
print(
f" wire{self._verilog_size(temp_gate.bitwidth)} {sanitized_name};"
f"{comment}",
file=file,
)
# Write the initial values for read-only memories. If we ever add support
# outside of simulation for initial values for MemBlocks, that would also go
# here.
if self.romblocks:
print("\n // Read-only memory data", file=file)
for romblock in self.romblocks:
print(" initial begin", file=file)
for addr in range(1 << romblock.addrwidth):
print(
f" {self._verilog_name(romblock.name)}[{addr}] = "
f"{romblock.bitwidth}'h{romblock._get_read_data(addr):x};",
file=file,
)
print(" end", file=file)
# combinational_gates is the set of Gates that must be assigned by
# ``_to_verilog_combinational``.
self.combinational_gates = (
self.gate_graph.outputs | set(temp_gates) - self.gate_graph.mem_reads
)
def _verilog_expr(self, gate: Gate, lhs: Gate | None = None) -> str:
"""Returns a Verilog expression for ``gate`` and its arguments, recursively.
The returned expression will be used as the right hand side ("rhs") of
assignment statements, and inputs for registers and memories.
:param lhs: Left-hand side of the assignment statement. This determines when we
return the name of a declared gate, and when we return the expression that
specifies the declared gate's value.
For example, suppose we have a ``declared_gate`` that says ``x = a + b``. If
we are currently processing another Gate that says ``y = x - 2``, we could
emit ``y = x - 2`` or ``y = (a + b) - 2``. ``x`` is a ``declared_gate``, so
we must emit ``y = x - 2``. So when we generate the ``_verilog_expr`` for
``x`` in this scenario, we know to just emit ``x`` because ``x`` is a
``declared_gate``, and we are not currently defining ``x``, because the
assignment's ``lhs`` is not ``x``.
On the other hand, if we are currently processing the Gate ``x = a + b``, we
must emit ``x = a + b`` instead of the unhelpful ``x = x``. We emit the
former, rather than the latter, for the assignment's right-hand side because
the assignment's ``lhs`` is ``x``
"""
if gate in self.declared_gates and lhs is not gate:
# If a Verilog wire/reg has been declared for the gate, and we are not
# currently defining the Gate's value, just return the wire's Verilog name.
return self._verilog_name(gate.name)
if gate.op == "C":
# Return the constant's Verilog value.
return f"{gate.bitwidth}'d{gate.const_value}"
# Convert each of the Gate's args to a Verilog expression.
verilog_args = [self._verilog_expr(arg, lhs) for arg in gate.args]
# Return an expression that combines ``verilog_args`` with the appropriate
# Verilog operator for ``gate``.
if gate.op == "w":
return verilog_args[0]
if gate.op == "~":
return f"~({verilog_args[0]})"
if gate.op in "&|^+-*<>":
return f"({verilog_args[0]} {gate.op} {verilog_args[1]})"
if gate.op == "=":
return f"({verilog_args[0]} == {verilog_args[1]})"
if gate.op == "x":
return f"({verilog_args[0]} ? {verilog_args[2]} : {verilog_args[1]})"
if gate.op == "c":
if len(verilog_args) == 1:
return verilog_args[0]
return f"{{{', '.join(verilog_args)}}}"
if gate.op == "s":
selections = []
for sel in reversed(gate.sel):
if gate.args[0].bitwidth == 1:
selections.append(verilog_args[0])
else:
selections.append(f"{verilog_args[0]}[{sel}]")
if len(gate.sel) == 1:
return f"({selections[0]})"
# Special case: slicing multiple copies of the same gate.
if all(sel == gate.sel[0] for sel in gate.sel):
return f"{{{len(selections)} {{{selections[0]}}}}}"
# Special case: slicing a consecutive subset.
if tuple(range(gate.sel[0], gate.sel[-1] + 1)) == gate.sel:
return f"({verilog_args[0]}[{gate.sel[-1]}:{gate.sel[0]}])"
return f"{{{', '.join(selections)}}}"
msg = f"Unimplemented op {gate.op} in Gate {gate}"
raise PyrtlError(msg)
def _to_verilog_combinational(self, file: IO):
"""Generate Verilog combinational logic.
Emits combinational Verilog logic for ``combinational_gates``. This function
only generates assignments for Outputs and temporaries. The other wires and regs
declared by ``_to_verilog_header`` are handled elsewhere:
- Constant assignments are handled by ``_to_verilog_header``.
- Register assignments are handled by ``_to_verilog_sequential``.
- MemBlock reads are handled by ``_to_verilog_memories``.
:param declared_gates: Set of Gates with corresponding Verilog wire/reg
declarations. Generated Verilog code can refer to these Gates by name.
:param combinational_gates: Set of Gates that must be assigned by
``_to_verilog_combinational``.
"""
if self.combinational_gates:
print("\n // Combinational logic", file=file)
for assignment_gate in self._name_sorted(self.combinational_gates):
print(
f" assign {self._verilog_name(assignment_gate.name)} = "
f"{self._verilog_expr(assignment_gate, lhs=assignment_gate)};",
file=file,
)
def _to_verilog_sequential(self, file: IO):
"""Print the sequential logic of the verilog implementation."""
if not self.gate_graph.registers:
return
print("\n // Register logic", file=file)
if self.add_reset == "asynchronous":
print(" always @(posedge clk or posedge rst) begin", file=file)
else:
print(" always @(posedge clk) begin", file=file)
if self.add_reset:
print(" if (rst) begin", file=file)
for register in self._name_sorted(self.gate_graph.registers):
reset_value = register.reset_value
print(
f" {self._verilog_name(register.name)} <= "
f"{register.bitwidth}'d{reset_value};",
file=file,
)
print(" end else begin", file=file)
indent = " "
else:
indent = ""
for register in self._name_sorted(self.gate_graph.registers):
print(
f" {indent}{self._verilog_name(register.name)} <= "
f"{self._verilog_expr(register.args[0])};",
file=file,
)
if self.add_reset:
print(" end", file=file)
print(" end", file=file)
def _to_verilog_memories(self, file: IO):
"""Generate Verilog logic for MemBlock and RomBlock reads and writes."""
for memblock in self.all_memblocks:
kind = "MemBlock"
if isinstance(memblock, RomBlock):
kind = "RomBlock"
print(f"\n // {kind} {memblock.name} logic", file=file)
# Find writes to ``memblock``.
write_gates = []
for write_gate in self._name_sorted(self.gate_graph.mem_writes):
if write_gate.mem is memblock:
write_gates.append(write_gate)
if write_gates:
print(" always @(posedge clk) begin", file=file)
for write_gate in write_gates:
enable = write_gate.args[2]
verilog_enable = self._verilog_expr(write_gate.args[2])
verilog_addr = self._verilog_expr(write_gate.args[0])
verilog_rhs = self._verilog_expr(write_gate.args[1])
# Simplify the assignment if the enable bit is a constant ``1``.
if enable.op == "C" and enable.const_value == 1:
print(
f" {self._verilog_name(memblock.name)}"
f"[{verilog_addr}] <= {verilog_rhs};",
file=file,
)
else:
print(
f" if ({verilog_enable}) begin\n"
f" {self._verilog_name(memblock.name)}"
f"[{verilog_addr}] <= {verilog_rhs};\n"
" end",
file=file,
)
print(" end", file=file)
# Find reads from ``memblock``. The ``read_gate`` should have been declared
# by ``_to_verilog_header``.
read_gates = []
for read_gate in self._name_sorted(self.gate_graph.mem_reads):
if read_gate.mem is memblock:
read_gates.append(read_gate)
for read_gate in self._name_sorted(read_gates):
print(
f" assign {self._verilog_name(read_gate.name)} = "
f"{self._verilog_name(memblock.name)}"
f"[{self._verilog_expr(read_gate.args[0])}];",
file=file,
)
def _to_verilog_footer(self, file: IO):
print("endmodule", file=file)
def output_to_verilog(self, dest_file: IO, initialize_registers: bool):
self._to_verilog_header(dest_file, initialize_registers)
self._to_verilog_combinational(dest_file)
self._to_verilog_sequential(dest_file)
self._to_verilog_memories(dest_file)
self._to_verilog_footer(dest_file)
def output_verilog_testbench(
self,
dest_file: IO,
simulation_trace: SimulationTrace = None,
toplevel_include: str | None = None,
vcd: str = "waveform.vcd",
cmd: str | None = None,
):
# Output an include, if given.
if toplevel_include:
print(f'`include "{toplevel_include}"', file=dest_file)
print(file=dest_file)
# Output header.
print("module tb();", file=dest_file)
# Declare all block inputs as reg.
print(" reg clk;", file=dest_file)
if self.add_reset:
print(" reg rst;", file=dest_file)
if self.inputs:
print("\n // block Inputs", file=dest_file)
for input_gate in self.inputs:
sanitized_name, comment = self._name_and_comment(input_gate.name)
print(
f" reg{self._verilog_size(input_gate.bitwidth)} {sanitized_name};"
f"{comment}",
file=dest_file,
)
# Declare all block outputs as wires.
if self.outputs:
print("\n // block Outputs", file=dest_file)
for output_gate in self.outputs:
sanitized_name, comment = self._name_and_comment(output_gate.name)
print(
f" wire{self._verilog_size(output_gate.bitwidth)} {sanitized_name};"
f"{comment}",
file=dest_file,
)
print(file=dest_file)
# Declare an integer for MemBlock initialization.
if len(self.memblocks) > 0:
print(" integer tb_addr;", file=dest_file)
io_list_str = [f".{io}({io})" for io in self.io_list]
print(
f" {self.module_name} block({', '.join(io_list_str)});\n", file=dest_file
)
# Generate the clock signal.
print(" always", file=dest_file)
print(" #5 clk = ~clk;\n", file=dest_file)
# Generate Input assignments for each cycle in the trace.
print(" initial begin", file=dest_file)
# If a VCD output is requested, set that up.
if vcd:
print(f' $dumpfile ("{vcd}");', file=dest_file)
print(" $dumpvars;\n", file=dest_file)
# Initialize clk, and all the registers and memories.
print(" clk = 1'd0;", file=dest_file)
if self.add_reset:
print(" rst = 1'd0;", file=dest_file)
def default_value() -> int:
"""Returns the Simulation's default value for Registers and MemBlocks."""
if not simulation_trace:
return 0
return simulation_trace.default_value
# simulation_trace.register_value_map maps from Register to initial value. Make
# a copy that maps from Register name to initial value.
register_value_map = {}
if simulation_trace:
register_value_map = {
register.name: value
for register, value in simulation_trace.register_value_map.items()
}
if self.registers:
print("\n // Initialize Registers", file=dest_file)
for reg_gate in self.registers:
# Try using register_value_map first.
initial_value = register_value_map.get(reg_gate.name)
# If that didn't work, use the Register's reset_value.
if not initial_value:
initial_value = reg_gate.reset_value
# If there is no reset_value, use the default_value().
if not initial_value:
initial_value = default_value()
print(
f" block.{self._verilog_name(reg_gate.name)} = "
f"{reg_gate.bitwidth}'d{initial_value};",
file=dest_file,
)
# Initialize MemBlocks.
if self.memblocks:
print("\n // Initialize MemBlocks", file=dest_file)
for memblock in self.memblocks:
max_addr = 1 << memblock.addrwidth
print(
f" for (tb_addr = 0; tb_addr < {max_addr}; tb_addr++) "
f"begin block.{self._verilog_name(memblock.name)}[tb_addr] = "
f"{memblock.bitwidth}'d{default_value()}; end",
file=dest_file,
)
if not simulation_trace:
continue
memory_value_map = simulation_trace.memory_value_map.get(memblock)
if not memory_value_map:
continue
for addr, initial_data in memory_value_map.items():
# The generated Verilog ``for`` loop above just initialized every
# address in the ``MemBlock`` to ``default_value()``, so skip redundant
# initializations.
if initial_data == default_value():
continue
print(
f" block.{self._verilog_name(memblock.name)}[{addr}] = "
f"{memblock.bitwidth}'d{initial_data};",
file=dest_file,
)
# Set Input values for each cycle.
if simulation_trace:
tracelen = max(len(t) for t in simulation_trace.trace.values())
for i in range(tracelen):
for input_gate in self.inputs:
input_value = simulation_trace.trace[input_gate.name][i]
print(
f" {self._verilog_name(input_gate.name)} = "
f"{input_gate.bitwidth}'d{input_value};",
file=dest_file,
)
print("\n #10", file=dest_file)
if cmd:
print(f" {cmd}", file=dest_file)
# Footer.
print(" $finish;", file=dest_file)
print(" end", file=dest_file)
print("endmodule", file=dest_file)
[docs]
def output_to_verilog(
dest_file: IO,
add_reset: bool | str = True,
block: Block = None,
initialize_registers: bool = False,
module_name: str = "toplevel",
):
"""A function to walk the ``block`` and output it in Verilog format to the open
file.
The Verilog module will be named ``toplevel``, with a clock input named ``clk``.
When possible, wires keep their names in the Verilog output. Wire names that do not
satisfy Verilog's naming requirements, and wires that conflict with Verilog keywords
are given new temporary names in the Verilog output.
:param dest_file: Open file where the Verilog output will be written.
:param add_reset: If reset logic should be added. Allowable options are: ``False``
(meaning no reset logic is added), ``True`` (default, for adding synchronous
reset logic), and ``'asynchronous'`` (for adding asynchronous reset logic). The
reset input will be named ``rst``, and when ``rst`` is high, registers will be
reset to their ``reset_value``.
:param initialize_registers: Initialize Verilog registers to their ``reset_value``.
When this argument is ``True``, a register like ``Register(name='foo',
bitwidth=8, reset_value=4)`` generates Verilog like ``reg[7:0] foo = 8'd4;``.
:param block: Block to be walked and exported. Defaults to the :ref:`working_block`.
:param module_name: name of the module. Defaults to toplevel
if the user puts nothing.
"""
_VerilogOutput(block, add_reset, module_name=module_name).output_to_verilog(
dest_file, initialize_registers
)
[docs]
def output_verilog_testbench(
dest_file: IO,
simulation_trace: SimulationTrace = None,
toplevel_include: str | None = None,
vcd: str = "waveform.vcd",
cmd: str | None = None,
add_reset: bool | str = True,
block: Block = None,
module_name: str = "toplevel",
):
"""Output a Verilog testbench for the block/inputs used in the simulation trace.
If ``add_reset`` is ``True``, a ``rst`` input wire is added to the instantiated
``toplevel`` module. The ``rst`` wire will be held low in the testbench, because
initialization here occurs via the ``initial`` block. ``add_reset`` is provided for
consistency with :func:`output_to_verilog`.
This function *only* generates the Verilog testbench. The Verilog module must be
generated separately by calling :func:`output_to_verilog`, see the
``toplevel_include`` parameter and Example 2 below.
The test bench does not return any values.
Example 1, writing testbench to a string::
with io.StringIO() as tbfile:
pyrtl.output_verilog_testbench(dest_file=tbfile, simulation_trace=sim_trace)
Example 2, testbench in same file as Verilog::
with open('hardware.v', 'w') as fp:
output_to_verilog(fp)
output_verilog_testbench(
fp, sim.tracer, vcd=None, cmd='$display("%d", out);')
:param dest_file: An open file to which the test bench will be printed.
:param simulation_trace: A trace from which the inputs will be extracted for
inclusion in the test bench. This is typically :attr:`Simulation.tracer`. The
generated test bench will replay the :class:`Simulation`'s
:class:`Inputs<Input>` cycle by cycle. The default values for all registers and
memories will be based on the trace, otherwise they will be initialized to 0.
:param toplevel_include: Name of the file containing the ``toplevel`` module this
testbench is testing. If not ``None``, an ``include`` directive will be added
for this file.
:param vcd: By default, the testbench generator will generate a command to write the
output of the testbench execution to a ``.vcd`` file, via ``$dumpfile``, and
``vcd`` is the name of the file to write. If ``None``, then no ``dumpfile`` will
be used.
:param cmd: The string passed as ``cmd`` will be copied verbatim into the testbench
just before the end of each cycle. This is useful for doing things like printing
specific values during testbench evaluation. For example::
cmd='$display("%d", out);'
will instruct the testbench to print the value of ``out`` every cycle.
:param add_reset: If reset logic should be added. Allowable options are: ``False``
(meaning no reset logic is added), ``True`` (default, for adding synchronous
reset logic), and ``'asynchronous'`` (for adding asynchronous reset logic). The
value passed in here should match the argument passed to
:func:`output_to_verilog`.
:param block: Block containing design to test. Defaults to the :ref:`working_block`.
:param module_name: Name of the module the user chooses. Defaults to toplevel
if nothing is inputted.
"""
_VerilogOutput(block, add_reset, module_name=module_name).output_verilog_testbench(
dest_file, simulation_trace, toplevel_include, vcd, cmd
)
# ----------------------------------------------------------------
# ___ __ __ ___
# |___ | |__) |__) | |
# | | | \ | \ | |___
#
[docs]
def output_to_firrtl(
open_file, rom_blocks: list[RomBlock] | None = None, block: Block = None
):
"""Output the block as FIRRTL code to the output file.
If ROM is initialized in PyRTL code, you can pass in the :class:`RomBlocks` as a
list ``[rom1, rom2, ...]``.
:param open_file: File to write to.
:param rom_blocks: List of :class:`RomBlocks<RomBlock>` to be initialized.
:param block: Block to use (defaults to :ref:`working_block`).
"""
block = working_block(block)
# FIRRTL only allows 'bits' operations to have two parameters: a high and low index
# representing the inclusive bounds of a contiguous range. PyRTL uses slice syntax,
# which aren't always contiguous, so we need to convert them.
one_bit_selects(block=block)
# FIRRTL only allows 'concatenate' operations to have two arguments, but PyRTL's 'c'
# op allows an arbitrary number of wires. We need to convert these n-way concats to
# series of two-way concats accordingly.
two_way_concat(block=block)
f = open_file
# write out all the implicit stuff
f.write("circuit Example :\n")
f.write(" module Example :\n")
f.write(" input clock : Clock\n input reset : UInt<1>\n")
# write out IO signals, wires and registers
for wire in _name_sorted(block.wirevector_subset(Input)):
f.write(f" input {wire.name} : UInt<{wire.bitwidth}>\n")
for wire in _name_sorted(block.wirevector_subset(Output)):
f.write(f" output {wire.name} : UInt<{wire.bitwidth}>\n")
for wire in _name_sorted(
block.wirevector_subset(exclude=(Input, Output, Register, Const))
):
f.write(f" wire {wire.name} : UInt<{wire.bitwidth}>\n")
for wire in _name_sorted(block.wirevector_subset(Register)):
f.write(f" reg {wire.name} : UInt<{wire.bitwidth}>, clock\n")
for wire in _name_sorted(block.wirevector_subset(Const)):
# some const is in the form like const_0_1'b1, is this legal operation?
wire.name = wire.name.split("'").pop(0)
f.write(f" node {wire.name} = UInt<{wire.bitwidth}>({wire.val})\n")
f.write("\n")
# write "Main"
node_cntr = 0
initializedMem = []
for log_net in _net_sorted(block.logic_subset()):
if log_net.op == "&":
f.write(
f" {log_net.dests[0].name} <= "
f"and({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "|":
f.write(
f" {log_net.dests[0].name} <= "
f"or({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "^":
f.write(
f" {log_net.dests[0].name} <= "
f"xor({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "n":
f.write(
f" node T_{node_cntr} = "
f"and({log_net.args[0].name}, {log_net.args[1].name})\n"
)
f.write(f" {log_net.dests[0].name} <= not(T_{node_cntr})\n")
node_cntr += 1
elif log_net.op == "~":
f.write(f" {log_net.dests[0].name} <= not({log_net.args[0].name})\n")
elif log_net.op == "+":
f.write(
f" {log_net.dests[0].name} <= "
f"add({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "-":
f.write(
f" {log_net.dests[0].name} <= "
f"sub({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "*":
f.write(
f" {log_net.dests[0].name} <= "
f"mul({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "=":
f.write(
f" {log_net.dests[0].name} <= "
f"eq({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "<":
f.write(
f" {log_net.dests[0].name} <= "
f"lt({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == ">":
f.write(
f" {log_net.dests[0].name} <= "
f"gt({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "w":
f.write(f" {log_net.dests[0].name} <= {log_net.args[0].name}\n")
elif log_net.op == "x":
f.write(
f" {log_net.dests[0].name} <= mux({log_net.args[0].name}, "
f"{log_net.args[2].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "c":
if len(log_net.args) != 2:
msg = (
"Expected concat net to have only two argument wires; has "
f"{len(log_net.args)}"
)
raise PyrtlInternalError(msg)
f.write(
f" {log_net.dests[0].name} <= "
f"cat({log_net.args[0].name}, {log_net.args[1].name})\n"
)
elif log_net.op == "s":
if len(log_net.op_param) != 1:
msg = (
"Expected select net to have single select bit; has "
f"{len(log_net.op_param)}"
)
raise PyrtlInternalError(msg)
f.write(
f" {log_net.dests[0].name} <= bits({log_net.args[0].name}, "
f"{log_net.op_param[0]}, {log_net.op_param[0]})\n"
)
elif log_net.op == "r":
f.write(
f" {log_net.dests[0].name} <= mux(reset, "
f"UInt<{log_net.dests[0].bitwidth}>(0), {log_net.args[0].name})\n"
)
elif log_net.op == "m":
# if there are rom blocks, need to be initialized
if rom_blocks is not None:
if log_net.op_param[0] not in initializedMem:
initializedMem.append(log_net.op_param[0])
# find corresponding rom block according to memid
curr_rom = next(
(x for x in rom_blocks if x.id == log_net.op_param[0]), None
)
f.write(
f" wire {log_net.op_param[1].name} : "
f"UInt<{log_net.op_param[1].bitwidth}>"
f"[{2 ** log_net.op_param[1].addrwidth}]\n"
)
# if rom data is a function, calculate the data first
if callable(curr_rom.data):
romdata = [
curr_rom.data(i) for i in range(2**curr_rom.addrwidth)
]
curr_rom.data = romdata
# write rom block initialization data
for i in range(len(curr_rom.data)):
f.write(
f" {log_net.op_param[1].name}[{i}] <= "
f"UInt<{log_net.op_param[1].bitwidth}>"
f"({curr_rom.data[i]})\n"
)
# write the connection
f.write(
f" {log_net.dests[0].name} <= "
f"{log_net.op_param[1].name}[{log_net.args[0].name}]\n"
)
else:
if log_net.op_param[0] not in initializedMem:
initializedMem.append(log_net.op_param[0])
f.write(
f" cmem {log_net.op_param[1].name}_{log_net.op_param[0]} : "
f"UInt<{log_net.op_param[1].bitwidth}>"
f"[{2 ** log_net.op_param[1].addrwidth}]\n"
)
f.write(
f" infer mport T_{node_cntr} = {log_net.op_param[1].name}_"
f"{log_net.op_param[0]}[{log_net.args[0].name}], clock\n"
)
f.write(f" {log_net.dests[0].name} <= T_{node_cntr}\n")
node_cntr += 1
elif log_net.op == "@":
if log_net.op_param[0] not in initializedMem:
initializedMem.append(log_net.op_param[0])
f.write(
f" cmem {log_net.op_param[1].name}_{log_net.op_param[0]} : "
f"UInt<{log_net.op_param[1].bitwidth}>"
f"[{2 ** log_net.op_param[1].addrwidth}]\n"
)
f.write(f" when {log_net.args[2].name} :\n")
f.write(
f" infer mport T_{node_cntr} = {log_net.op_param[1].name}_"
f"{log_net.op_param[0]}[{log_net.args[0].name}], clock\n"
)
f.write(f" T_{node_cntr} <= {log_net.args[1].name}\n")
f.write(" skip\n")
node_cntr += 1
else:
pass
return 0
# -----------------------------------------------------------------
# __ __ __ __ ___ __
# | /__` / /\ /__` |__) |__ |\ | / |__|
# | .__/ \__ /~~\ .__/ |__) |___ | \| \__ | |
def input_from_iscas_bench(bench, block: Block = None):
"""Import an ISCAS .bench file
:param bench: an open ISCAS .bench file to read
:param block: block to add the imported logic (defaults to current
:ref:`working_block`)
"""
import pyparsing
from pyparsing import (
Group,
Keyword,
Literal,
OneOrMore,
Suppress,
Word,
ZeroOrMore,
one_of,
)
block = working_block(block)
try:
bench_string = bench.read()
except AttributeError as exc:
if isinstance(bench, str):
bench_string = bench
else:
msg = "input_from_bench expecting either open file or string"
raise PyrtlError(msg) from exc
def SKeyword(x):
return Suppress(Keyword(x))
def SLiteral(x):
return Suppress(Literal(x))
# NOTE: The acceptable signal characters are based on viewing the I/O names in the
# available ISCAS benchmark files, and may not be complete.
signal_start = pyparsing.alphas + pyparsing.nums + "$:[]_<>\\/?"
signal_middle = pyparsing.alphas + pyparsing.nums + "$:[]_<>\\/.?-"
signal_id = Word(signal_start, signal_middle)
gate_names = "AND OR NAND NOR XOR NOT BUFF DFF"
src_list = Group(signal_id + ZeroOrMore(SLiteral(",") + signal_id))("src_list")
net_def = Group(
signal_id("dst")
+ SLiteral("=")
+ one_of(gate_names)("gate")
+ SLiteral("(")
+ src_list
+ SLiteral(")")
)("net_def")
input_def = Group(SKeyword("INPUT") + SLiteral("(") + signal_id + SLiteral(")"))(
"input_def"
)
output_def = Group(SKeyword("OUTPUT") + SLiteral("(") + signal_id + SLiteral(")"))(
"output_def"
)
command_def = input_def | output_def | net_def
commands = OneOrMore(command_def)("command_list")
parser = commands.ignore(pyparsing.pythonStyleComment)
# Begin actually reading and parsing the BENCH file
result = parser.parseString(bench_string, parseAll=True)
output_to_internal = {} # dict: name -> wire
def twire(name):
"""Find or make wire named 'name' and return it."""
w = output_to_internal.get(name)
if w is None:
w = block.wirevector_by_name.get(name)
if w is None:
w = WireVector(bitwidth=1, name=name)
return w
for cmd in result["command_list"]:
if cmd.getName() == "input_def":
_wire_in = Input(bitwidth=1, name=str(cmd[0]), block=block)
elif cmd.getName() == "output_def":
# Create internal wire for indirection, since Outputs can't be inputs to
# nets in PyRTL
wire_internal = WireVector(bitwidth=1, block=block)
wire_out = Output(bitwidth=1, name=str(cmd[0]), block=block)
wire_out <<= wire_internal
output_to_internal[cmd[0]] = wire_internal
elif cmd.getName() == "net_def":
srcs = cmd["src_list"]
if cmd["gate"] == "AND":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]) & twire(srcs[1])
elif cmd["gate"] == "OR":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]) | twire(srcs[1])
elif cmd["gate"] == "NAND":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]).nand(twire(srcs[1]))
elif cmd["gate"] == "NOR":
dst_wire = twire(cmd["dst"])
dst_wire <<= ~(twire(srcs[0]) | twire(srcs[1]))
elif cmd["gate"] == "XOR":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0]) ^ twire(srcs[1])
elif cmd["gate"] == "NOT":
dst_wire = twire(cmd["dst"])
dst_wire <<= ~twire(srcs[0])
elif cmd["gate"] == "BUFF":
dst_wire = twire(cmd["dst"])
dst_wire <<= twire(srcs[0])
elif cmd["gate"] == "DFF":
dst_wire = twire(cmd["dst"])
reg = Register(bitwidth=1)
reg.next <<= twire(srcs[0])
dst_wire <<= reg
else:
msg = f"Unexpected gate {{{cmd['gate']}}}"
raise PyrtlError(msg)
# Benchmarks like c1196, b18, etc. have inputs and outputs by the same name, that
# are therefore directly connected. This pass will rename the outputs so that this
# is still okay.
for o in block.wirevector_subset(Output):
inputs = [i for i in block.wirevector_subset(Input) if i.name == o.name]
if inputs:
if len(inputs) > 1:
msg = f"More than one input found with the name {inputs[0].name}"
raise PyrtlError(msg)
i = inputs[0]
o_internal = twire(o.name)
o_internal <<= i
o.name = next_tempvar_name()
# Ensure the input is the one mapped by the original name
block.wirevector_by_name[i.name] = i
print(
"Found input and output wires with the same name. "
f"Output '{i.name}' has now been renamed to '{o.name}'."
)