Source code for pyrtl.visualization

"""
Helper functions for viewing the block visually.

Each of the functions in visualization take a block and a file descriptor. The functions
provided write the block as a given visual format to the file.
"""

from __future__ import annotations

import collections
from collections.abc import Callable
from typing import TYPE_CHECKING

from pyrtl.core import Block, LogicNet, working_block
from pyrtl.pyrtlexceptions import PyrtlError, PyrtlInternalError
from pyrtl.wire import Const, Input, Output, Register, WireVector

if TYPE_CHECKING:
    from pyrtl.simulation import SimulationTrace


[docs] def net_graph(block: Block = None, split_state: bool = False): """Return a graph representation of the given :class:`Block`. The graph has the following form:: { node1: { nodeA: [edge1A_1, edge1A_2], nodeB: [edge1B]}, node2: { nodeB: [edge2B], nodeC: [edge2C_1, edge2C_2]}, ... } aka: ``edges = graph[source][dest]`` Each node can be either a :class:`LogicNet` or a :class:`WireVector` (e.g. an :class:`Input`, an :class:`Output`, a :class:`Const` or even an undriven :class:`WireVector` (which acts as a source or sink in the network). Each edge is a :class:`WireVector` or derived type (:class:`Input`, :class:`Output`, :class:`Register`, etc.). Note that :class:`Inputs<Input>`, :class:`Consts<Const>`, and :class:`Outputs<Output>` will be both "node" and "edge". :class:`WireVectors<WireVector>` that are not connected to any nets are not returned as part of the graph. .. note:: Consider using :ref:`gate_graphs` instead. :param block: :class:`Block` to use (defaults to current :ref:`working_block`). :param split_state: If ``True``, split connections to/from a register update net; this means that registers will be appear as source nodes of the network, and ``r`` nets (i.e. the logic for setting :attr:`Register.next`) will be treated as sink nodes of the network. """ # FIXME: make it not try to add unused wires (issue #204) block = working_block(block) # self.sanity_check() graph = {} # add all of the nodes for net in block.logic: graph[net] = {} wire_src_dict, wire_dst_dict = block.net_connections() dest_set = set(wire_src_dict.keys()) arg_set = set(wire_dst_dict.keys()) dangle_set = dest_set.symmetric_difference(arg_set) for w in dangle_set: graph[w] = {} if split_state: for w in block.wirevector_subset(Register): graph[w] = {} # add all of the edges for w in dest_set | arg_set: try: _from = wire_src_dict[w] except Exception: _from = w # e.g. an Input/Const if split_state and isinstance(w, Register): _from = w try: _to_list = wire_dst_dict[w] except Exception: graph[_from][w] = [w] # e.g. an Output else: for _to in _to_list: graph[_from][_to] = list(filter(lambda arg: arg is w, _to.args)) return graph
# ----------------------------------------------------------------- # ___ __ ___ # | / _` |___ # | \__> | def _trivialgraph_default_namer(thing, is_edge=True): """Returns a "good" string for thing in printed graphs.""" if is_edge: if thing.name is None or thing.name.startswith("tmp"): return "" return "/".join([thing.name, str(len(thing))]) if isinstance(thing, Const): return str(thing.val) if isinstance(thing, WireVector): return thing.name or "??" try: return thing.op + str(thing.op_param or "") except AttributeError as exc: msg = f'no naming rule for "{thing}"' raise PyrtlError(msg) from exc
[docs] def output_to_trivialgraph( file, namer: Callable[[WireVector | LogicNet, bool], str] = _trivialgraph_default_namer, block: Block = None, split_state: bool = False, ): """Walk the block and output it in `trivial graph format <https://en.wikipedia.org/wiki/Trivial_Graph_Format>`_ to the open file. :param file: Open file to write to. :param namer: A function that takes in an object (a :class:`WireVector` or :class:`LogicNet`) as the first argument and a boolean ``is_edge`` as the second that is set ``True`` if the object is a :class:`WireVector`, and returns a string representing that object. :param block: :class:`Block` to use (defaults to current :ref:`working_block`). :param split_state: If ``True``, split connections to/from a register update net; this means that registers will be appear as source nodes of the network, and ``r`` :class:`LogicNets<LogicNet>` (i.e. the logic for setting :attr:`Register.next`) will be treated as sink nodes of the network. """ graph = net_graph(block, split_state) node_index_map = {} # map node -> index # print the list of nodes for index, node in enumerate(graph): print(index, namer(node, is_edge=False), file=file) node_index_map[node] = index print("#", file=file) # print the list of edges for _from in graph: for _to in graph[_from]: from_index = node_index_map[_from] to_index = node_index_map[_to] for edge in graph[_from][_to]: print(from_index, to_index, namer(edge), file=file)
# ----------------------------------------------------------------- # __ __ __ __ # / _` |__) /\ |__) |__| \ / | / # \__> | \ /~~\ | | | \/ | /__ def _default_edge_namer( edge: WireVector, is_to_splitmerge: bool = False, extra_edge_info: dict[WireVector, str] | None = None, ): """ A function for naming an edge for use in the ``graphviz`` graph. :param edge: the edge (i.e. :class:`WireVector` or deriving class) :param is_to_splitmerge: if the node to which the edge points is a ``select`` or ``concat`` operation :param extra_edge_info: a map from edge to any additional data you want to print associated with it (e.g. timing data) :return: a function that can be called by graph namer function you pass in to ``block_to_graphviz_string`` """ name = "" if edge.name is None else "/".join([edge.name, str(len(edge))]) if extra_edge_info and edge in extra_edge_info: # Always label an edge if present in the extra_edge_info map name = name + " (" + str(extra_edge_info[edge]) + ")" elif ( edge.name is None or edge.name.startswith("tmp") or isinstance(edge, (Input, Output, Const, Register)) ): name = "" penwidth = 2 if len(edge) == 1 else 6 arrowhead = "none" if is_to_splitmerge else "normal" return f'[label="{name}", penwidth="{penwidth}", arrowhead="{arrowhead}"]' def _default_node_namer( node: WireVector, split_state: bool = False, extra_node_info: dict[WireVector, str] | None = None, ): """ A function for naming a node for use in the ``graphviz`` graph. :param node: the node (i.e. :class:`WireVector` or deriving class, or a logic net) :param split_state: if ``True``, split connections to/from a register update net; this means that registers will be appear as source nodes of the network, and 'r' nets (i.e. the logic for setting a register's next value) will be treated as sink nodes of the network. :param extra_node_info: a map from node to any additional data you want to print associated with it (e.g. delay data) :return: a function that can be called by graph namer function you pass in to :func:`block_to_graphviz_string` """ def label(v): if extra_node_info and node in extra_node_info: v = v + " (" + str(extra_node_info[node]) + ")" return v if isinstance(node, Const): name = node.name + ": " if not node.name.startswith("const_") else "" return ( f'[label="{label(name + str(node.val))}", shape=circle, ' "fillcolor=lightgrey]" ) if isinstance(node, Input): return f'[label="{label(node.name)}", shape=invhouse, fillcolor=coral]' if isinstance(node, Output): return f'[label="{label(node.name)}", shape=house, fillcolor=lawngreen]' if isinstance(node, Register): return f'[label="{label(node.name)}", shape=square, fillcolor=gold]' if isinstance(node, WireVector): return f'[label="{label(node.name)}", shape=circle, fillcolor=none]' try: if node.op == "&": return '[label="{}"]'.format(label("and")) if node.op == "|": return '[label="{}"]'.format(label("or")) if node.op == "^": return '[label="{}"]'.format(label("xor")) if node.op == "~": return '[label="{}", shape=invtriangle]'.format(label("not")) if node.op == "x": return '[label="{}", shape=invtrapezium]'.format(label("mux")) if node.op == "s": # node.op_param is a tuple of the selected bits to pull from the argument # wire, so it could look something like (0,0,0,0,0,0,0), meaning dest wire # is going to be a concatenation of the zero-th bit of the argument wire, 7 # times. selLower = node.op_param[0] selUpper = node.op_param[-1] if len(node.op_param) == 1: bits = f"[{selLower}]" elif node.op_param == tuple(range(selLower, selUpper + 1)): # consecutive bits = f"[{selUpper}:{selLower}]" elif all( ix == node.op_param[0] for ix in node.op_param[1:] ): # all the same bits = f"[{node.op_param[0]}]*{len(node.op_param)}" else: bits = "bits" + str(tuple(reversed(node.op_param))) return f'[label="{label(bits)}", fillcolor=azure1, height=.25, width=.25]' if node.op in "c": return '[label="{}", height=.1, width=.1]'.format(label("concat")) if node.op == "r": name = node.dests[0].name or "" name = (f"{name}.next") if split_state else name return f'[label="{label(name)}", shape=square, fillcolor=gold]' if node.op == "w": return '[label="{}", height=.1, width=.1]'.format(label("")) if node.op in "m@": name = node.op_param[1].name if name.startswith("tmp"): name = "" else: name = "(" + name + ")" return f'[label="{label(node.op + name)}"]' return '[label="{}"]'.format(label(node.op + str(node.op_param or ""))) except AttributeError as exc: msg = f'no naming rule for "{node}"' raise PyrtlError(msg) from exc def _graphviz_default_namer( thing: WireVector | LogicNet, is_edge: bool, is_to_splitmerge: bool, split_state: bool, node_namer=_default_node_namer, edge_namer=_default_edge_namer, ): """Returns a "good" Graphviz label for thing. :param thing: The edge (:class:`WireVector`) or node (:class:`LogicNet` or :class:`Input`/:class:`Output`/:class:`Const`) to name :param is_edge: ``True`` if thing is an edge :param is_to_splitmerge: if the node to which the edge points is a ``select`` or concat operation :param split_state: If ``True``, visually split the connections to/from a register update net. :param node_namer: A function mapping a node to a label; one of its arguments is a dict mapping nodes to nodes to additional user-supplied information. :param edge_namer: A function mapping an edge to a label; one of its arguments is a dict mapping nodes to nodes to additional user-supplied information. :return: A function that knows how to label each element in the graph, which can be passed to :func:`output_to_graphviz` or :func:`block_to_graphviz_string` """ if is_edge: return edge_namer(thing, is_to_splitmerge=is_to_splitmerge) return node_namer(thing, split_state=split_state)
[docs] def graphviz_detailed_namer( extra_node_info: dict | None = None, extra_edge_info: dict | None = None ): """Returns a detailed Graphviz namer that prints extra information about nodes/edges in the given maps. If both :class:`dict` arguments are ``None``, the returned namer behaves identically to the default Graphviz namer. :param extra_node_info: A :class:`dict` from node to additional data about that node. The additional data will be converted to :class:`str` and printed next to the node's label. :param extra_edge_info: A :class:`dict` from edge to additional data about that edge. The additional data will be converted to :class:`str` and printed next to the edge's label. :return: A function to label each element in the graph, which can be used as :func:`output_to_graphviz` or :func:`block_to_graphviz_string`'s ``namer``. """ def node_namer(node, split_state): return _default_node_namer(node, split_state, extra_node_info) def edge_namer(edge, is_to_splitmerge): return _default_edge_namer(edge, is_to_splitmerge, extra_edge_info) def namer(thing, is_edge, is_to_splitmerge, split_state): return _graphviz_default_namer( thing, is_edge, is_to_splitmerge, split_state, node_namer=node_namer, edge_namer=edge_namer, ) return namer
[docs] def output_to_graphviz( file, block: Block = None, namer=_graphviz_default_namer, split_state: bool = True, maintain_arg_order: bool = False, ): """Walk the :class:`Block` and output it in `Graphviz <https://graphviz.org/>`_ format to the open file. ``output_to_graphviz`` writes a file containing a directed graph in the format expected by `Graphviz <https://graphviz.org/>`_, specifically in the :command:`dot` format. Once Graphviz is installed, the resulting graph file can be rendered to a ``.pdf`` file with:: dot -Tpdf output.dot > output.pdf :param file: Open file to write to. :param block: :class:`Block` to use (defaults to current :ref:`working_block`) :param namer: Function used to label each edge and node; see :func:`block_to_graphviz_string` for more information. :param split_state: If ``True``, visually split the connections to/from a :class:`Register` update net. :param maintain_arg_order: If ``True``, add ordering constraints so incoming edges are ordered left-to-right for nets where argument order matters (e.g. ``<``). Keeping this as ``False`` results in a cleaner, though less visually precise, graphical output. """ print( block_to_graphviz_string(block, namer, split_state, maintain_arg_order), file=file, )
[docs] def block_to_graphviz_string( block: Block = None, namer=_graphviz_default_namer, split_state: bool = True, maintain_arg_order: bool = False, ): """Return a Graphviz string for the ``block``. The normal namer function will label user-named wires with their names and label the nodes (:class:`LogicNets<LogicNet>` or :class:`Input`/:class:`Output`/:class:`Const` terminals) with their operator symbol or name/value, respectively. If custom information about each node in the graph is desired, you can pass in a custom namer function which must have the same signature as the default namer, :func:`_graphviz_default_namer`. However, we recommend you instead pass in a call to :func:`graphviz_detailed_namer`, supplying it with your own :class:`dicts<dict>` mapping wires and nodes to labels. For any wire/node found in these maps, that additional information will be printed in parentheses alongside the node in the ``graphviz`` graph. For example, if you wanted to print the delay of each wire and the fanout of each gate, you could pass in two maps to the :func:`graphviz_detailed_namer` call, which returns a namer function that can subsequently be passed to :func:`output_to_graphviz` or :func:`block_to_graphviz_string`:: node_fanout = {n: f"Fanout: {my_fanout_func(n)}" for n in working_block().logic} wire_delay = {w: f"Delay: {my_delay_func(w):.2f}" for w in working_block().wirevector_set} with open("out.gv", "w") as f: output_to_graphviz( f, namer=graphviz_detailed_namer(node_fanout, wire_delay)) :param namer: A function mapping graph objects (wires/logic nets) to labels. If you want a more detailed namer, pass in a call to :func:`graphviz_detailed_namer`. :param block: :class:`Block` to use (defaults to current :ref:`working_block`) :param bool split_state: If ``True``, split connections to/from a :class:`Register` update net; this means that registers will be appear as source nodes of the network, and ``r`` nets (i.e. the logic for setting :attr:`Register.next`) will be treated as sink nodes of the network. :param bool maintain_arg_order: If ``True``, will add ordering constraints so incoming edges are ordered left-to-right for nets where argument order matters (e.g. ``<``). Keeping this as ``False`` results in a cleaner, though less visually precise, graphical output. """ graph = net_graph(block, split_state) node_index_map = {} # map node -> index rstring = """\ digraph g { graph [splines="spline", outputorder="edgesfirst"]; node [shape=circle, style=filled, fillcolor=lightblue1, fontcolor=black, fontname=helvetica, penwidth=0, fixedsize=shape]; edge [labelfloat=false, penwidth=2, color=deepskyblue, arrowsize=.5]; """ from pyrtl.importexport import _natural_sort_key def _node_sort_key(node): # If a LogicNet and a wire share the same name, we want the LogicNet to sort # first, so we arbitrarily 'A' and 'B' suffixes to break ties. if isinstance(node, LogicNet): if node.op == "@": key = str(node.args[2]) + "A" else: key = node.dests[0].name + "A" else: key = node.name + "B" return _natural_sort_key(key) # print the list of nodes for index, node in enumerate(sorted(graph.keys(), key=_node_sort_key)): label = namer(node, False, False, split_state) rstring += f" n{index} {label};\n" node_index_map[node] = index # print the list of edges srcs = collections.defaultdict(list) for _from in sorted(graph.keys(), key=_node_sort_key): for _to in sorted(graph[_from].keys(), key=_node_sort_key): from_index = node_index_map[_from] to_index = node_index_map[_to] for edge in graph[_from][_to]: is_to_splitmerge = hasattr(_to, "op") and _to.op in "cs" label = namer(edge, True, is_to_splitmerge, False) rstring += f" n{from_index} -> n{to_index} {label};\n" srcs[_to].append((_from, edge)) # Maintain left-to-right order of incoming wires for nets where order matters. This # won't be visually perfect sometimes (especially for a wire used twice in a net's # argument list), but for the majority of cases this will improve the visualization. def index_of(w, args): # Special helper so we compare id rather than using builtin operators for i, arg in enumerate(args): if w is arg: return i msg = "Expected to find wire in set of args" raise PyrtlInternalError(msg) if maintain_arg_order: block = working_block(block) for net in sorted(block.logic_subset(op="c-<>x@"), key=_node_sort_key): args = [(node_index_map[n], wire) for (n, wire) in srcs[net]] args.sort(key=lambda t: index_of(t[1], net.args)) s = " -> ".join([f"n{n}" for n, _ in args]) rstring += " {\n" rstring += " rank=same;\n" rstring += " edge[style=invis];\n" rstring += " " + s + ";\n" rstring += " rankdir=LR;\n" rstring += " }\n" rstring += "}\n" return rstring
# ----------------------------------------------------------------- # __ __ # /__` \ / / _` # .__/ \/ \__>
[docs] def output_to_svg(file, block: Block = None, split_state: bool = True): """Output the block as an SVG to the open file. :param file: Open file to write to. :param block: :class:`Block` to use (defaults to current :ref:`working_block`). :param split_state: If ``True``, visually split the connections to/from a register update net. """ print(block_to_svg(block, split_state), file=file)
[docs] def block_to_svg( block: Block = None, split_state: bool = True, maintain_arg_order: bool = False ): """Return an SVG for the block. :param block: :class:`Block` to use (defaults to current :ref:`working_block`). :param split_state: If ``True``, visually split the connections to/from a register update net. :param maintain_arg_order: If ``True``, will add ordering constraints so incoming edges are ordered left-to-right for nets where argument order matters (e.g. ``<``). Keeping this as ``False`` results in a cleaner, though less visually precise, graphical output. :return: The SVG representation of the :class:`Block`. """ try: from graphviz import Source src = Source( block_to_graphviz_string( block, split_state=split_state, maintain_arg_order=maintain_arg_order ) ) try: svg = src._repr_image_svg_xml() except AttributeError: # py-graphviz 0.18.3 or earlier return src._repr_svg_() else: # py-graphviz 0.19 or later return svg except ImportError as exc: msg = 'need graphviz installed (try "pip install graphviz")' raise PyrtlError(msg) from exc
# ----------------------------------------------------------------- # ___ # |__| | |\/| | # | | | | | |___
[docs] def trace_to_html( simtrace: SimulationTrace, trace_list: list[str] | None = None, sortkey=None, repr_func: Callable[[int], str] = hex, repr_per_name: dict[str, Callable[[int], str]] | None = None, ) -> str: """Return a HTML block showing the trace. :param simtrace: A trace to render in HTML. :param trace_list: (optional) A list of wires to display. :param sortkey: (optional) The key with which to sort the ``trace_list``. :param repr_func: Function to use for representing each value in the trace. Examples include :func:`hex`, :func:`oct`, :func:`bin`, and :class:`str` (for decimal), :func:`val_to_signed_integer` (for signed decimal) or the function returned by :func:`enum_name` (for :class:`~enum.IntEnum`). Defaults to :func:`hex`. :param repr_per_name: Map from signal name to a function that takes in the signal's value and returns a user-defined representation. If a signal name is not found in the map, the argument ``repr_func`` will be used instead. :return: An HTML block showing the trace. """ from pyrtl.simulation import SimulationTrace, _trace_sort_key if repr_per_name is None: repr_per_name = {} if not isinstance(simtrace, SimulationTrace): msg = "first arguement must be of type SimulationTrace" raise PyrtlError(msg) trace = simtrace.trace if sortkey is None: sortkey = _trace_sort_key if trace_list is None: trace_list = sorted(trace, key=sortkey) wave_template = """\ <script type="WaveDrom"> { signal : [ %s ], config: { hscale: %d } } </script> """ vallens = [] # For determining longest value length def extract(w): wavelist = [] datalist = [] last = None for value in trace[w]: if last == value: wavelist.append(".") else: f = repr_per_name.get(w) if f is not None: wavelist.append("=") datalist.append(str(f(value))) elif len(simtrace._wires[w]) == 1: # int() to convert True/False to 0/1 wavelist.append(str(int(value))) else: wavelist.append("=") datalist.append(str(repr_func(value))) last = value wavestring = "".join(wavelist) datastring = ", ".join([f'"{data}"' for data in datalist]) if repr_per_name.get(w) is None and len(simtrace._wires[w]) == 1: vallens.append(1) # all are the same length return bool_signal_template % (w, wavestring) vallens.extend([len(data) for data in datalist]) return int_signal_template % (w, wavestring, datastring) bool_signal_template = ' { name: "%s", wave: "%s" },' int_signal_template = ' { name: "%s", wave: "%s", data: [%s] },' signals = [extract(w) for w in trace_list] all_signals = "\n".join(signals) maxvallen = max(vallens) scale = (maxvallen // 5) + 1 return wave_template % (all_signals, scale)
# print(wave)