From 4b3b250571679a991c1582cb7c8d38345ac22ca1 Mon Sep 17 00:00:00 2001 From: JM Etancelin <jean-matthieu.etancelin@univ-pau.fr> Date: Mon, 23 Nov 2020 14:41:13 +0100 Subject: [PATCH] Improve tasks operators and graph handle. Improve inter-tasks automatic inserting of RedistributeInter. Better profiling and task profiler report (statistics over ranks). Add nested problems better graph creation and interaction across levels. Fix overlapping bridges and redistributes. Fix io_utils FS type process safety (deadlock issue with tasks) --- hysop/core/graph/computational_graph.py | 336 +++++++++++------------ hysop/core/graph/graph_builder.py | 314 ++++++++++++--------- hysop/core/mpi/bridge.py | 73 +++-- hysop/core/mpi/redistribute.py | 182 +++++++----- hysop/core/mpi/topo_tools.py | 4 +- hysop/domain/domain.py | 3 + hysop/fields/cartesian_discrete_field.py | 11 +- hysop/iterative_method.py | 3 +- hysop/operator/inter_task_param_comm.py | 37 +-- hysop/problem.py | 112 ++++---- hysop/tools/io_utils.py | 13 +- hysop/tools/profiler.py | 101 +++---- 12 files changed, 639 insertions(+), 550 deletions(-) diff --git a/hysop/core/graph/computational_graph.py b/hysop/core/graph/computational_graph.py index e8c678a9e..f48f3febf 100644 --- a/hysop/core/graph/computational_graph.py +++ b/hysop/core/graph/computational_graph.py @@ -1,7 +1,7 @@ # coding: utf-8 from hysop import __DEBUG__, __VERBOSE__, __PROFILE__, vprint, dprint -from hysop.tools.decorators import debug +from hysop.tools.decorators import debug, profile from hysop.tools.types import to_list, to_set, to_tuple, first_not_None, check_instance from hysop.tools.string_utils import framed_str, strlen, multiline_split from hysop.tools.numpywrappers import npw @@ -418,64 +418,71 @@ class ComputationalGraph(ComputationalGraphNode): return ss.encode('utf-8') def operator_report(self): + from hysop.problem import Problem maxlen = (None, None, None, 40, None, 40, None) split_sep = (None, None, None, ',', None, ',', None) newline_prefix = (None, None, None, ' ', '', ' ', None) replace = ('--', '', '', '', '-', '', '') - reduced_graph = self.reduced_graph - ops, tasks = [], [] - for (i, node) in enumerate(self.nodes): - handled_inputs, handled_outputs = (), () - finputs, foutputs = [], [] - for f in node.input_tensor_fields: - f0 = f.fields[0] - t0 = node.input_fields[f0] - if all((node.input_fields[fi] is t0) for fi in f.fields): - finputs.append(u'{}.{}'.format(f.pretty_name.decode('utf-8'), - t0.pretty_tag.decode('utf-8'))) - handled_inputs += f.fields - for f in node.output_tensor_fields: - f0 = f.fields[0] - t0 = node.output_fields[f0] - if all((node.output_fields[fi] is t0) for fi in f.fields): - foutputs.append(u'{}.{}'.format(f.pretty_name.decode('utf-8'), - t0.pretty_tag.decode('utf-8'))) - handled_outputs += f.fields - finputs += [u'{}.{}'.format(f.pretty_name.decode('utf-8'), - t.pretty_tag.decode('utf-8')) - for (f, t) in node.input_fields.iteritems() - if f not in handled_inputs and not t is None] - foutputs += [u'{}.{}'.format(f.pretty_name.decode('utf-8'), - t.pretty_tag.decode('utf-8')) - for (f, t) in node.output_fields.iteritems() - if f not in handled_outputs and not t is None] - finputs = u','.join(sorted(finputs)) - foutputs = u','.join(sorted(foutputs)) - - pinputs = u','.join(sorted([p.pretty_name.decode('utf-8') - for p in node.input_params.values()])) - poutputs = u','.join(sorted([p.pretty_name.decode('utf-8') - for p in node.output_params.values()])) - - infields = u'[{}]'.format(finputs) if finputs else u'' - outfields = u'[{}]'.format(foutputs) if foutputs else u'' - inparams = u'[{}]'.format(pinputs) if pinputs else u'' - outparams = u'[{}]'.format(poutputs) if poutputs else u'' - - inputs = u'{}{}{}'.format(infields, u'x' if infields and inparams else u'', inparams) - outputs = u'{}{}{}'.format( - outfields, u'x' if outfields and outparams else u'', outparams) - if inputs == '': - inputs = u'no inputs' - if outputs == '': - outputs = u'no outputs' + def __iterate_operators(n, idprefix=''): + for (i, node) in enumerate(n): + handled_inputs, handled_outputs = (), () + finputs, foutputs = [], [] + for f in node.input_tensor_fields: + f0 = f.fields[0] + t0 = node.input_fields[f0] + if all((node.input_fields[fi] is t0) for fi in f.fields): + finputs.append(u'{}.{}'.format(f.pretty_name.decode('utf-8'), + t0.pretty_tag.decode('utf-8'))) + handled_inputs += f.fields + for f in node.output_tensor_fields: + f0 = f.fields[0] + t0 = node.output_fields[f0] + if all((node.output_fields[fi] is t0) for fi in f.fields): + foutputs.append(u'{}.{}'.format(f.pretty_name.decode('utf-8'), + t0.pretty_tag.decode('utf-8'))) + handled_outputs += f.fields + finputs += [u'{}.{}'.format(f.pretty_name.decode('utf-8'), + t.pretty_tag.decode('utf-8')) + for (f, t) in node.input_fields.iteritems() + if f not in handled_inputs and not t is None] + foutputs += [u'{}.{}'.format(f.pretty_name.decode('utf-8'), + t.pretty_tag.decode('utf-8')) + for (f, t) in node.output_fields.iteritems() + if f not in handled_outputs and not t is None] + finputs = u','.join(sorted(finputs)) + foutputs = u','.join(sorted(foutputs)) - opname = node.pretty_name.decode('utf-8') - optype = type(node).__name__ - tasks.append(node.mpi_params.task_id) - strdata = (str(i), str(node.mpi_params.task_id), opname, inputs, '->', outputs, optype) + pinputs = u','.join(sorted([p.pretty_name.decode('utf-8') + for p in node.input_params.values()])) + poutputs = u','.join(sorted([p.pretty_name.decode('utf-8') + for p in node.output_params.values()])) + + infields = u'[{}]'.format(finputs) if finputs else u'' + outfields = u'[{}]'.format(foutputs) if foutputs else u'' + inparams = u'[{}]'.format(pinputs) if pinputs else u'' + outparams = u'[{}]'.format(poutputs) if poutputs else u'' + inputs = u'{}{}{}'.format(infields, u'x' if infields and inparams else u'', inparams) + outputs = u'{}{}{}'.format( + outfields, u'x' if outfields and outparams else u'', outparams) + if inputs == '': + inputs = u'no inputs' + if outputs == '': + outputs = u'no outputs' + + opname = node.pretty_name.decode('utf-8') + optype = type(node).__name__ + taskdata = node.mpi_params.task_id + strdata = (idprefix+str(i), str(node.mpi_params.task_id), opname, inputs, '->', outputs, optype) + yield (strdata, taskdata) + if isinstance(node, Problem): + for _ in __iterate_operators(node.nodes, idprefix=strdata[0]+':'): + yield _ + + ops, tasks = [], [] + for strdata, taskdata in __iterate_operators(self.nodes): + tasks.append(taskdata) ops += multiline_split(strdata, maxlen, split_sep, replace, newline_prefix) hide_task_id = len(set(tasks)) == 1 title = u'ComputationalGraph {} discrete operator report (tasks:{})'.format( @@ -509,69 +516,51 @@ class ComputationalGraph(ComputationalGraphNode): return u'\n{}\n'.format(framed_str(title=title, msg=ss[1:])).encode('utf-8') def task_profiler_report(self): + from hysop.problem import Problem if not __PROFILE__: return '' - # isize = tasksize = name_size = type_size = 8 rk = self._profiler.get_comm().Get_rank() - maxlen = (None, None, None, None, None, None) - split_sep = (None, None, None, None, None, None) - newline_prefix = (None, None, None, None, None, None) - replace = ('--', '', '', '', '', '') - reduced_graph = self.reduced_graph - profnames, proftimes, profcalls = \ - self._profiler.all_names[rk], self._profiler.all_times[rk], self._profiler.all_calls[rk] - ops = [] - for (i, op) in enumerate(self.nodes): - opname = op.pretty_name.decode('utf-8') - optype = type(op).__name__ - procdata, taskdata = [], [] - for _ in self._profiler.summary.values(): - if _.get_name().find(op.name) >= 0: - for __ in _.summary.values(): - try: - procdata.append("{0} {1} {2:.5g} {3:.5g}".format( - __.fname, __.nb_calls, __.total_time, __.total_time/__.nb_calls)) - nn = _.get_name() + '.' + __.fname - idx = [k for k in enumerate(profnames) if k[1].find(nn) >= 0][0][0] - taskvals = proftimes[:, idx][proftimes[:, idx] != 0.] - taskdata.append("{0:.5g} {1:.5g} {2:.5g} {3}".format( - taskvals[-1]/(len(taskvals)-1), - npw.min(taskvals[:-1]), npw.max(taskvals[:-1]), len(taskvals)-1)) - except AttributeError: - pass - if procdata: - strdata = (str(i), str(op.mpi_params.task_id), opname, optype, procdata[0], taskdata[0]) - ops += multiline_split(strdata, maxlen, split_sep, replace, newline_prefix) - for p, t in zip(procdata[1:], taskdata[1:]): - ops += multiline_split(('', '', '', '', p, t), maxlen, - split_sep, replace, newline_prefix) + maxlen = (None,) * 11 + split_sep = (None, ) * 11 + newline_prefix = (None, ) * 11 + replace = ('--',) + ('',) * 10 + + def __recurse_nodes(n, iprefix='', pnprefix='', nprefix=''): + for (i, op) in enumerate(n): + yield (iprefix+str(i), pnprefix+op.pretty_name.decode('utf-8'), nprefix+op.name, op) + if isinstance(op, Problem): + for _ in __recurse_nodes(op.nodes, iprefix+str(i)+":", pnprefix+op.pretty_name.decode('utf-8')+".", nprefix+op.name+"."): + yield _ + + ops, already_printed = [], [] + for (i, pn, n, op) in __recurse_nodes(self.nodes, nprefix=self.name+"."): + for k in self._profiler.all_data.keys(): + if n == '.'.join(k.split('.')[:-1]) and not n in already_printed: + strdata = (i, str(op.mpi_params.task_id), pn, type(op).__name__, k.split('.')[-1]) + values = tuple(ff.format(self._profiler.all_data[k][_]) + for ff, _ in zip(("{:.5g}", "{}", "{:.5g}", "{:.5g}", "{:.5g}", "{}"), (2, 1, 3, 4, 5, 0))) + ops += multiline_split(strdata + values, maxlen, split_sep, replace, newline_prefix) + already_printed.append(n) isize = max(strlen(s[0]) for s in ops) tasksize = max(max(strlen(s[1]) for s in ops), 6) name_size = max(strlen(s[2]) for s in ops) type_size = max(strlen(s[3]) for s in ops) - procdata_sizes = tuple([max(max(strlen(s[4].split(' ')[k]) for s in ops), l) - for k, l in enumerate((2, 2, 3, 4))]) - procdata_size = 3+npw.sum(procdata_sizes) - taskdata_sizes = tuple([max(max(strlen(s[5].split(' ')[k]) for s in ops), l) - for k, l in enumerate((4, 3, 3, 4))]) - taskdata_size = 3+npw.sum(taskdata_sizes) + fn_size = max(strlen(s[4]) for s in ops) + v_size = (max(strlen(s[5]) for s in ops) for _ in range(5, 11)) notasks = len(set(s[1] for s in ops)) == 1 if notasks: tasksize = 0 - ss = u' {:<{isize}} {:<{tasksize}} {:<{name_size}} {:<{type_size}} {:<{procdata_size}} {:<{taskdata_size}}'.format( - 'ID', '' if notasks else 'TASKID', 'OPERATOR', 'OPERATOR TYPE', 'PROC DATA', 'TASK DATA', - isize=isize, tasksize=tasksize, taskdata_size=taskdata_size, - name_size=name_size, type_size=type_size, procdata_size=procdata_size) - ss += u'\n {12:<{0}} {13:<{1}} {14:<{2}} {15:<{3}} {16:<{4}} {17:<{5}} {18:<{6}} {19:<{7}} {20:<{8}} {21:<{9}} {22:<{10}} {23:<{11}}'.format( - *((isize, tasksize, name_size, type_size)+procdata_sizes+taskdata_sizes + - ('', '', '', '', 'fn', 'nc', 'tot', 'mean', 'mean', 'min', 'max', 'size'))) - for (i, task_id, opname, optype, procdata, taskdata) in ops: - ss += u'\n {12:<{0}} {13:<{1}} {14:<{2}} {15:<{3}} {16:<{4}} {17:<{5}} {18:<{6}} {19:<{7}} {20:<{8}} {21:<{9}} {22:<{10}} {23:<{11}}'.format( - *((isize, tasksize, name_size, type_size)+procdata_sizes+taskdata_sizes + - (i, '' if notasks else task_id, opname, optype) + - tuple(procdata.split(' ') + taskdata.split(' ')))) + ln = {'isize': isize, 'tasksize': tasksize, 'name_size': name_size, 'type_size': type_size, 'fn_size': fn_size} + ln.update(dict([('vsize'+str(i), s) for i, s in enumerate(v_size)])) + ss_fmt = u'\n {:<{isize}} {:<{tasksize}} {:<{name_size}} {:<{type_size}} {:<{fn_size}}' + ss_fmt += u' | {:<{vsize0}} {:<{vsize1}} | {:<{vsize2}} {:<{vsize3}} {:<{vsize4}} {:<{vsize5}}' + ss = ss_fmt.format('ID', '' if notasks else 'TASKID', 'OPERATOR', 'OPERATOR TYPE', 'FN', + 'SELF', '', 'STATS', '', '', '', **ln) + ss += ss_fmt.format('', '', '', '', '', 'total', 'nc', 'mean', 'min', 'max', 'nproc', **ln) + for _ in ops: + ss += ss_fmt.format(_[0], '' if notasks else _[1], *tuple(_[2:]), **ln) title = u'ComputationalGraph {} task profiling report '.format( self.pretty_name.decode('utf-8')) @@ -599,12 +588,24 @@ class ComputationalGraph(ComputationalGraphNode): @debug @not_initialized def push_nodes(self, *args): + from hysop.operators import InterTaskParamComm + from hysop.problem import Problem nodes = () for arg in args: if (arg is None): continue nodes += to_tuple(arg) + def _get_mpi_params(n): + try: + mpi_params = n.impl_kwds['mpi_params'] + except (AttributeError, KeyError) as e: + try: + mpi_params = n.mpi_params + except AttributeError: + mpi_params = self.mpi_params + return mpi_params + def _hidden_node(node, mpi_params): """Add a utility node for handling inter-tasks communications.""" from hysop.core.mpi.redistribute import RedistributeInter @@ -616,21 +617,20 @@ class ComputationalGraph(ComputationalGraphNode): else: kw.update({'mpi_params': self._last_pushed_node_mpi_params, }) return RedistributeInter(**kw) - for node in nodes: + # nodes iterator with neighbors + for pcn_node in zip((None, )+nodes[:-1], nodes, nodes[1:]+(None, )): + prev_node, node, next_node = pcn_node # Skip not on-task operators very early - try: - mpi_params = node.impl_kwds['mpi_params'] - except (AttributeError, KeyError) as e: - try: - mpi_params = node.mpi_params - except AttributeError: - mpi_params = self.mpi_params + mpi_params = _get_mpi_params(node) + if self._last_pushed_node_mpi_params is None: self._last_pushed_node_mpi_params = mpi_params # Check if there is a task change if self._last_pushed_node_mpi_params and self._last_pushed_node_mpi_params.task_id != mpi_params.task_id: - self.nodes.append(_hidden_node(node, mpi_params)) - self._last_pushed_node_mpi_params = mpi_params + if (not isinstance(node, InterTaskParamComm) and + not (isinstance(node, Problem) and node.search_intertasks_ops)): + self.nodes.append(_hidden_node(node, mpi_params)) + self._last_pushed_node_mpi_params = _get_mpi_params(node) if mpi_params and not mpi_params.on_task: continue if isinstance(node, ComputationalGraph): @@ -655,10 +655,10 @@ class ComputationalGraph(ComputationalGraphNode): def available_methods(self): avail_methods = {} if not self.nodes: - from hysop.iterative_method import IterativeMethod + from hysop.problem import Problem msg = u'No nodes present in ComputationalGraph {}.'.format( self.pretty_name.decode('utf-8')) - if not isinstance(self, IterativeMethod): + if not isinstance(self, Problem): raise RuntimeError(msg.encode('utf-8')) for node in self.nodes: for (k, v) in node.available_methods().iteritems(): @@ -687,7 +687,8 @@ class ComputationalGraph(ComputationalGraphNode): self.pre_initialize(**kwds) msg = u'ComputationalGraph {} is empty.' - assert len(self.nodes) > 0, msg.format(self.pretty_name.decode('utf-8')).encode('utf-8') + if len(self.nodes) == 0: + vprint(msg.format(self.pretty_name.decode('utf-8')).encode('utf-8')) for node in self.nodes: node.pre_initialize(**kwds) @@ -714,10 +715,7 @@ class ComputationalGraph(ComputationalGraphNode): search_intertasks_ops=search_intertasks_ops) # fix for auto generated nodes - for node in self.nodes: - if (node._field_requirements is None): - node.get_and_set_field_requirements() - assert (node._field_requirements is not None) + self._fix_auto_generated_nodes() # from now on, all nodes contained in self.nodes are ComputationalGraphOperator # (ordered for sequential execution) @@ -728,6 +726,14 @@ class ComputationalGraph(ComputationalGraphNode): if self.is_root: self.post_initialize(**kwds) + def _fix_auto_generated_nodes(self): + for node in self.nodes: + if isinstance(node, ComputationalGraph): + node._fix_auto_generated_nodes() + if (node._field_requirements is None): + node.get_and_set_field_requirements() + assert (node._field_requirements is not None) + @debug def check(self): super(ComputationalGraph, self).check() @@ -759,6 +765,8 @@ class ComputationalGraph(ComputationalGraphNode): assert (node._field_requirements is not None) if not isinstance(node, Problem): node.handle_topologies(input_topology_states[node], output_topology_states[node]) + else: + node.handle_topologies(node.input_topology_states, node.output_topology_states) node.input_topology_states = input_topology_states[node] node.output_topology_states = output_topology_states[node] self.topology_handled = True @@ -905,54 +913,45 @@ class ComputationalGraph(ComputationalGraphNode): for node in sorted(deffered_ops, key=lambda _: _.variable.name): node.discretize() - if self.is_root: - input_discrete_fields = {} - for (field, topo) in self.input_fields.iteritems(): - istate = self.initial_input_topology_states[field][1] - # problem inputs are writeable for initialization - istate = istate.copy(is_read_only=False) - dfield = field.discretize(topo, istate) - input_discrete_fields[field] = dfield - - output_discrete_fields = {} - for field, topo in self.output_fields.iteritems(): - ostate = self.final_output_topology_states[field][1] - dfield = field.discretize(topo, ostate) - output_discrete_fields[field] = dfield - - # build back DiscreteTensorFields from DiscreteScalarFields - from hysop.fields.discrete_field import DiscreteTensorField - input_discrete_tensor_fields = {} - for tfield in self.input_tensor_fields: - dfields = tfield.new_empty_array() - for (idx, field) in tfield.nd_iter(): - dfield = input_discrete_fields[field] - dfields[idx] = dfield - tdfield = DiscreteTensorField(field=tfield, dfields=dfields) - input_discrete_tensor_fields[tfield] = tdfield - - output_discrete_tensor_fields = {} - for tfield in self.output_tensor_fields: - dfields = tfield.new_empty_array() - for (idx, field) in tfield.nd_iter(): - dfield = output_discrete_fields[field] - dfields[idx] = dfield - tdfield = DiscreteTensorField(field=tfield, dfields=dfields) - output_discrete_tensor_fields[tfield] = tdfield - - discrete_fields = tuple(set(input_discrete_fields.values() + - output_discrete_fields.values())) - - discrete_tensor_fields = tuple(set(input_discrete_tensor_fields.values() + - output_discrete_tensor_fields.values())) - - else: - input_discrete_fields = None - output_discrete_fields = None - input_discrete_tensor_fields = None - output_discrete_tensor_fields = None - discrete_fields = None - discrete_tensor_fields = None + input_discrete_fields = {} + for (field, topo) in self.input_fields.iteritems(): + istate = self.initial_input_topology_states[field][1] + # problem inputs are writeable for initialization + istate = istate.copy(is_read_only=False) + dfield = field.discretize(topo, istate) + input_discrete_fields[field] = dfield + + output_discrete_fields = {} + for field, topo in self.output_fields.iteritems(): + ostate = self.final_output_topology_states[field][1] + dfield = field.discretize(topo, ostate) + output_discrete_fields[field] = dfield + + # build back DiscreteTensorFields from DiscreteScalarFields + from hysop.fields.discrete_field import DiscreteTensorField + input_discrete_tensor_fields = {} + for tfield in self.input_tensor_fields: + dfields = tfield.new_empty_array() + for (idx, field) in tfield.nd_iter(): + dfield = input_discrete_fields[field] + dfields[idx] = dfield + tdfield = DiscreteTensorField(field=tfield, dfields=dfields) + input_discrete_tensor_fields[tfield] = tdfield + + output_discrete_tensor_fields = {} + for tfield in self.output_tensor_fields: + dfields = tfield.new_empty_array() + for (idx, field) in tfield.nd_iter(): + dfield = output_discrete_fields[field] + dfields[idx] = dfield + tdfield = DiscreteTensorField(field=tfield, dfields=dfields) + output_discrete_tensor_fields[tfield] = tdfield + + discrete_fields = tuple(set(input_discrete_fields.values() + + output_discrete_fields.values())) + + discrete_tensor_fields = tuple(set(input_discrete_tensor_fields.values() + + output_discrete_tensor_fields.values())) self.input_discrete_fields = input_discrete_fields self.output_discrete_fields = output_discrete_fields @@ -1008,6 +1007,7 @@ class ComputationalGraph(ComputationalGraphNode): return self @debug + @profile @ready def apply(self, **kwds): for node in self.nodes: diff --git a/hysop/core/graph/graph_builder.py b/hysop/core/graph/graph_builder.py index e5767ed37..9fe384fdf 100644 --- a/hysop/core/graph/graph_builder.py +++ b/hysop/core/graph/graph_builder.py @@ -67,7 +67,7 @@ class GraphBuilder(object): """ check_instance(node, ComputationalGraph) self.target_node = node - gprint('\n::Graph builder::') + gprint('::Graph builder::') gprint('>Initialized graph builder for ComputationalGraph {}'.format(node.name)) def configure(self, current_level, outputs_are_inputs, search_intertasks_ops, **kwds): @@ -108,11 +108,17 @@ class GraphBuilder(object): self.target_node.method) def new_node(self, op, subgraph, - current_level, node, node_id, opvertex): - + current_level, node, node_id, opvertex, insert_at=-1): + # Adds new nodes to graph. Optional position of new nodes in nodelist (adjust node_id for nodes sorting) graph = self.graph opnode = new_vertex(graph, op).copy_attributes(opvertex) gprint(' *Created node is {}.'.format(int(opnode))) + if insert_at >= 0: + for _ in graph.nodes: + if _.node_id > insert_at: + _.node_id += 1 + opnode.node_id = insert_at + 1 + gprint(' *Created moved to {}.'.format(int(opnode))) return opnode def build_graph(self): @@ -324,65 +330,6 @@ class GraphBuilder(object): op_input_topology_states[op] = input_states op_output_topology_states[op] = output_states - # iterate over ComputationalNodes - for (node_id, node) in enumerate(target_node.nodes): - gprint(' >Handling node {}: {} {}'.format( - node_id, node.name, node.__class__)) - - # Recursively build graph. - # If current node is a ComputationalGraph, we have to first - # build its own local graph and we extract all its operators (graph nodes). - # Else if node is a ComputationalGraphOperator, we just take the - # current node operator. - subgraph, node_ops, node_vertices, from_subgraph = \ - self.build_subgraph(node, current_level) - # iterate over subgraph operators - for (opvertex, op) in zip(node_vertices, node_ops): - # add operator node and fill vertex properties - opnode = self.new_node(op, subgraph, current_level, node, node_id, opvertex) - if isinstance(node, RedistributeInter): - assert self.search_intertasks_ops - # Save graph building state for filling next nodes topologies after graph completion - self._redistribute_inter.append((node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode)) - else: - __handle_node(node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode) - - # On level=0 we print a summary (if asked) for input and output fields and - # their topology. - def _print_io_fields_params_summary(comment=''): - msg = '\nComputationalGraph {} inputs {}:\n'.format(target_node.name, comment) - if not self.input_fields and not self.input_params: - msg += ' no inputs\n' - else: - if self.input_fields: - for ifield in sorted(self.input_fields, key=lambda x: x.name): - itopo = self.input_fields[ifield] - _, ireqs = self.input_topology_states[ifield] - msg += ' *Field {} on topo {}{}\n'.format( - ifield.name, itopo.id, ": {}".format(ireqs) if GRAPH_BUILDER_DEBUG_LEVEL == 2 else '') - if len(self.input_params) > 0: - for iparam in sorted(self.input_params): - msg += ' *Parameter {}\n'.format(iparam) - msg += 'ComputationalGraph {} outputs {}:\n'.format(target_node.name, comment) - if not self.output_fields and not self.output_params: - msg += ' no outputs\n' - else: - if self.output_fields: - for ofield in sorted(self.output_fields, key=lambda x: x.name): - otopo = self.output_fields[ofield] - _, oreqs = self.output_topology_states[ofield] - msg += ' *Field {} on topo {}{}\n'.format( - ofield.name, otopo.id, ": {}".format(oreqs) if GRAPH_BUILDER_DEBUG_LEVEL == 2 else '') - if len(self.output_params) > 0: - for oparam in sorted(self.output_params): - msg += ' *Parameter {}\n'.format(oparam) - - msg += '\n' - gprint(msg) - if current_level == 0: - _print_io_fields_params_summary() - is_graph_updated = False - def __find_elements_to_redistribute(available_elems, needed_elems): # The algorithm is to extract level0 input fields and topologies as needs # and meet with output fields and topologies as provided. The key feature is that @@ -393,6 +340,14 @@ class GraphBuilder(object): comm = domain.parent_comm tcomm = domain.task_comm() current_task = domain.current_task() + gprint(current_task, available_elems, needed_elems) + + def _name_to_key(n, d): + var = [_ for _ in d.keys() if isinstance(_, str) and _ == n] + var += [_ for _ in d.keys() if not isinstance(_, str) and _.name == n] + if len(var) == 1: + return var[0] + return None # Find redistribute candidates available_names = set([_ if not hasattr(_, 'name') else _.name @@ -410,51 +365,65 @@ class GraphBuilder(object): msg = '' # loop over other tasks for ot in (_ for _ in domain.all_tasks if _ != current_task): - comm.isend(needed_names.keys(), - dest=domain.task_root_in_parent(ot), tag=ot) - ot_needs = comm.recv(source=domain.task_root_in_parent(ot), tag=current_task) - can_provide = [_ for _ in ot_needs if _ in available_names] - for prov in can_provide: - available_names[prov] = ot + if(domain.task_root_in_parent(ot) == domain.parent_rank): + ot_needs = needed_names.keys() + ot_provide = can_provide = [_ for _ in ot_needs if _ in available_names] + for prov in can_provide: + available_names[prov] = needed_elems[_name_to_key(prov, needed_elems)].task_id + for _op in ot_provide: + needed_names[_op] = available_elems[_name_to_key(_op, available_elems)].task_id + else: + comm.isend(needed_names.keys(), dest=domain.task_root_in_parent(ot), tag=4321) + ot_needs = comm.recv(source=domain.task_root_in_parent(ot), tag=4321) + can_provide = [_ for _ in ot_needs if _ in available_names] + for prov in can_provide: + available_names[prov] = ot + assert ot != available_elems[_name_to_key(prov, available_elems)].task_id + comm.isend(can_provide, dest=domain.task_root_in_parent(ot), tag=1234) + ot_provide = comm.recv(source=domain.task_root_in_parent(ot), tag=1234) + for _op in ot_provide: + needed_names[_op] = ot + assert ot != needed_elems[_name_to_key(_op, needed_elems)].task_id if len(ot_needs) > 0: msg += "\n *Other task {} needs init for {}, we provide {}".format( ot, ot_needs, "nothing" if len(can_provide) == 0 else can_provide) - comm.isend(can_provide, dest=domain.task_root_in_parent(ot), tag=1234) - ot_provide = comm.recv(source=domain.task_root_in_parent(ot), tag=1234) - for _op in ot_provide: - needed_names[_op] = ot if msg != '': gprint(" >[IT] Inter-tasks matching:" + msg) needed_names = {p: t for (p, t) in needed_names.items() if t is not None} available_names = {p: t for (p, t) in available_names.items() if t is not None} needed_names = tcomm.bcast(needed_names, root=0) available_names = tcomm.bcast(available_names, root=0) - gprint(" >[IT] Inter-tasks will send {} and recieve {}".format(available_names, needed_names)) - # assert len(available_names.items() + needed_names.items()) <= 1, \ - # "Redistributes work only for single variables for the moment" + gprint(" >[IT] Inter-tasks will send:to {} and recieve:from {}".format(available_names, needed_names)) # Get back the actual field or parameter - for p, t in sorted(available_names.items() + needed_names.items(), key=lambda _: _[0]): - kwargs = {'other_task_id': t} + for p in sorted(set(available_names.keys() + needed_names.keys())): + kwargs = {} s_topo, r_topo, comm_dir = (None, )*3 - if p in available_names.keys(): - var = [_ for _ in available_elems.keys() if isinstance(_, str) and _ == p] - var += [_ for _ in available_elems.keys() if not isinstance(_, str) and _.name == p] - var = var[0] - topo = available_elems[var] + ae, ne = [_name_to_key(p, _) for _ in available_elems, needed_elems] + if not ae is None: + var = ae + t = available_names[p] + topo = available_elems[ae] comm_dir = 'src' s_topo = topo - if p in needed_names.keys(): - var = [_ for _ in needed_elems.keys() if isinstance(_, str) and _ == p] - var += [_ for _ in needed_elems.keys() if not isinstance(_, str) and _.name == p] - var = var[0] - topo = needed_elems[var] + if not ne is None: + var = ne + t = needed_names[p] + topo = needed_elems[ne] comm_dir = 'dest' r_topo = topo + if not (ae is None or ne is None): + comm_dir = 'src&dest' + t = None assert not comm_dir is None # Finalize init call kwargs.update({'variable': var, 'mpi_params': topo.mpi_params, - 'name': 'RI{}_{}_{}'.format(comm_dir, topo.id, var.name), + 'name': 'RI{}_{}{}{}_{}'.format( + comm_dir, + '' if s_topo is None else s_topo.id, + 'to' if not s_topo is None and not r_topo is None else '', + '' if r_topo is None else r_topo.id, + var.name), 'pretty_name': u'RI{}_{}{}{}_{}'.format( comm_dir, '' if s_topo is None else subscript(s_topo.id), @@ -463,51 +432,111 @@ class GraphBuilder(object): var.pretty_name.decode('utf-8')), 'source_topo': s_topo, 'target_topo': r_topo, + 'other_task_id': t, }) + yield kwargs - # Iterate redistribute inter-tasks (Resume for these nodes) - if self.search_intertasks_ops: - gprint(" >[IT] Resume postponed RedistributeInter") - for _handle_node_args in self._redistribute_inter: - node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode = _handle_node_args - # Fix the redistribute base initialization - gprint(" >[IT] Handling node {}".format(node_id)) - available_elems, needed_elems = {}, {} - for _node in target_node.nodes: - if _node is node: - break - available_elems.update(_node.output_fields) - available_elems.update(_node.output_params) - for _node in target_node.nodes[::-1]: - if _node is node: - break - needed_elems.update(_node.input_fields) - needed_elems.update(_node.input_params) - for it_redistribute_kwargs in __find_elements_to_redistribute( - available_elems, needed_elems): - assert RedistributeInter.can_redistribute( - *tuple([it_redistribute_kwargs[_] - for _ in ('source_topo', 'target_topo', 'other_task_id')])) + # iterate over ComputationalNodes + node_id = 0 + for node in [_ for _ in target_node.nodes]: + gprint('\n >Handling node {}::{}: {} {}'.format( + self.target_node.name, node_id, node.name, node.__class__)) + + # Recursively build graph. + # If current node is a ComputationalGraph, we have to first + # build its own local graph and we extract all its operators (graph nodes). + # Else if node is a ComputationalGraphOperator, we just take the + # current node operator. + subgraph, node_ops, node_vertices, from_subgraph = \ + self.build_subgraph(node, current_level) + # iterate over subgraph operators + for (opvertex, op) in zip(node_vertices, node_ops): + # add operator node and fill vertex properties + opnode = self.new_node(op, subgraph, current_level, node, node_id, opvertex) + if isinstance(node, RedistributeInter): + assert self.search_intertasks_ops + gprint(" >[IT] Handling node {}".format(node_id)) + available_elems, needed_elems = {}, {} + for _node in target_node.nodes: + if _node is node: + break + available_elems.update(_node.output_fields) + available_elems.update(_node.output_params) + for _node in target_node.nodes[::-1]: + if _node is node: + break + needed_elems.update(_node.input_fields) + needed_elems.update(_node.input_params) + for it_redistribute_kwargs in __find_elements_to_redistribute( + available_elems, needed_elems): + assert RedistributeInter.can_redistribute( + *tuple([it_redistribute_kwargs[_] + for _ in ('source_topo', 'target_topo', 'other_task_id')])) + if op.fake_init: + op.__init__(**it_redistribute_kwargs) + # Recompute fields requirements since no fields were given in first fake operator creation + op.get_and_set_field_requirements() + first_op, first_opnode = op, opnode + else: + op = RedistributeInter(**it_redistribute_kwargs) + target_node.nodes.insert(target_node.nodes.index(first_op), op) + gprint('\n >Handling node {}::{}: {} {}'.format( + self.target_node.name, node_id, op.name, op.__class__)) + subgraph, node_ops, node_vertices, from_subgraph = \ + self.build_subgraph(op, current_level) + opvertex = node_vertices[0] + opnode = self.new_node(op, subgraph, current_level, op, node_id, opvertex, insert_at=node_id) + cstate = self.topology_states.setdefault( + op.variable, self.new_topology_state(op.variable)) + node = op + op.initialize(topgraph_method=self.target_node.method) + __handle_node(node_id, node, subgraph, node_ops, node_vertices, + from_subgraph, opvertex, op, opnode) + node_id += 1 if op.fake_init: - op.__init__(**it_redistribute_kwargs) - first_op, first_opnode = op, opnode - else: - op = RedistributeInter(**it_redistribute_kwargs) - target_node.nodes.insert(target_node.nodes.index(first_op), op) - subgraph, node_ops, node_vertices, from_subgraph = \ - self.build_subgraph(node, current_level) - opvertex = node_vertices[0] - opnode = self.new_node(op, subgraph, current_level, op, node_id, opvertex) - cstate = self.topology_states.setdefault( - op.variable, self.new_topology_state(op.variable)) - op.initialize(topgraph_method=self.target_node.method) - __handle_node(node_id, node, subgraph, node_ops, node_vertices, - from_subgraph, opvertex, op, opnode) - if op.fake_init: - # Delete node because nothing has to be exchanged - target_node.nodes.remove(op) - graph.remove_node(opnode) + # Delete node because nothing has to be exchanged + target_node.nodes.remove(op) + graph.remove_node(opnode) + else: + __handle_node(node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode) + node_id += 1 + + # On level=0 we print a summary (if asked) for input and output fields and + # their topology. + def _print_io_fields_params_summary(comment=''): + msg = '\nComputationalGraph {} inputs {}:\n'.format(target_node.name, comment) + if not self.input_fields and not self.input_params: + msg += ' no inputs\n' + else: + if self.input_fields: + for ifield in sorted(self.input_fields, key=lambda x: x.name): + itopo = self.input_fields[ifield] + _, ireqs = self.input_topology_states[ifield] + msg += ' *Field {} on topo {}{}\n'.format( + ifield.name, itopo.id, ": {}".format(ireqs) if GRAPH_BUILDER_DEBUG_LEVEL == 2 else '') + if len(self.input_params) > 0: + for iparam in sorted(self.input_params): + msg += ' *Parameter {}\n'.format(iparam) + msg += 'ComputationalGraph {} outputs {}:\n'.format(target_node.name, comment) + if not self.output_fields and not self.output_params: + msg += ' no outputs\n' + else: + if self.output_fields: + for ofield in sorted(self.output_fields, key=lambda x: x.name): + otopo = self.output_fields[ofield] + _, oreqs = self.output_topology_states[ofield] + msg += ' *Field {} on topo {}{}\n'.format( + ofield.name, otopo.id, ": {}".format(oreqs) if GRAPH_BUILDER_DEBUG_LEVEL == 2 else '') + if len(self.output_params) > 0: + for oparam in sorted(self.output_params): + msg += ' *Parameter {}\n'.format(oparam) + + msg += '\n' + gprint(msg) + if current_level == 0: + _print_io_fields_params_summary() + is_graph_updated = False # iterate deferred nodes for (op, opnode) in self._deferred_operators: @@ -589,7 +618,8 @@ class GraphBuilder(object): final_topo, final_state = target_topo, cstate.discrete_topology_states[target_topo] kept_topo_and_state = orig_topo == final_topo and orig_state == final_state if not kept_topo_and_state: - msg = " > Update graph outputs from topology {}{} to {}{}".format( + msg = " > Update graph outputs {} from topology {}{} to {}{}".format( + field.name, orig_topo.tag, ":{}".format(orig_state) if GRAPH_BUILDER_DEBUG_LEVEL == 2 else '', final_topo.tag, ":{}".format(final_state) if GRAPH_BUILDER_DEBUG_LEVEL == 2 else '') gprint(msg) @@ -611,8 +641,11 @@ class GraphBuilder(object): diff = read_topos-written_topos if len(written_topos) > 0 and len(diff) >= 0: for t in diff: - assert f in input_topology_states - is_graph_updated = _closure(f, t, self._double_check_inputs[f][t], self.topology_states[f]) + if f in self.output_fields and self.output_fields[f].mpi_params.task_id != t.mpi_params.task_id: + print('WARNING FOR MIXED-TASK DOUBLE CHECK for', f.name, (self.output_fields[f], t)) + else: + assert f in input_topology_states + is_graph_updated = _closure(f, t, self._double_check_inputs[f][t], self.topology_states[f]) # Final intertask redistributes as closure if self.search_intertasks_ops and (current_level == 0) and outputs_are_inputs: @@ -622,6 +655,17 @@ class GraphBuilder(object): available_elems.update(self.output_fields) needed_elems.update(self.input_params) available_elems.update(self.output_params) + + # Find redistribute candidates + mgs = " >[IT] Current task {} parameters and fields : {}" + gprint(mgs.format("can communicate", ", ".join(set([_ if not hasattr(_, 'name') else _.name + for _ in available_elems.keys()]) - self._intertasks_exchanged))) + gprint(mgs.format("needs", ", ".join(set([_ if not hasattr(_, 'name') else _.name + for _ in needed_elems.keys()]) - self._intertasks_exchanged))) + + for _k in available_elems: + if _k in needed_elems.keys(): + needed_elems.pop(_k) for it_redistribute_kwargs in __find_elements_to_redistribute( available_elems, needed_elems): if it_redistribute_kwargs: @@ -712,6 +756,8 @@ class GraphBuilder(object): node_vertices += [None]*len(node_operators) elif node.mpi_params is None or node.mpi_params.on_task: if isinstance(node, Problem): + node._build_graph(current_level=current_level, outputs_are_inputs=True, + search_intertasks_ops=node.search_intertasks_ops, **kwds) assert node.graph_built, "Sub-problem should be already built" assert node.initialized, "Sub-problem should be already initialized" node_ops.append(node) @@ -1092,8 +1138,10 @@ class GraphBuilder(object): def topology_affinity(candidate_topo): candidate_state = self.discrete_topology_states[candidate_topo] + # discard out-of taks topos + score = (candidate_topo.task_id != target_topo.task_id) * -10000000 # skip redistribute - score = (candidate_topo is target_topo) * 1000000 + score += (candidate_topo is target_topo) * 1000000 # skip multiresolution filter (not automatically handled yet) score += (candidate_topo.grid_resolution == target_topo.grid_resolution).all()*100000 diff --git a/hysop/core/mpi/bridge.py b/hysop/core/mpi/bridge.py index c0b97326e..054df2157 100644 --- a/hysop/core/mpi/bridge.py +++ b/hysop/core/mpi/bridge.py @@ -15,7 +15,7 @@ from hysop.core.mpi.topo_tools import TopoTools from hysop.tools.misc import Utils from hysop.core.mpi import MPI from hysop.tools.numpywrappers import npw -from hysop.tools.types import check_instance +from hysop.tools.types import check_instance, first_not_None class Bridge(object): @@ -212,6 +212,7 @@ class BridgeInter(object): domain = current.domain parent = domain.parent_comm assert isinstance(parent, MPI.Intracomm) + self._rank = parent.Get_rank() self._topology = current # current task id current_task = self._topology.domain.current_task() @@ -279,7 +280,7 @@ class BridgeInter(object): return current_indices, remote_indices - def transfer_types(self): + def transfer_types(self, task_id=None): """Return the dictionnary of MPI derived types used for send (if on source) or receive (if on target) """ @@ -289,6 +290,12 @@ class BridgeInter(object): self._transfer_indices, data_shape, dtype=self._dtype, order=self._order) return self._transfer_types + def transfer_indices(self, task_id=None): + """Return the dictionnary of transfer local indices + used for send (if on source) or receive (if on target) + """ + return self._transfer_indices + class BridgeOverlap(Bridge): """ @@ -297,15 +304,14 @@ class BridgeOverlap(Bridge): - have common mpi processes i.e. something in between a standard bridge with intra-comm and - a bridge dealing with intercommunication. This is probably - a very pathologic case ... + a bridge dealing with intercommunication. The main difference with a standard bridge is that this one may be call on processes where either source or target does not exist. """ - def __init__(self, comm_ref=None, **kwds): + def __init__(self, source_id, target_id, comm_ref=None, **kwds): """Bridge between two topologies that: * have a different number of mpi processes * have common mpi processes @@ -331,6 +337,7 @@ class BridgeOverlap(Bridge): check_instance(comm_ref, MPI.Intracomm, allow_none=True) self._comm_ref = comm_ref self.domain = None + self._source_task_id, self._target_task_id = source_id, target_id super(BridgeOverlap, self).__init__(**kwds) def _check_topologies(self): @@ -342,35 +349,31 @@ class BridgeOverlap(Bridge): if self._comm_ref: self.comm = self._comm_ref elif self._source is not None: - self.comm = self._source.parent + self.comm = self._source.domain.parent_comm else: - self.comm = self._target.parent + self.comm = self._target.domain.parent_comm # To build a bridge, all process in source/target must be in self.comm # and there must be an overlap between source # and target processes group. If not, turn to intercommunicator. + intersec_size = TopoTools.intersection_size(self.comm if self._source is None else self._source.comm, + self.comm if self._target is None else self._target.comm) if self._source is not None and self._target is not None: msg = 'BridgeOverlap error: mpi group from ' msg += 'source and topo must overlap. If not ' msg += 'BridgeInter will probably suits better.' - assert TopoTools.intersection_size(self._source.comm, - self._target.comm) > 0, msg - # assert self._source.domain == self._target.domain, "given {} != {}".format( - # self._source.domain, self._target.domain) - - if self._source is not None: + assert intersec_size > 0, msg + elif self._source is not None: assert isinstance(self._source, CartesianTopologyView) s_size = self._source.comm.Get_size() - assert TopoTools.intersection_size(self._source.comm, self.comm) == s_size - self.domain = self._source.domain - - if self._target is not None: + assert intersec_size == s_size + elif self._target is not None: assert isinstance(self._target, CartesianTopologyView) + self._target_task_id = self._target.mpi_params.task_id t_size = self._target.comm.Get_size() - assert TopoTools.intersection_size(self._target.comm, self.comm) == t_size, \ - "{} != {}".format(TopoTools.intersection_size(self._target.comm, self.comm), t_size) - self.domain = self._target.domain + assert intersec_size == t_size + self.domain = first_not_None(self._source, self._target).domain self._rank = self.comm.Get_rank() def _build_send_recv_dict(self): @@ -406,9 +409,9 @@ class BridgeOverlap(Bridge): convert = self._source.mesh.global_to_local self._send_indices = {rk: convert(self._send_indices[rk]) for rk in self._send_indices} - else: - # Shortcut for send_types() - self._send_types = {} + data_shape = self._source.mesh.local_resolution + self._send_types = TopoTools.create_subarray( + self._send_indices, data_shape, dtype=self._dtype, order=self._order) # 2. Target indices : if self._rank in indices_target: @@ -423,6 +426,24 @@ class BridgeOverlap(Bridge): convert = self._target.mesh.global_to_local self._recv_indices = {rk: convert(self._recv_indices[rk]) for rk in self._recv_indices} - else: - # Shortcut for recv types - self._recv_types = {} + data_shape = self._target.mesh.local_resolution + self._recv_types = TopoTools.create_subarray( + self._recv_indices, data_shape, dtype=self._dtype, order=self._order) + + def transfer_types(self, task_id=None): + """Return the dictionnary of MPI derived types + used for send (if task_id is source) or receive (if task_id is target) + """ + if task_id == self._source_task_id: + return self._send_types + if task_id == self._target_task_id: + return self._recv_types + + def transfer_indices(self, task_id=None): + """Return the dictionnary of local indices + used for send (if task_id is source) or receive (if task_id is target) + """ + if task_id == self._source_task_id: + return self._send_indices + if task_id == self._target_task_id: + return self._recv_indices diff --git a/hysop/core/mpi/redistribute.py b/hysop/core/mpi/redistribute.py index 09f3c46c6..b5e82a817 100644 --- a/hysop/core/mpi/redistribute.py +++ b/hysop/core/mpi/redistribute.py @@ -277,36 +277,46 @@ class RedistributeInter(RedistributeOperatorBase): def can_redistribute(cls, source_topo, target_topo, other_task_id=None, **kwds): tin = source_topo tout = target_topo - # source and target are defined on different tasks (one topology is None) + # source and target are defined on different tasks + # (one topology is None) or (there is two topologies on different tasks) if not ((isinstance(tin, CartesianTopology) and not isinstance(tout, CartesianTopology)) or (isinstance(tout, CartesianTopology) and not isinstance(tin, CartesianTopology))): - return False + if (tout is None and tin is None) or (tout.mpi_params.task_id == tin.mpi_params.task_id): + return False # source and target must have the same global resolution - if isinstance(tout, CartesianTopology): + if isinstance(tout, CartesianTopology) and not isinstance(tin, CartesianTopology): tout_id = tout.mpi_params.task_id _is_source, _is_dest = False, True _other_task = other_task_id domain = tout.domain other_resol = npw.zeros_like(tout.mesh.grid_resolution) my_resol = tout.mesh.grid_resolution - elif isinstance(tin, CartesianTopology): + elif isinstance(tin, CartesianTopology) and not isinstance(tout, CartesianTopology): tin_id = tin.mpi_params.task_id _is_source, _is_dest = True, False _other_task = other_task_id domain = tin.domain other_resol = npw.zeros_like(tin.mesh.grid_resolution) my_resol = tin.mesh.grid_resolution + elif isinstance(tout, CartesianTopology) and isinstance(tin, CartesianTopology): + tout_id = tout.mpi_params.task_id + tin_id = tin.mpi_params.task_id + _is_source, _is_dest = True, True + _other_task = other_task_id + domain = tin.domain + other_resol = tin.mesh.grid_resolution + my_resol = tout.mesh.grid_resolution else: return False if domain.task_rank() == 0: - if _is_source: + if _is_source and not _is_dest: domain.parent_comm.send(tin.mesh.grid_resolution, dest=domain.task_root_in_parent(_other_task)) other_resol = domain.parent_comm.recv( source=domain.task_root_in_parent(_other_task)) - if _is_dest: + if _is_dest and not _is_source: other_resol = domain.parent_comm.recv( source=domain.task_root_in_parent(_other_task)) domain.parent_comm.send(tout.mesh.grid_resolution, @@ -346,29 +356,27 @@ class RedistributeInter(RedistributeOperatorBase): def _synchronize(self, tin, tout): """Ensure that the two redistributes are operating on the same variable""" v = self.variable - other_name = "" - domain = None + in_name, out_name = "" if tin is None else v.name, "" if tout is None else v.name + domain = first_not_None((tin, tout)).domain + # Exchange names on root ranks first + if domain.task_rank() == 0 and in_name != out_name: + rcv_name = domain.parent_comm.sendrecv( + v.name, sendtag=self._other_task_id, recvtag=first_not_None((tin, tout)).mpi_params.task_id, + dest=domain.task_root_in_parent(self._other_task_id), + source=domain.task_root_in_parent(self._other_task_id)) + in_name, out_name = [rcv_name if _ == "" else _ for _ in (in_name, out_name)] + # then broadcast other's names on local ranks if not tout is None: - domain = tout.domain - if domain.task_rank() == 0: - other_name = domain.parent_comm.sendrecv( - v.name, sendtag=1357, recvtag=2468, - dest=domain.task_root_in_parent(self._other_task_id), - source=domain.task_root_in_parent(self._other_task_id)) + in_name = tout.mpi_params.comm.bcast(in_name, root=0) if not tin is None: - domain = tin.domain - if domain.task_rank() == 0: - other_name = domain.parent_comm.sendrecv( - v.name, sendtag=2468, recvtag=1357, - dest=domain.task_root_in_parent(self._other_task_id), - source=domain.task_root_in_parent(self._other_task_id)) - other_name = domain.task_comm().bcast(other_name, root=0) - assert other_name == v.name + out_name = tin.mpi_params.comm.bcast(out_name, root=0) + assert in_name == out_name and in_name == v.name def get_preserved_input_fields(self): - """This Inter-communicator redistribute is not preserving the output fields. - output fields are invalidated on other topologies""" - return set() + """This Inter-communicator redistribute is preserving the output fields. + output fields are invalidated on other topologies only if field is not also an input""" + o_f, i_f = self.output_fields, self.input_fields + return set([f for f in o_f.keys() if (not o_f[f] is None) and (f in i_f.keys() and not i_f[f] is None)]) def output_topology_state(self, output_field, input_topology_states): """ @@ -385,7 +393,6 @@ class RedistributeInter(RedistributeOperatorBase): The state may include transposition state, memory order and more. see hysop.topology.transposition_state.TranspositionState for the complete list. """ - from hysop.fields.continuous_field import Field from hysop.topology.topology import TopologyState @@ -448,11 +455,10 @@ class RedistributeInter(RedistributeOperatorBase): @debug def _check_inout_topology_states(self, ifields, itopology_states, ofields, otopology_states): - if not (((ifields != itopology_states) and (ofields == otopology_states)) - or ((ofields != otopology_states) and (ifields == itopology_states))): + if (ifields != itopology_states) and (ofields != otopology_states): + msg = '\nFATAL ERROR: {}::{}.handle_topologies()\n\n' + msg = msg.format(type(self).__name__, self.name) if not ((ifields != itopology_states) and (ofields == otopology_states)): - msg = '\nFATAL ERROR: {}::{}.handle_topologies()\n\n' - msg = msg.format(type(self).__name__, self.name) msg += 'input_topology_states fields did not match operator\'s input Fields.\n' if ifields - itopology_states: msg += 'input_topology_states are missing the following Fields: {}\n' @@ -461,8 +467,6 @@ class RedistributeInter(RedistributeOperatorBase): msg += 'input_topology_states is providing useless extra Fields: {}\n' msg = msg.format(itopology_states - ifields) if not ((ofields != otopology_states) and (ifields == itopology_states)): - msg = '\nFATAL ERROR: {}::{}.handle_topologies()\n\n' - msg = msg.format(type(self).__name__, self.name) msg += 'output_topology_states fields did not match operator\'s output Fields.\n' if ofields - otopology_states: msg += 'output_topology_states are missing the following Fields: {}\n' @@ -499,7 +503,6 @@ class RedistributeInter(RedistributeOperatorBase): def discretize(self): super(RedistributeInter, self).discretize() - # we can create the bridge ifield, ofield = None, None if self.variable in self.input_discrete_fields: @@ -507,13 +510,16 @@ class RedistributeInter(RedistributeOperatorBase): if self.variable in self.output_discrete_fields: ofield = self.output_discrete_fields[self.variable] _is_source, _is_target = False, False - source_topo, target_topo = None, None + source_topo, target_topo, source_id = None, None, None + source_tstate, target_tstate = None, None if ifield is not None: _is_source = True source_topo = ifield.topology source_id = source_topo.mpi_params.task_id target_id = self._other_task_id - current_topo = source_topo + source_tstate = (source_topo.topology_state.dim, + source_topo.topology_state.axes, + source_topo.topology_state.memory_order) if DEBUG_REDISTRIBUTE != 0: print "This is a redistribute of {} from source topology {}".format( self.variable.name, source_topo.tag) @@ -521,14 +527,16 @@ class RedistributeInter(RedistributeOperatorBase): _is_target = True target_topo = ofield.topology target_id = target_topo.mpi_params.task_id - source_id = self._other_task_id - current_topo = target_topo + source_id = self._other_task_id if source_id is None else source_id + target_tstate = (target_topo.topology_state.dim, + target_topo.topology_state.axes, + target_topo.topology_state.memory_order) if DEBUG_REDISTRIBUTE != 0: print "This is a redistribute of {} to target topology {}".format( self.variable.name, target_topo.tag) self._synchronize(source_topo, target_topo) - current_state = current_topo.topology_state - domain = current_topo.domain + domain = first_not_None((source_topo, target_topo)).domain + self._source_id, self._target_id = source_id, target_id # compute a tag from algebraic relation : # x,y \in [0;ss-1] and Tag = y+ss*(x+ss*(HASH/ss^2) @@ -537,50 +545,75 @@ class RedistributeInter(RedistributeOperatorBase): basetag = ss*ss*(npw.uint32(hash(self.variable.name+"RedistributeInter"))/(100*ss*ss)) self._tag = lambda x, y: npw.uint32(basetag+x*ss+y) - other_state_dim, other_state_axes, other_state_memory_order = None, None, None - if domain.task_rank() == 0: - domain.parent_comm.isend(current_state.dim, - dest=domain.task_root_in_parent(self._other_task_id), - tag=123) - domain.parent_comm.isend(current_state.axes, - dest=domain.task_root_in_parent(self._other_task_id), - tag=456) - domain.parent_comm.isend(current_state.memory_order, - dest=domain.task_root_in_parent(self._other_task_id), - tag=789) - other_state_dim = domain.parent_comm.recv( - source=domain.task_root_in_parent(self._other_task_id), tag=123) - other_state_axes = domain.parent_comm.recv( - source=domain.task_root_in_parent(self._other_task_id), tag=456) - other_state_memory_order = domain.parent_comm.recv( - source=domain.task_root_in_parent(self._other_task_id), tag=789) - other_state_dim = domain.task_comm().bcast(other_state_dim, root=0) - other_state_axes = domain.task_comm().bcast(other_state_axes, root=0) - other_state_memory_order = domain.task_comm().bcast(other_state_memory_order, root=0) - - if (current_state.dim != other_state_dim) or (current_state.axes != other_state_axes) \ - or (current_state.memory_order != other_state_memory_order): + # Exchange on root ranks first ... + if domain.task_rank() == 0 and source_tstate != target_tstate: + rcv_tstate = domain.parent_comm.sendrecv( + first_not_None((source_tstate, target_tstate)), + sendtag=self._other_task_id, recvtag=first_not_None((source_topo, target_topo)).mpi_params.task_id, + dest=domain.task_root_in_parent(self._other_task_id), + source=domain.task_root_in_parent(self._other_task_id)) + source_tstate, target_tstate = [rcv_tstate if _ == None else _ for _ in (source_tstate, target_tstate)] + # ... then broadcast + if _is_source: + target_tstate = source_topo.mpi_params.comm.bcast(target_tstate, root=0) + if _is_target: + source_tstate = target_topo.mpi_params.comm.bcast(source_tstate, root=0) + + if not (source_tstate == target_tstate): msg = 'Topology state mismatch between source and target.' msg += '\nSource topology state:' - msg += str(current_state) + msg += str(source_tstate) msg += '\nTarget topology state:' - msg += str((other_state_dim, other_state_axes, other_state_memory_order)) + msg += str(target_tstate) raise RuntimeError(msg) if _is_source: - assert all(current_topo.mesh.local_resolution == ifield.resolution) + assert all(source_topo.mesh.local_resolution == ifield.resolution) if _is_target: - assert all(current_topo.mesh.local_resolution == ofield.resolution) + assert all(target_topo.mesh.local_resolution == ofield.resolution) - self.bridge = BridgeInter(current=current_topo, - source_id=source_id, target_id=target_id, - dtype=self.dtype, order=get_mpi_order(first_not_None((ifield, ofield)).sdata)) + # Compute if there is an overlap + src_overlap, dst_overlap = -1, -1 + if _is_source: + src_overlap = source_topo.mpi_params.comm.allreduce(1 if _is_source and _is_target else 0) + if _is_target: + dst_overlap = target_topo.mpi_params.comm.allreduce(1 if _is_source and _is_target else 0) + if domain.task_rank() == 0 and (src_overlap == -1 or dst_overlap == -1): + rcv_overlap = domain.parent_comm.sendrecv( + src_overlap if dst_overlap == -1 else dst_overlap, + sendtag=self._other_task_id, recvtag=first_not_None((source_topo, target_topo)).mpi_params.task_id, + dest=domain.task_root_in_parent(self._other_task_id), + source=domain.task_root_in_parent(self._other_task_id)) + src_overlap, dst_overlap = [rcv_overlap if _ == -1 else _ for _ in (src_overlap, dst_overlap)] + # ... then broadcast + if _is_source: + dst_overlap = source_topo.mpi_params.comm.bcast(dst_overlap, root=0) + if _is_target: + src_overlap = target_topo.mpi_params.comm.bcast(src_overlap, root=0) + assert (src_overlap+dst_overlap) % 2 == 0 + self._has_overlap = src_overlap+dst_overlap > 0 + + # Create bridges and store comm types and indices + if self._has_overlap: + self.bridge = BridgeOverlap(source=source_topo, target=target_topo, + source_id=source_id, target_id=target_id, + dtype=self.dtype, order=get_mpi_order(first_not_None((ifield, ofield)).sdata)) + else: + self.bridge = BridgeInter(current=first_not_None((source_topo, target_topo)), + source_id=source_id, target_id=target_id, + dtype=self.dtype, order=get_mpi_order(first_not_None((ifield, ofield)).sdata)) # dictionary that maps the rank to the derived type needed (send if on source or recieve on target) - self._comm_types = self.bridge.transfer_types() - self._comm_indices = self.bridge._transfer_indices + self._comm_types, self._comm_indices = {}, {} + if _is_source: + self._comm_types[source_id] = self.bridge.transfer_types(task_id=source_id) + self._comm_indices[source_id] = self.bridge.transfer_indices(task_id=source_id) + if _is_target: + self._comm_types[target_id] = self.bridge.transfer_types(task_id=target_id) + self._comm_indices[target_id] = self.bridge.transfer_indices(task_id=target_id) self._has_requests = False if DEBUG_REDISTRIBUTE != 0: print "RedistributeInter communication indices", self._comm_indices + print "RedistributeInter communication types", self._comm_types self.dFin = ifield self.dFout = ofield @@ -612,15 +645,16 @@ class RedistributeInter(RedistributeOperatorBase): # TODO : Using GPU-aware MPI would simplify the usage of _memcpy if self._is_source: - for rk, t in types.iteritems(): + for rk, t in types[self._source_id].iteritems(): if self._need_copy_before: _memcpy(self._dFin_data, self.dFin.sdata, - target_indices=indices[rk], source_indices=indices[rk], + target_indices=indices[self._source_id][rk], + source_indices=indices[self._source_id][rk], skind=Backend.OPENCL, tkind=Backend.HOST) sendtag = self._tag(rk+1, rank+1) comm.Isend([self._dFin_data, 1, t], dest=rk, tag=sendtag) if self._is_target: - for rk, t in types.iteritems(): + for rk, t in types[self._target_id].iteritems(): recvtag = self._tag(rank+1, rk+1) comm.Recv([self._dFout_data, 1, t], source=rk, tag=recvtag) if self._need_copy_after: diff --git a/hysop/core/mpi/topo_tools.py b/hysop/core/mpi/topo_tools.py index 6867528c0..fe296d1e8 100644 --- a/hysop/core/mpi/topo_tools.py +++ b/hysop/core/mpi/topo_tools.py @@ -37,7 +37,7 @@ class TopoTools(object): true to return the result a dict of slices, else return a numpy array. See notes below. root : int, optional - rank of the root mpi process. If None, reduce operation + rank of the root mpi process. If None, reduce operation\ is done on all processes. comm : mpi communicator, optional Communicator used to reduce indices. Default = topo.parent @@ -87,7 +87,7 @@ class TopoTools(object): may also work when topo is None. The function is usefull if you need to collect global indices - on a topo define only on a subset of comm, + on a topo defined only on a subset of comm, when for the procs not in this subset, topo will be equal to None. In such a case, comm and dom are required. This may happen when you want to build a bridge between two topologies diff --git a/hysop/domain/domain.py b/hysop/domain/domain.py index e8eabd801..0fb99eba5 100644 --- a/hysop/domain/domain.py +++ b/hysop/domain/domain.py @@ -138,6 +138,9 @@ class DomainView(TaggedObjectView): return self._domain._task_rank[task_id] return None + def _is_task_matters(self, tid, proctasks): + return self._domain._is_task_matters(tid, proctasks) + def is_on_task(self, params): """Test if the current process corresponds to param task.""" if isinstance(params, MPIParams): diff --git a/hysop/fields/cartesian_discrete_field.py b/hysop/fields/cartesian_discrete_field.py index f388b73eb..0dacc4105 100644 --- a/hysop/fields/cartesian_discrete_field.py +++ b/hysop/fields/cartesian_discrete_field.py @@ -888,17 +888,19 @@ class CartesianDiscreteScalarFieldView(CartesianDiscreteScalarFieldViewContainer Boundaries on the interior of the global domain have value BoundaryCondition.NONE. """ return self.mesh.local_boundaries - + def _get_global_boundaries_config(self): """ Return global boundaries configuration (boundary kind + attached data). """ return tuple(self.global_lboundaries_config, self.global_rboundaries_config) + def _get_global_lboundaries_config(self): """ Return global left boundaries configuration (boundary kind + attached data). """ return self.topology_state.transposed(self.field.lboundaries) + def _get_global_rboundaries_config(self): """ Return global right boundaries configuration (boundary kind + attached data). @@ -1326,7 +1328,8 @@ class CartesianDiscreteScalarFieldView(CartesianDiscreteScalarFieldViewContainer exchange_method = first_not_None(exchange_method, ExchangeMethod.ISEND_IRECV) check_instance(ghost_op, GhostOperation) check_instance(exchange_method, ExchangeMethod) - name = first_not_None(name, '{}_{}_{}'.format(self.name, ghosts, ghost_op)) + name = first_not_None(name, '{}_{}_{}_{}'.format( + '_'.join(self.name.split('_')[:-1]), ':'.join([str(_) for _ in self.topology.proc_shape]), ghosts, ghost_op)) data = to_tuple(first_not_None(data, self.data)) directions = to_tuple(first_not_None(directions, range(self.dim))) @@ -1351,7 +1354,7 @@ class CartesianDiscreteScalarFieldView(CartesianDiscreteScalarFieldViewContainer kind = None from hysop.fields.ghost_exchangers import CartesianDiscreteFieldGhostExchanger - return CartesianDiscreteFieldGhostExchanger(name=name, + return CartesianDiscreteFieldGhostExchanger(name=name, global_lboundaries_config=self.global_lboundaries_config, global_rboundaries_config=self.global_rboundaries_config, topology=self.topology, data=data, kind=kind, @@ -1400,7 +1403,7 @@ class CartesianDiscreteScalarFieldView(CartesianDiscreteScalarFieldViewContainer global_boundaries = property(_get_global_boundaries) global_lboundaries = property(_get_global_lboundaries) global_rboundaries = property(_get_global_rboundaries) - + global_boundaries_config = property(_get_global_boundaries_config) global_lboundaries_config = property(_get_global_lboundaries_config) global_rboundaries_config = property(_get_global_rboundaries_config) diff --git a/hysop/iterative_method.py b/hysop/iterative_method.py index 8d2d286ab..9adfb5340 100644 --- a/hysop/iterative_method.py +++ b/hysop/iterative_method.py @@ -4,7 +4,7 @@ from hysop import Simulation, Problem from hysop.parameters.scalar_parameter import ScalarParameter from hysop.tools.contexts import Timer from hysop import dprint, vprint -from hysop.tools.decorators import debug +from hysop.tools.decorators import debug, profile from hysop.core.graph.graph import ready from hysop.constants import HYSOP_REAL, HYSOP_INTEGER from hysop.tools.numpywrappers import npw @@ -115,6 +115,7 @@ class IterativeMethod(Problem): pass @debug + @profile @ready def apply(self, simulation, report_freq=0, dbg=None, **kwds): if self.to_be_skipped(self, simulation=simulation, **kwds): diff --git a/hysop/operator/inter_task_param_comm.py b/hysop/operator/inter_task_param_comm.py index 66e5f91eb..53eae778e 100644 --- a/hysop/operator/inter_task_param_comm.py +++ b/hysop/operator/inter_task_param_comm.py @@ -39,15 +39,16 @@ class PythonInterTaskParamComm(HostOperator): input_fields=input_fields, output_fields=output_fields, **kwds) self.domain = domain self.source_task = source_task + self.dest_task = dest_task self.task_is_source = domain.is_on_task(source_task) self.task_is_dest = domain.is_on_task(dest_task) self.inter_comm = domain.task_intercomm(dest_task if self.task_is_source else source_task) - # if self.inter_comm.is_inter: - # # Disjoint tasks with real inter-communicator - self._the_apply = self._apply_intercomm - # elif self.inter_comm.is_intra: - # # Overlapping tasks using an intra-communicator fron union of tasks procs - # self._the_apply = self._apply_intracomm + if self.inter_comm.is_inter: + # Disjoint tasks with real inter-communicator + self._the_apply = self._apply_intercomm + elif self.inter_comm.is_intra: + # Overlapping tasks using an intra-communicator fron union of tasks procs + self._the_apply = self._apply_intracomm self._all_params_by_type = {} for p in sorted(self.parameters, key=lambda _: _.name): @@ -64,6 +65,7 @@ class PythonInterTaskParamComm(HostOperator): self._the_apply(**kwds) def _apply_intercomm(self, **kwds): + """Disjoint tasks so inter-comm bcast is needed.""" for t in self._all_params_by_type.keys(): if self.task_is_source: self._send_temp_by_type[t][...] = [p() for p in self._all_params_by_type[t]] @@ -75,16 +77,19 @@ class PythonInterTaskParamComm(HostOperator): for p, v in zip(self._all_params_by_type[t], self._recv_temp_by_type[t]): p.value = v - # def _apply_intracomm(self, **kwds): - # for t in self._all_params_by_type.keys(): - # if self.task_is_source: - # self._send_temp_by_type[t][...] = [p() for p in self._all_params_by_type[t]] - # self._recv_temp_by_type[t] = self.inter_comm.bcast( - # self._send_temp_by_type[t], - # self.domain.task_root_in_parent(self.source_task)) - # if self.task_is_dest: - # for p, v in zip(self._all_params_by_type[t], self._recv_temp_by_type[t]): - # p.value = v + def _apply_intracomm(self, **kwds): + """Communicator is an intra-communicator defined as tasks' comm union. + Single broadcast is enough. + """ + for t in self._all_params_by_type.keys(): + if self.task_is_source and self.domain.task_rank() == 0: + self._send_temp_by_type[t][...] = [p() for p in self._all_params_by_type[t]] + self._recv_temp_by_type[t] = self.inter_comm.bcast( + self._send_temp_by_type[t], + self.domain.task_root_in_parent(self.source_task)) + if self.task_is_dest: + for p, v in zip(self._all_params_by_type[t], self._recv_temp_by_type[t]): + p.value = v @classmethod def supports_mpi(cls): diff --git a/hysop/problem.py b/hysop/problem.py index 69e40b76e..877ede4d7 100644 --- a/hysop/problem.py +++ b/hysop/problem.py @@ -2,11 +2,12 @@ from __future__ import absolute_import import sys import datetime +from hysop import __DEBUG__ from hysop.constants import Backend, MemoryOrdering, HYSOP_DEFAULT_TASK_ID from hysop.tools.types import check_instance, first_not_None, to_tuple, to_list from hysop.tools.string_utils import vprint_banner from hysop.tools.contexts import Timer -from hysop.tools.decorators import debug +from hysop.tools.decorators import debug, profile from hysop.tools.parameters import MPIParams from hysop.core.checkpoints import CheckpointHandler from hysop.core.graph.computational_graph import ComputationalGraph @@ -21,38 +22,33 @@ class Problem(ComputationalGraph): super(Problem, self).__init__(name=name, method=method, mpi_params=mpi_params, **kwds) self._do_check_unique_clenv = check_unique_clenv self.search_intertasks_ops = None + self.ops_tasks = [] @debug def insert(self, *ops): - ops_tasks = [] for node in ops: if hasattr(node, 'mpi_params'): - ops_tasks.append(node.mpi_params.task_id) + self.ops_tasks.append(node.mpi_params.task_id) if hasattr(node, 'impl_kwds') and 'mpi_params' in node.impl_kwds: - ops_tasks.append(node.impl_kwds['mpi_params'].task_id) - - given_ops_have_tasks = all([f.domain.has_tasks - for node in ops if hasattr(node, 'impl_kwds') and 'variables' in node.impl_kwds - for f in node.impl_kwds['variables'].keys()]) - if given_ops_have_tasks: - pb_task_id = HYSOP_DEFAULT_TASK_ID if self.mpi_params is None else self.mpi_params.task_id - if len(set(ops_tasks)) == 1 and ops_tasks[0] == pb_task_id: - # Intertask is not needed this is a single task-problem - given_ops_have_tasks = False - - if self.search_intertasks_ops is None: - self.search_intertasks_ops = given_ops_have_tasks - else: - self.search_intertasks_ops = self.search_intertasks_ops and given_ops_have_tasks + self.ops_tasks.append(node.impl_kwds['mpi_params'].task_id) + + given_ops_have_tasks = True + pb_task_id = HYSOP_DEFAULT_TASK_ID if self.mpi_params is None else self.mpi_params.task_id + if len(set(self.ops_tasks)) == 1 and self.ops_tasks[0] == pb_task_id: + # Intertask is not needed this is a single task-problem + given_ops_have_tasks = False + + self.search_intertasks_ops = given_ops_have_tasks self.push_nodes(*ops) return self @debug - def build(self, args=None, allow_subbuffers=False, outputs_are_inputs=True): + def build(self, args=None, allow_subbuffers=False, outputs_are_inputs=True, search_intertasks_ops=None): with Timer() as tm: msg = self.build_problem(args=args, allow_subbuffers=allow_subbuffers, - outputs_are_inputs=outputs_are_inputs) + outputs_are_inputs=outputs_are_inputs, + search_intertasks_ops=search_intertasks_ops) if msg: msg = ' Problem {} achieved, exiting ! '.format(msg) vprint_banner(msg, at_border=2) @@ -71,23 +67,26 @@ class Problem(ComputationalGraph): vprint_banner(msg, at_border=2) sys.exit(0) - def build_problem(self, args, allow_subbuffers, outputs_are_inputs=True): + def get_preserved_input_fields(self): + return set() + + def build_problem(self, args, allow_subbuffers, outputs_are_inputs=True, search_intertasks_ops=None): if (args is not None) and args.stop_at_initialization: return 'initialization' vprint('\nInitializing problem... '+str(self.name)) - # Initialize and Discretize first the other Problems - for node in [_ for _ in self.nodes if isinstance(_, Problem)]: - node.initialize(outputs_are_inputs=True, topgraph_method=None, is_root=True, - search_intertasks_ops=node.search_intertasks_ops) - node.discretize() + search_intertasks = search_intertasks_ops + if search_intertasks is None: + search_intertasks = self.search_intertasks_ops self.initialize(outputs_are_inputs=outputs_are_inputs, topgraph_method=None, is_root=True, - search_intertasks_ops=self.search_intertasks_ops) + search_intertasks_ops=search_intertasks) if (args is not None) and args.stop_at_discretization: return 'discretization' vprint('\nDiscretizing problem...'+str(self.name)) + for node in [_ for _ in self.nodes if isinstance(_, Problem)]: + node.discretize() self.discretize() if (args is not None) and args.stop_at_work_properties: @@ -131,31 +130,40 @@ class Problem(ComputationalGraph): def initialize_field(self, field, mpi_params=None, **kwds): """Initialize a field on all its input and output topologies.""" initialized = set() - for op in self.nodes: - # give priority to tensor field initialization - for op_fields in (self.input_discrete_tensor_fields, self.output_discrete_tensor_fields, - op.input_discrete_tensor_fields, op.output_discrete_tensor_fields, - op.input_discrete_fields, op.output_discrete_fields): - if (field in op_fields): - dfield = op_fields[field] - if all((df in initialized) for df in dfield.discrete_fields()): - # all contained scalar fields were already initialized - continue - elif mpi_params and mpi_params.task_id != dfield.topology.task_id: - # Topology task does not matches given mpi_params task - continue - else: - components = () - for (component, scalar_dfield) in dfield.nd_iter(): - if (scalar_dfield._dfield not in initialized): - components += (component,) - dfield.initialize(components=components, **kwds) - initialized.update(dfield.discrete_fields()) + + def __iterate_nodes(l): + for e in l: + if isinstance(e, Problem): + for _ in __iterate_nodes(e.nodes): + yield _ + yield e + # give priority to tensor field initialization + for op_fields in (self.input_discrete_tensor_fields, + self.output_discrete_tensor_fields) \ + + tuple(_ for op in __iterate_nodes(self.nodes) + for _ in (op.input_discrete_tensor_fields, op.output_discrete_tensor_fields, + op.input_discrete_fields, op.output_discrete_fields)): + if (field in op_fields): + dfield = op_fields[field] + if all((df in initialized) for df in dfield.discrete_fields()): + # all contained scalar fields were already initialized + continue + elif mpi_params and mpi_params.task_id != dfield.topology.task_id: + # Topology task does not matches given mpi_params task + continue + else: + components = () + for (component, scalar_dfield) in dfield.nd_iter(): + if (scalar_dfield._dfield not in initialized): + components += (component,) + dfield.initialize(components=components, **kwds) + initialized.update(dfield.discrete_fields()) if not initialized: msg = 'FATAL ERROR: Could not initialize field {}.'.format(field.name) raise RuntimeError(msg) @debug + @profile def solve(self, simu, dry_run=False, dbg=None, report_freq=10, plot_freq=10, checkpoint_handler=None, **kwds): @@ -168,8 +176,9 @@ class Problem(ComputationalGraph): simu.initialize() check_instance(checkpoint_handler, CheckpointHandler, allow_none=True) - checkpoint_handler.create_checkpoint_template(self, simu) - checkpoint_handler.load_checkpoint(self, simu) + if not checkpoint_handler is None: + checkpoint_handler.create_checkpoint_template(self, simu) + checkpoint_handler.load_checkpoint(self, simu) vprint('\nSolving problem...') with Timer() as tm: @@ -177,7 +186,7 @@ class Problem(ComputationalGraph): vprint() simu.print_state() self.apply(simulation=simu, dbg=dbg, **kwds) - should_dump_checkpoint = checkpoint_handler.should_dump(simu) # determined before simu advance + should_dump_checkpoint = (not checkpoint_handler is None) and checkpoint_handler.should_dump(simu) # determined before simu advance simu.advance(dbg=dbg, plot_freq=plot_freq) if should_dump_checkpoint: checkpoint_handler.save_checkpoint(self, simu) @@ -196,7 +205,8 @@ class Problem(ComputationalGraph): vprint_banner(msg, spacing=True, at_border=2) simu.finalize() - checkpoint_handler.finalize(self.mpi_params) + if not checkpoint_handler is None: + checkpoint_handler.finalize(self.mpi_params) self.final_report() if (dbg is not None): diff --git a/hysop/tools/io_utils.py b/hysop/tools/io_utils.py index ff36a4fe3..05f26f5f8 100755 --- a/hysop/tools/io_utils.py +++ b/hysop/tools/io_utils.py @@ -49,10 +49,7 @@ class IO(object): @requires_cmd('stat') def get_fs_type(path): cmd = ['stat', '-f', '-c', '%T', path] - fs_type = '' - if mpi.main_rank == 0: - fs_type = subprocess.check_output(cmd) - fs_type = mpi.main_comm.bcast(fs_type, root=0) + fs_type = subprocess.check_output(cmd) return fs_type.replace('\n', '') @classmethod @@ -236,7 +233,7 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', 'io_leader', 'visu_leader', 'with_last', 'enable_ram_fs', 'force_ram_fs', 'dump_is_temporary', 'postprocess_dump', 'append', - 'hdf5_disable_compression', 'hdf5_disable_slicing', + 'hdf5_disable_compression', 'hdf5_disable_slicing', 'disk_filepath', 'kwds'])): """ A struct to handle I/O files parameters @@ -417,13 +414,13 @@ class IOParams(namedtuple("IOParams", ['filename', 'filepath', return dump def clone(self, **kwds): - keys = ('filename', + keys = ('filename', 'frequency', 'fileformat', 'dump_times', 'dump_tstart', 'dump_tend', 'dump_func', 'io_leader', 'visu_leader', 'with_last', 'enable_ram_fs', 'force_ram_fs', - 'dump_is_temporary', 'postprocess_dump', - 'hdf5_disable_compression', 'hdf5_disable_slicing', + 'dump_is_temporary', 'postprocess_dump', + 'hdf5_disable_compression', 'hdf5_disable_slicing', 'append', 'kwds') diff = set(kwds.keys()).difference(keys) diff --git a/hysop/tools/profiler.py b/hysop/tools/profiler.py index f688b998f..729fc3e9c 100644 --- a/hysop/tools/profiler.py +++ b/hysop/tools/profiler.py @@ -99,9 +99,8 @@ class Profiler(object): self._elems = {} self._l = 1 - self.all_times = None - self.all_call_nb = None - self.all_names = [None] + self.all_data = None + self.node_id = None def down(self, l): self._l = l + 1 @@ -120,7 +119,8 @@ class Profiler(object): def __iadd__(self, other): """+= operator. Append a new profiled function to the collection""" - self._elems[other.get_name()] = other + if not other.get_name() in self._elems.keys(): + self._elems[other.get_name()] = other return self def __setitem__(self, key, value): @@ -147,44 +147,11 @@ class Profiler(object): ' profiler report' if (self._l == 1) else '') for v in summary.values(): # sorted(summary.values(), key=lambda x: x.total_time): if len(str(v)) > 0: - s += '\n{}'.format(' '*self._l + str(v)) + s += '\n{}'.format(' '*self._l + str(v)) else: s = '' return s - def write(self, prefix='', hprefix='', with_head=True): - """ - - Parameters - ---------- - prefix : string, optional - hprefix : string, optional - with_head : bool, optional - - """ - comm = self.get_comm() - rk = comm.Get_rank() - comm_size = comm.Get_size() - if prefix != '' and prefix[-1] != ' ': - prefix += ' ' - if hprefix != '' and hprefix[-1] != ' ': - hprefix += ' ' - s = "" - h = hprefix + "Rank" - for r in xrange(comm_size): - s += prefix + "{0}".format(r) - for i in xrange(len(self.all_names[rk])): - s += " {0}".format(self.all_times[rk][r, i]) - s += "\n" - s += prefix + "-1" - for i in xrange(len(self.all_names[rk])): - h += ' ' + self.all_names[rk][i] - s += " {0}".format(self.all_times[rk][comm_size, i]) - h += "\n" - if with_head: - s = h + s - print s - def summarize(self): """ Update profiling values and prepare data for a report @@ -198,6 +165,7 @@ class Profiler(object): from hysop.fields.continuous_field import Field i = 0 + # Recursive summarize for k in self._elems.keys(): try: # Either elem[k] is a FProfiler ... @@ -211,41 +179,40 @@ class Profiler(object): self.summary[1e10 * i] = self._elems[k] else: self.summary[1e8 * i] = self._elems[k] + # Flatten elements for k in sorted(self._elems.keys()): if isinstance(self._elems[k], FProfiler): - self.table.append( - (self.get_name() + '.' + k, - self._elems[k].total_time, self._elems[k].nb_calls)) + tt = (self.get_name() + '.' + k, + self._elems[k].total_time, self._elems[k].nb_calls) + self.table.append(tt) for k in sorted(self._elems.keys()): if isinstance(self._elems[k], Profiler): for e in self._elems[k].table: - self.table.append( - (self.get_name() + '.' + e[0], e[1], e[2])) - - if self._l == 1 and self.all_times is None: + tt = (self.get_name() + '.' + e[0], e[1], e[2]) + self.table.append(tt) + # Table content is (name, time, ncalls) + if self._l == 1 and self.all_data is None: comm = self.get_comm() rk = comm.Get_rank() comm_size = comm.Get_size() - nb = {} - nb[rk] = len(self.table) - self.all_names = dict((_, None) for _ in range(comm_size)) - self.all_times, self.all_calls = {}, {} - self.all_names[rk] = [_[0] for _ in self.table] - for r in range(comm_size): - self.all_names[r] = comm.bcast(self.all_names[r], root=r) - nb[r] = len(self.all_names[r]) - self.all_times[r] = npw.zeros((comm_size+1, nb[r])) - self.all_calls[r] = npw.integer_zeros((comm_size+1, nb[r])) - for r in range(comm_size): - for i, n in enumerate(self.all_names[r]): - tn = [_ for _ in self.table if _[0] == n] - if len(tn) == 1: - self.all_times[r][rk, i] = tn[0][1] - self.all_calls[r][rk, i] = tn[0][2] - for r in range(comm_size): - for rr in range(comm_size): - comm.Bcast(self.all_times[r][rr, :], root=rr) - comm.Bcast(self.all_calls[r][rr, :], root=rr) + nb = comm.allgather(len(self.table)) + all_names = comm.allgather([_[0] for _ in self.table]) + all_times = comm.allgather([_[1] for _ in self.table]) + all_calls = comm.allgather([_[2] for _ in self.table]) + all_data = {} + # all_data structure : task_size, calls nb, self total, task mean, task min, task max for r in range(comm_size): - self.all_times[r][comm_size, :] = npw.sum(self.all_times[r][:comm_size, :], axis=0) - self.all_calls[r][comm_size, :] = npw.sum(self.all_calls[r][:comm_size, :], axis=0) + for n, t, c in zip(all_names[r], all_times[r], all_calls[r]): + if n not in all_data: + all_data[n] = [0, ]*6+[[], ] + all_data[n][0] += 1 + all_data[n][1] += c + if r == rk: + all_data[n][2] = t + all_data[n][6].append(t) + for n in all_data.keys(): + all_data[n][1] /= all_data[n][0] + all_data[n][3] = npw.average(all_data[n][6]) + all_data[n][4] = npw.min(all_data[n][6]) + all_data[n][5] = npw.max(all_data[n][6]) + self.all_data = all_data -- GitLab