Source code for pyrtl.importexport

"""
Helper functions for reading and writing hardware files.

Each of the functions in inputoutput take a block and a file descriptor. The functions
provided either read the file and update the Block accordingly, or write information
from the Block out to the file.
"""

from __future__ import annotations

import collections
import functools
import operator
import os
import re
import subprocess
import sys
import tempfile
from typing import IO, TYPE_CHECKING

from pyrtl.core import Block, _NameSanitizer, working_block
from pyrtl.corecircuits import concat_list, rtl_all, rtl_any, select
from pyrtl.gate_graph import Gate, GateGraph
from pyrtl.memory import MemBlock, RomBlock
from pyrtl.passes import one_bit_selects, two_way_concat
from pyrtl.pyrtlexceptions import PyrtlError, PyrtlInternalError
from pyrtl.wire import Const, Input, Output, Register, WireVector, next_tempvar_name

if TYPE_CHECKING:
    from pyrtl.core import LogicNet
    from pyrtl.simulation import SimulationTrace


def _natural_sort_key(key: str) -> list:
    """Convert the key into a form such that it will be sorted naturally, e.g. such that
    "tmp4" appears before "tmp18".

    For example, given "a1b2" as input, this will return ['a', 1, 'b', 2, ''].
    """

    def convert(text):
        return int(text) if text.isdigit() else text

    return [convert(c) for c in re.split(r"(\d+)", key)]


def _net_sorted(logic: set[LogicNet], name_mapper=lambda w: w.name) -> list[LogicNet]:
    # Sort nets based on the name of the destination wire, unless it's a memory write
    # net.
    def natural_keys(n):
        if n.op == "@":
            # Sort based on the name of the wr_en wire, since this particular net is
            # used within 'always begin ... end' blocks for memory update logic.
            key = str(n.args[2])
        else:
            key = name_mapper(n.dests[0])
        return _natural_sort_key(key)

    return sorted(logic, key=natural_keys)


def _name_sorted(
    wires: set[WireVector], name_mapper=lambda w: w.name
) -> list[WireVector]:
    return sorted(wires, key=lambda w: _natural_sort_key(name_mapper(w)))


# -----------------------------------------------------------------
#     __          ___
#    |__) |    | |___
#    |__) |___ | |


class Subcircuit:
    """
    This is a way to create and track per-module-instance wire names, so there are not
    name clashes when we instantiate a module more than once.
    """

    def __init__(self, model, is_top=False, clk_set=None, block=None):
        if clk_set is None:
            clk_set = {"clk"}
        self.model = model
        self.is_top = is_top
        self.clk_set = clk_set
        self.block = working_block(block)
        self.inputs = {}
        self.outputs = {}
        self.wirevector_by_name = {}

    def add_input(self, original_name, wire):
        self.inputs[original_name] = wire
        self.wirevector_by_name[original_name] = wire

    def add_output(self, original_name, wire):
        self.outputs[original_name] = wire
        self.wirevector_by_name[original_name] = wire

    def add_reg(self, original_name, wire):
        self.wirevector_by_name[original_name] = wire

    def add_clock(self, clock_name):
        self.clk_set.add(clock_name)

    def twire(self, x):
        """Find or make wire named x and return it."""
        s = self.wirevector_by_name.get(x)
        if s is None:
            # Purposefully *not* setting its name to 'x', so we don't have name clashes
            s = WireVector(bitwidth=1)
            self.wirevector_by_name[x] = s
        return s


[docs] def input_from_blif( blif, block: Block = None, merge_io_vectors: bool = True, clock_name: str = "clk", top_model: str | None = None, ): """Read an open BLIF file or string as input, updating the block appropriately. If ``merge_io_vectors`` is ``True``, then given 1-bit :class:`Input` wires ``a[0]`` and ``a[1]``, these wires will be combined into a single 2-bit :class:`Input` wire ``a`` that can be accessed by name ``a`` in the block. Otherwise if ``merge_io_vectors`` is ``False``, the original 1-bit wires will be :class:`Input` wires of the block. This holds similarly for :class:`Output`. ``input_from_blif`` assumes the following: - There is only one single shared clock and reset - Output is generated by Yosys with formals in a particular order ``input_from_blif`` currently supports multi-module (unflattened) BLIF, though we recommend importing a flattened BLIF with a single module when possible. It currently ignores the reset signal (which it assumes is input only to the flip flops). :param blif: An open BLIF file to read. :param block: The block where the logic will be added. Defaults to the :ref:`working_block`. :param merge_io_vectors: If ``True``, :class:`Input`/:class:`Output` wires whose names differ only by a indexing subscript (e.g. 1-bit wires ``a[0]`` and ``a[1]``) will be combined into a single :class:`Input`/:class:`Output` (e.g. a 2-bit wire ``a``). :param clock_name: The name of the clock (defaults to ``clk``). :param top_model: name of top-level model to instantiate; if ``None``, defaults to first model listed in the BLIF. """ import pyparsing from pyparsing import ( Group, Keyword, Literal, OneOrMore, Opt, Suppress, Word, ZeroOrMore, one_of, ) block = working_block(block) try: blif_string = blif.read() except AttributeError as exc: if isinstance(blif, str): blif_string = blif else: msg = "input_from_blif expecting either open file or string" raise PyrtlError(msg) from exc def SKeyword(x): return Suppress(Keyword(x)) def SLiteral(x): return Suppress(Literal(x)) # Begin BLIF language definition signal_start = pyparsing.alphas + "$:[]_<>\\/?" signal_middle = pyparsing.alphas + pyparsing.nums + "$:[]_<>\\/.?-" signal_id = Word(signal_start, signal_middle) header = SKeyword(".model") + signal_id("model_name") input_list = Group(SKeyword(".inputs") + ZeroOrMore(signal_id))("input_list") output_list = Group(SKeyword(".outputs") + ZeroOrMore(signal_id))("output_list") cover_atom = Word("01-") cover_list = Group(ZeroOrMore(cover_atom))("cover_list") namesignal_list = Group(OneOrMore(signal_id))("namesignal_list") name_def = Group(SKeyword(".names") + namesignal_list + cover_list)("name_def") def make_dff_parsers(formals, names): return functools.reduce( operator.__or__, ( Group(SKeyword(".subckt") + SKeyword(name) + formals)(name) for name in names ), ) # This is purposefully not supporting any DFFs that use negedges in the sensitivity # list. Currently don't have the '$_ALDFF*' dffs (with load signal). Also, we may # want to look into doing something similar for the $_DLATCH* types. NOTE: # Unfortunately, this list needs to be kept consistent with the flop_next function # in extract_flops(). dff_names = [ "$_DFF_P_", "$_DFFE_PN_", "$_DFFE_PP_", "$_DFF_PP0_", "$_DFF_PP1_", "$_DFFE_PP0N_", "$_DFFE_PP0P_", "$_DFFE_PP1N_", "$_DFFE_PP1P_", "$_DFFSR_PPP", "$_DFFSRE_PPPN_", "$_DFFSRE_PPPP_", "$_SDFF_PN0_", "$_SDFF_PN1_", "$_SDFF_PP0_", "$_SDFF_PP1_", "$_SDFFE_PN0N_", "$_SDFFE_PN0P_", "$_SDFFE_PN1N_", "$_SDFFE_PN1P_", "$_SDFFE_PP0N_", "$_SDFFE_PP0P_", "$_SDFFE_PP1N_", "$_SDFFE_PP1P_", "$_SDFFCE_PN0N_", "$_SDFFCE_PN0P_", "$_SDFFCE_PN1N_", "$_SDFFCE_PN1P_", "$_SDFFCE_PP0N_", "$_SDFFCE_PP0P_", "$_SDFFCE_PP1N_", "$_SDFFCE_PP1P_", ] # (a)synchronous Flip-flop (positive/negative polarity reset/set/enable) dffs_formal = ( SLiteral("C=") + signal_id("C") + SLiteral("D=") + signal_id("D") + Opt(SLiteral("E=") + signal_id("E")) + SLiteral("Q=") + signal_id("Q") + Opt(SLiteral("S=") + signal_id("S")) + Opt(SLiteral("R=") + signal_id("R")) ) dffs_def = make_dff_parsers(dffs_formal, dff_names) # synchronous Flip-flop (using .latch format) latches_init_val = Opt(one_of("0 1 2 3"), default="0") # TODO I think <type> and <control> ('re' and 'C') below are technically optional # too latches_def = Group( SKeyword(".latch") + signal_id("D") + signal_id("Q") + SLiteral("re") + signal_id("C") + latches_init_val("I") )("latches_def") # model reference formal_actual = Group(signal_id("formal") + SLiteral("=") + signal_id("actual"))( "formal_actual" ) formal_actual_list = Group(OneOrMore(formal_actual))("formal_actual_list") model_name = signal_id("model_name") model_ref = Group(SKeyword(".subckt") + model_name + formal_actual_list)( "model_ref" ) command_def = name_def | dffs_def | latches_def | model_ref command_list = Group(OneOrMore(command_def))("command_list") footer = SKeyword(".end") model_def = Group(header + input_list + output_list + command_list + footer) model_list = OneOrMore(model_def) parser = model_list.ignore(pyparsing.pythonStyleComment) # Begin actually reading and parsing the BLIF file result = parser.parse_string(blif_string, parseAll=True) ff_clk_set = set() models = {} # model name -> model, for subckt instantiation def extract_inputs(subckt): if subckt.is_top: # NOTE: Assumes that: # - Top-level inputs starting with the same prefix are part of the same wire # - Indices start at 0 start_names = [ re.sub(r"\[([0-9]+)\]$", "", x) for x in subckt.model["input_list"] ] name_counts = collections.Counter(start_names) for input_name in name_counts: bitwidth = name_counts[input_name] if input_name in subckt.clk_set: continue if bitwidth == 1: wire_in = Input(bitwidth=1, name=input_name, block=block) subckt.add_input(input_name, wire_in) block.add_wirevector(wire_in) elif merge_io_vectors: wire_in = Input(bitwidth=bitwidth, name=input_name, block=block) for i in range(bitwidth): bit_name = input_name + "[" + str(i) + "]" bit_wire = WireVector(bitwidth=1, block=block) bit_wire <<= wire_in[i] subckt.add_input(bit_name, bit_wire) else: for i in range(bitwidth): bit_name = input_name + "[" + str(i) + "]" wire_in = Input(bitwidth=1, name=bit_name, block=block) subckt.add_input(bit_name, wire_in) block.add_wirevector(wire_in) else: # For subckts: # - Never merge input vectors # - All inputs are 1-bit for input_name in subckt.model["input_list"]: if input_name in subckt.clk_set: continue wire_in = WireVector( bitwidth=1, block=block ) # Internal name prevents name clash subckt.add_input(input_name, wire_in) block.add_wirevector(wire_in) def extract_outputs(subckt): if subckt.is_top: # NOTE: Assumes that: # - Top-level outputs starting with the same prefix are part of the same # wire # - Indices start at 0 start_names = [ re.sub(r"\[([0-9]+)\]$", "", x) for x in subckt.model["output_list"] ] name_counts = collections.Counter(start_names) for output_name in name_counts: bitwidth = name_counts[output_name] # To allow an output wire to be used as an argument (legal in BLIF), we # need to create an intermediate wire, which will be used in twire() # whenever the original wire is referenced. For example, given 2-bit # Output 'a', every access to 'a[1]' will really be a reference to # 'a[1]_i', a normal WireVector connected to 'a[1]'. A key property is # that the name by which all other parts of the code refer to this wire # doesn't change; the only thing that changes is what underlying wire is # used. if bitwidth == 1: bit_internal = WireVector(bitwidth=1, block=block) bit_out = Output(bitwidth=1, name=output_name, block=block) bit_out <<= bit_internal # NOTE this is important: redirecting user-visible name to internal # wire subckt.add_output(output_name, bit_internal) elif merge_io_vectors: wire_out = Output(bitwidth=bitwidth, name=output_name, block=block) bit_list = [] for i in range(bitwidth): bit_name = output_name + "[" + str(i) + "]" bit_wire = WireVector(bitwidth=1, block=block) bit_list.append(bit_wire) subckt.add_output(bit_name, bit_wire) wire_out <<= concat_list(bit_list) else: for i in range(bitwidth): bit_name = output_name + "[" + str(i) + "]" bit_internal = WireVector(bitwidth=1, block=block) bit_out = Output(bitwidth=1, name=bit_name, block=block) bit_out <<= bit_internal # NOTE this is important: redirecting user-visible name to # internal wire subckt.add_output(bit_name, bit_internal) else: # For subckts: # - Never merge outputs vectors # - All outputs are 1-bit for output_name in subckt.model["output_list"]: bit_out = WireVector(bitwidth=1, block=block) block.add_wirevector(bit_out) subckt.add_output(output_name, bit_out) def extract_commands(subckt): # for each "command" (dff or net) in the model for command in subckt.model["command_list"]: # if it is a net (specified as a cover) if command.getName() == "name_def": extract_cover(subckt, command) # else if the command is a d flop flop elif command.getName() in dff_names: extract_flop(subckt, command) # same as dff, but using different .latch format elif command.getName() == "latches_def": extract_latch(subckt, command) # a subckt elif command.getName() == "model_ref": extract_model_reference(subckt, command) else: msg = "unknown command type" raise PyrtlError(msg) def extract_cover(subckt, command): def twire(w): return subckt.twire(w) netio = command["namesignal_list"] if len(command["cover_list"]) == 0: output_wire = twire(netio[0]) output_wire <<= Const(0, bitwidth=1, block=block) # const "FALSE" elif command["cover_list"].asList() == ["1"]: output_wire = twire(netio[0]) output_wire <<= Const(1, bitwidth=1, block=block) # const "TRUE" elif command["cover_list"].asList() == ["1", "1"]: # Populate clock list if one input is already a clock if netio[1] in subckt.clk_set: subckt.add_clock(netio[0]) elif netio[0] in subckt.clk_set: subckt.add_clock(netio[1]) else: output_wire = twire(netio[1]) output_wire <<= twire(netio[0]) # simple wire elif command["cover_list"].asList() == ["0", "1"]: output_wire = twire(netio[1]) output_wire <<= ~twire(netio[0]) # not gate elif command["cover_list"].asList() == ["11", "1"]: output_wire = twire(netio[2]) output_wire <<= twire(netio[0]) & twire(netio[1]) # and gate elif command["cover_list"].asList() == ["1-", "1", "-1", "1"]: output_wire = twire(netio[2]) output_wire <<= twire(netio[0]) | twire(netio[1]) # or gate elif command["cover_list"].asList() == ["0-", "1", "-0", "1"]: # nand is not really a PyRTL primitive and so should only be added to a # netlist via a call to nand_synth(). We instead convert it to ~(a & b) # rather than (~a | ~b) as would be generated if handled by the else case # below. output_wire = twire(netio[2]) output_wire <<= ~( twire(netio[0]) & twire(netio[1]) ) # nand gate -> not+and gates elif command["cover_list"].asList() == ["10", "1", "01", "1"]: output_wire = twire(netio[2]) output_wire <<= twire(netio[0]) ^ twire(netio[1]) # xor gate else: # Although the following is fully generic and thus encompasses all of the # special cases after the simple wire case above, we leave the above in # because they are commonly found and lead to a slightly cleaner (though # equivalent) netlist, because we can use the xor primitive/save a gate when # converting the nand, or avoid the extra fluff of concat/select wires that # might be created implicitly as part of rtl_all/rtl_any. def convert_val(ix, val): wire = twire(netio[ix]) if val == "0": wire = ~wire return wire cover = command["cover_list"].asList() output_wire = twire(netio[-1]) conjunctions = [] while cover: if len(cover) < 2: msg = ( f'BLIF file with malformed cover set "{command["cover_list"]}"' ) raise PyrtlError(msg) input_plane, output_plane, cover = cover[0], cover[1], cover[2:] if output_plane != "1": msg = ( "Off-set found in the output plane of BLIF cover set " f'"{command["cover_list"]}" (only on-sets are supported)' ) raise PyrtlError(msg) conj = rtl_all( *[ convert_val(ix, val) for ix, val in enumerate(input_plane) if val != "-" ] ) conjunctions.append(conj) output_wire <<= rtl_any(*conjunctions) def extract_latch(subckt, command): def twire(w): return subckt.twire(w) if command["C"] not in ff_clk_set: ff_clk_set.add(command["C"]) # Create register and assign next state to D and output to Q. We ignore # initialization values of 2 (don't care) and 3 (unknown). regname = command["Q"] + "_reg" init_val = command["I"] rval = {"0": 0, "1": 1, "2": None, "3": None}[init_val] flop = Register(bitwidth=1, reset_value=rval) subckt.add_reg(regname, flop) flop.next <<= twire(command["D"]) flop_output = twire(command["Q"]) flop_output <<= flop def extract_flop(subckt, command): # Generate a register like extract_latch, only we're doing so because of a # .subckt rather than a .latch def twire(w): return subckt.twire(w) def opt_twire(w): return twire(w) if w is not None else None if command["C"] not in ff_clk_set: ff_clk_set.add(command["C"]) regname = command["Q"] + "_reg" def flop_next(data, enable, set, reset, prev): return { "$_DFF_P_": lambda: data, "$_DFFE_PN_": lambda: select(~enable, data, prev), "$_DFFE_PP_": lambda: select(enable, data, prev), "$_DFF_PP0_": lambda: select(reset, 0, data), "$_DFF_PP1_": lambda: select(reset, 1, data), "$_DFFE_PP0N_": lambda: select(reset, 0, select(~enable, data, prev)), "$_DFFE_PP0P_": lambda: select(reset, 0, select(enable, data, prev)), "$_DFFE_PP1N_": lambda: select(reset, 1, select(~enable, data, prev)), "$_DFFE_PP1P_": lambda: select(reset, 1, select(enable, data, prev)), "$_DFFSR_PPP": lambda: select(reset, 0, select(set, 1, data)), "$_DFFSRE_PPPN_": lambda: select( reset, 0, select(set, 1, select(~enable, data, prev)) ), "$_DFFSRE_PPPP_": lambda: select( reset, 0, select(set, 1, select(enable, data, prev)) ), "$_SDFF_PN0_": lambda: select(~reset, 0, data), "$_SDFF_PN1_": lambda: select(~reset, 1, data), "$_SDFF_PP0_": lambda: select(reset, 0, data), "$_SDFF_PP1_": lambda: select(reset, 1, data), "$_SDFFE_PN0N_": lambda: select(~reset, 0, select(~enable, data, prev)), "$_SDFFE_PN0P_": lambda: select(~reset, 0, select(enable, data, prev)), "$_SDFFE_PN1N_": lambda: select(~reset, 1, select(~enable, data, prev)), "$_SDFFE_PN1P_": lambda: select(~reset, 1, select(enable, data, prev)), "$_SDFFE_PP0N_": lambda: select(reset, 0, select(~enable, data, prev)), "$_SDFFE_PP0P_": lambda: select(reset, 0, select(enable, data, prev)), "$_SDFFE_PP1N_": lambda: select(reset, 1, select(~enable, data, prev)), "$_SDFFE_PP1P_": lambda: select(reset, 1, select(enable, data, prev)), "$_SDFFCE_PN0N_": lambda: select( ~enable, select(~reset, 0, data), prev ), "$_SDFFCE_PN0P_": lambda: select(enable, select(~reset, 0, data), prev), "$_SDFFCE_PN1N_": lambda: select( ~enable, select(~reset, 1, data), prev ), "$_SDFFCE_PN1P_": lambda: select(enable, select(~reset, 1, data), prev), "$_SDFFCE_PP0N_": lambda: select(~enable, select(reset, 0, data), prev), "$_SDFFCE_PP0P_": lambda: select(enable, select(reset, 0, data), prev), "$_SDFFCE_PP1N_": lambda: select(~enable, select(reset, 1, data), prev), "$_SDFFCE_PP1P_": lambda: select(enable, select(reset, 1, data), prev), }[command.getName()]() # TODO May want to consider setting reset_value in the Register. Right now it's # mainly used to signal how we want to *output* Verilog from PyRTL, and not how # we want to interpret *input* to PyRTL. flop = Register(bitwidth=1) subckt.add_reg(regname, flop) flop.next <<= flop_next( twire(command["D"]), opt_twire(command.get("E")), opt_twire(command.get("S")), opt_twire(command.get("R")), flop, ) flop_output = twire(command["Q"]) flop_output <<= flop def extract_model_reference(parent, command): def twire(w): return parent.twire(w) def get_formal_connected_to_parent_clocks(): clks = set() for fa in command["formal_actual_list"]: if fa["actual"] in parent.clk_set: clks.add(fa["formal"]) return clks formal_clks = get_formal_connected_to_parent_clocks() subckt = Subcircuit( models[command["model_name"]], clk_set=formal_clks, block=block ) instantiate(subckt) for fa in command["formal_actual_list"]: formal = fa["formal"] actual = fa["actual"] if actual in parent.clk_set: assert formal in subckt.clk_set # We didn't create an input wire corresponding to this. continue if formal in subckt.inputs: wf = subckt.inputs[formal] wa = twire(actual) wf <<= wa elif formal in subckt.outputs: wf = subckt.outputs[formal] wa = twire(actual) wa <<= wf else: msg = ( f"{formal} formal parameter is neither an input nor output of " f"subckt {command['model_name']}" ) raise PyrtlError(msg) def instantiate(subckt): extract_inputs(subckt) extract_outputs(subckt) extract_commands(subckt) # Get all model definitions for model in result: if not top_model: top_model = model["model_name"] models[model["model_name"]] = model top = Subcircuit(models[top_model], is_top=True, clk_set={clock_name}, block=block) instantiate(top)
# ---------------------------------------------------------------- # ___ __ __ __ # \ / |__ |__) | | / \ / _` # \/ |___ | \ | |___ \__/ \__> #
[docs] def input_from_verilog( verilog, clock_name: str = "clk", toplevel: str | None = None, leave_in_dir: bool | None = None, block: Block = None, ): """Read an open Verilog file or string as input via `Yosys <https://github.com/YosysHQ/yosys>`_ conversion, updating the block. This function is essentially a wrapper for :func:`input_from_blif`, with the added convenience of turning the Verilog into BLIF for import for you. This function passes a set of commands to Yosys as a script that normally produces BLIF files that can be successfully imported into PyRTL via :func:`input_from_blif`. If the Yosys conversion fails, we recommend you create your own custom Yosys script to try and produce BLIF yourself. Then you can import BLIF directly via :func:`input_from_blif`. :param verilog: An open Verilog file to read. :param clock_name: The name of the clock (defaults to ``"clk"``). :param toplevel: Name of top-level module to instantiate; if ``None``, defaults to first model defined in the Verilog file. :param leave_in_dir: If ``True``, save the intermediate BLIF file created in the given directory. :param block: The block where the logic will be added. Defaults to the :ref:`working_block`. """ # Dev Notes: # 1) We pass in an open file or string to keep the same API as input_from_blif (even # though we are essentially putting that Verilog code back in a file for Yosys to # operate on). # 2) The Yosys script does not have a call to `flatten`, since we now have support # for multi-module BLIFs. `flatten` replaces the .subckt (i.e. the module # inclusions) with their actual contents, so if things aren't working for some # reason, add this command for help. # 3) The Yosys script *does* have a call to `setundef -zero -undriven`, which we use # to set undriven nets to 0; we do this so PyRTL doesn't complain about used but # undriven wires. try: verilog_string = verilog.read() except AttributeError as exc: if isinstance(verilog, str): verilog_string = verilog else: msg = "input_from_verilog expecting either open file or string" raise PyrtlError(msg) from exc block = working_block(block) # Create a temporary Verilog file temp_vd, tmp_verilog_path = tempfile.mkstemp( prefix="pyrtl_verilog", suffix=".v", dir=leave_in_dir, text=True ) with open(tmp_verilog_path, "w") as f: f.write(verilog_string) # Create a temporary BLIF file temp_bd, tmp_blif_path = tempfile.mkstemp( prefix="pyrtl_blif", suffix=".blif", dir=leave_in_dir, text=True ) yosys_arg_template = ( "-p read_verilog %s; synth %s; setundef -zero -undriven; opt; write_blif %s; " ) yosys_arg = yosys_arg_template % ( tmp_verilog_path, ("-top " + toplevel) if toplevel is not None else "-auto-top", tmp_blif_path, ) try: os.close(temp_vd) os.close(temp_bd) # call yosys on the script, and grab the output _yosys_output = subprocess.check_output(["yosys", yosys_arg]) with open(tmp_blif_path) as blif: input_from_blif( blif, block=block, clock_name=clock_name, top_model=toplevel ) except (subprocess.CalledProcessError, ValueError) as exc: print("Error with call to yosys...", file=sys.stderr) print("---------------------------------------------", file=sys.stderr) print(str(exc.output).replace("\\n", "\n"), file=sys.stderr) print("---------------------------------------------", file=sys.stderr) msg = "Yosys call failed" raise PyrtlError(msg) from exc except OSError as exc: print("Error with call to yosys...", file=sys.stderr) msg = "Call to yosys failed (not installed or on path?)" raise PyrtlError(msg) from exc finally: os.remove(tmp_verilog_path) if leave_in_dir is None: os.remove(tmp_blif_path)
class _VerilogSanitizer(_NameSanitizer): _ver_regex = r"[_A-Za-z][_a-zA-Z0-9\$]*$" _verilog_reserved = """always and assign automatic begin buf bufif0 bufif1 case casex casez cell cmos config deassign default defparam design disable edge else end endcase endconfig endfunction endgenerate endmodule endprimitive endspecify endtable endtask event for force forever fork function generate genvar highz0 highz1 if ifnone incdir include initial inout input instance integer join large liblist library localparam macromodule medium module nand negedge nmos nor noshowcancelledno not notif0 notif1 or output parameter pmos posedge primitive pull0 pull1 pulldown pullup pulsestyle_oneventglitch pulsestyle_ondetectglitch remos real realtime reg release repeat rnmos rpmos rtran rtranif0 rtranif1 scalared showcancelled signed small specify specparam strong0 strong1 supply0 supply1 table task time tran tranif0 tranif1 tri tri0 tri1 triand trior trireg unsigned use vectored wait wand weak0 weak1 while wire wor xnor xor """ def __init__(self, internal_prefix): self._verilog_reserved_set = frozenset(self._verilog_reserved.split()) super().__init__(self._ver_regex, internal_prefix, self._extra_checks) def _extra_checks(self, str): return ( str not in self._verilog_reserved_set # is not a Verilog reserved keyword and str != "clk" # not the clock signal and len(str) <= 1024 # not too long to be a Verilog id ) class _VerilogOutput: def __init__(self, block: Block, add_reset: bool | str, module_name="toplevel"): block = working_block(block) self.gate_graph = GateGraph(block) self.add_reset = add_reset self.module_name = module_name if not isinstance(self.add_reset, bool) and self.add_reset != "asynchronous": msg = ( f"Invalid add_reset option {self.add_reset}. Acceptable options are " "False, True, and 'asynchronous'" ) raise PyrtlError(msg) if self.add_reset and self.gate_graph.get_gate("rst") is not None: msg = ( "Found a user-defined wire named 'rst'. Pass in 'add_reset=False' to " "use your existing reset logic." ) raise PyrtlError(msg) self.internal_names = _VerilogSanitizer("_ver_out_tmp_") def gate_key(gate: Gate) -> str: """Sort Gates by name. MemBlocks have no name, so use their ``wr_en`` name instead. """ if gate.name: return gate.name return gate.args[2].name # Sanitize all Gate names. for gate in sorted(self.gate_graph.gates, key=gate_key): if gate.name: self.internal_names.make_valid_string(gate.name) self.inputs = self._name_sorted(self.gate_graph.inputs) self.outputs = self._name_sorted(self.gate_graph.outputs) self.io_list = [ "clk", *[self._verilog_name(input.name) for input in self.inputs], *[self._verilog_name(output.name) for output in self.outputs], ] if self.add_reset: self.io_list.insert(1, "rst") if any(io_name.startswith("tmp") for io_name in self.io_list): msg = 'input or output with name starting with "tmp" indicates unnamed IO' raise PyrtlError(msg) self.registers = self._name_sorted(self.gate_graph.registers) # List of all unique MemBlocks and RomBlocks, sorted by memid. self.all_memblocks = sorted( # Build a set of unique MemBlocks first, to avoid duplicates. { mem_gate.mem for mem_gate in self.gate_graph.mem_reads | self.gate_graph.mem_writes }, key=lambda memblock: memblock.id, ) # Sanitize all MemBlock names. for memblock in self.all_memblocks: self.internal_names.make_valid_string(memblock.name) # List of unique MemBlocks (not RomBlocks!), sorted by memid. self.memblocks = [ memblock for memblock in self.all_memblocks if type(memblock) is MemBlock ] # List of unique RomBlocks (not MemBlocks!), sorted by memid. self.romblocks = [ romblock for romblock in self.all_memblocks if isinstance(romblock, RomBlock) ] def _verilog_name(self, name: str) -> str: """Return the sanitized Verilog identifier name for ``name``.""" return self.internal_names[name] def _name_sorted(self, gates: set[Gate]) -> list[Gate]: def name_mapper(gate: Gate) -> str: if gate.name is None: # MemBlock writes have no name, so instead sort by the name of # ``wr_en``, since this particular net is used within 'always begin ... # end' blocks for memory update logic. return gate.args[2].name return self._verilog_name(gate.name) return _name_sorted(gates, name_mapper=name_mapper) def _verilog_size(self, bitwidth: int) -> str: """Return a Verilog size declaration for ``bitwidth`` bits.""" return "" if bitwidth == 1 else f"[{bitwidth - 1}:0]" def _is_sliced(self, gate: Gate) -> bool: """Return True iff gate has bitwidth 2 or more, and is an arg to a bit-slice. Such gates must be declared, because Verilog's bit-selection operator ``[]`` only works on wires and registers, not arbitrary expressions. """ return ( gate.bitwidth and gate.bitwidth > 1 and any(dest.op == "s" for dest in gate.dests) ) def _should_declare_const(self, gate: Gate) -> bool: """Determine if we should declare a constant Verilog wire for ``gate``. This function determines which constant Gates will be declared in Verilog. Any constant gate without a corresponding Verilog declaration will be inlined. Constant Verilog gates are declared for: 1. Named constant ``Gates``. 2. ``Gates`` that are ``args`` for a ``s`` bit-selection ``Gate``, because Verilog's bit-selection operator ``[]`` only works on wires and registers, not arbitrary expressions. """ is_const = gate.op == "C" is_named = not gate.name.startswith("const_") is_sliced = self._is_sliced(gate) return is_const and (is_named or is_sliced) def _should_declare_wire(self, gate: Gate) -> bool: """Determine if we should declare a temporary Verilog wire for ``gate``. This function determines which temporary Gates will be declared in Verilog. Any temporary gate without a corresponding Verilog declaration will be inlined. Temporary Verilog wires are never declared for Outputs, Inputs, Consts, or Registers, because those declarations are handled separately by ``_to_verilog_header``. Temporary Verilog wires are never declared for memory writes, because writes generate no output. Otherwise, temporary Verilog wires are declared for: 1. Named ``Gates``. 2. ``Gates`` with multiple users. 3. ``MemBlock`` reads, because ``_to_verilog_memories`` expects a declaration. 4. ``Gates`` that are ``args`` for a ``s`` bit-selection ``Gate``, because Verilog's bit-selection operator ``[]`` only works on wires and registers, not arbitrary expressions. """ excluded = gate.is_output or gate.op in "ICr@" is_named = gate.name and not gate.name.startswith("tmp") multiple_users = len(gate.dests) > 1 is_read = gate.op == "m" is_sliced = self._is_sliced(gate) return not excluded and (is_named or multiple_users or is_read or is_sliced) def _name_and_comment(self, name: str, kind="") -> tuple[str, str]: """Return the sanitized version of ``name`` and a Verilog comment with the un-sanitized name. If ``kind`` is provided, it will always be included in the comment. """ sanitized_name = self._verilog_name(name) if kind: if sanitized_name != name: comment = f" // {kind} {name}" else: comment = f" // {kind}" else: if sanitized_name != name: comment = f" // {name}" else: comment = "" return sanitized_name, comment def _to_verilog_header(self, file: IO, initialize_registers: bool): """Print the header of the verilog implementation.""" print("// Generated automatically via PyRTL", file=file) print("// As one initial test of synthesis, map to FPGA with:", file=file) print( f'// yosys -p "synth_xilinx -top {self.module_name}" thisfile.v\n', file=file, ) # ``declared_gates`` is the set of Gates with corresponding Verilog reg/wire # declarations. Generated Verilog code can refer to these Gates by name. self.declared_gates = self.gate_graph.inputs | self.gate_graph.outputs # Module name. print(f"module {self.module_name}({', '.join(self.io_list)});", file=file) # Declare Inputs and Outputs. print(" input clk;", file=file) if self.add_reset: print(" input rst;", file=file) for input_gate in self.inputs: sanitized_name, comment = self._name_and_comment(input_gate.name) print( f" input{self._verilog_size(input_gate.bitwidth)} {sanitized_name};" f"{comment}", file=file, ) for output_gate in self.outputs: sanitized_name, comment = self._name_and_comment(output_gate.name) print( f" output{self._verilog_size(output_gate.bitwidth)} " f"{sanitized_name};{comment}", file=file, ) # Declare MemBlocks and RomBlocks. if self.all_memblocks: print("\n // Memories", file=file) for memblock in self.all_memblocks: kind = "MemBlock" if isinstance(memblock, RomBlock): kind = "RomBlock" sanitized_name, comment = self._name_and_comment(memblock.name, kind) print( f" reg{self._verilog_size(memblock.bitwidth)} " f"{sanitized_name}{self._verilog_size(1 << memblock.addrwidth)};" f"{comment}", file=file, ) # Declare Registers. self.declared_gates |= self.gate_graph.registers if self.registers: print("\n // Registers", file=file) for reg_gate in self.registers: register_initialization = "" if initialize_registers: reset_value = 0 if reg_gate.reset_value is not None: reset_value = reg_gate.reset_value register_initialization = f" = {reg_gate.bitwidth}'d{reset_value}" sanitized_name, comment = self._name_and_comment(reg_gate.name) print( f" reg{self._verilog_size(reg_gate.bitwidth)} " f"{sanitized_name}{register_initialization};{comment}", file=file, ) # Declare constants. const_gates = [] for const_gate in self._name_sorted(self.gate_graph.consts): if self._should_declare_const(const_gate): const_gates.append(const_gate) self.declared_gates |= set(const_gates) if const_gates: print("\n // Constants", file=file) for const_gate in const_gates: sanitized_name, comment = self._name_and_comment(const_gate.name) print( f" wire{self._verilog_size(const_gate.bitwidth)} " f"{sanitized_name} = {const_gate.bitwidth}'d{const_gate.const_value};" f"{comment}", file=file, ) # Declare any needed temporary wires. temp_gates = [] for gate in self.gate_graph: if self._should_declare_wire(gate): temp_gates.append(gate) temp_gates = self._name_sorted(temp_gates) self.declared_gates |= set(temp_gates) if temp_gates: print("\n // Temporaries", file=file) for temp_gate in temp_gates: sanitized_name, comment = self._name_and_comment(temp_gate.name) print( f" wire{self._verilog_size(temp_gate.bitwidth)} {sanitized_name};" f"{comment}", file=file, ) # Write the initial values for read-only memories. If we ever add support # outside of simulation for initial values for MemBlocks, that would also go # here. if self.romblocks: print("\n // Read-only memory data", file=file) for romblock in self.romblocks: print(" initial begin", file=file) for addr in range(1 << romblock.addrwidth): print( f" {self._verilog_name(romblock.name)}[{addr}] = " f"{romblock.bitwidth}'h{romblock._get_read_data(addr):x};", file=file, ) print(" end", file=file) # combinational_gates is the set of Gates that must be assigned by # ``_to_verilog_combinational``. self.combinational_gates = ( self.gate_graph.outputs | set(temp_gates) - self.gate_graph.mem_reads ) def _verilog_expr(self, gate: Gate, lhs: Gate | None = None) -> str: """Returns a Verilog expression for ``gate`` and its arguments, recursively. The returned expression will be used as the right hand side ("rhs") of assignment statements, and inputs for registers and memories. :param lhs: Left-hand side of the assignment statement. This determines when we return the name of a declared gate, and when we return the expression that specifies the declared gate's value. For example, suppose we have a ``declared_gate`` that says ``x = a + b``. If we are currently processing another Gate that says ``y = x - 2``, we could emit ``y = x - 2`` or ``y = (a + b) - 2``. ``x`` is a ``declared_gate``, so we must emit ``y = x - 2``. So when we generate the ``_verilog_expr`` for ``x`` in this scenario, we know to just emit ``x`` because ``x`` is a ``declared_gate``, and we are not currently defining ``x``, because the assignment's ``lhs`` is not ``x``. On the other hand, if we are currently processing the Gate ``x = a + b``, we must emit ``x = a + b`` instead of the unhelpful ``x = x``. We emit the former, rather than the latter, for the assignment's right-hand side because the assignment's ``lhs`` is ``x`` """ if gate in self.declared_gates and lhs is not gate: # If a Verilog wire/reg has been declared for the gate, and we are not # currently defining the Gate's value, just return the wire's Verilog name. return self._verilog_name(gate.name) if gate.op == "C": # Return the constant's Verilog value. return f"{gate.bitwidth}'d{gate.const_value}" # Convert each of the Gate's args to a Verilog expression. verilog_args = [self._verilog_expr(arg, lhs) for arg in gate.args] # Return an expression that combines ``verilog_args`` with the appropriate # Verilog operator for ``gate``. if gate.op == "w": return verilog_args[0] if gate.op == "~": return f"~({verilog_args[0]})" if gate.op in "&|^+-*<>": return f"({verilog_args[0]} {gate.op} {verilog_args[1]})" if gate.op == "=": return f"({verilog_args[0]} == {verilog_args[1]})" if gate.op == "x": return f"({verilog_args[0]} ? {verilog_args[2]} : {verilog_args[1]})" if gate.op == "c": if len(verilog_args) == 1: return verilog_args[0] return f"{{{', '.join(verilog_args)}}}" if gate.op == "s": selections = [] for sel in reversed(gate.sel): if gate.args[0].bitwidth == 1: selections.append(verilog_args[0]) else: selections.append(f"{verilog_args[0]}[{sel}]") if len(gate.sel) == 1: return f"({selections[0]})" # Special case: slicing multiple copies of the same gate. if all(sel == gate.sel[0] for sel in gate.sel): return f"{{{len(selections)} {{{selections[0]}}}}}" # Special case: slicing a consecutive subset. if tuple(range(gate.sel[0], gate.sel[-1] + 1)) == gate.sel: return f"({verilog_args[0]}[{gate.sel[-1]}:{gate.sel[0]}])" return f"{{{', '.join(selections)}}}" msg = f"Unimplemented op {gate.op} in Gate {gate}" raise PyrtlError(msg) def _to_verilog_combinational(self, file: IO): """Generate Verilog combinational logic. Emits combinational Verilog logic for ``combinational_gates``. This function only generates assignments for Outputs and temporaries. The other wires and regs declared by ``_to_verilog_header`` are handled elsewhere: - Constant assignments are handled by ``_to_verilog_header``. - Register assignments are handled by ``_to_verilog_sequential``. - MemBlock reads are handled by ``_to_verilog_memories``. :param declared_gates: Set of Gates with corresponding Verilog wire/reg declarations. Generated Verilog code can refer to these Gates by name. :param combinational_gates: Set of Gates that must be assigned by ``_to_verilog_combinational``. """ if self.combinational_gates: print("\n // Combinational logic", file=file) for assignment_gate in self._name_sorted(self.combinational_gates): print( f" assign {self._verilog_name(assignment_gate.name)} = " f"{self._verilog_expr(assignment_gate, lhs=assignment_gate)};", file=file, ) def _to_verilog_sequential(self, file: IO): """Print the sequential logic of the verilog implementation.""" if not self.gate_graph.registers: return print("\n // Register logic", file=file) if self.add_reset == "asynchronous": print(" always @(posedge clk or posedge rst) begin", file=file) else: print(" always @(posedge clk) begin", file=file) if self.add_reset: print(" if (rst) begin", file=file) for register in self._name_sorted(self.gate_graph.registers): reset_value = register.reset_value print( f" {self._verilog_name(register.name)} <= " f"{register.bitwidth}'d{reset_value};", file=file, ) print(" end else begin", file=file) indent = " " else: indent = "" for register in self._name_sorted(self.gate_graph.registers): print( f" {indent}{self._verilog_name(register.name)} <= " f"{self._verilog_expr(register.args[0])};", file=file, ) if self.add_reset: print(" end", file=file) print(" end", file=file) def _to_verilog_memories(self, file: IO): """Generate Verilog logic for MemBlock and RomBlock reads and writes.""" for memblock in self.all_memblocks: kind = "MemBlock" if isinstance(memblock, RomBlock): kind = "RomBlock" print(f"\n // {kind} {memblock.name} logic", file=file) # Find writes to ``memblock``. write_gates = [] for write_gate in self._name_sorted(self.gate_graph.mem_writes): if write_gate.mem is memblock: write_gates.append(write_gate) if write_gates: print(" always @(posedge clk) begin", file=file) for write_gate in write_gates: enable = write_gate.args[2] verilog_enable = self._verilog_expr(write_gate.args[2]) verilog_addr = self._verilog_expr(write_gate.args[0]) verilog_rhs = self._verilog_expr(write_gate.args[1]) # Simplify the assignment if the enable bit is a constant ``1``. if enable.op == "C" and enable.const_value == 1: print( f" {self._verilog_name(memblock.name)}" f"[{verilog_addr}] <= {verilog_rhs};", file=file, ) else: print( f" if ({verilog_enable}) begin\n" f" {self._verilog_name(memblock.name)}" f"[{verilog_addr}] <= {verilog_rhs};\n" " end", file=file, ) print(" end", file=file) # Find reads from ``memblock``. The ``read_gate`` should have been declared # by ``_to_verilog_header``. read_gates = [] for read_gate in self._name_sorted(self.gate_graph.mem_reads): if read_gate.mem is memblock: read_gates.append(read_gate) for read_gate in self._name_sorted(read_gates): print( f" assign {self._verilog_name(read_gate.name)} = " f"{self._verilog_name(memblock.name)}" f"[{self._verilog_expr(read_gate.args[0])}];", file=file, ) def _to_verilog_footer(self, file: IO): print("endmodule", file=file) def output_to_verilog(self, dest_file: IO, initialize_registers: bool): self._to_verilog_header(dest_file, initialize_registers) self._to_verilog_combinational(dest_file) self._to_verilog_sequential(dest_file) self._to_verilog_memories(dest_file) self._to_verilog_footer(dest_file) def output_verilog_testbench( self, dest_file: IO, simulation_trace: SimulationTrace = None, toplevel_include: str | None = None, vcd: str = "waveform.vcd", cmd: str | None = None, ): # Output an include, if given. if toplevel_include: print(f'`include "{toplevel_include}"', file=dest_file) print(file=dest_file) # Output header. print("module tb();", file=dest_file) # Declare all block inputs as reg. print(" reg clk;", file=dest_file) if self.add_reset: print(" reg rst;", file=dest_file) if self.inputs: print("\n // block Inputs", file=dest_file) for input_gate in self.inputs: sanitized_name, comment = self._name_and_comment(input_gate.name) print( f" reg{self._verilog_size(input_gate.bitwidth)} {sanitized_name};" f"{comment}", file=dest_file, ) # Declare all block outputs as wires. if self.outputs: print("\n // block Outputs", file=dest_file) for output_gate in self.outputs: sanitized_name, comment = self._name_and_comment(output_gate.name) print( f" wire{self._verilog_size(output_gate.bitwidth)} {sanitized_name};" f"{comment}", file=dest_file, ) print(file=dest_file) # Declare an integer for MemBlock initialization. if len(self.memblocks) > 0: print(" integer tb_addr;", file=dest_file) io_list_str = [f".{io}({io})" for io in self.io_list] print( f" {self.module_name} block({', '.join(io_list_str)});\n", file=dest_file ) # Generate the clock signal. print(" always", file=dest_file) print(" #5 clk = ~clk;\n", file=dest_file) # Generate Input assignments for each cycle in the trace. print(" initial begin", file=dest_file) # If a VCD output is requested, set that up. if vcd: print(f' $dumpfile ("{vcd}");', file=dest_file) print(" $dumpvars;\n", file=dest_file) # Initialize clk, and all the registers and memories. print(" clk = 1'd0;", file=dest_file) if self.add_reset: print(" rst = 1'd0;", file=dest_file) def default_value() -> int: """Returns the Simulation's default value for Registers and MemBlocks.""" if not simulation_trace: return 0 return simulation_trace.default_value # simulation_trace.register_value_map maps from Register to initial value. Make # a copy that maps from Register name to initial value. register_value_map = {} if simulation_trace: register_value_map = { register.name: value for register, value in simulation_trace.register_value_map.items() } if self.registers: print("\n // Initialize Registers", file=dest_file) for reg_gate in self.registers: # Try using register_value_map first. initial_value = register_value_map.get(reg_gate.name) # If that didn't work, use the Register's reset_value. if not initial_value: initial_value = reg_gate.reset_value # If there is no reset_value, use the default_value(). if not initial_value: initial_value = default_value() print( f" block.{self._verilog_name(reg_gate.name)} = " f"{reg_gate.bitwidth}'d{initial_value};", file=dest_file, ) # Initialize MemBlocks. if self.memblocks: print("\n // Initialize MemBlocks", file=dest_file) for memblock in self.memblocks: max_addr = 1 << memblock.addrwidth print( f" for (tb_addr = 0; tb_addr < {max_addr}; tb_addr++) " f"begin block.{self._verilog_name(memblock.name)}[tb_addr] = " f"{memblock.bitwidth}'d{default_value()}; end", file=dest_file, ) if not simulation_trace: continue memory_value_map = simulation_trace.memory_value_map.get(memblock) if not memory_value_map: continue for addr, initial_data in memory_value_map.items(): # The generated Verilog ``for`` loop above just initialized every # address in the ``MemBlock`` to ``default_value()``, so skip redundant # initializations. if initial_data == default_value(): continue print( f" block.{self._verilog_name(memblock.name)}[{addr}] = " f"{memblock.bitwidth}'d{initial_data};", file=dest_file, ) # Set Input values for each cycle. if simulation_trace: tracelen = max(len(t) for t in simulation_trace.trace.values()) for i in range(tracelen): for input_gate in self.inputs: input_value = simulation_trace.trace[input_gate.name][i] print( f" {self._verilog_name(input_gate.name)} = " f"{input_gate.bitwidth}'d{input_value};", file=dest_file, ) print("\n #10", file=dest_file) if cmd: print(f" {cmd}", file=dest_file) # Footer. print(" $finish;", file=dest_file) print(" end", file=dest_file) print("endmodule", file=dest_file)
[docs] def output_to_verilog( dest_file: IO, add_reset: bool | str = True, block: Block = None, initialize_registers: bool = False, module_name: str = "toplevel", ): """A function to walk the ``block`` and output it in Verilog format to the open file. The Verilog module will be named ``toplevel``, with a clock input named ``clk``. When possible, wires keep their names in the Verilog output. Wire names that do not satisfy Verilog's naming requirements, and wires that conflict with Verilog keywords are given new temporary names in the Verilog output. :param dest_file: Open file where the Verilog output will be written. :param add_reset: If reset logic should be added. Allowable options are: ``False`` (meaning no reset logic is added), ``True`` (default, for adding synchronous reset logic), and ``'asynchronous'`` (for adding asynchronous reset logic). The reset input will be named ``rst``, and when ``rst`` is high, registers will be reset to their ``reset_value``. :param initialize_registers: Initialize Verilog registers to their ``reset_value``. When this argument is ``True``, a register like ``Register(name='foo', bitwidth=8, reset_value=4)`` generates Verilog like ``reg[7:0] foo = 8'd4;``. :param block: Block to be walked and exported. Defaults to the :ref:`working_block`. :param module_name: name of the module. Defaults to toplevel if the user puts nothing. """ _VerilogOutput(block, add_reset, module_name=module_name).output_to_verilog( dest_file, initialize_registers )
[docs] def output_verilog_testbench( dest_file: IO, simulation_trace: SimulationTrace = None, toplevel_include: str | None = None, vcd: str = "waveform.vcd", cmd: str | None = None, add_reset: bool | str = True, block: Block = None, module_name: str = "toplevel", ): """Output a Verilog testbench for the block/inputs used in the simulation trace. If ``add_reset`` is ``True``, a ``rst`` input wire is added to the instantiated ``toplevel`` module. The ``rst`` wire will be held low in the testbench, because initialization here occurs via the ``initial`` block. ``add_reset`` is provided for consistency with :func:`output_to_verilog`. This function *only* generates the Verilog testbench. The Verilog module must be generated separately by calling :func:`output_to_verilog`, see the ``toplevel_include`` parameter and Example 2 below. The test bench does not return any values. Example 1, writing testbench to a string:: with io.StringIO() as tbfile: pyrtl.output_verilog_testbench(dest_file=tbfile, simulation_trace=sim_trace) Example 2, testbench in same file as Verilog:: with open('hardware.v', 'w') as fp: output_to_verilog(fp) output_verilog_testbench( fp, sim.tracer, vcd=None, cmd='$display("%d", out);') :param dest_file: An open file to which the test bench will be printed. :param simulation_trace: A trace from which the inputs will be extracted for inclusion in the test bench. This is typically :attr:`Simulation.tracer`. The generated test bench will replay the :class:`Simulation`'s :class:`Inputs<Input>` cycle by cycle. The default values for all registers and memories will be based on the trace, otherwise they will be initialized to 0. :param toplevel_include: Name of the file containing the ``toplevel`` module this testbench is testing. If not ``None``, an ``include`` directive will be added for this file. :param vcd: By default, the testbench generator will generate a command to write the output of the testbench execution to a ``.vcd`` file, via ``$dumpfile``, and ``vcd`` is the name of the file to write. If ``None``, then no ``dumpfile`` will be used. :param cmd: The string passed as ``cmd`` will be copied verbatim into the testbench just before the end of each cycle. This is useful for doing things like printing specific values during testbench evaluation. For example:: cmd='$display("%d", out);' will instruct the testbench to print the value of ``out`` every cycle. :param add_reset: If reset logic should be added. Allowable options are: ``False`` (meaning no reset logic is added), ``True`` (default, for adding synchronous reset logic), and ``'asynchronous'`` (for adding asynchronous reset logic). The value passed in here should match the argument passed to :func:`output_to_verilog`. :param block: Block containing design to test. Defaults to the :ref:`working_block`. :param module_name: Name of the module the user chooses. Defaults to toplevel if nothing is inputted. """ _VerilogOutput(block, add_reset, module_name=module_name).output_verilog_testbench( dest_file, simulation_trace, toplevel_include, vcd, cmd )
# ---------------------------------------------------------------- # ___ __ __ ___ # |___ | |__) |__) | | # | | | \ | \ | |___ #
[docs] def output_to_firrtl( open_file, rom_blocks: list[RomBlock] | None = None, block: Block = None ): """Output the block as FIRRTL code to the output file. If ROM is initialized in PyRTL code, you can pass in the :class:`RomBlocks` as a list ``[rom1, rom2, ...]``. :param open_file: File to write to. :param rom_blocks: List of :class:`RomBlocks<RomBlock>` to be initialized. :param block: Block to use (defaults to :ref:`working_block`). """ block = working_block(block) # FIRRTL only allows 'bits' operations to have two parameters: a high and low index # representing the inclusive bounds of a contiguous range. PyRTL uses slice syntax, # which aren't always contiguous, so we need to convert them. one_bit_selects(block=block) # FIRRTL only allows 'concatenate' operations to have two arguments, but PyRTL's 'c' # op allows an arbitrary number of wires. We need to convert these n-way concats to # series of two-way concats accordingly. two_way_concat(block=block) f = open_file # write out all the implicit stuff f.write("circuit Example :\n") f.write(" module Example :\n") f.write(" input clock : Clock\n input reset : UInt<1>\n") # write out IO signals, wires and registers for wire in _name_sorted(block.wirevector_subset(Input)): f.write(f" input {wire.name} : UInt<{wire.bitwidth}>\n") for wire in _name_sorted(block.wirevector_subset(Output)): f.write(f" output {wire.name} : UInt<{wire.bitwidth}>\n") for wire in _name_sorted( block.wirevector_subset(exclude=(Input, Output, Register, Const)) ): f.write(f" wire {wire.name} : UInt<{wire.bitwidth}>\n") for wire in _name_sorted(block.wirevector_subset(Register)): f.write(f" reg {wire.name} : UInt<{wire.bitwidth}>, clock\n") for wire in _name_sorted(block.wirevector_subset(Const)): # some const is in the form like const_0_1'b1, is this legal operation? wire.name = wire.name.split("'").pop(0) f.write(f" node {wire.name} = UInt<{wire.bitwidth}>({wire.val})\n") f.write("\n") # write "Main" node_cntr = 0 initializedMem = [] for log_net in _net_sorted(block.logic_subset()): if log_net.op == "&": f.write( f" {log_net.dests[0].name} <= " f"and({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "|": f.write( f" {log_net.dests[0].name} <= " f"or({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "^": f.write( f" {log_net.dests[0].name} <= " f"xor({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "n": f.write( f" node T_{node_cntr} = " f"and({log_net.args[0].name}, {log_net.args[1].name})\n" ) f.write(f" {log_net.dests[0].name} <= not(T_{node_cntr})\n") node_cntr += 1 elif log_net.op == "~": f.write(f" {log_net.dests[0].name} <= not({log_net.args[0].name})\n") elif log_net.op == "+": f.write( f" {log_net.dests[0].name} <= " f"add({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "-": f.write( f" {log_net.dests[0].name} <= " f"sub({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "*": f.write( f" {log_net.dests[0].name} <= " f"mul({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "=": f.write( f" {log_net.dests[0].name} <= " f"eq({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "<": f.write( f" {log_net.dests[0].name} <= " f"lt({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == ">": f.write( f" {log_net.dests[0].name} <= " f"gt({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "w": f.write(f" {log_net.dests[0].name} <= {log_net.args[0].name}\n") elif log_net.op == "x": f.write( f" {log_net.dests[0].name} <= mux({log_net.args[0].name}, " f"{log_net.args[2].name}, {log_net.args[1].name})\n" ) elif log_net.op == "c": if len(log_net.args) != 2: msg = ( "Expected concat net to have only two argument wires; has " f"{len(log_net.args)}" ) raise PyrtlInternalError(msg) f.write( f" {log_net.dests[0].name} <= " f"cat({log_net.args[0].name}, {log_net.args[1].name})\n" ) elif log_net.op == "s": if len(log_net.op_param) != 1: msg = ( "Expected select net to have single select bit; has " f"{len(log_net.op_param)}" ) raise PyrtlInternalError(msg) f.write( f" {log_net.dests[0].name} <= bits({log_net.args[0].name}, " f"{log_net.op_param[0]}, {log_net.op_param[0]})\n" ) elif log_net.op == "r": f.write( f" {log_net.dests[0].name} <= mux(reset, " f"UInt<{log_net.dests[0].bitwidth}>(0), {log_net.args[0].name})\n" ) elif log_net.op == "m": # if there are rom blocks, need to be initialized if rom_blocks is not None: if log_net.op_param[0] not in initializedMem: initializedMem.append(log_net.op_param[0]) # find corresponding rom block according to memid curr_rom = next( (x for x in rom_blocks if x.id == log_net.op_param[0]), None ) f.write( f" wire {log_net.op_param[1].name} : " f"UInt<{log_net.op_param[1].bitwidth}>" f"[{2 ** log_net.op_param[1].addrwidth}]\n" ) # if rom data is a function, calculate the data first if callable(curr_rom.data): romdata = [ curr_rom.data(i) for i in range(2**curr_rom.addrwidth) ] curr_rom.data = romdata # write rom block initialization data for i in range(len(curr_rom.data)): f.write( f" {log_net.op_param[1].name}[{i}] <= " f"UInt<{log_net.op_param[1].bitwidth}>" f"({curr_rom.data[i]})\n" ) # write the connection f.write( f" {log_net.dests[0].name} <= " f"{log_net.op_param[1].name}[{log_net.args[0].name}]\n" ) else: if log_net.op_param[0] not in initializedMem: initializedMem.append(log_net.op_param[0]) f.write( f" cmem {log_net.op_param[1].name}_{log_net.op_param[0]} : " f"UInt<{log_net.op_param[1].bitwidth}>" f"[{2 ** log_net.op_param[1].addrwidth}]\n" ) f.write( f" infer mport T_{node_cntr} = {log_net.op_param[1].name}_" f"{log_net.op_param[0]}[{log_net.args[0].name}], clock\n" ) f.write(f" {log_net.dests[0].name} <= T_{node_cntr}\n") node_cntr += 1 elif log_net.op == "@": if log_net.op_param[0] not in initializedMem: initializedMem.append(log_net.op_param[0]) f.write( f" cmem {log_net.op_param[1].name}_{log_net.op_param[0]} : " f"UInt<{log_net.op_param[1].bitwidth}>" f"[{2 ** log_net.op_param[1].addrwidth}]\n" ) f.write(f" when {log_net.args[2].name} :\n") f.write( f" infer mport T_{node_cntr} = {log_net.op_param[1].name}_" f"{log_net.op_param[0]}[{log_net.args[0].name}], clock\n" ) f.write(f" T_{node_cntr} <= {log_net.args[1].name}\n") f.write(" skip\n") node_cntr += 1 else: pass return 0
# ----------------------------------------------------------------- # __ __ __ __ ___ __ # | /__` / /\ /__` |__) |__ |\ | / |__| # | .__/ \__ /~~\ .__/ |__) |___ | \| \__ | | def input_from_iscas_bench(bench, block: Block = None): """Import an ISCAS .bench file :param bench: an open ISCAS .bench file to read :param block: block to add the imported logic (defaults to current :ref:`working_block`) """ import pyparsing from pyparsing import ( Group, Keyword, Literal, OneOrMore, Suppress, Word, ZeroOrMore, one_of, ) block = working_block(block) try: bench_string = bench.read() except AttributeError as exc: if isinstance(bench, str): bench_string = bench else: msg = "input_from_bench expecting either open file or string" raise PyrtlError(msg) from exc def SKeyword(x): return Suppress(Keyword(x)) def SLiteral(x): return Suppress(Literal(x)) # NOTE: The acceptable signal characters are based on viewing the I/O names in the # available ISCAS benchmark files, and may not be complete. signal_start = pyparsing.alphas + pyparsing.nums + "$:[]_<>\\/?" signal_middle = pyparsing.alphas + pyparsing.nums + "$:[]_<>\\/.?-" signal_id = Word(signal_start, signal_middle) gate_names = "AND OR NAND NOR XOR NOT BUFF DFF" src_list = Group(signal_id + ZeroOrMore(SLiteral(",") + signal_id))("src_list") net_def = Group( signal_id("dst") + SLiteral("=") + one_of(gate_names)("gate") + SLiteral("(") + src_list + SLiteral(")") )("net_def") input_def = Group(SKeyword("INPUT") + SLiteral("(") + signal_id + SLiteral(")"))( "input_def" ) output_def = Group(SKeyword("OUTPUT") + SLiteral("(") + signal_id + SLiteral(")"))( "output_def" ) command_def = input_def | output_def | net_def commands = OneOrMore(command_def)("command_list") parser = commands.ignore(pyparsing.pythonStyleComment) # Begin actually reading and parsing the BENCH file result = parser.parseString(bench_string, parseAll=True) output_to_internal = {} # dict: name -> wire def twire(name): """Find or make wire named 'name' and return it.""" w = output_to_internal.get(name) if w is None: w = block.wirevector_by_name.get(name) if w is None: w = WireVector(bitwidth=1, name=name) return w for cmd in result["command_list"]: if cmd.getName() == "input_def": _wire_in = Input(bitwidth=1, name=str(cmd[0]), block=block) elif cmd.getName() == "output_def": # Create internal wire for indirection, since Outputs can't be inputs to # nets in PyRTL wire_internal = WireVector(bitwidth=1, block=block) wire_out = Output(bitwidth=1, name=str(cmd[0]), block=block) wire_out <<= wire_internal output_to_internal[cmd[0]] = wire_internal elif cmd.getName() == "net_def": srcs = cmd["src_list"] if cmd["gate"] == "AND": dst_wire = twire(cmd["dst"]) dst_wire <<= twire(srcs[0]) & twire(srcs[1]) elif cmd["gate"] == "OR": dst_wire = twire(cmd["dst"]) dst_wire <<= twire(srcs[0]) | twire(srcs[1]) elif cmd["gate"] == "NAND": dst_wire = twire(cmd["dst"]) dst_wire <<= twire(srcs[0]).nand(twire(srcs[1])) elif cmd["gate"] == "NOR": dst_wire = twire(cmd["dst"]) dst_wire <<= ~(twire(srcs[0]) | twire(srcs[1])) elif cmd["gate"] == "XOR": dst_wire = twire(cmd["dst"]) dst_wire <<= twire(srcs[0]) ^ twire(srcs[1]) elif cmd["gate"] == "NOT": dst_wire = twire(cmd["dst"]) dst_wire <<= ~twire(srcs[0]) elif cmd["gate"] == "BUFF": dst_wire = twire(cmd["dst"]) dst_wire <<= twire(srcs[0]) elif cmd["gate"] == "DFF": dst_wire = twire(cmd["dst"]) reg = Register(bitwidth=1) reg.next <<= twire(srcs[0]) dst_wire <<= reg else: msg = f"Unexpected gate {{{cmd['gate']}}}" raise PyrtlError(msg) # Benchmarks like c1196, b18, etc. have inputs and outputs by the same name, that # are therefore directly connected. This pass will rename the outputs so that this # is still okay. for o in block.wirevector_subset(Output): inputs = [i for i in block.wirevector_subset(Input) if i.name == o.name] if inputs: if len(inputs) > 1: msg = f"More than one input found with the name {inputs[0].name}" raise PyrtlError(msg) i = inputs[0] o_internal = twire(o.name) o_internal <<= i o.name = next_tempvar_name() # Ensure the input is the one mapped by the original name block.wirevector_by_name[i.name] = i print( "Found input and output wires with the same name. " f"Output '{i.name}' has now been renamed to '{o.name}'." )