Source code for pyrtl.analysis

"""
Contains functions to estimate aspects of blocks (like area and delay) by either using
internal models or by making calls out to external tool chains.
"""

from __future__ import annotations

import collections
import math
import os
import re
import subprocess
import sys
import tempfile
from collections.abc import Callable, Iterable

from pyrtl.core import Block, LogicNet, working_block
from pyrtl.helperfuncs import _currently_in_jupyter_notebook, _print_netlist_latex
from pyrtl.importexport import output_to_verilog
from pyrtl.memory import RomBlock
from pyrtl.pyrtlexceptions import PyrtlError, PyrtlInternalError
from pyrtl.wire import Const, Input, Output, Register, WireVector

# --------------------------------------------------------------------
#         __   ___          ___  __  ___              ___    __
#    /\  |__) |__   /\     |__  /__`  |  |  |\/|  /\   |  | /  \ |\ |
#   /~~\ |  \ |___ /~~\    |___ .__/  |  |  |  | /~~\  |  | \__/ | \|
#


[docs] def area_estimation(tech_in_nm: float = 130, block=None) -> tuple[float, float]: """Estimates the total area of the block. The estimations are based off of 130nm standard cell designs for the logic, and custom memory blocks from the literature. The results are not fully validated and we do not recommend that this function be used in carrying out science for publication. :param tech_in_nm: the size of the circuit technology to be estimated (for example, 65 is 65nm and 250 is 0.25um) :return: tuple of estimated areas (logic, mem) in terms of mm^2 """ def mem_area_estimate(tech_in_nm, bits, ports, is_rom): # http://www.cs.ucsb.edu/~sherwood/pubs/ICCD-srammodel.pdf # ROM is assumed to be 1/10th of area of SRAM tech_in_um = tech_in_nm / 1000.0 area_estimate = 0.001 * tech_in_um**2.07 * bits**0.9 * ports**0.7 + 0.0048 return area_estimate if not is_rom else area_estimate / 10.0 # Subset of the raw data gathered from yosys, mapping to vsclib 130nm library # Width Adder_Area Mult_Area (area in "tracks" as discussed below) # 8 211 2684 # 16 495 12742 # 32 1110 49319 # 64 2397 199175 # 128 4966 749828 def adder_stdcell_estimate(width): return width * 34.4 - 25.8 def multiplier_stdcell_estimate(width): if width == 1: return 5 if width == 2: return 39 if width == 3: return 219 return -958 + (150 * width) + (45 * width**2) def stdcell_estimate(net): if net.op in "w~sc": return 0 if net.op in "&|n": return 40 / 8.0 * len(net.args[0]) # 40 lambda if net.op in "^=<>x": return 80 / 8.0 * len(net.args[0]) # 80 lambda if net.op == "r": return 144 / 8.0 * len(net.args[0]) # 144 lambda if net.op in "+-": return adder_stdcell_estimate(len(net.args[0])) if net.op == "*": return multiplier_stdcell_estimate(len(net.args[0])) if net.op in "m@": return 0 # memories handled elsewhere msg = ( "Unable to estimate the following net due to unimplemented op :\n{str(net)}" ) raise PyrtlInternalError(msg) block = working_block(block) # The functions above were gathered and calibrated by mapping reference designs to # an openly available 130nm stdcell library. # http://www.vlsitechnology.org/html/vsc_description.html # http://www.vlsitechnology.org/html/cells/vsclib013/lib_gif_index.html # In a standard cell design, each gate takes up a length of standard "track" in the # chip. The functions above return that length for each of the different types of # functions in the units of "tracks". In the 130nm process used, 1 lambda is 55nm, # and 1 track is 8 lambda. # first, sum up the area of all of the logic elements (including registers) total_tracks = sum(stdcell_estimate(a_net) for a_net in block.logic) total_length_in_nm = total_tracks * 8 * 55 # each track is then 72 lambda tall, and converted from nm2 to mm2 area_in_mm2_for_130nm = (total_length_in_nm * (72 * 55)) / 1e12 # scaling from 130nm to the target tech logic_area = area_in_mm2_for_130nm / (130.0 / tech_in_nm) ** 2 # now sum up the area of the memories mem_area = 0 for mem in {net.op_param[1] for net in block.logic_subset("@m")}: bits, ports, is_rom = _bits_ports_and_isrom_from_memory(mem) mem_area += mem_area_estimate(tech_in_nm, bits, ports, is_rom) return logic_area, mem_area
def _bits_ports_and_isrom_from_memory(mem): """Helper to extract mem bits and ports for estimation.""" is_rom = False bits = 2**mem.addrwidth * mem.bitwidth read_ports = len(mem.readport_nets) write_ports = len(mem.writeport_nets) if isinstance(mem, RomBlock): is_rom = True ports = max(read_ports, write_ports) return bits, ports, is_rom # -------------------------------------------------------------------- # ___ __ /\ __ __ # | | |\/| | |\ | / ` /~~\ |\ | /\ | \_/ /__` | /__` # | | | | | | \| \__> / \| \| /~~\ |_ | .__/ | .__/ #
[docs] class TimingAnalysis: """Timing analysis estimates the timing delays in the block TimingAnalysis has an :attr:`timing_map` object that maps wires to the 'time' after a clock edge at which the signal in the wire settles """
[docs] def __init__(self, block: Block = None, gate_delay_funcs=None): """Calculates timing delays in the block. Calculates the timing analysis while allowing for different timing delays of different gates of each type. Supports all valid presynthesis blocks. Currently doesn't support memory post synthesis. :param block: PyRTL block to analyze. Defaults to the :ref:`working_block`. :param gate_delay_funcs: a map with keys corresponding to the gate op and a function returning the delay as the value. It takes the gate as an argument. If the delay is negative (``-1``), the gate will be treated as the end of the block. """ self.block = working_block(block) self.timing_map = None self.block.sanity_check() self._generate_timing_map(gate_delay_funcs)
def _generate_timing_map(self, gate_delay_funcs): # The functions above were gathered and calibrated by mapping reference designs # to an openly available 130nm stdcell library. Note that this is will compute # the critical logic delay, but does not include setup/hold time. if gate_delay_funcs is None: gate_delay_funcs = { "~": lambda _width: 48.5, "&": lambda _width: 98.5, "|": lambda _width: 105.3, "^": lambda _width: 135.07, "n": lambda _width: 66.0, "w": lambda _width: 0, "+": self._logconst_func(184.0, 18.9), "-": self._logconst_func(184.0, 18.9), "*": self._multiplier_stdcell_estimate, "<": self._logconst_func(101.9, 105.4), ">": self._logconst_func(101.9, 105.4), "=": self._logconst_func(60.1, 147), "x": lambda _width: 138.0, "c": lambda _width: 0, "s": lambda _width: 0, "r": lambda _width: -1, "m": self._memory_read_estimate, "@": lambda _width: -1, } cleared = self.block.wirevector_subset((Input, Const, Register)) self.timing_map = dict.fromkeys(cleared, 0) for _gate in self.block: # ordered iteration if _gate.op == "m": gate_delay = gate_delay_funcs["m"]( _gate.op_param[1] ) # reads require a memid else: gate_delay = gate_delay_funcs[_gate.op](len(_gate.args[0])) if gate_delay < 0: continue time = max(self.timing_map[a_wire] for a_wire in _gate.args) + gate_delay for dest_wire in _gate.dests: self.timing_map[dest_wire] = time @staticmethod def _logconst_func(a, b): return lambda x: a * math.log2(float(x)) + b @staticmethod def _multiplier_stdcell_estimate(width): if width == 1: return 98.57 if width == 2: return 200.17 return 549.1 * math.log2(width) - 391.7 @staticmethod def _memory_read_estimate(mem): # http://www.cs.ucsb.edu/~sherwood/pubs/ICCD-srammodel.pdf # ROM is assumed to be same delay as SRAM (perhaps optimistic?) bits, ports, _is_rom = _bits_ports_and_isrom_from_memory(mem) tech_in_um = 0.130 return 270 * tech_in_um**1.38 * bits**0.25 * ports**1.30 + 1.05
[docs] def max_freq( self, tech_in_nm: float = 130, ffoverhead: float | None = None ) -> float: """Estimates the max frequency of a block in MHz. All params are optional and have reasonable default values. Estimation is based on Dennard Scaling assumption and does not include wiring effect -- as a result the estimates may be optimistic (especially below 65nm). :param tech_in_nm: the size of the circuit technology to be estimated (for example, 65 is 65nm and 250 is 0.25um) :param ffoverhead: setup and ff propagation delay in picoseconds :return: a number representing an estimate of the max frequency in Mhz """ cp_length = self.max_length() scale_factor = 130.0 / tech_in_nm if ffoverhead is None: clock_period_in_ps = scale_factor * (cp_length + 189 + 194) else: clock_period_in_ps = (scale_factor * cp_length) + ffoverhead return 1e6 * 1.0 / clock_period_in_ps
[docs] def max_length(self): """Returns the max timing delay of the circuit in ps. The result assumes that the circuit is implemented in a 130nm process, and that there is no setup or hold time associated with the circuit. The resulting value is in picoseconds. If an proper estimation of timing is required it is recommended to use :meth:`max_freq` to determine the clock period as it more accurately considers scaling and setup/hold. """ return max(self.timing_map.values())
[docs] def print_max_length(self): """Prints the max timing delay of the circuit""" print("The total block timing delay is ", self.max_length())
class _TooManyCPsError(Exception): pass
[docs] def critical_path( self, print_cp: bool = True, cp_limit=100 ) -> list[WireVector, list[LogicNet]]: """Takes a timing map and returns the critical paths of the system. :param print_cp: Whether to print the critical path to the terminal after calculation :return: a list containing tuples with the 'first' wire as the first value and the critical paths (which themselves are lists of nets) as the second """ critical_paths = [] # storage of all completed critical paths wire_src_dict, _wire_dst_dict = self.block.net_connections() def critical_path_pass(old_critical_path, first_wire): if isinstance(first_wire, (Input, Const, Register)): critical_paths.append((first_wire, old_critical_path)) return if len(critical_paths) >= cp_limit: raise self._TooManyCPsError() source = wire_src_dict[first_wire] critical_path = [source] critical_path.extend(old_critical_path) arg_max_time = max(self.timing_map[arg_wire] for arg_wire in source.args) for arg_wire in source.args: # If the time for both items are the max, both will be on a critical # path. if self.timing_map[arg_wire] == arg_max_time: critical_path_pass(critical_path, arg_wire) max_time = self.max_length() try: for wire_pair in self.timing_map.items(): if wire_pair[1] == max_time: critical_path_pass([], wire_pair[0]) except self._TooManyCPsError: print("Critical path count limit reached") if print_cp: self.print_critical_paths(critical_paths) return critical_paths
[docs] @staticmethod def print_critical_paths(critical_paths): """Prints the results of the critical path length analysis. Done by default by the :meth:`critical_path` function. """ line_indent = " " * 2 # print the critical path for cp_with_num in enumerate(critical_paths): print("Critical path", cp_with_num[0], ":") print(line_indent, "The first wire is:", cp_with_num[1][0]) if _currently_in_jupyter_notebook(): _print_netlist_latex(cp_with_num[1][1]) else: for net in cp_with_num[1][1]: print(line_indent, (net)) print()
# -------------------------------------------------------------------- # __ __ __ # \ / / \ /__` \ / /__` # | \__/ .__/ | .__/ #
[docs] def yosys_area_delay( library: str, abc_cmd: str | None = None, leave_in_dir: str | None = None, block: Block = None, ) -> tuple[float, float]: """Synthesize with `Yosys <https://yosyshq.net/yosys/>`_ and return estimate of area and delay. If ``leave_in_dir`` is specified, that directory will be used to create any temporary files, and the resulting files will be left behind there (which can be useful for manual exploration or debugging) The area and delay are returned in units as defined by the stdcell library. In the standard vsc 130nm library, the area is in a number of "tracks", each of which is about 1.74 square um (see area estimation for more details) and the delay is in ps. http://www.vlsitechnology.org/html/vsc_description.html :param library: stdcell library file to target in liberty format :param abc_cmd: string of commands for :program:`yosys` to pass to :program:`abc` for synthesis :param leave_in_dir: the directory where temporary files should be left :param block: PyRTL block to analyze. Defaults to the :ref:`working_block`. :raise PyrtlError: If :program:`yosys` is not configured correctly. :raise PyrtlInternalError: If the call to :program:`yosys` was not successful :return: a tuple of numbers: area, delay """ if abc_cmd is None: abc_cmd = "strash;scorr;ifraig;retime;dch,-f;map;print_stats;" else: # first, replace whitespace with commas as per yosys requirements re.sub(r"\s+", ",", abc_cmd) # then append with "print_stats" to generate the area and delay info abc_cmd = f"{abc_cmd};print_stats;" def extract_area_delay_from_yosys_output(yosys_output): report_lines = [ line for line in yosys_output.decode().split("\n") if "ABC: netlist" in line ] area = re.match(r".*area\s*=\s*([0-9\.]*)", report_lines[0]).group(1) delay = re.match(r".*delay\s*=\s*([0-9\.]*)", report_lines[0]).group(1) return float(area), float(delay) yosys_arg_template = """-p read_verilog %s; synth -top toplevel; dfflibmap -liberty %s; abc -liberty %s -script +%s """.replace("\n", " ") temp_d, temp_path = tempfile.mkstemp( prefix="pyrtl_verilog", suffix=".v", dir=leave_in_dir, text=True ) try: # write the verilog to a temp yosys_arg = yosys_arg_template % (temp_path, library, library, abc_cmd) with open(temp_path, "w") as f: print("// generated via pyrtl yosys_area_delay", file=f) print(f"// yosys {yosys_arg}", file=f) output_to_verilog(f, block=block) os.close(temp_d) # call yosys on the temp, and grab the output yosys_output = subprocess.check_output(["yosys", yosys_arg]) area, delay = extract_area_delay_from_yosys_output(yosys_output) except (subprocess.CalledProcessError, ValueError) as exc: print("Error with call to yosys...", file=sys.stderr) print("---------------------------------------------", file=sys.stderr) print(str(exc.output).replace("\\n", "\n"), file=sys.stderr) print("---------------------------------------------", file=sys.stderr) msg = "Yosys call failed" raise PyrtlError(msg) from exc except OSError as exc: print("Error with call to yosys...", file=sys.stderr) msg = "Call to yosys failed (not installed or on path?)" raise PyrtlError(msg) from exc finally: if leave_in_dir is None: os.remove(temp_path) return area, delay
[docs] class PathsResult(dict):
[docs] def print(self, file=sys.stdout): """Pretty print the result of calling :func:`paths` :param f: the open file to print to (defaults to stdout) :return: None """ # All this work, to make sure it's determinstic def path_sort_key(path): dst_names = [net.dests[0].name if net.dests else "" for net in path] return (len(path), dst_names) for start in sorted(self.keys(), key=lambda w: w.name): print(f"From {start.name}", file=file) for end in sorted(self[start].keys(), key=lambda w: w.name): print(f" To {end.name}", file=file) all_paths = self[start][end] if len(all_paths) > 0: for i, paths in enumerate(sorted(all_paths, key=path_sort_key)): print(f" Path {i}", file=file) for path in paths: print(f" {path}", file=file) else: print(" (No paths)", file=file)
[docs] def paths( src: WireVector | Iterable[WireVector] = None, dst: WireVector | Iterable[WireVector] = None, dst_nets: dict[WireVector, LogicNet] | None = None, block: Block = None, ) -> PathsResult: """Get the list of all paths from ``src`` to ``dst``. You can provide ``dst_nets`` (the result of calling :meth:`Block.net_connections`, if you plan on calling this function repeatedly on a block that hasn't changed, to speed things up. This function can accept one or more ``src`` wires, and one or more ``dst`` wires, such that it returns a map that can be accessed like so:: paths[src][dst] = [<path>, <path>, ...] where ``path`` is a list of nets. Thus there can be multiple paths from a given ``src`` wire to a given ``dst`` wire. If ``src`` and ``dst`` are both single wires, you still need to access the result via ``paths[src][dst]``. This also finds and returns the loop paths in the case of registers or memories that feed into themselves, i.e. ``paths[src][src]`` is not necessarily empty. It does not distinguish between loops that include synchronous vs asynchronous memories. :param src: Source wire(s) from which to trace your paths; if ``None``, will get paths from all :class:`Inputs<Input>`. :param dst: Destination wire(s) to which to trace your paths; if ``None``, will get paths to all :class:`Outputs<Output>`. :param dst_nets: map from wire to set of nets where the wire is an argument; will compute it internally if not given via a call to :meth:`Block.net_connections()`. :param block: Block to use (defaults to :ref:`working_block`) :return: a map of the form ``{src_wire: {dst_wire: [path]}}`` for each ``src_wire`` in ``src`` (or all inputs if ``src`` is None), ``dst_wire`` in ``dst`` (or all outputs if ``dst`` is None), where ``path`` is a list of nets. This map is also an instance of :class:`.PathsResult`, so you can call :meth:`.PathsResult.print` on it to pretty print it. """ block = working_block(block) if dst_nets is None: # Note: if you set `include_virtual_nodes=True`, Output wires will actually be # present as the destination "net" of Output wires in the dst_nets map. That # would overly complicate this algorithm: we will assume all values() in the # dst_nets map are logic nets only. We set this to False for explicitness... _wire_src_dict, dst_nets = block.net_connections(include_virtual_nodes=False) else: # ... or make sure it's not present otherwise. for output in block.wirevector_subset(cls=Output): dst_nets.pop(output, None) if src is None: src = block.wirevector_subset(cls=Input) elif isinstance(src, WireVector): src = {src} else: src = set(src) if dst is None: dst = block.wirevector_subset(cls=Output) elif isinstance(dst, WireVector): dst = {dst} else: dst = set(dst) def paths_src_dst(src, dst): paths = [] # Use DFS to get the paths [each a list of nets] from src wire to dst wire def dfs(w, curr_path): if w is dst: # Found valid path paths.append(curr_path) for dst_net in dst_nets.get(w, []): # Avoid loops and the mem net (has no output wire) if dst_net not in curr_path: if dst_net.op == "@": # dests will be the read ports for read_net in dst_net.op_param[1].readport_nets: dfs(read_net.dests[0], [*curr_path, dst_net, read_net]) else: dfs(dst_net.dests[0], [*curr_path, dst_net]) dfs(src, []) return paths all_paths = collections.defaultdict(dict) for src_wire in src: for dst_wire in dst: paths = paths_src_dst(src_wire, dst_wire) # Remove empty paths... paths = list(filter(lambda x: len(x) > 0, paths)) # ...and those that are supersets of others (resulting from an inner loop). if src_wire is not dst_wire: paths = sorted(paths, key=lambda p: len(p), reverse=True) keep = [] for i in range(len(paths)): # Check if there is a path in paths[i+1:] that is the suffix of # paths[i] (paths[i] is at least as large as each path in # paths[i+1:]). If so, paths[i] contains a loop since both start at # src_wire, so don't keep it. if not any(paths[i][-len(p) :] == p for p in paths[i + 1 :]): keep.append(paths[i]) paths = keep all_paths[src_wire][dst_wire] = paths return PathsResult(all_paths)
[docs] def distance( src: WireVector, dst: WireVector, f: Callable[list[LogicNet], int], block: Block = None, ) -> dict[tuple[LogicNet], int]: """Calculate the distance along each path from ``src`` to ``dst`` according to ``f`` This calls the given function ``f`` on each net in a path, summing the result. :param src: wire to start from :param dst: wire to end on :param f: function from a net to number, representing the 'value' of a net that you want to sum across all nets in the path :param block: block to use (defaults to :ref:`working_block`) :return: a map from each path (a tuple) to its calculated distance """ ps = paths(src, dst, block=block) ps = ps[src][dst] # Turning path into tuple so it can be the key return {tuple(path): sum(map(f, path)) for path in ps}
[docs] def fanout(w: WireVector) -> int: """Get the number of places a wire is used as an argument. :param w: :class:`WireVector` to check fanout for. :return: Integer fanout count. """ _wire_src_dict, wire_dst_dict = w._block.net_connections() if w not in wire_dst_dict: return 0 all_args = [arg for net in wire_dst_dict[w] for arg in net.args] return len(list(filter(lambda arg: arg is w, all_args)))