From cf38f01f7c04385ec0e72fbd0e5846087d63a13b Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Keck <Jean-Baptiste.Keck@imag.fr> Date: Fri, 17 Apr 2020 15:23:36 +0200 Subject: [PATCH] reformat graph builder --- hysop/core/graph/computational_graph.py | 28 +++- hysop/core/graph/graph.py | 24 +++- hysop/core/graph/graph_builder.py | 164 +++++++++++++++--------- 3 files changed, 147 insertions(+), 69 deletions(-) diff --git a/hysop/core/graph/computational_graph.py b/hysop/core/graph/computational_graph.py index eda63ad23..302aa6cb2 100644 --- a/hysop/core/graph/computational_graph.py +++ b/hysop/core/graph/computational_graph.py @@ -716,7 +716,7 @@ class ComputationalGraph(ComputationalGraphNode): with tempfile.NamedTemporaryFile(suffix='.html') as f: net.show(f.name) - def to_file(self, path, io_rank=0, show_buttons=False): + def to_html(self, path, io_rank=0, show_buttons=False): """ Generate an interactive computational graph in an html file. """ @@ -740,11 +740,29 @@ class ComputationalGraph(ComputationalGraphNode): raise graph = self.reduced_graph - network = pyvis.network.Network() - for node in graph: + network = pyvis.network.Network(directed=True) + known_nodes = set() + + def add_node(node): node_id = int(node) - network.add_node(node_id, label=node.label, title=node.title, - color=node.color) + if node_id not in known_nodes: + network.add_node(node_id, label=node.label, + title=node.title, color=node.color) + known_nodes.add(node_id) + + def add_edge(from_node, to_node): + from_node_id = int(from_node) + to_node_id = int(to_node) + edge = graph[from_node][to_node] + title = edge.get('data', 'FAIL') + network.add_edge(from_node_id, to_node_id, title=title) + + for node in graph: + add_node(node) + for out_node in graph[node]: + add_node(out_node) + add_edge(node, out_node) + return network diff --git a/hysop/core/graph/graph.py b/hysop/core/graph/graph.py index 1d8dadb31..a04402ba7 100644 --- a/hysop/core/graph/graph.py +++ b/hysop/core/graph/graph.py @@ -24,7 +24,14 @@ def new_vertex(graph, *args, **kwds): def new_edge(graph, u, v, *args, **kwds): # /!\ We have to use networkx 2.2 which has a different interface for attributes - graph.add_edge(u, v, object=EdgeAttributes(*args, **kwds)) + assert u in graph + assert v in graph + if v not in graph[u]: + data = EdgeAttributes(*args, **kwds) + graph.add_edge(u, v, data=data) + else: + edge = graph[u][v] + edge['data'].update(*args, **kwds) return (u,v) def generate_vertex_colors(): @@ -151,7 +158,8 @@ class VertexAttributes(object): for param in iparams.values()), p=prefix, s=suffix+'  ') if iparams else '', '{p}Fin:{s}{}\n'.format(sep.join([ifinfo(f,topo) - for (f,topo) in ifields.iteritems()]), p=prefix, s=suffix+'  ') + for (f,topo) in ifields.iteritems()]), p=prefix, + s=suffix+'  ') if ifields else '', '{p}Pout:{s}{}\n'.format(sep.join([opinfo(param) for param in oparams.values()]), p=prefix, s=suffix) @@ -167,9 +175,15 @@ class VertexAttributes(object): class EdgeAttributes(object): """Simple class to hold edge data.""" - def __init__(self, variable=None, topology=None): - self.variable = variable - self.topology = topology + def __init__(self, *args, **kwds): + self.variables = [] + self.update(*args, **kwds) + + def update(self, variable=None, topology=None): + if variable is None: + assert topology is None + return + self.variables.append( (variable, topology) ) class ComputationalGraphNodeData(object): """ diff --git a/hysop/core/graph/graph_builder.py b/hysop/core/graph/graph_builder.py index 5cda50bcc..e2bc0ee4a 100644 --- a/hysop/core/graph/graph_builder.py +++ b/hysop/core/graph/graph_builder.py @@ -133,9 +133,10 @@ class GraphBuilder(object): # check that all target nodes are unique to prevent conflicts if len(set(target_node.nodes)) != len(target_node.nodes): - duplicates = set([x for x in target_node.nodes if target_node.nodes.count(x) > 1]) - msg='\n\nFATAL ERROR: ComputationalGraph {} contains mutiple references to the ' - msg+='same nodes.\n' + duplicates = set([x for x in target_node.nodes + if target_node.nodes.count(x) > 1]) + msg='\n\nFATAL ERROR: ComputationalGraph {} contains mutiple references to ' + msg+='the same nodes.\n' msg+='Concerned operators are:\n' for op in duplicates: msg0=' *Operator {:12s} (cls={:30s} | id={}): {} occurences\n' @@ -147,7 +148,8 @@ class GraphBuilder(object): # iterate over ComputationalNodes for (node_id, node) in enumerate(target_node.nodes): - gprint(' >Handling node {}: {} {}'.format(node_id, node.name, node.__class__) ) + gprint(' >Handling node {}: {} {}'.format( + node_id, node.name, node.__class__) ) # Recursively build graph. # If current node is a ComputationalGraph, we have to first @@ -175,23 +177,27 @@ class GraphBuilder(object): if not isinstance(op, Problem): # try to fill in undertermined topologies (experimental feature) backends = op.supported_backends() - for (ifield, itopo) in sorted(ifields.iteritems(), key=lambda x: x[0].name): + for (ifield, itopo) in sorted(ifields.iteritems(), + key=lambda x: x[0].name): if (itopo is not None): continue # look for ifield usage untill now - if (ifield in ofields) and (ofields[ifield] is not None) and (ofields[ifield].backend.kind in backends): + if ((ifield in ofields) and (ofields[ifield] is not None) + and (ofields[ifield].backend.kind in backends)): ifields[ifield] = ofields[ifield] elif (ifield not in self.topology_states): if outputs_are_inputs: # we can try to push this operator after we're done deferred_operators.append((op,opnode)) else: - msg = '\nGraphBuilder {} could not automatically determine the ' - msg += 'topology of input field {} in operator {}.' - msg += '\nTry to set a non empty TopologyDescriptor when passing ' - msg += 'the variable parameters, when creating the operator.' - msg += '\nAutomatic topology detection is an experimental feature.' - msg = msg.format(target_node.name, ifield.name, op.name) + msg = ('\nGraphBuilder {} could not automatically ' + 'determine the topology of input field {} in ' + 'operator {}.\nTry to set a non empty ' + 'TopologyDescriptor when passing the variable ' + 'parameters, when creating the operator.' + '\nAutomatic topology detection is an ' + 'experimental feature.') + msg = msg.format(target_node.name, ifield.name, op.name) raise RuntimeError(msg) else: cstate = self.topology_states[ifield] @@ -201,18 +207,20 @@ class GraphBuilder(object): backend = itopo.backend.any_backend_from_kind(*backends) itopo = itopo.topology_like(backend=backend) ifields[ifield] = itopo - for (ofield, otopo) in sorted(ofields.iteritems(), key=lambda x: x[0].name): + for (ofield, otopo) in sorted(ofields.iteritems(), + key=lambda x: x[0].name): if (otopo is not None): continue if (ofield in ifields) and (ifields[ofield] is not None): ofields[ofield] = ifields[ofield] elif (ofield not in self.topology_states): - msg = '\nGraphBuilder {} could not automatically determine the ' - msg += 'topology of input field {} in operator {}.' - msg += '\nTry to set a non empty TopologyDescriptor when passing ' - msg += 'the variable parameters, when creating the operator.' - msg += '\nAutomatic topology detection is an experimental feature.' - msg = msg.format(target_node.name, ofield.name, op.name) + msg = ('\nGraphBuilder {} could not automatically determine ' + 'the topology of input field {} in operator {}.' + '\nTry to set a non empty TopologyDescriptor when ' + 'passing the variable parameters, when creating the ' + 'operator.\nAutomatic topology detection is an ' + 'experimental feature.') + msg = msg.format(target_node.name, ofield.name, op.name) raise RuntimeError(msg) else: cstate = self.topology_states[ofield] @@ -242,10 +250,11 @@ class GraphBuilder(object): input_states = {} if ifields: gprint(' >Input fields') - for (ifield,itopo) in sorted(ifields.iteritems(), key=lambda x: x[0].name, reverse=True): + for (ifield,itopo) in sorted(ifields.iteritems(), + key=lambda x: x[0].name, reverse=True): gprint(' *{}{}'.format(ifield.name, - ' on an unknown topology (to be determined)' if (itopo is None) \ - else '.{}'.format(itopo.pretty_tag))) + ' on an unknown topology (to be determined)' + if (itopo is None) else '.{}'.format(itopo.pretty_tag))) if (itopo is None): continue if isinstance(op, Problem): @@ -254,8 +263,8 @@ class GraphBuilder(object): else: ifreqs = None else: - ifreqs = None if (current_level!=0 or isinstance(op, Problem)) \ - else field_requirements.get_input_requirement(ifield)[1] + ifreqs = None if current_level!=0 or isinstance(op, Problem) + else field_requirements.get_input_requirement(ifield)[1] if (ifield not in self.topology_states): cstate = self.new_topology_state(ifield) self.topology_states[ifield] = cstate @@ -276,7 +285,8 @@ class GraphBuilder(object): output_states = {} if ofields: gprint(' >Output fields') - for (ofield,otopo) in sorted(ofields.iteritems(), key=lambda x: x[0].name, reverse=True): + for (ofield,otopo) in sorted(ofields.iteritems(), + key=lambda x: x[0].name, reverse=True): assert (otopo is not None) gprint(' *{}.{}'.format(ofield.name, otopo.pretty_tag)) if isinstance(op, Problem): @@ -286,11 +296,12 @@ class GraphBuilder(object): ofreqs = None else: ofreqs = None if (current_level!=0) \ - else field_requirements.get_output_requirement(ofield)[1] + else field_requirements.get_output_requirement(ofield)[1] istates = None if (current_level!=0) else input_states cstate = self.topology_states.setdefault(ofield, self.new_topology_state(ofield)) - invalidate_field = (ofield not in op.get_preserved_input_fields()) + invalidate_field = (ofield not in + op.get_preserved_input_fields()) dstate = cstate.handle_output(opnode, otopo, ofreqs, op, istates, invalidate_field, graph) output_fields[ofield] = otopo @@ -338,7 +349,8 @@ class GraphBuilder(object): for ifield in input_fields: itopo = input_fields[ifield] _,ireqs = input_topology_states[ifield] - msg+=' *Field {} on topo {}: {}\n'.format(ifield.name, itopo.id, ireqs) + msg+=' *Field {} on topo {}: {}\n'.format( + ifield.name, itopo.id, ireqs) else: msg+= ' no inputs\n' msg+='ComputationalGraph {} outputs:\n'.format(target_node.name) @@ -346,7 +358,8 @@ class GraphBuilder(object): for ofield in output_fields: otopo = output_fields[ofield] _,oreqs = output_topology_states[ofield] - msg+=' *Field {} on topo {}: {}\n'.format(ofield.name, otopo.id, oreqs) + msg+=' *Field {} on topo {}: {}\n'.format( + ofield.name, otopo.id, oreqs) else: msg+= ' no outputs\n' msg+='\n' @@ -392,12 +405,13 @@ class GraphBuilder(object): # processing the top level (root) graph if (current_level==0) and outputs_are_inputs: # identify variables that needs a closure - redistribute_fields = set(input_fields.keys())#.intersection(output_fields.keys()) + redistribute_fields = set(input_fields.keys()) for field in sorted(redistribute_fields, key=lambda x: x.name): assert field in input_topology_states target_topo = input_fields[field] - input_dfield_requirements, input_topology_state = input_topology_states[field] + input_dfield_requirements, input_topology_state = \ + input_topology_states[field] requirements = input_dfield_requirements.copy() requirements.axes = (input_topology_state.axes,) @@ -557,7 +571,8 @@ class GraphBuilder(object): for (fields, io_params, op_kwds) in input_fields_to_dump: if (not fields) or (field in fields): io_params = IOParams( - filename='{}_{}_in'.format(io_params.filename,field.name), + filename='{}_{}_in'.format(io_params.filename, + field.name), frequency=io_params.frequency, fileformat=io_params.fileformat, io_leader=io_params.io_leader) @@ -565,11 +580,13 @@ class GraphBuilder(object): break # dictionnary (topology -> list of node) that are up to date (lastly written) - # multiple fields can be up to date at the same time after a redistribute operator - # or after an operator that implements the get_preserved_input_fields method. + # multiple fields can be up to date at the same time after a redistribute + # operator or after an operator that implements the + # get_preserved_input_fields method. self.write_nodes = {} - # dictionnary (topology -> list of nodes) that are currently reading field:topo + # dictionnary (topology -> list of nodes) that are currently reading + # field:topo self.read_nodes = {} # dictionnary (topology -> TopologyState) @@ -665,7 +682,8 @@ class GraphBuilder(object): # topology is already up to date with lastest write, nothing to do return - msg0='field {} from up to date topology:\n |-{}\n to topology\n |>{}' + msg0='field {} from up to date topology:' + msg0+='\n |-{}\n to topology\n |>{}' msg0=msg0.format(field.name, '\n |-'.join(t.short_description() for t in src_topos), target_topo.short_description()) @@ -708,7 +726,8 @@ class GraphBuilder(object): if src_state.axes in target_axes: return - msg=' >Transpose from state {} to any of those transposition states [{},] ' + msg=' >Transpose from state {} to any of those transposition states ' + msg+='[{},] ' msg=msg.format(src_state.tstate, ', '.join([str(TranspositionState.axes_to_tstate(axes)) for axes in target_axes])) @@ -733,8 +752,8 @@ class GraphBuilder(object): transpose_generator.generate() except TranspositionNotImplementedError: msg='FATAL ERROR: Graph builder could not find suitable operator on ' - msg+='backend {} to transpose from state {} to any of those transposition ' - msg+='states [{},] for field {} on topology id {}.' + msg+='backend {} to transpose from state {} to any of those ' + msg+='transposition states [{},] for field {} on topology id {}.' msg=msg.format(topo.backend.kind, src_state.tstate, ', '.join([TranspositionState.axes_to_tstate(axes) @@ -773,8 +792,8 @@ class GraphBuilder(object): target_memory_order=target_memory_order) reorder_generator.generate() except MemoryReorderingNotImplementedError: - msg='FATAL ERROR: Graph builder could not find suitable operator on backend {} ' - msg+='to reorder a field from order {} to order {} ' + msg='FATAL ERROR: Graph builder could not find suitable operator on ' + msg+='backend {} to reorder a field from order {} to order {} ' msg+='for field {} on topology id {}.' msg=msg.format(topo.backend.kind, src_state.memory_order, target_memory_order, @@ -836,8 +855,10 @@ class GraphBuilder(object): else: istate.axes = allowed_axes[0] - allowed_memory_order = target_dfield_requirements.memory_order - default_memory_order = self.discrete_topology_states[target_topo].memory_order + allowed_memory_order = \ + target_dfield_requirements.memory_order + default_memory_order = \ + self.discrete_topology_states[target_topo].memory_order assert (default_memory_order is not MemoryOrdering.ANY) if (allowed_memory_order is MemoryOrdering.ANY): istate.memory_order = default_memory_order @@ -853,16 +874,25 @@ class GraphBuilder(object): target_memory_order = target_dfield_requirements.memory_order def topology_affinity(candidate_topo): candidate_state = self.discrete_topology_states[candidate_topo] - score = (candidate_topo is target_topo) * 1000000 # skip redistribute + # skip redistribute + score = (candidate_topo is target_topo) * 1000000 + # skip multiresolution filter (not automatically handled yet) score += (candidate_topo.grid_resolution - == target_topo.grid_resolution).all()*100000 # skip multiresolution filter (not automatically handled yet) + == target_topo.grid_resolution).all()*100000 + # skip transpose score += ((target_axes is not None) and - (candidate_state.axes in target_axes))*10000 # skip transpose - score += (candidate_topo.backend is target_topo.backend)*1000 # better bandwidth - score += (candidate_topo.backend.kind is target_topo.backend.kind)*100 # better bandwidth + (candidate_state.axes in target_axes))*10000 + # better bandwidth + score += (candidate_topo.backend + is target_topo.backend)*1000 + # better bandwidth + score += (candidate_topo.backend.kind + is target_topo.backend.kind)*100 + # memory reordering is a noop score += ((target_memory_order is not MemoryOrdering.ANY) and - (candidate_state.memory_order is target_memory_order))*1 # memory reordering is a noop - score -= (np.prod(candidate_topo.ghosts)) # penalize number of ghosts + (candidate_state.memory_order is target_memory_order))*1 + # penalize number of ghosts + score -= (np.prod(candidate_topo.ghosts)) return score if (target_topo.backend.kind is Backend.HOST) and write_nodes: @@ -871,8 +901,11 @@ class GraphBuilder(object): src_topos = sorted(src_topos, key=topology_affinity, reverse=True) src_topo = src_topos[0] if (src_topo is not target_topo): - gprint(' >Redistributing field {} from up to date topologies {} to host topology {}.'.format( - ifield.name, ' ,'.join(t.pretty_tag for t in src_topos), target_topo.pretty_tag)) + msg=' >Redistributing field {} from up to date topologies {} ' + msg+='to host topology {}.' + msg=msg.format(ifield.name, ' ,'.join(t.pretty_tag + for t in src_topos), target_topo.pretty_tag) + gprint(msg) self.transpose(src_topo, target_axes, graph) self.redistribute(target_topo, graph, src_topo=src_topo) # we can always reorder target because this a host topology @@ -883,8 +916,11 @@ class GraphBuilder(object): src_topos = sorted(src_topos, key=topology_affinity, reverse=True) src_topo = src_topos[0] if (src_topo is not target_topo): - gprint(' >Redistributing field {} from up to date topologies {} to device topology {}.'.format( - ifield.name, ' ,'.join(t.pretty_tag for t in src_topos), target_topo.pretty_tag)) + msg=' >Redistributing field {} from up to date topologies {} ' + msg+='to device topology {}.' + msg=msg.format(ifield.name, ' ,'.join(t.pretty_tag + for t in src_topos), target_topo.pretty_tag) + gprint(msg) self.reorder(src_topo, target_memory_order, graph) self.redistribute(target_topo, graph, src_topo=src_topo) # target is always opencl so we transpose here @@ -938,7 +974,10 @@ class GraphBuilder(object): opnode, ofield, output_topo) if invalidate_field: - gprint(' >Invalidating output field {} on all topologies but {} because is has been freshly written.'.format(ofield.name, output_topo.pretty_tag)) + msg=' >Invalidating output field {} on all topologies but {} ' + msg+='because is has been freshly written.' + msg=msg.format(ofield.name, output_topo.pretty_tag) + gprint() # add dependency to all operators that reads this field # to prevent concurent read-writes. if output_topo in read_nodes: @@ -949,9 +988,15 @@ class GraphBuilder(object): write_nodes.clear() dtopology_states.clear() else: - gprint(' >Keeping output field {} up to date on all topologies because is has been marked as preserved by operator.'.format(ofield.name)) - gprint(' >Up to date topologies for field {} are now {}, {}.'.format(ofield.name, output_topo.pretty_tag, - ' ,'.join(t.pretty_tag for t in write_nodes))) + msg=' >Keeping output field {} up to date on all topologies because ' + msg+='is has been marked as preserved by operator.' + msg=msg.format(ofield.name) + gprint(msg) + + msg=' >Up to date topologies for field {} are now {}, {}.' + msg=msg.format(ofield.name, output_topo.pretty_tag, + ' ,'.join(t.pretty_tag for t in write_nodes)) + gprint(msg) # add the operator node as the one that lastly wrote this field. # no other operators can be reading as this topology just been written. @@ -961,7 +1006,8 @@ class GraphBuilder(object): if isinstance(operator, Problem): ostate = operator.final_output_topology_states[ofield][1] else: - ostate = operator.output_topology_state(ofield, input_topology_states) + ostate = operator.output_topology_state(ofield, + input_topology_states) dtopology_states[output_topo] = ostate gprint2(' >Output state is now {}'.format(ostate)) else: -- GitLab