diff --git a/Examples/NS_planeJet.py b/Examples/Plane_jet/NS_planeJet_CPU.py similarity index 59% rename from Examples/NS_planeJet.py rename to Examples/Plane_jet/NS_planeJet_CPU.py index a050b048cf593fa2d4ceafd2ba49ebb6c7a26fff..636b0b104d1dafdc589cdb1b3eb999205fced425 100644 --- a/Examples/NS_planeJet.py +++ b/Examples/Plane_jet/NS_planeJet_CPU.py @@ -8,26 +8,20 @@ All parameters are set and defined in python module dataTG. """ import parmepy as pp -from parmepy.constants import ORDER, np, PARMES_REAL +from parmepy.constants import ORDER, np, PARMES_REAL, HDF5 from parmepy.f2py import fftw2py -import numpy as np from parmepy.fields.continuous import Field -from parmepy.variables.variables import Variables from parmepy.mpi.topology import Cartesian from parmepy.operator.advection import Advection from parmepy.operator.stretching import Stretching from parmepy.operator.poisson import Poisson from parmepy.operator.diffusion import Diffusion -from parmepy.operator.adapt_timestep import AdaptTimeStep from parmepy.operator.redistribute import Redistribute from parmepy.problem.navier_stokes import NSProblem from parmepy.operator.monitors.printer import Printer from parmepy.operator.monitors.energy_enstrophy import Energy_enstrophy from parmepy.problem.simulation import Simulation -from parmepy.operator.adapt_timestep import AdaptTimeStep -from parmepy.methods_keys import Scales from parmepy.operator.differential import Curl -from parmepy.numerics.updateGhosts import UpdateGhosts from parmepy.mpi.main_var import main_size from parmepy.methods_keys import Scales, TimeIntegrator, Interpolation,\ Remesh, Support, Splitting, dtAdvecCrit, SpaceDiscretisation, GhostUpdate @@ -36,7 +30,7 @@ from parmepy.numerics.integrators.runge_kutta3 import RK3 as RK3 from parmepy.numerics.integrators.runge_kutta4 import RK4 as RK4 from parmepy.numerics.finite_differences import FD_C_4, FD_C_2 from parmepy.numerics.interpolation import Linear -from parmepy.numerics.remeshing import L4_2 as rmsh +from parmepy.numerics.remeshing import L6_4 from parmepy.f2py import fftw2py # problem dimension @@ -45,17 +39,16 @@ dim = 3 nb = 129 # number of ghosts in usual cartesian topo NBGHOSTS = 2 -ADVECTION_METHOD = {Scales: 'p_66'} +ADVECTION_METHOD = {Scales: 'p_64'} # ADVECTION_METHOD = {TimeIntegrator: RK2, # Interpolation: Linear, -# Remesh: rmsh, +# Remesh: L6_4, # Support: '', # Splitting: 'o2_FullHalf'} -VISCOSITY = 5e-6 +VISCOSITY = 1e-4 ## ----------- A 3d problem ----------- -print " ========= Start Navier-Stokes 3D (Taylor Green benchmark) =========" ## pi constant pi = np.pi cos = np.cos @@ -75,40 +68,33 @@ randY = np.asarray(np.random.random([nb - 1, nb - 1,(nb - 1)/main_size]), randZ = np.asarray(np.random.random([nb - 1, nb - 1,(nb - 1)/main_size]), dtype=PARMES_REAL, order=ORDER) - 0.5 -# ##initjet.f -# width = 0.01 -# ampl3 = 0.3 -# ampl2 = 0. -# ampl = 0.05 -# def computeVel(res, x, y, z, t): -# yy = abs(y - 0.5) -# aux=(0.1-2.*yy)/(4.*width) -# res[0][...] =0.5*(1.+tanh(aux))*(1.+ampl3*sin(8.*pi*(x)))*(1.+ampl*exp(-abs((0.1-2.*yy)/(4.*width)**2))*randX) -# res[1][...] = ampl*exp(-abs((0.1-2.*yy)/(4.*width)**2))*randY -# res[2][...] = ampl*exp(-abs((0.1-2.*yy)/(4.*width)**2))*randZ -# return res - - -# def initScal(res, x, y, z, t): -# yy = abs(y - 0.5) -# aux=(0.1-2.*yy)/(4.*width) -# res[0][...] = 0.5*(1.+tanh(aux))*(1.+ampl2*exp(-abs((0.1-2.*yy)/(4.*width)**2))*randZ) -# return res - - -# ## JCP initial condition +##initjet.f +width = 0.01 +ampl3 = 0.3 +ampl = 0.05 +ampl2=0.05 def computeVel(res, x, y, z, t): - res[0][...] = 0.5*(1.-tanh((abs(y-0.5)-0.1/2.)/0.02))*(1.+0.3*sin(8*pi*x)) - res[1][...] = 0. - res[2][...] = 0. + yy = abs(y - 0.5) + aux = (0.1 - 2. * yy) / (4. * width) + strg1 = exp(-abs(aux ** 2)) * randX + strg2 = exp(-abs(aux ** 2)) * randY + strg3 = exp(-abs(aux ** 2)) * randZ + res[0][...] = 0.5 * (1. + tanh(aux)) + res[0][...] *= (1. + ampl3 * sin(8. * pi * x)) + res[0][...] *= (1. + ampl * strg1) + res[1][...] = ampl * strg2 + res[2][...] = ampl * strg3 return res def initScal(res, x, y, z, t): - res[0][...] = 0.5*(1.-tanh((abs(y-0.5)-0.1/2.)/0.02))*(1.+0.3*sin(8*pi*x)) + yy = abs(y - 0.5) + aux = (0.1 - 2. * yy) / (4. * width) + res[0][...] = 0.5 * (1. + tanh(aux)) + #strg3 = exp(-abs(aux ** 2)) * randZ + #res[0][...] *= (1. + ampl2 * strg3) return res - ## Fields velo = Field(domain=box, formula=computeVel, name='Velocity', isVector=True) @@ -117,10 +103,6 @@ vorti = Field(domain=box, scal = Field(domain=box, formula=initScal, name='PassiveScalar', isVector=False) -## Variables -dt_adapt = Variables(domain=box, name='adaptative time step', - data=[0.01]) - ## Variables ## Usual Cartesian topology definition # At the moment we use two (or three?) topologies : @@ -166,27 +148,13 @@ poisson = Poisson(velo, vorti, c = Curl(velo, vorti, resolutions={velo: nbElem, vorti: nbElem}, - method={SpaceDiscretisation: fftw2py, GhostUpdate: False}, + method={SpaceDiscretisation: fftw2py, GhostUpdate: True}, ) -## Tools Operators -dtAdapt = AdaptTimeStep(velo, vorti, - resolutions={velo: nbElem, - vorti: nbElem}, - dt_adapt=dt_adapt, - method={TimeIntegrator: RK3, - SpaceDiscretisation: FD_C_4, - dtAdvecCrit: 'deform'}, - topo=topo, - lcfl=0.125, - cfl=None, # 0.5, - prefix='./res_PS/dt_2p') - ## Simulation simu = Simulation(tinit=0.0, - tend=4.5, timeStep=dt_adapt, - iterMax=10000) - + tend=4.5, timeStep=0.001, + iterMax=1000000) # Bridges between the different topologies in order to @@ -196,56 +164,49 @@ toGhost_vorti = Redistribute([vorti], advec, stretch) toGhost_velo = Redistribute([velo], poisson, stretch) # 2 - Stretching to Poisson/Diffusion fromGhost_vorti = Redistribute([vorti], stretch, diffusion) -# 3 - Poisson to TimeStep -distrPoissTimeStep = Redistribute([velo, vorti], poisson, dtAdapt) - # Define the problem to solve -pb = NSProblem(operators=[toGhost_velo, advecScal, advec, - toGhost_vorti, - stretch, - fromGhost_vorti, - diffusion, - poisson, - distrPoissTimeStep, dtAdapt], - simulation=simu, dumpFreq=-1) - -## Setting solver to Problem (only operators for computational tasks) +pb = NSProblem(operators=[ + toGhost_velo, + advecScal, + advec, + toGhost_vorti, + stretch, + fromGhost_vorti, + diffusion, + poisson, + ], simulation=simu, dumpFreq=-1) + +## Setting solver to Problem (o nly operators for computational tasks) pb.pre_setUp() ## Diagnostics related to the problem topofft = poisson.discreteFields[poisson.vorticity].topology -# energy = Energy_enstrophy(velo, vorti, isNormalized=True, -# topo=topofft, -# viscosity=VISCOSITY, -# frequency=1, -# prefix='./res_PS/energy_S128_2p.dat') +energy = Energy_enstrophy(velo, vorti, isNormalized=True, + topo=topofft, + viscosity=VISCOSITY, + frequency=1, + prefix='./res_NS_planeJet/energy_ref.dat') # initialisation vorti.setTopoInit(topofft) velo.setTopoInit(topofft) scal.setTopoInit(topofft) -#pb.addMonitors([energy]) +pb.addMonitors([energy]) pb.setUp() + c.discretize() c.setUp() -c.apply(simu) -# p = Printer(variables=[scal,], -# topo=topofft, -# frequency=1, -# prefix='./res_PS/scal_S128_2p', -# ext=".vtk") -# pb.addMonitors([p]) -# p._printStep() +c.apply(simu) -# pf = Printer(variables=[velo, vorti], -# topo=topofft, -# frequency=1, -# prefix='./res_PS/fields_S128_2p', -# ext=".vtk") -# pb.addMonitors([pf]) -# pf._printStep() +p = Printer(variables=[scal,], + topo=topofft, + frequency=100, + prefix='./res_NS_planeJet/scal_ref', + formattype=HDF5) +pb.addMonitors([p]) +p._printStep() def run(): @@ -261,4 +222,3 @@ print 'total time (rank):', MPI.Wtime() - time, '(', topo.rank, ')' pb.finalize() ## Clean memory buffers -#fftw2py.clean_fftw_solver(box.dimension) diff --git a/Examples/Plane_jet/NS_planeJet_CPU_MS.py b/Examples/Plane_jet/NS_planeJet_CPU_MS.py new file mode 100644 index 0000000000000000000000000000000000000000..e4b06947c925b83826a468c497d535b50126b6a8 --- /dev/null +++ b/Examples/Plane_jet/NS_planeJet_CPU_MS.py @@ -0,0 +1,226 @@ +#!/usr/bin/python + +""" +Taylor Green 3D : see paper van Rees 2011. + +All parameters are set and defined in python module dataTG. + +""" + +import parmepy as pp +from parmepy.constants import ORDER, np, PARMES_REAL, HDF5 +from parmepy.f2py import fftw2py +from parmepy.fields.continuous import Field +from parmepy.mpi.topology import Cartesian +from parmepy.operator.advection import Advection +from parmepy.operator.stretching import Stretching +from parmepy.operator.poisson import Poisson +from parmepy.operator.diffusion import Diffusion +from parmepy.operator.redistribute import Redistribute +from parmepy.problem.navier_stokes import NSProblem +from parmepy.operator.monitors.printer import Printer +from parmepy.operator.monitors.energy_enstrophy import Energy_enstrophy +from parmepy.problem.simulation import Simulation +from parmepy.operator.differential import Curl +from parmepy.mpi.main_var import main_size +from parmepy.methods_keys import Scales, TimeIntegrator, Interpolation,\ + Remesh, Support, Splitting, dtAdvecCrit, SpaceDiscretisation, GhostUpdate, MultiScale +from parmepy.numerics.integrators.runge_kutta2 import RK2 as RK2 +from parmepy.numerics.integrators.runge_kutta3 import RK3 as RK3 +from parmepy.numerics.integrators.runge_kutta4 import RK4 as RK4 +from parmepy.numerics.finite_differences import FD_C_4, FD_C_2 +from parmepy.numerics.interpolation import Linear +from parmepy.numerics.remeshing import L6_4 +from parmepy.f2py import fftw2py + +# problem dimension +dim = 3 +# resolution +nb = 129 +nb_s = 257 +# number of ghosts in usual cartesian topo +NBGHOSTS = 2 +ADVECTION_METHOD = {Scales: 'p_64', MultiScale: 'L4_4'} +# ADVECTION_METHOD = {TimeIntegrator: RK2, +# Interpolation: Linear, +# Remesh: L6_4, +# Support: '', +# Splitting: 'o2_FullHalf'} +VISCOSITY = 1e-4 + + +## ----------- A 3d problem ----------- +## pi constant +pi = np.pi +cos = np.cos +sin = np.sin +exp = np.exp +abs = np.abs +tanh = np.tanh +## Domain +box = pp.Box(dim, length=[1., ]*3) + +## Global resolution +nbElem = [nb] * dim +nbElem_s = [nb_s] * dim +randX = np.asarray(np.random.random([nb - 1, nb - 1,(nb - 1)/main_size]), + dtype=PARMES_REAL, order=ORDER) - 0.5 +randY = np.asarray(np.random.random([nb - 1, nb - 1,(nb - 1)/main_size]), + dtype=PARMES_REAL, order=ORDER) - 0.5 +randZ = np.asarray(np.random.random([nb - 1, nb - 1,(nb - 1)/main_size]), + dtype=PARMES_REAL, order=ORDER) - 0.5 + +##initjet.f +width = 0.01 +ampl3 = 0.3 +ampl = 0.05 +ampl2=0.05 +def computeVel(res, x, y, z, t): + yy = abs(y - 0.5) + aux = (0.1 - 2. * yy) / (4. * width) + strg1 = exp(-abs(aux ** 2)) * randX + strg2 = exp(-abs(aux ** 2)) * randY + strg3 = exp(-abs(aux ** 2)) * randZ + res[0][...] = 0.5 * (1. + tanh(aux)) + res[0][...] *= (1. + ampl3 * sin(8. * pi * x)) + res[0][...] *= (1. + ampl * strg1) + res[1][...] = ampl * strg2 + res[2][...] = ampl * strg3 + return res + + +def initScal(res, x, y, z, t): + yy = abs(y - 0.5) + aux = (0.1 - 2. * yy) / (4. * width) + res[0][...] = 0.5 * (1. + tanh(aux)) + #strg3 = exp(-abs(aux ** 2)) * randZ + #res[0][...] *= (1. + ampl2 * strg3) + return res + +## Fields +velo = Field(domain=box, formula=computeVel, + name='Velocity', isVector=True) +vorti = Field(domain=box, + name='Vorticity', isVector=True) +scal = Field(domain=box, formula=initScal, + name='PassiveScalar', isVector=False) + +## Variables +## Usual Cartesian topology definition +# At the moment we use two (or three?) topologies : +# - "topo" for Stretching and all operators based on finite differences. +# --> ghost layer = 2 +# - topo from Advection operator for all the other operators. +# --> no ghost layer +# - topo from fftw for Poisson and Diffusion. +# Todo : check compat between scales and fft operators topologies. +ghosts = [NBGHOSTS] * box.dimension +topo = Cartesian(box, box.dimension, nbElem, + ghosts=ghosts) + +## Navier Stokes Operators +advec = Advection(velo, vorti, + resolutions={velo: nbElem, + vorti: nbElem}, + method=ADVECTION_METHOD + ) +advecScal = Advection(velo, scal, + resolutions={velo: nbElem, + scal: nbElem_s}, + method=ADVECTION_METHOD + ) + +stretch = Stretching(velo, vorti, + resolutions={velo: nbElem, + vorti: nbElem}, + topo=topo + ) + +diffusion = Diffusion(vorti, + resolution=nbElem, + viscosity=VISCOSITY + ) + +poisson = Poisson(velo, vorti, + resolutions={velo: nbElem, + vorti: nbElem}, + projection=[True, 1, False], + ) + +c = Curl(velo, vorti, + resolutions={velo: nbElem, + vorti: nbElem}, + method={SpaceDiscretisation: fftw2py, GhostUpdate: True}, + ) + +## Simulation +simu = Simulation(tinit=0.0, + tend=4.5, timeStep=0.001, + iterMax=1000000) + + +# Bridges between the different topologies in order to +# redistribute data. +# 1 -Advection to stretching +toGhost_vorti = Redistribute([vorti], advec, stretch) +toGhost_velo = Redistribute([velo], poisson, stretch) +# 2 - Stretching to Poisson/Diffusion +fromGhost_vorti = Redistribute([vorti], stretch, diffusion) + +# Define the problem to solve +pb = NSProblem(operators=[ + toGhost_velo, + advecScal, + advec, + toGhost_vorti, + stretch, + fromGhost_vorti, + diffusion, + poisson, + ], simulation=simu, dumpFreq=-1) + +## Setting solver to Problem (o nly operators for computational tasks) +pb.pre_setUp() +## Diagnostics related to the problem +topofft = poisson.discreteFields[poisson.vorticity].topology +energy = Energy_enstrophy(velo, vorti, isNormalized=True, + topo=topofft, + viscosity=VISCOSITY, + frequency=1, + prefix='./res_NS_planeJet/energy_ref_128-256.dat') + +# initialisation +vorti.setTopoInit(topofft) +velo.setTopoInit(topofft) +#scal.setTopoInit(topofft) +pb.addMonitors([energy]) +pb.setUp() + + +c.discretize() +c.setUp() + +c.apply(simu) + +p = Printer(variables=[scal,], + topo=scal.discreteFields.keys()[0], + frequency=100, + prefix='./res_NS_planeJet/scal_ref_128-256', + formattype=HDF5) +pb.addMonitors([p]) +p._printStep() + + +def run(): + pb.solve() + +## Solve problem +from parmepy.mpi import MPI +print "Start computation ..." +time = MPI.Wtime() +run() +print 'total time (rank):', MPI.Wtime() - time, '(', topo.rank, ')' + + +pb.finalize() +## Clean memory buffers diff --git a/Examples/demo_hybrid.py b/Examples/demo_hybrid.py new file mode 100644 index 0000000000000000000000000000000000000000..5f788d4cf09f44dd0e5691df71c52f04a5fb5ed4 --- /dev/null +++ b/Examples/demo_hybrid.py @@ -0,0 +1,73 @@ +""" +Task parallelism example. + +One process is dedicated to file outputs and other are computing the data. +""" +import parmepy as pp +from parmepy.constants import np, VTK, HDF5 +from parmepy.mpi.main_var import main_size, main_rank, main_comm +from parmepy.mpi.topology import Cartesian +from parmepy.fields.continuous import Field +from parmepy.operator.analytic import Analytic +from parmepy.operator.redistribute_intercomm import RedistributeIntercomm +from parmepy.operator.monitors.printer import Printer +from parmepy.problem.problem_tasks import ProblemTasks +from parmepy.problem.simulation import Simulation +from parmepy.tools.problem2dot import toDot +pi = np.pi +cos = np.cos +sin = np.sin + + +def func(res, x, y, z, t=0.): + res[0][...] = 2. * sin(pi*x)**2*sin(2.*pi*y)*sin(2.*pi*z)*cos(t*pi/3.) + res[1][...] = -sin(2.*pi*x)*sin(pi*y)**2*sin(2.*pi*z)*cos(t*pi/3.) + res[2][...] = -sin(2.*pi*x)*sin(2.*pi*y)*sin(pi*z)**2*cos(t*pi/3.) + return res + +assert main_size > 1 + +nb_elem = [65, ] * 3 +b = pp.Box() +proc_tasks = [0,] * main_size +proc_tasks[0] = 1 +comm_s = main_comm.Split(color=proc_tasks[main_rank], key=main_rank) + + +if proc_tasks[main_rank] == 1: + topo = Cartesian(b, dim=1, globalMeshResolution=nb_elem, + cutdir=[True, False, False], + comm=comm_s + ) +elif proc_tasks[main_rank] == 0: + topo = Cartesian(b, dim=2, globalMeshResolution=nb_elem, + cutdir=[False, True, True], + comm=comm_s + ) + +field = Field(domain=b, name="Test_Vec", + isVector=True, formula=func) + +op = Analytic([field], {field: nb_elem}, topo=topo, task_id=0) +p = Printer(variables=[field], + topo=topo, + frequency=1, + prefix='./demo_task/', + formattype=HDF5, task_id=1) + +red = RedistributeIntercomm(field, topo, op, p, proc_tasks, main_comm, + component=None) + +simu = Simulation(tinit=0.0, tend=3., timeStep=0.15, iterMax=200) + + +pb = ProblemTasks([op, red, p], simu, proc_tasks) +pb.pre_setUp() + +# As 1 process is only a printer, one need explicit discretize and +# set the initialization topology for the field. +field.discretize(topo) +field.setTopoInit(topo) +pb.setUp() +pb.solve() +pb.finalize() diff --git a/Examples/demo_mpi.py b/Examples/demo_mpi.py index 6f5602d15ee1418791cb60e946c242f8533de7ae..8d22303e872c0a1821528e81158b8d79953648b9 100644 --- a/Examples/demo_mpi.py +++ b/Examples/demo_mpi.py @@ -10,6 +10,10 @@ from parmepy.operator.analytic import Analytic from parmepy.operator.redistribute import Redistribute from parmepy.operator.redistribute_intercomm import RedistributeIntercomm +if PARMES_REAL is np.float32: + atol = 1.e-4 +else: + atol = 1e-10 def func(res, x, y, z, t=0.): res[0][...] = x @@ -48,7 +52,8 @@ def test_topologies(domain, topo_from, topo_to, init_formula): ope.wait() assert pp.constants.np.allclose(field.norm(topo_from), - field.norm(topo_to)), \ + field.norm(topo_to), + atol=atol), \ "[{0}] Data differs: norm, topo_from - topo_to = {1}".format( main_rank, field.norm(topo_from) - field.norm(topo_to)) @@ -142,7 +147,8 @@ if main_size > 2: for i in xrange(main_size): norms[i,:] = main_comm.bcast(norms[i,:], root=i) for i in xrange(main_size): - assert pp.constants.np.allclose(norms[i,:], norms[main_rank,:]) + assert pp.constants.np.allclose(norms[i,:], norms[main_rank,:], + atol=atol) if main_rank == 0: print " Ok" else: @@ -183,9 +189,14 @@ if main_size > 2: if proc_tasks[main_rank] == 0: field_r.initialize(topo=topo) - red = RedistributeIntercomm(field, topo, 1, 0, proc_tasks, main_comm, + op_from = Analytic([field], {field: [17, 17, 17]}, topo=topo, task_id=1) + op_to = Analytic([field], {field: [17, 17, 17]}, topo=topo, task_id=0) + + red = RedistributeIntercomm(field, topo, op_from, op_to, + proc_tasks, main_comm, component=None) - red_r = RedistributeIntercomm(field_r, topo, 0, 1, proc_tasks, main_comm, + red_r = RedistributeIntercomm(field_r, topo, op_to, op_from, + proc_tasks, main_comm, component=None) red.discretize() red_r.discretize() @@ -194,23 +205,112 @@ if main_size > 2: red.apply() red_r.apply() - red.wait() - red_r.wait() norms = npw.zeros((main_size, 3)) norms[main_rank, :] = field.norm(topo) for i in xrange(main_size): norms[i, :] = main_comm.bcast(norms[i, :], root=i) for i in xrange(main_size): - assert pp.constants.np.allclose(norms[i, :], norms[main_rank, :]) + assert pp.constants.np.allclose(norms[i, :], norms[main_rank, :], + atol=atol) norms[main_rank, :] = field_r.norm(topo) for i in xrange(main_size): norms[i, :] = main_comm.bcast(norms[i, :], root=i) for i in xrange(main_size): - assert pp.constants.np.allclose(norms[i, :], norms[main_rank, :]) + assert pp.constants.np.allclose(norms[i, :], norms[main_rank, :], + atol=atol) if main_rank == 0: print " Ok" else: if main_rank == 0: print "CASE 4 is not tested (require process number > 2)" + +## CASE 5: +## Extending CASE 4: Using topology with ghosts +if main_size > 2: + if main_rank == 0: + print "Testing CASE 5 ...", + b.reset() + proc_tasks = [0,] * main_size + proc_tasks[0:2] = [1, 1] + comm_s = main_comm.Split(color=proc_tasks[main_rank], key=main_rank) + if proc_tasks[main_rank] == 1: + topo = Cartesian(b, dim=1, globalMeshResolution=[17, 17, 17], + cutdir=[True, False, False], ghosts=[2, 0, 1], + comm=comm_s + ) + elif proc_tasks[main_rank] == 0: + topo = Cartesian(b, dim=2, globalMeshResolution=[17, 17, 17], + cutdir=[False, True, True], ghosts=[2, 0, 1], + comm=comm_s + ) + field = Field(domain=b, name="Test_Vec", + isVector=True, formula=func) + field_r = Field(domain=b, name="Test_Vec_R", + isVector=True, formula=func) + + field.discretize(topo) + field_r.discretize(topo) + if proc_tasks[main_rank] == 1: + field.initialize(topo=topo) + if proc_tasks[main_rank] == 0: + field_r.initialize(topo=topo) + + op_from = Analytic([field], {field: [17, 17, 17]}, topo=topo, task_id=1) + op_to = Analytic([field], {field: [17, 17, 17]}, topo=topo, task_id=0) + + red = RedistributeIntercomm(field, topo, op_from, op_to, + proc_tasks, main_comm, + component=None) + red_r = RedistributeIntercomm(field_r, topo, op_to, op_from, + proc_tasks, main_comm, + component=None) + red.discretize() + red_r.discretize() + red.setUp() + red_r.setUp() + + red.apply() + red_r.apply() + + norms = npw.zeros((main_size, 3)) + norms[main_rank, :] = field.norm(topo) + for i in xrange(main_size): + norms[i, :] = main_comm.bcast(norms[i, :], root=i) + for i in xrange(main_size): + assert pp.constants.np.allclose(norms[i, :], norms[main_rank, :], + atol=atol) + norms[main_rank, :] = field_r.norm(topo) + for i in xrange(main_size): + norms[i, :] = main_comm.bcast(norms[i, :], root=i) + for i in xrange(main_size): + assert pp.constants.np.allclose(norms[i, :], norms[main_rank, :], + atol=atol) + + # Assert that ghosts have not been exchanged + if proc_tasks[main_rank] == 0: + assert np.allclose(field.discreteFields[topo].data[0][0:2, :, :], 0., + atol=atol) + assert np.allclose(field.discreteFields[topo].data[0][-2:, :, :], 0., + atol=atol) + assert np.allclose(field.discreteFields[topo].data[0][:, :, 0:1], 0., + atol=atol) + assert np.allclose(field.discreteFields[topo].data[0][:, :, -1:], 0., + atol=atol) + if proc_tasks[main_rank] == 1: + assert np.allclose(field_r.discreteFields[topo].data[0][0:2, :, :], 0., + atol=atol) + assert np.allclose(field_r.discreteFields[topo].data[0][-2:, :, :], 0., + atol=atol) + assert np.allclose(field_r.discreteFields[topo].data[0][:, :, 0:1], 0., + atol=atol) + assert np.allclose(field_r.discreteFields[topo].data[0][:, :, -1:], 0., + atol=atol) + + if main_rank == 0: + print " Ok" + +else: + if main_rank == 0: + print "CASE 5 is not tested (require process number > 2)" diff --git a/Examples/levelSet3D.py b/Examples/levelSet3D.py index 326a6865793f35cf072c6942098d23f800c8f001..5eea9e816dc8867c872acdde17d38d34b12f6906 100755 --- a/Examples/levelSet3D.py +++ b/Examples/levelSet3D.py @@ -11,71 +11,74 @@ from parmepy.operator.analytic import Analytic from parmepy.problem.simulation import Simulation from parmepy.operator.redistribute import Redistribute from parmepy.methods_keys import TimeIntegrator, Interpolation, Remesh,\ - Support, Splitting, MultiScale + Support, Splitting, MultiScale, Scales, MultiScale from parmepy.numerics.integrators.runge_kutta2 import RK2 as RK #from parmepy.numerics.integrators.runge_kutta4 import RK4 as RK from parmepy.numerics.interpolation import Linear -from parmepy.numerics.remeshing import L2_1 as rmsh -from parmepy.constants import np +from parmepy.numerics.remeshing import L2_1 as rmsh, L4_2, L2_1, L4_4, L8_4 +from parmepy.constants import np, HDF5 from parmepy.mpi.main_var import main_size sin, cos, pi, sqrt = np.sin, np.cos, np.pi, np.sqrt -# def scalaire(res, x, y, z, t=0.): -# r = sqrt((x - 0.35) ** 2 + (y - 0.35) ** 2 + (z - 0.35) ** 2) -# res[0][...] = 0. -# res[0][r < 0.15] = 1. -# return res +def scalaire(res, x, y, z, t=0.): + r = sqrt((x - 0.35) ** 2 + (y - 0.35) ** 2 + (z - 0.35) ** 2) + res[0][...] = 0. + res[0][r < 0.15] = 1. + return res -# def vitesse(res, x, y, z, t=0.): -# res[0][...] = 2. * sin(pi*x)**2*sin(2.*pi*y)*sin(2.*pi*z)*cos(t*pi/3.) -# res[1][...] = -sin(2.*pi*x)*sin(pi*y)**2*sin(2.*pi*z)*cos(t*pi/3.) -# res[2][...] = -sin(2.*pi*x)*sin(2.*pi*y)*sin(pi*z)**2*cos(t*pi/3.) -# return res +def vitesse(res, x, y, z, t=0.): + res[0][...] = 2. * sin(pi*x)**2*sin(2.*pi*y)*sin(2.*pi*z)*cos(t*pi/3.) + res[1][...] = -sin(2.*pi*x)*sin(pi*y)**2*sin(2.*pi*z)*cos(t*pi/3.) + res[2][...] = -sin(2.*pi*x)*sin(2.*pi*y)*sin(pi*z)**2*cos(t*pi/3.) + return res dim = 3 boxLength = [1., 1., 1.] boxMin = [0., 0., 0.] -v_nbElem = [257] * 3 -nbElem = [257] * 3 +nbElem_v = [129] * 3 +nbElem_s = [129] * 3 timeStep = 0.025 finalTime = 3. outputFilePrefix = './res3D/levelSet_3D' -outputModulo = 1 -simu = Simulation(tinit=0.0, tend=finalTime, timeStep=timeStep, iterMax=120) +outputModulo = 10 +simu = Simulation(tinit=0.0, tend=finalTime, timeStep=timeStep, iterMax=2000) ## Domain box = Box(dim, length=boxLength, origin=boxMin) ## Fields -scal = Field(domain=box, name='Scalar', #formula=scalaire +scal = Field(domain=box, name='Scalar', formula=scalaire ) -velo = Field(domain=box, name='Velocity', isVector=True, #formula=vitesse +velo = Field(domain=box, name='Velocity', isVector=True, formula=vitesse ) ## Operators advec = Advection(velo, scal, - resolutions={velo: v_nbElem, - scal: nbElem}, + resolutions={velo: nbElem_v, + scal: nbElem_s}, method={TimeIntegrator: RK, Interpolation: Linear, - Remesh: rmsh, - Support: 'gpu_2k', - Splitting: 'o2_FullHalf', - MultiScale: Linear}, - src=['./levelSet3D.cl'], -#batch_nb=2 + Remesh: L6_4, + Support: 'gpu_1k', + Splitting: 'o2', + MultiScale: L2_1}, + #src=['./levelSet3D.cl'], + #device_type='cpu',platform_id=1, + #batch_nb=2 ) advec.discretize() velocity = Analytic(velo, - resolutions={velo: v_nbElem}, - method={Support: 'gpu'}, + resolutions={velo: nbElem_v}, + #method={Support: 'gpu'}, topo=advec.advecDir[0].discreteFields[velo].topology + #topo=advec.discreteFields[velo].topology ) - -distrForAdvecZ = Redistribute([velo], velocity, advec.advecDir[2], component=2) +if main_size > 1: + distrForAdvecZ = Redistribute([velo], velocity, advec.advecDir[2], + component=2, name_suffix='Zin') ##Problem # Sequential : no need of redistribute @@ -90,7 +93,7 @@ p = Printer(variables=[scal], topo=scal.topoInit, frequency=outputModulo, prefix=outputFilePrefix, - ext=".vtk") + formattype=HDF5) pb.addMonitors([p]) p._printStep() diff --git a/Examples/levelSet3D_hybrid.py b/Examples/levelSet3D_hybrid.py new file mode 100644 index 0000000000000000000000000000000000000000..ce0790645e0cf31ac82ee9073d30232fb4bdd3a4 --- /dev/null +++ b/Examples/levelSet3D_hybrid.py @@ -0,0 +1,116 @@ +""" +Task parallelism example. + +One process is dedicated to file outputs and other are computing the data. +""" +import parmepy as pp +from parmepy.constants import np, VTK, HDF5 +from parmepy.mpi.main_var import main_size, main_rank, main_comm +from parmepy.mpi.topology import Cartesian +from parmepy.fields.continuous import Field +from parmepy.operator.advection import Advection +from parmepy.operator.analytic import Analytic +from parmepy.operator.redistribute_intercomm import RedistributeIntercomm +from parmepy.operator.redistribute import Redistribute +from parmepy.operator.monitors.printer import Printer +from parmepy.problem.problem_tasks import ProblemTasks +from parmepy.problem.simulation import Simulation +from parmepy.tools.problem2dot import toDot +from parmepy.methods_keys import TimeIntegrator, Interpolation, Remesh,\ + Support, Splitting, MultiScale, Scales, MultiScale +from parmepy.numerics.integrators.runge_kutta2 import RK2 as RK +#from parmepy.numerics.integrators.runge_kutta4 import RK4 as RK +from parmepy.numerics.interpolation import Linear +from parmepy.numerics.remeshing import L2_1 as rmsh, L4_2, L2_1, L4_4, L8_4 +pi = np.pi +cos = np.cos +sin = np.sin +sqrt = np.sqrt + +def scalaire(res, x, y, z, t=0.): + r = sqrt((x - 0.35) ** 2 + (y - 0.35) ** 2 + (z - 0.35) ** 2) + res[0][...] = 0. + res[0][r < 0.15] = 1. + return res + + +def vitesse(res, x, y, z, t=0.): + res[0][...] = 2. * sin(pi*x)**2*sin(2.*pi*y)*sin(2.*pi*z)*cos(t*pi/3.) + res[1][...] = -sin(2.*pi*x)*sin(pi*y)**2*sin(2.*pi*z)*cos(t*pi/3.) + res[2][...] = -sin(2.*pi*x)*sin(2.*pi*y)*sin(pi*z)**2*cos(t*pi/3.) + return res + +dim = 3 +boxLength = [1., 1., 1.] +boxMin = [0., 0., 0.] +nbElem = [129] * 3 + +assert main_size > 1 +proc_tasks = [0,] * main_size +proc_tasks[0] = 1 +comm_s = main_comm.Split(color=proc_tasks[main_rank], key=main_rank) + + +box = pp.Box(dim, length=boxLength, origin=boxMin) + +if proc_tasks[main_rank] == 1: + topo = box.getOrCreateTopology(1, gridResolution=nbElem, + comm=comm_s + ) +elif proc_tasks[main_rank] == 0: + topo = box.getOrCreateTopology(1, gridResolution=nbElem, + comm=comm_s + ) +## Fields +scal = Field(domain=box, name='Scalar', formula=scalaire + ) +velo = Field(domain=box, name='Velocity', isVector=True, formula=vitesse + ) + +op_velo = Analytic([velo], {velo: nbElem}, topo=topo, task_id=1) +advec = Advection(velo, scal, + resolutions={velo: nbElem, + scal: nbElem}, + method={TimeIntegrator: RK, + Interpolation: Linear, + Remesh: L2_1, + Support: 'gpu_1k', + Splitting: 'o2', + MultiScale: L2_1}, + topo=topo, + task_id=0, + ) +p = Printer(variables=[scal], + topo=topo, + frequency=5, + prefix='./levelset3D_hybrid/', + formattype=HDF5, + task_id=0) + +distrForAdvecZ = Redistribute([velo], None, advec.advecDir[2], component=2, + task_id=0) + +redX = RedistributeIntercomm(velo, topo, op_velo, advec.advecDir[0], + proc_tasks, main_comm, + component=0) +redY = RedistributeIntercomm(velo, topo, op_velo, advec.advecDir[1], + proc_tasks, main_comm, + component=1) +redZ = RedistributeIntercomm(velo, topo, op_velo, distrForAdvecZ, + proc_tasks, main_comm, + component=2) +distrForAdvecZ.opFrom = redZ + +simu = Simulation(tinit=0.0, tend=3., timeStep=0.025, iterMax=200) + + +pb = ProblemTasks([op_velo, redX, redY, redZ, distrForAdvecZ, advec, p], + simu, proc_tasks, dumpFreq=1000) + +from parmepy.tools.problem2dot import toDot +toDot(pb) + +pb.setUp() +pb.solve() +pb.finalize() + diff --git a/HySoP/hysop/domain/domain.py b/HySoP/hysop/domain/domain.py index c280526a732af17439b99ba64c03410401b84131..3182ba6575d67da4145ed19d9a5581af11b7d61f 100644 --- a/HySoP/hysop/domain/domain.py +++ b/HySoP/hysop/domain/domain.py @@ -34,7 +34,8 @@ class Domain(object): def getOrCreateTopology(self, topoDim, gridResolution, topoResolution=None, ghosts=None, precomputed=False, localres=None, - offset=None, fixedResolution=False): + offset=None, fixedResolution=False, + comm=None): """ This routines checks if a topology is present in the list of topologies of the domain. @@ -49,13 +50,15 @@ class Domain(object): @return the required topology. """ if topoResolution is None: - newTopo = Cartesian(self, topoDim, gridResolution, ghosts=ghosts) + newTopo = Cartesian(self, topoDim, gridResolution, ghosts=ghosts, + comm=comm) elif precomputed: newTopo = Cartesian.withPrecomputedResolution(self, topoResolution, gridResolution, localres=localres, offset=offset, - ghosts=ghosts) + ghosts=ghosts, + comm=comm) else: if fixedResolution: @@ -63,7 +66,7 @@ class Domain(object): else: topoCreation = Cartesian.withResolution newTopo = topoCreation(self, topoResolution, - gridResolution, ghosts=ghosts) + gridResolution, ghosts=ghosts, comm=comm) otherid = self.register(newTopo) if otherid < 0: otherid = newTopo.getId() diff --git a/HySoP/hysop/domain/obstacle/sphere.py b/HySoP/hysop/domain/obstacle/sphere.py index 132c22ca439f8d10f1381757c72b8df818b33d0b..5d14340a780cbb3185963afb9fcbedeb72138777 100644 --- a/HySoP/hysop/domain/obstacle/sphere.py +++ b/HySoP/hysop/domain/obstacle/sphere.py @@ -13,7 +13,7 @@ class Sphere(Obstacle): """ def __init__(self, domain, position, radius=1.0, - vd=0.0, porousLayers=None): + vd=0.0, porousLayers=[]): """ Description of a sphere in a domain. @param domain : the physical domain that contains the sphere. @@ -41,8 +41,6 @@ class Sphere(Obstacle): self.chi = [dist] ## List of thicknesses for porous layers - if porousLayers is None: - porousLayers = [] self.layers = porousLayers def discretize(self, topo): @@ -83,7 +81,7 @@ class HemiSphere(Sphere): x < xs for xs == x position of the center of the sphere. """ def __init__(self, domain, position, radius=1.0, - vd=0.0, porousLayers=None): + vd=0.0, porousLayers=[]): """ Description of a sphere in a domain. @param domain : the physical domain that contains the sphere. diff --git a/HySoP/hysop/gpu/gpu_particle_advection.py b/HySoP/hysop/gpu/gpu_particle_advection.py index d1c756ddb070527dacbfa5867ea54340d2001634..7f7cf9631ddb317c840cdf9dd1a24d94a1a8954c 100644 --- a/HySoP/hysop/gpu/gpu_particle_advection.py +++ b/HySoP/hysop/gpu/gpu_particle_advection.py @@ -17,10 +17,8 @@ from parmepy.gpu import cl from parmepy.gpu.tools import get_opencl_environment from parmepy.gpu.gpu_kernel import KernelLauncher from parmepy.tools.timers import Timer -from parmepy.mpi.main_var import main_size from parmepy.operator.discrete.discrete import DiscreteOperator from parmepy.mpi import MPI -from parmepy.mpi.main_var import main_rank from parmepy.numerics.updateGhosts import UpdateGhosts @@ -37,11 +35,7 @@ class GPUParticleAdvection(ParticleAdvection): part_position=None, part_advectedFields=None, platform_id=0, device_id=0, device_type='gpu', - method={TimeIntegrator: RK2, - Interpolation: Linear, - Remesh: L2_1, - Support: 'gpu_2k', - Splitting: 'o2'}, + method=None, src=None, precision=PARMES_REAL, batch_nb=None, isMultiScale=False): """ @@ -65,6 +59,12 @@ class GPUParticleAdvection(ParticleAdvection): @param splittingConfig : Directional splitting configuration (parmepy.numerics.splitting.Splitting.__init__) """ + if method is None: + method = {TimeIntegrator: RK2, + Interpolation: Linear, + Remesh: L2_1, + Support: 'gpu_2k', + Splitting: 'o2'} ParticleAdvection.__init__(self, velocity, advectedFields, d, part_position, part_advectedFields, method, isMultiScale=isMultiScale) @@ -73,8 +73,12 @@ class GPUParticleAdvection(ParticleAdvection): self.user_gpu_src = src self.num_method = None self.dim = self.advectedFields[0].dimension - self.cl_env = get_opencl_environment(platform_id, device_id, - device_type, precision) + self.cl_env = get_opencl_environment( + platform_id, device_id, + device_type, precision, + comm=self.advectedFields[0].topology.comm) + self._main_rank = self.advectedFields[0].topology.comm.Get_rank() + self._main_size = self.advectedFields[0].topology.comm.Get_size() resol = self.advectedFields[0].topology.mesh.resolution v_resol = self.velocity.topology.mesh.resolution @@ -109,7 +113,7 @@ class GPUParticleAdvection(ParticleAdvection): v_shape[batch_dir] /= batch_nb v_shape[batch_dir] += gh_velo[batch_dir] self.batch_nb[batch_dir] = batch_nb - if np.prod(self.batch_nb) > 1 and main_rank == 0: + if np.prod(self.batch_nb) > 1 and self._main_rank == 0: print "Using batch for variables:", self.batch_nb # Functions to get the appropriate vectors for the current direction @@ -119,7 +123,7 @@ class GPUParticleAdvection(ParticleAdvection): if self.dim == 3 and self.dir == 1: self._reorderVect = lambda v: (v[1], v[0], v[2]) if self.dim == 3 and self.dir == 2: - if main_size == 1 and self.method[Splitting].find('o2') >= 0: + if self._main_size == 1 and self.method[Splitting].find('o2') >= 0: self._reorderVect = lambda v: (v[2], v[0], v[1]) else: self._reorderVect = lambda v: (v[2], v[1], v[0]) @@ -157,7 +161,7 @@ class GPUParticleAdvection(ParticleAdvection): "-D V_NB_I=V_NB_Y -D V_NB_II=V_NB_X -D V_NB_III=V_NB_Z", " -D NB_I=NB_Z -D NB_II=NB_Y -D NB_III=NB_X " + "-D V_NB_I=V_NB_Z -D V_NB_II=V_NB_Y -D V_NB_III=V_NB_X"] - if main_size == 1 and self.method[Splitting].find('o2') >= 0: + if self._main_size == 1 and self.method[Splitting].find('o2') >= 0: self._constants[2] = \ " -D NB_I=NB_Z -D NB_II=NB_X -D NB_III=NB_Y " + \ "-D V_NB_I=V_NB_Z -D V_NB_II=V_NB_X -D V_NB_III=V_NB_Y" @@ -236,7 +240,7 @@ class GPUParticleAdvection(ParticleAdvection): # 3D: X(dt/2), Y(dt/2), Z(dt), Y(dt/2), X(dt/2) # 2D: X(dt/2), Y(dt), X(dt/2) if np.prod(self.batch_nb) == 1 and self.method[Splitting] == 'o2': - if main_size > 1: + if self._main_size > 1: if self.dim == 2: self.exec_list = [ [self._init_copy, self._compute], # X(dt/2) @@ -275,7 +279,7 @@ class GPUParticleAdvection(ParticleAdvection): # X(dt/2), Y(dt/2), Z(dt/2), Z(dt/2), Y(dt/2), X(dt/2) elif np.prod(self.batch_nb) == 1 and \ self.method[Splitting] == 'o2_FullHalf': - if main_size > 1: + if self._main_size > 1: if self.dim == 2: self.exec_list = [ [self._init_copy, self._compute], # X(dt/2) @@ -432,7 +436,7 @@ class GPUParticleAdvection(ParticleAdvection): resol_tmp = np.empty_like(resol) # XY transposition settings - is_XY_needed = self.dir == 1 or (self.dir == 0 and main_size == 1) + is_XY_needed = self.dir == 1 or (self.dir == 0 and self._main_size == 1) if is_XY_needed: resol_tmp[...] = resol[...] if self.dir == 1: # (XY -> YX) @@ -463,7 +467,7 @@ class GPUParticleAdvection(ParticleAdvection): self.transpose_xy = KernelLauncher( prg.transpose_xy, self.cl_env.queue, gwi, lwi) - is_XY_r_needed = self.dir == 1 and main_size > 1 + is_XY_r_needed = self.dir == 1 and self._main_size > 1 if is_XY_r_needed: # Reversed XY transposition settings (YX -> XY), only in parallel resol_tmp[...] = resol[...] @@ -495,7 +499,7 @@ class GPUParticleAdvection(ParticleAdvection): resol = self.advectedFields[0].topology.mesh.resolution resol_tmp = np.empty_like(resol) - is_XZ_needed = self.dir == 2 or (self.dir == 1 and main_size == 1) + is_XZ_needed = self.dir == 2 or (self.dir == 1 and self._main_size == 1) # XZ transposition settings if is_XZ_needed: resol_tmp[...] = resol[...] @@ -505,7 +509,7 @@ class GPUParticleAdvection(ParticleAdvection): resol_tmp[2] = resol[2] ocl_cte = " -D NB_I=NB_Y -D NB_II=NB_X -D NB_III=NB_Z" elif self.dir == 2: - if main_size == 1: # YXZ -> ZXY + if self._main_size == 1: # YXZ -> ZXY resol_tmp[0] = resol[2] resol_tmp[1] = resol[0] resol_tmp[2] = resol[1] @@ -539,7 +543,7 @@ class GPUParticleAdvection(ParticleAdvection): self.transpose_xz = KernelLauncher( prg.transpose_xz, self.cl_env.queue, gwi, lwi) - is_XZ_r_needed = self.dir == 2 and main_size > 1 + is_XZ_r_needed = self.dir == 2 and self._main_size > 1 if is_XZ_r_needed: # Reversed XZ transposition settings (ZYX -> XYZ) resol_tmp[...] = resol[...] @@ -655,7 +659,6 @@ class GPUParticleAdvection(ParticleAdvection): if split_id == 0: for v in self._work + self.advectedFields + [self.velocity]: v.clean_events() - self._the_apply(simulation, dtCoeff, split_id, old_dir) self._apply_timer.append_time(MPI.Wtime() - ctime) diff --git a/HySoP/hysop/gpu/gpu_particle_advection_1k.py b/HySoP/hysop/gpu/gpu_particle_advection_1k.py index 4ef0ac522a963caccd4d6fb7516e1393d1570243..5291647aaf4f0b15c5a15045fe41cf3ecdf4f2a2 100644 --- a/HySoP/hysop/gpu/gpu_particle_advection_1k.py +++ b/HySoP/hysop/gpu/gpu_particle_advection_1k.py @@ -26,11 +26,7 @@ class GPUParticleAdvection1k(GPUParticleAdvection): part_position=None, part_advectedFields=None, platform_id=0, device_id=0, device_type='gpu', - method={TimeIntegrator: RK2, - Interpolation: Linear, - Remesh: L2_1, - Support: 'gpu_1k', - Splitting: 'o2'}, + method=None, src=None, precision=PARMES_REAL, batch_nb=None, isMultiScale=False): """ @@ -56,6 +52,12 @@ class GPUParticleAdvection1k(GPUParticleAdvection): @param isMultiScale : Flag to specify if the velocity and advected fields are on different grids. """ + if method is None: + method = {TimeIntegrator: RK2, + Interpolation: Linear, + Remesh: L2_1, + Support: 'gpu_1k', + Splitting: 'o2'} GPUParticleAdvection.__init__(self, velocity, advectedFields, d, part_position, part_advectedFields, platform_id, device_id, diff --git a/HySoP/hysop/gpu/gpu_particle_advection_2k.py b/HySoP/hysop/gpu/gpu_particle_advection_2k.py index c004f741f218b5ae0e85813e79803820c5db4762..b66a79bec3c948a317f41bc2673a2119d37e0e49 100644 --- a/HySoP/hysop/gpu/gpu_particle_advection_2k.py +++ b/HySoP/hysop/gpu/gpu_particle_advection_2k.py @@ -25,11 +25,7 @@ class GPUParticleAdvection2k(GPUParticleAdvection): part_position=None, part_advectedFields=None, platform_id=0, device_id=0, device_type='gpu', - method={TimeIntegrator: RK2, - Interpolation: Linear, - Remesh: L2_1, - Support: 'gpu_2k', - Splitting: 'o2'}, + method=None, src=None, precision=PARMES_REAL, batch_nb=None, isMultiScale=False): """ @@ -55,6 +51,12 @@ class GPUParticleAdvection2k(GPUParticleAdvection): @param isMultiScale : Flag to specify if the velocity and advected fields are on different grids. """ + if method is None: + method = {TimeIntegrator: RK2, + Interpolation: Linear, + Remesh: L2_1, + Support: 'gpu_2k', + Splitting: 'o2'} GPUParticleAdvection.__init__(self, velocity, advectedFields, d, part_position, part_advectedFields, platform_id, device_id, diff --git a/HySoP/hysop/gpu/tools.py b/HySoP/hysop/gpu/tools.py index fe7767684aaa7a43abb2e386c78151d246757a83..b33a41a7385c9203c333183cd1f007890e23db17 100644 --- a/HySoP/hysop/gpu/tools.py +++ b/HySoP/hysop/gpu/tools.py @@ -6,7 +6,6 @@ Tools for gpu management. from parmepy import __VERBOSE__ from parmepy.constants import np from parmepy.gpu import cl, clTools, GPU_SRC, CL_PROFILE -from parmepy.mpi.main_var import main_comm, main_rank import re FLOAT_GPU, DOUBLE_GPU = np.float32, np.float64 @@ -20,7 +19,7 @@ class OpenCLEnvironment(object): """ def __init__(self, platform_id, device_id, device_type, - precision, gl_sharing=False): + precision, gl_sharing=False, comm=None): """Create environment. @param platform_id : OpenCL platform id @param device_id : OpenCL device id @@ -44,7 +43,12 @@ class OpenCLEnvironment(object): ## OpenCL queue self.queue = self._get_queue(self.ctx) ## MPI sub-communicator for all processes attached to the same device - self.gpu_comm = main_comm.Split(color=device_id, key=main_rank) + if comm is None: + from parmepy.mpi.main_var import main_comm + else: + main_comm = comm + self.gpu_comm = main_comm.Split(color=device_id, + key=main_comm.Get_rank()) ## Memory Pool allocator (immediate allocator) self.memPool = clTools.MemoryPool( @@ -320,7 +324,8 @@ class OpenCLEnvironment(object): Parse the sources to handle single and double precision. """ gpu_src = "" - if self.precision is DOUBLE_GPU: + if cl.device_type.to_string(self.device.type) == 'GPU' and \ + self.precision is DOUBLE_GPU: gpu_src += "#pragma OPENCL EXTENSION cl_khr_fp64: enable \n" if isinstance(files, list): file_list = files @@ -505,7 +510,7 @@ class OpenCLEnvironment(object): def get_opengl_shared_environment(platform_id, device_id, device_type, - precision): + precision, comm=None): """ Get an OpenCL environment with OpenGL shared enable. @@ -520,14 +525,15 @@ def get_opengl_shared_environment(platform_id, device_id, device_type, global __cl_env if __cl_env is None: __cl_env = OpenCLEnvironment(platform_id, device_id, device_type, - precision, gl_sharing=True) + precision, gl_sharing=True, comm=comm) else: __cl_env.modify(platform_id, device_id, device_type, precision, gl_sharing=True) return __cl_env -def get_opencl_environment(platform_id, device_id, device_type, precision): +def get_opencl_environment(platform_id, device_id, device_type, precision, + comm=None): """ Get an OpenCL environment. @@ -541,7 +547,7 @@ def get_opencl_environment(platform_id, device_id, device_type, precision): global __cl_env if __cl_env is None: __cl_env = OpenCLEnvironment(platform_id, device_id, device_type, - precision) + precision, comm=comm) else: __cl_env.modify(platform_id, device_id, device_type, precision) diff --git a/HySoP/hysop/mpi/topology.py b/HySoP/hysop/mpi/topology.py index 286a08c6d25b9ec334d4693f277c99e8aa11add8..cafa0549888db8cc41ce9dc62b9d322f6e31a1e0 100644 --- a/HySoP/hysop/mpi/topology.py +++ b/HySoP/hysop/mpi/topology.py @@ -25,7 +25,7 @@ To get more details try : from parmepy.constants import ORDER, np, PARMES_INDEX, debug, PARMES_INTEGER from parmepy.mpi.mesh import SubMesh from itertools import count -from parmepy.mpi.main_var import main_comm, MPI +from parmepy.mpi.main_var import MPI import parmepy.tools.numpywrappers as npw @@ -58,7 +58,7 @@ class Cartesian(object): @debug def __init__(self, domain, dim, globalMeshResolution, - comm=main_comm, periods=None, ghosts=None, cutdir=None, + comm=None, periods=None, ghosts=None, cutdir=None, shape=None, precomputed=False, localres=None, localoffset=None): @@ -74,6 +74,8 @@ class Cartesian(object): # Mind the sequential case : dim from withConstr may be equal # to 0 if shape[:] = 1 ## (Source) Communicator used to build the topology + if comm is None: + from parmepy.mpi.main_var import main_comm as comm self._comm_origin = comm ## number of process in comm_origin self._origin_size = self._comm_origin.Get_size() @@ -244,7 +246,8 @@ class Cartesian(object): @classmethod def withResolution(cls, domain, shape, - globalMeshResolution, ghosts=None, periods=None): + globalMeshResolution, ghosts=None, periods=None, + comm=None): """ Compute a topology for a given shape of the processus grid. If the dimension of the topology is smaller than the domain dimension, @@ -273,11 +276,13 @@ class Cartesian(object): cutdir = shape != 1 return cls(domain, dim, globalMeshResolution, cutdir=cutdir, - periods=periods, ghosts=ghosts, shape=shape) + periods=periods, ghosts=ghosts, shape=shape, + comm=comm) @classmethod def withResolutionFixed(cls, domain, shape, - globalMeshResolution, ghosts=None, periods=None): + globalMeshResolution, ghosts=None, periods=None, + comm=None): """ Compute a topology for a given shape of the processus grid. @param domain in which the topology is defined @@ -295,11 +300,13 @@ class Cartesian(object): create a topology with 0 resolution in one direction." cutdir = shape != 1 return cls(domain, dim, globalMeshResolution, - cutdir=cutdir, periods=periods, ghosts=ghosts, shape=shape) + cutdir=cutdir, periods=periods, ghosts=ghosts, shape=shape, + comm=comm) @classmethod def withCutdir(cls, domain, cutdir, - globalMeshResolution, ghosts=None, periods=None): + globalMeshResolution, ghosts=None, periods=None, + comm=None): """ Compute a topology from a list of directions to be cut. @param domain in which the topology is defined @@ -312,12 +319,14 @@ class Cartesian(object): cutdir = np.asarray(cutdir, dtype=np.bool) dim = cutdir[cutdir].size return cls(domain, dim, globalMeshResolution, - cutdir=cutdir, periods=periods, ghosts=ghosts) + cutdir=cutdir, periods=periods, ghosts=ghosts, + comm=comm) @classmethod def withPrecomputedResolution(cls, domain, shape, globalMeshResolution, localres, - offset, ghosts=None, periods=None): + offset, ghosts=None, periods=None, + comm=None): """ Compute a topology for a given shape of the processus grid and a given local mesh resolution/offset (example : output from @@ -342,7 +351,8 @@ class Cartesian(object): return cls(domain, dim, globalMeshResolution, precomputed=True, localres=localres, localoffset=offset, - periods=periods, ghosts=ghosts, cutdir=cutdir, shape=shape) + periods=periods, ghosts=ghosts, cutdir=cutdir, shape=shape, + comm=comm) def __eq__(self, other): """ diff --git a/HySoP/hysop/operator/adapt_timestep.py b/HySoP/hysop/operator/adapt_timestep.py index b3c545633d10e02d4c9b924f98da0d79aa62f085..0abbd87fa63d28052af1159d2d5b289bc580407f 100755 --- a/HySoP/hysop/operator/adapt_timestep.py +++ b/HySoP/hysop/operator/adapt_timestep.py @@ -23,9 +23,9 @@ class AdaptTimeStep(Operator): @debug def __init__(self, velocity, vorticity, resolutions, dt_adapt=None, - method={TimeIntegrator: RK3, SpaceDiscretisation: FD_C_4, - dtAdvecCrit: 'vort'}, - topo=None, ghosts=None, prefix='./dt.dat', **other_config): + method=None, + topo=None, ghosts=None, prefix='./dt.dat', + task_id=None, **other_config): """ Create a timeStep-evaluation operator from given velocity and vorticity variables. @@ -47,8 +47,12 @@ class AdaptTimeStep(Operator): @param ghosts : number of ghosts points. Default depends on the method. Automatically computed if not set. """ + if method is None: + method = {TimeIntegrator: RK3, + SpaceDiscretisation: FD_C_4, + dtAdvecCrit: 'vort'} Operator.__init__(self, [velocity, vorticity], method, topo=topo, - ghosts=ghosts) + ghosts=ghosts, task_id=task_id) self.config = other_config ## velocity variable (vector) self.velocity = velocity @@ -106,8 +110,3 @@ class AdaptTimeStep(Operator): self.discreteOperator.setUp() self._isUpToDate = True - - def apply(self, simulation=None): - # computation ... - self.discreteOperator.apply(simulation) - diff --git a/HySoP/hysop/operator/advection.py b/HySoP/hysop/operator/advection.py index af1d91033db0fdbe59e35bdbc7de1e2153c496d7..0b8cf6ae7382182f622d16647cba96473a425b33 100644 --- a/HySoP/hysop/operator/advection.py +++ b/HySoP/hysop/operator/advection.py @@ -11,7 +11,6 @@ from parmepy.numerics.integrators.runge_kutta2 import RK2 from parmepy.numerics.interpolation import Linear from parmepy.numerics.remeshing import L2_1 from parmepy.fields.continuous import Field -from parmepy.mpi import main_size, main_rank from parmepy.operator.redistribute import Redistribute from parmepy.operator.advectionDir import AdvectionDir @@ -50,12 +49,8 @@ class Advection(Operator): @debug def __init__(self, velocity, advectedFields, resolutions, - method={TimeIntegrator: RK2, - Interpolation: Linear, - Remesh: L2_1, - Support: '', - Splitting: 'o2'}, - topo=None, ghosts=None, **other_config): + method=None, + topo=None, ghosts=None, task_id=None, **other_config): """ Create a Transport operator from given variables velocity and scalar. @@ -68,6 +63,13 @@ class Advection(Operator): @param other_config : Method specific parameters @param topo : a predefined topology to discretize variables """ + if method is None: + method = {TimeIntegrator: RK2, + Interpolation: Linear, + Remesh: L2_1, + Support: '', + Splitting: 'o2', + MultiScale: L2_1} v = [velocity] if isinstance(advectedFields, list): self.advectedFields = advectedFields @@ -79,7 +81,8 @@ class Advection(Operator): for vv in self.advectedFields: vars_str += vv.name + "," Operator.__init__(self, v, method, topo=topo, - ghosts=ghosts, name_suffix=vars_str[0:-1]+')') + ghosts=ghosts, name_suffix=vars_str[0:-1] + ')', + task_id=task_id) ## Transport velocity self.velocity = velocity ## Transported fields @@ -114,7 +117,31 @@ class Advection(Operator): self._old_dir = 0 self.splitting = [] self._dim = self.velocity.dimension - self.advecDir = [None] * self._dim + self.advecDir = None + if not self._isSCALES: + particles_advectedFields = [ + Field(adF.domain, name="Particle_AdvectedFields", + isVector=adF.isVector) + for adF in self.advectedFields] + if self.method[Support].find('gpu_1k') >= 0: + particles_positions = None + else: + particles_positions = \ + Field(self.advectedFields[0].domain, + name="Particle_Position", isVector=False + ) + + # Directional continuous Advection operators + self.advecDir = [None] * self._dim + for i in xrange(self._dim): + self.advecDir[i] = AdvectionDir( + self.velocity, self.advectedFields, i, + particles_advectedFields, particles_positions, + self.resolutions, + method=self.method, topo=self._predefinedTopo, + ghosts=self.ghosts, isMultiScale=self._isMultiScale, + task_id=task_id, name_suffix=vars_str[0:-1] + ')', + **self.config) @debug def discretize(self): @@ -206,6 +233,10 @@ class Advection(Operator): # Scales nbcells equals resolutions - 1 nbcells = np.asarray(self.resolutions[self.advectedFields[0]], dtype=PARMES_INDEX) - 1 + if self._predefinedTopo is not None: + main_size = self._predefinedTopo.size + else: + from parmepy.mpi.main_var import main_size topodims = [1, 1, main_size] scalesres, scalesoffset, stab_coeff = \ scales.init_advection_solver(nbcells, @@ -242,31 +273,9 @@ class Advection(Operator): # --- GPU or pure-python advection --- else: - if self.advecDir[0] is None: - particles_advectedFields = [ - Field(adF.domain, name="Particle_AdvectedFields", - isVector=adF.isVector) - for adF in self.advectedFields] - if self.method[Support].find('gpu_1k') >= 0: - particles_positions = None - else: - particles_positions = \ - Field(self.advectedFields[0].domain, - name="Particle_Position", isVector=False - ) - - # Directional continuous Advection operators - for i in xrange(self._dim): - self.advecDir[i] = AdvectionDir( - self.velocity, self.advectedFields, i, - particles_advectedFields, particles_positions, - self.resolutions, - method=self.method, topo=self._predefinedTopo, - ghosts=self.ghosts, isMultiScale=self._isMultiScale, - **self.config) - for i in xrange(self._dim): - self.advecDir[i].discretize() - self.discreteFields = self.advecDir[0].discreteFields + for i in xrange(self._dim): + self.advecDir[i].discretize() + self.discreteFields = self.advecDir[0].discreteFields self._my_setUp = self.setUp_Python @staticmethod @@ -306,7 +315,6 @@ class Advection(Operator): # -- Final set up -- self.discreteOperator.setUp() - self.apply = self.discreteOperator.apply self._isUpToDate = True def setUp_Python(self): @@ -315,6 +323,8 @@ class Advection(Operator): # If topologies differs between directions, # one need Redistribute # operators + main_size = self.advecDir[0].discreteFields[ + self.velocity].topology.size if main_size > 1: # Build bridges self.bridges = self._dim * [None] @@ -328,7 +338,8 @@ class Advection(Operator): self.bridges[dfrom][dto] = Redistribute( self.advectedFields, self.advecDir[dfrom], - self.advecDir[dto]) + self.advecDir[dto], + name_suffix=str(dfrom)+'_'+str(dto)) self.bridges[dfrom][dto].setUp() # Splitting configuration @@ -410,23 +421,6 @@ class Advection(Operator): s += " Total : {0:9d}".format(total_lmem) + "Bytes" print s - def addRedistributeRequirement(self, red): - if self._isSCALES: - self.discreteOperator.addRedistributeRequirement(red) - else: - dop = self.advecDir[0].discreteOperator - dop.addRedistributeRequirement(red) - - def getRedistributeRequirement(self): - if self._isSCALES: - return self.discreteOperator.requirements - else: - req = [] - for i in xrange(self._dim): - for r in self.advecDir[i].discreteOperator.requirements: - req.append(r) - return req - @debug def _apply_noComm(self, simulation=None): """ @@ -437,6 +431,8 @@ class Advection(Operator): Redefinition for advection. Applying a dimensional splitting. """ + for req in self.requirements: + req.wait() for split_id, split in enumerate(self.splitting): self.advecDir[split[0]].apply( simulation, split[1], split_id, self._old_dir) @@ -452,6 +448,8 @@ class Advection(Operator): Redefinition for advection. Applying a dimensional splitting. """ + for req in self.requirements: + req.wait() for split_id, split in enumerate(self.splitting): # Calling the redistribute operators between directions if not self._old_dir == split[0]: diff --git a/HySoP/hysop/operator/advection_dir.py b/HySoP/hysop/operator/advection_dir.py index 79581f7998eeac243d5c1f40ee43d870dabbe646..44042b140f8b847f23cd3d4f8329be5cb61d27f8 100644 --- a/HySoP/hysop/operator/advection_dir.py +++ b/HySoP/hysop/operator/advection_dir.py @@ -3,14 +3,13 @@ Advection of a field in a single direction. """ -from parmepy.constants import debug, np, S_DIR +from parmepy.constants import debug, np, S_DIR, PARMES_INDEX from parmepy.methods_keys import TimeIntegrator, Interpolation, Remesh, \ Support, Splitting, MultiScale from parmepy.numerics.integrators.runge_kutta2 import RK2 from parmepy.numerics.interpolation import Linear from parmepy.numerics.remeshing import L2_1 from parmepy.operator.continuous import Operator -from parmepy.mpi import main_size from parmepy.tools.timers import Timer @@ -31,17 +30,22 @@ class AdvectionDir(Operator): @debug def __init__(self, velocity, advectedFields, d, particle_fields, particle_positions, resolutions, - method={TimeIntegrator: RK2, - Interpolation: Linear, - Remesh: L2_1, - Support: '', - Splitting: 'o2'}, + method=None, topo=None, ghosts=None, isMultiScale=False, + task_id=None, name_suffix='', **other_config): + if method is None: + method = {TimeIntegrator: RK2, + Interpolation: Linear, + Remesh: L2_1, + Support: '', + Splitting: 'o2', + MultiScale: L2_1} v = [velocity] [v.append(f) for f in advectedFields] Operator.__init__(self, v, method, topo=topo, - ghosts=ghosts) + ghosts=ghosts, task_id=task_id, + name_suffix=name_suffix + S_DIR[d]) self.output = advectedFields self.input = [var for var in self.variables] ## Transport velocity @@ -54,12 +58,15 @@ class AdvectionDir(Operator): ## resolutions[velocity] may return [32,32,32]. self.resolutions = resolutions self._isMultiScale = isMultiScale - self._v_ghosts = [0, ]*self.domain.dimension + self._v_ghosts = np.array([0, ] * self.domain.dimension, + dtype=PARMES_INDEX) if self._isMultiScale: - self._v_ghosts = [2, ]*self.domain.dimension + self._v_ghosts = np.array([2, ] * self.domain.dimension, + dtype=PARMES_INDEX) if MultiScale in method.keys(): if method[MultiScale] == Linear: - self._v_ghosts = [1, ]*self.domain.dimension + self._v_ghosts = np.array([1, ] * self.domain.dimension, + dtype=PARMES_INDEX) else: assert method[MultiScale] == L2_1 else: @@ -76,6 +83,11 @@ class AdvectionDir(Operator): Discretisation according to the chosen method. Available methods : See Advection.setUp """ + if self._predefinedTopo is not None: + main_size = self._predefinedTopo.size + else: + from parmepy.mpi import main_size + comm = None if main_size == 1: # - topologies creation and variables discretization - if self._predefinedTopo is not None: @@ -93,29 +105,43 @@ class AdvectionDir(Operator): self.domain.dimension, self.resolutions[v]) self.discreteFields[v] = v.discretize(topo) else: + topodims = np.ones((self.domain.dimension)) if self._predefinedTopo is not None: - raise ValueError("User-defined topology is not\ + if self._predefinedTopo.dim != 1: + raise ValueError("User-defined topology is not\ (yet) allowed for advection.") - - topodims = np.ones((self.domain.dimension)) - # MPI topology depends on direction - if self.dir == self.domain.dimension - 1: - # Cut in first dir for last dir computations - topodims[0] = main_size + else: + comm = self._predefinedTopo._comm_origin + shape = self._predefinedTopo.shape + size = self._predefinedTopo.size + if self.dir == self.domain.dimension - 1 and \ + shape[-1] > 1: + topodims[0] = size + elif self.dir != self.domain.dimension - 1 and \ + shape[self.dir] > 1: + topodims[-1] = size + else: + topodims[...] = shape else: - # Cut in last dir - topodims[-1] = main_size + # MPI topology depends on direction + if self.dir == self.domain.dimension - 1: + # Cut in first dir for last dir computations + topodims[0] = main_size + else: + # Cut in last dir + topodims[-1] = main_size for v in self.variables: if v == self.velocity: topo = self.domain.getOrCreateTopology( self.domain.dimension, self.resolutions[v], topoResolution=topodims, fixedResolution=True, - ghosts=self._v_ghosts) + ghosts=self._v_ghosts, comm=comm) else: topo = self.domain.getOrCreateTopology( self.domain.dimension, self.resolutions[v], - topoResolution=topodims, fixedResolution=True) + topoResolution=topodims, fixedResolution=True, + comm=comm) self.discreteFields[v] = v.discretize(topo) @staticmethod @@ -171,6 +197,8 @@ class AdvectionDir(Operator): @debug def apply(self, simulation, dtCoeff, split_id, old_dir=None): + for req in self.requirements: + req.wait() self.discreteOperator.apply( simulation, dtCoeff, split_id, old_dir) diff --git a/HySoP/hysop/operator/analytic.py b/HySoP/hysop/operator/analytic.py index 0fbeba9ea656efb13bf6bfed7e364e1c10984a64..2f19be81c2f11243715e0ceb216cfd7b271483b3 100644 --- a/HySoP/hysop/operator/analytic.py +++ b/HySoP/hysop/operator/analytic.py @@ -14,7 +14,8 @@ class Analytic(Operator): @debug def __init__(self, variables, resolutions, formula=None, - ghosts=None, topo=None, doVectorize=False, method={}): + ghosts=None, topo=None, doVectorize=False, method=None, + task_id=None): """ Create an operator using an analytic formula to compute field(s). @param variables : list of fields on which this operator will apply. @@ -27,8 +28,7 @@ class Analytic(Operator): @remark : method seems useless but it is useful for GPU. """ Operator.__init__(self, variables, ghosts=ghosts, topo=topo, - method=method) - + method=method, task_id=task_id) if formula is not None: ## A formula applied to all variables of this operator self.formula = formula @@ -45,7 +45,6 @@ class Analytic(Operator): ## Dict of resolutions (one per variable) self.resolutions = resolutions self.output = self.variables - self.requirements = [] def discretize(self): if self._predefinedTopo is not None: @@ -67,22 +66,13 @@ class Analytic(Operator): def setUp(self): self._isUpToDate = True - def addRedistributeRequirement(self, red): - self.requirements.append(red) - - def getRedistributeRequirement(self): - return self.requirements - - @debug def apply(self, simulation=None): assert simulation is not None, \ "Missing simulation value for computation." - # Calling for requirements completion for red in self.requirements: red.wait() - for v in self.variables: v.initialize(simulation.time) diff --git a/HySoP/hysop/operator/continuous.py b/HySoP/hysop/operator/continuous.py index 4f56a473488a6fe38ced93755707f38e854af13b..1d8d99a8a7f0b484fe884ec491858107560aef47 100644 --- a/HySoP/hysop/operator/continuous.py +++ b/HySoP/hysop/operator/continuous.py @@ -32,7 +32,7 @@ class Operator(object): @debug @abstractmethod def __init__(self, variables, method=None, topo=None, ghosts=None, - name_suffix=''): + name_suffix='', task_id=None): """ Build the operator. The only required parameter is a list of variables. @@ -86,12 +86,15 @@ class Operator(object): if topo is not None: self._predefinedTopo = topo if self.ghosts is not None: - assert (self.ghosts == topo.ghosts), 'topo.ghosts and\ + assert (self.ghosts == topo.ghosts).all(), 'topo.ghosts and\ input for ghosts must not be different' else: self.ghosts = topo.ghosts self.name = self.__class__.__name__ + name_suffix + self.task_id = task_id self.timer = Timer(self) + ## Redistribute operator list that we must wait for. + self.requirements = [] @staticmethod def getWorkLengths(): @@ -118,10 +121,10 @@ class Operator(object): """ def addRedistributeRequirement(self, red): - self.discreteOperator.addRedistributeRequirement(red) + self.requirements.append(red) def getRedistributeRequirement(self): - return self.discreteOperator.requirements + return self.requirements @abstractmethod def setUp(self): @@ -148,6 +151,8 @@ class Operator(object): parameters (time, time step, iteration number ...), see parmepy.problem.simulation.Simulation for details. """ + for req in self.requirements: + req.wait() self.discreteOperator.apply(simulation) def printComputeTime(self): @@ -156,6 +161,7 @@ class Operator(object): self.discreteOperator.printComputeTime() self.time_info = self.discreteOperator.time_info else: + from parmepy.mpi.main_var import main_rank shortName = str(self.__class__).rpartition('.')[-1][0:-2] s = '[' + str(main_rank) + '] ' + shortName s += " : operator not discretized --> no computation, time = 0." diff --git a/HySoP/hysop/operator/curlAndDiffusion.py b/HySoP/hysop/operator/curlAndDiffusion.py index 46ebdbf394ea10658b21baf064e4e09ecec8e91d..a2db23ba354cff3bffc376cad2aef1c0ceb71c68 100644 --- a/HySoP/hysop/operator/curlAndDiffusion.py +++ b/HySoP/hysop/operator/curlAndDiffusion.py @@ -24,7 +24,7 @@ class Diffusion(Operator): @debug def __init__(self, velocity=None, vorticity=None, - resolutions=None, method='', + resolutions=None, method=None, task_id=None, **other_config): """ Constructor. @@ -35,7 +35,8 @@ class Diffusion(Operator): @param viscosity : viscosity of the considered medium. """ Operator.__init__(self, [velocity, vorticity], method, - name="Curl and Diffusion") + name="Curl and Diffusion", + task_id=task_id) self.velocity = velocity self.vorticity = vorticity self.resolutions = resolutions @@ -69,8 +70,3 @@ class Diffusion(Operator): self.discreteOperator.setUp() self._isUpToDate = True - -if __name__ == "__main__": - print __doc__ - print "- Provided class : Diffusion" - print Diffusion.__doc__ diff --git a/HySoP/hysop/operator/density.py b/HySoP/hysop/operator/density.py index 9baecde84bb3dfea294a91c66484c1758a171254..4ab66b4630d44cf04abc2a08a2e3f9fe1f11fa7f 100644 --- a/HySoP/hysop/operator/density.py +++ b/HySoP/hysop/operator/density.py @@ -15,7 +15,8 @@ class DensityVisco(Operator): @debug def __init__(self, density, viscosity, - resolutions=None, method='', + resolutions=None, method=None, + task_id=None, **other_config): """ Constructor. @@ -24,7 +25,7 @@ class DensityVisco(Operator): @param velocity ContinuousVectorField : velocity variable. """ Operator.__init__(self, [density, viscosity], method, - name="DensityVisco") + name="DensityVisco", task_id=task_id) self.density = density self.viscosity = viscosity @@ -56,8 +57,3 @@ class DensityVisco(Operator): self.discreteOperator.setUp() self._isUpToDate = True - -if __name__ == "__main__": - print __doc__ - print "- Provided class : DensityVisco" - print DensityVisco.__doc__ diff --git a/HySoP/hysop/operator/differential.py b/HySoP/hysop/operator/differential.py index 012be67917b9fd842a74039dbfdeab631c280074..56adbf6150a50ba77ed160e4ca570771161b5097 100644 --- a/HySoP/hysop/operator/differential.py +++ b/HySoP/hysop/operator/differential.py @@ -9,8 +9,7 @@ from parmepy.operator.discrete.differential import Curl_d, GradFD from parmepy.methods_keys import SpaceDiscretisation, GhostUpdate from parmepy.numerics.finite_differences import FD_C_4 from parmepy.f2py import fftw2py -from parmepy.mpi import main_size -from abc import ABCMeta, abstractmethod +from abc import ABCMeta class Differential(Operator): @@ -22,12 +21,12 @@ class Differential(Operator): @debug def __init__(self, invar, outvar, resolutions, - method=None, topo=None, ghosts=None): + method=None, topo=None, ghosts=None, task_id=None): if method is None: method = {SpaceDiscretisation: FD_C_4, GhostUpdate: True} Operator.__init__(self, [invar, outvar], method, topo=topo, - ghosts=ghosts) + ghosts=ghosts, task_id=task_id) ## input variable self.invar = invar ## Curl of input @@ -65,7 +64,7 @@ class Differential(Operator): # according to fftw requirements. localres, localoffset = fftw2py.init_fftw_solver( self.resolution, self.domain.length) - + from parmepy.mpi.main_var import main_size topodims = np.ones((self.domain.dimension)) topodims[-1] = main_size #variables discretization @@ -92,7 +91,7 @@ class Differential(Operator): self.discreteFields[self.outvar].topology, \ 'Operator not yet implemented for multiple resolutions.' - + class Curl(Differential): """ Computes \f$ outVar = \nabla inVar \f$ diff --git a/HySoP/hysop/operator/diffusion.py b/HySoP/hysop/operator/diffusion.py index 6680c28895a713f65d05fcbcab1e18553720f563..127a65eb2e9df563a8b17ba2b1155347128cec1c 100644 --- a/HySoP/hysop/operator/diffusion.py +++ b/HySoP/hysop/operator/diffusion.py @@ -9,7 +9,6 @@ from parmepy.operator.continuous import Operator from parmepy.f2py import fftw2py from parmepy.operator.discrete.diffusion_fft import DiffusionFFT from parmepy.constants import debug -from parmepy.mpi import main_size import numpy as np @@ -26,24 +25,25 @@ class Diffusion(Operator): @debug def __init__(self, vorticity, resolution, viscosity, method=None, - ghosts=None, **other_config): + topo=None, ghosts=None, task_id=None, **other_config): """ Constructor for the diffusion operator. @param[in,out] vorticity : field \f$ \omega \f$ @param[in] resolution : \f$ \omega \f$ global resolution. @param[in] viscosity : \f$\nu\f$, viscosity of the considered medium. """ + # The only available method at the time is fftw + if method is None: + method = 'fftw' ## input/output field, solution of the problem self.vorticity = vorticity ## Global resolution for vorticity self.resolution = resolution ## viscosity self.viscosity = viscosity - Operator.__init__(self, [vorticity], method, ghosts=ghosts) + Operator.__init__(self, [vorticity], method=method, ghosts=ghosts, + topo=topo, task_id=task_id) self.config = other_config - # The only available method at the time is fftw - if method is None: - self.method = 'fftw' self.input = [self.vorticity] self.output = [self.vorticity] @@ -58,21 +58,32 @@ class Diffusion(Operator): localres, localoffset = fftw2py.init_fftw_solver( self.resolution, self.domain.length) + if self._predefinedTopo is not None: + main_size = self._predefinedTopo.size + else: + from parmepy.mpi.main_var import main_size topodims = np.ones((self.domain.dimension)) topodims[-1] = main_size #variables discretization if self.ghosts is not None: raise AttributeError("Ghosts points not yet\ implemented for diffusion operator.") - for v in self.variables: - topo = self.domain.getOrCreateTopology(self.domain.dimension, - self.resolution, topodims, - precomputed=True, - offset=localoffset, - localres=localres, - ghosts=self.ghosts) - - self.discreteFields[v] = v.discretize(topo) + if self._predefinedTopo is not None: + topo = self._predefinedTopo + assert (topo.shape == topodims).all(), 'input topology is\ + not compliant with fftw.' + for v in self.variables: + self.discreteFields[v] = v.discretize(topo) + else: + for v in self.variables: + topo = self.domain.getOrCreateTopology( + self.domain.dimension, + self.resolution, topodims, + precomputed=True, + offset=localoffset, + localres=localres, + ghosts=self.ghosts) + self.discreteFields[v] = v.discretize(topo) @debug def setUp(self): diff --git a/HySoP/hysop/operator/discrete/discrete.py b/HySoP/hysop/operator/discrete/discrete.py index 4d34a475d5620b1ca03b7c3769e01c805c882b88..02d54ba6f81f7f6098385ce2e374c150110707b2 100644 --- a/HySoP/hysop/operator/discrete/discrete.py +++ b/HySoP/hysop/operator/discrete/discrete.py @@ -48,8 +48,6 @@ class DiscreteOperator(object): self._isUpToDate = False ## True if work arrays are provided by external call (self.setWorks) self.hasExternalWork = False - ## Redistribute operator list that we must wait for. - self.requirements = [] self._apply_timer = ManualFunctionTimer('apply_function') self.timer.addFunctionTimer(self._apply_timer) # Local (optional) work arrays. Set with setWorks function @@ -110,8 +108,7 @@ class DiscreteOperator(object): parameters (time, time step, iteration number ...), see parmepy.problem.simulation.Simulation for details. """ - for red in self.requirements: - red.wait() + pass @debug def finalize(self): @@ -120,9 +117,6 @@ class DiscreteOperator(object): """ pass - def addRedistributeRequirement(self, redistribute): - self.requirements.append(redistribute) - def __str__(self): """Common printings for discrete operators.""" shortName = str(self.__class__).rpartition('.')[-1][0:-2] diff --git a/HySoP/hysop/operator/monitors/monitoring.py b/HySoP/hysop/operator/monitors/monitoring.py index ab6a943c5c6b1c596e4f1788c58b82aa1f454bd2..c8703fbb466a5a3635e5e3f45a7e8bd8af29b12f 100644 --- a/HySoP/hysop/operator/monitors/monitoring.py +++ b/HySoP/hysop/operator/monitors/monitoring.py @@ -13,14 +13,14 @@ class Monitoring(Operator): __metaclass__ = ABCMeta @abstractmethod - def __init__(self, variables, topo, frequency): + def __init__(self, variables, topo, frequency, task_id=None): """ Constructor @param variables : list of fields to monitor. @param topo : topo on which fields are to be monitored @param frequency : output rate. @param name : Monitor name. """ - Operator.__init__(self, variables, topo=topo) + Operator.__init__(self, variables, topo=topo, task_id=task_id) ## Monitor frequency self.freq = frequency ## Object to store computational times of lower level functions diff --git a/HySoP/hysop/operator/monitors/printer.py b/HySoP/hysop/operator/monitors/printer.py index 0549e6811abf68995acf42d1418af7d97c5ed98d..cf2ade524589d9e3ce5c15ac12f5a2cac387c577 100644 --- a/HySoP/hysop/operator/monitors/printer.py +++ b/HySoP/hysop/operator/monitors/printer.py @@ -25,7 +25,7 @@ class Printer(Monitoring): Performs outputs in VTK images. """ def __init__(self, variables, topo, frequency=0, - prefix=None, formattype=None): + prefix=None, formattype=None, task_id=None): """ Create a results printer for given fields, filename prefix (relative path) and an output frequency. @@ -35,7 +35,7 @@ class Printer(Monitoring): @param prefix : output file name prefix. @param ext : output file extension, default=.vtk. """ - Monitoring.__init__(self, variables, topo, frequency) + Monitoring.__init__(self, variables, topo, frequency, task_id=task_id) ## output file name prefix if prefix is None: self.prefix = './out_' @@ -71,7 +71,7 @@ class Printer(Monitoring): self._call_number = 0 ## shortcut to topo name self.topo = self._predefinedTopo - self._xmf = "" + self._xmf_data_files = [] # Create output dir if required if self.topo.rank == 0: @@ -81,6 +81,15 @@ class Printer(Monitoring): # Force synchro to be sure that all output dirs # have been created. self.topo.comm.barrier() + if self.formattype == VTK: + self._get_filename = lambda i: self.prefix + \ + "{0}_iter_{1:03d}".format(self.topo.rank, i) + elif self.formattype == HDF5: + self._get_filename = lambda i: self.prefix + \ + "{0}procs_iter_{1:03d}".format(self.topo.size, i) + '.h5' + elif self.formattype == DATA: + self._get_filename = lambda i: self.prefix + \ + "{0}_iter_{1:03d}.dat".format(self.topo.rank, i) def setUp(self): """ @@ -101,6 +110,27 @@ class Printer(Monitoring): def finalize(self): if self.freq == -1: self._printStep(self._call_number) + if self.formattype == HDF5 and self.topo.rank == 0: + # Write the xmf file driving all h5 files. + # Writing only one file + # We have a temporal list of Grid => a single xmf file + # Manual writing of the xmf file because "XDMF library very + # difficult to compile and to use" + # [Advanced HDF5 & XDMF - Groupe Calcul] + f = open(self.prefix + str(self.topo.size) + 'procs.xmf', 'w') + f.write("<?xml version=\"1.0\" ?>\n") + f.write("<!DOCTYPE Xdmf SYSTEM \"Xdmf.dtd\">\n") + f.write("<Xdmf Version=\"2.0\">\n") + f.write(" <Domain>\n") + f.write(" <Grid Name=\"CellTime\" GridType=\"Collection\" ") + f.write("CollectionType=\"Temporal\">\n") + for ds_names, i, t in self._xmf_data_files: + f.write(_TemporalGridXMF( + self.topo, ds_names, i, t, self._get_filename(i))) + f.write(" </Grid>\n") + f.write(" </Domain>\n") + f.write("</Xdmf>\n") + f.close() Monitoring.finalize(self) def _build_vtk_dict(self): @@ -154,9 +184,6 @@ class Printer(Monitoring): df.wait() except AttributeError: pass - # Set output file name - filename = self.prefix + str(self.topo.rank) - filename += '_' + "iter_{0:03d}".format(ite) ## VTK output \todo: Need fix in 2D, getting an IOError. if self.formattype == VTK: @@ -170,12 +197,12 @@ class Printer(Monitoring): ## for i in xrange(self.topo.mesh.dim)]) spacing = [0] * 3 spacing[:dim] = [self.topo.mesh.space_step[i] for i in xrange(dim)] - evtk.imageToVTK(filename, origin=orig, spacing=spacing, + evtk.imageToVTK(self._get_filename(ite), + origin=orig, spacing=spacing, pointData=self._build_vtk_dict()) elif self.formattype == HDF5: - filename = self.prefix + str(self.topo.size) - filename += "procs_iter_{0:03d}".format(ite) + '.h5' + filename = self._get_filename(ite) # Write the h5 file # (force np.float64, ParaView seems to not be able to read float32) # Writing compressed hdf5 files (gzip compression seems the best) @@ -214,33 +241,11 @@ class Printer(Monitoring): # Of the dataset (of site global resolution) ds[sl] = \ npw.realarray(df.data[d][self.topo.mesh.iCompute].T) + self._xmf_data_files.append((datasetNames, ite, t)) f.close() - if self.topo.rank == 0: - # Write the xmf file driving all h5 files. - # Writing only one file - # We have a temporal list of Grid => a single xmf file - # Manual writing of the xmf file because "XDMF library very - # difficult to compile and to use" - # [Advanced HDF5 & XDMF - Groupe Calcul] - self._xmf += _TemporalGridXMF( - self.topo, datasetNames, ite, t, filename) - f = open(self.prefix + str(self.topo.size) + 'procs.xmf', 'w') - f.write("<?xml version=\"1.0\" ?>\n") - f.write("<!DOCTYPE Xdmf SYSTEM \"Xdmf.dtd\">\n") - f.write("<Xdmf Version=\"2.0\">\n") - f.write(" <Domain>\n") - f.write(" <Grid Name=\"CellTime\" GridType=\"Collection\" ") - f.write("CollectionType=\"Temporal\">\n") - f.write(self._xmf) - f.write(" </Grid>\n") - f.write(" </Domain>\n") - f.write("</Xdmf>\n") - f.close() - elif self.formattype == DATA: - filename += '.dat' - f = open(filename, 'w') + f = open(self._get_filename(ite), 'w') shape = self.topo.mesh.resolution coords = self.topo.mesh.coords pbDimension = self.domain.dimension diff --git a/HySoP/hysop/operator/multiphase.py b/HySoP/hysop/operator/multiphase.py index d527e2c9c169ef99d7bf5d8910dac702cb5d9943..72fc91854ee19c718b3daf94e3e21bf268cd1e3c 100644 --- a/HySoP/hysop/operator/multiphase.py +++ b/HySoP/hysop/operator/multiphase.py @@ -16,8 +16,8 @@ class Baroclinic(Operator): @debug def __init__(self, velocity, vorticity, viscosity, density, - resolutions=None, topo=None, ghosts=None, method='', - **other_config): + resolutions=None, topo=None, ghosts=None, method=None, + task_id=None, **other_config): """ Constructor. Create a Pressure operator from given velocity variables. @@ -25,7 +25,8 @@ class Baroclinic(Operator): @param velocity ContinuousVectorField : velocity variable. """ Operator.__init__(self, [velocity, vorticity, density], method, - topo=topo, ghosts=ghosts, name="Pressure") + topo=topo, ghosts=ghosts, name="Pressure", + task_id=task_id) self.velocity = velocity self.vorticity = vorticity diff --git a/HySoP/hysop/operator/penalization.py b/HySoP/hysop/operator/penalization.py index d48d02d5d6d7d5013c6daca482d8ed95a519550f..6af9deeac58b8389c0dcdea039432d785e674061 100644 --- a/HySoP/hysop/operator/penalization.py +++ b/HySoP/hysop/operator/penalization.py @@ -24,7 +24,7 @@ class Penalization(Operator): @debug def __init__(self, variables, obstacles, coeff, resolutions=None, method=None, topo=None, - ghosts=None, **other_config): + ghosts=None, task_id=None, **other_config): """ Constructor. @param[in,out] variables : list of fields to be penalized @@ -33,7 +33,11 @@ class Penalization(Operator): @param[in] resolutions : list of resolutions (one per variable) """ - Operator.__init__(self, variables, method={}, topo=topo, ghosts=ghosts) + if method is None: + method = {} + Operator.__init__(self, variables, method=method, + topo=topo, ghosts=ghosts, + task_id=task_id) ## domain where penalization must be applied. ## A parmepy.domain.obstacle . diff --git a/HySoP/hysop/operator/poisson.py b/HySoP/hysop/operator/poisson.py index 4dfd696896e6a10bc1137b0b80e5d3bb9543e2d1..6d266d724885056052434e9fa9882b3245ecc804 100644 --- a/HySoP/hysop/operator/poisson.py +++ b/HySoP/hysop/operator/poisson.py @@ -10,7 +10,6 @@ from parmepy.operator.discrete.poisson_fft import PoissonFFT from parmepy.variables.variables import Variables from parmepy.constants import debug import numpy as np -from parmepy.mpi.main_var import main_size class Poisson(Operator): @@ -27,7 +26,7 @@ class Poisson(Operator): @debug def __init__(self, velocity, vorticity, resolutions, ghosts=None, - method=None, + method=None, topo=None, task_id=None, **other_config): """ Constructor for the Poisson problem. @@ -44,7 +43,8 @@ class Poisson(Operator): ## the same resolution. self.resolutions = resolutions - Operator.__init__(self, [velocity, vorticity], method, ghosts=ghosts) + Operator.__init__(self, [velocity, vorticity], method, ghosts=ghosts, + topo=topo, task_id=task_id) self.config = other_config if method is None: self.method = 'fftw' @@ -81,18 +81,29 @@ class Poisson(Operator): localres, localoffset = fftw2py.init_fftw_solver( self.resolution, self.domain.length) + if self._predefinedTopo is not None: + main_size = self._predefinedTopo.size + else: + from parmepy.mpi.main_var import main_size topodims = np.ones((self.domain.dimension)) topodims[-1] = main_size #variables discretization - for v in self.variables: - topo = self.domain.getOrCreateTopology(self.domain.dimension, - self.resolution, topodims, - precomputed=True, - offset=localoffset, - localres=localres, - ghosts=self.ghosts) - - self.discreteFields[v] = v.discretize(topo) + if self._predefinedTopo is not None: + topo = self._predefinedTopo + assert (topo.shape == topodims).all(), 'input topology is\ + not compliant with fftw.' + for v in self.variables: + self.discreteFields[v] = v.discretize(topo) + else: + for v in self.variables: + topo = self.domain.getOrCreateTopology( + self.domain.dimension, + self.resolution, topodims, + precomputed=True, + offset=localoffset, + localres=localres, + ghosts=self.ghosts) + self.discreteFields[v] = v.discretize(topo) @debug def setUp(self): diff --git a/HySoP/hysop/operator/redistribute.py b/HySoP/hysop/operator/redistribute.py index 52ed863cd7a32017fe235db216637e1b4ecfc968..ce306e8a9fc3b27052f9942dc16dd8d568bb4394 100644 --- a/HySoP/hysop/operator/redistribute.py +++ b/HySoP/hysop/operator/redistribute.py @@ -23,7 +23,6 @@ from parmepy import __VERBOSE__ from parmepy.constants import debug, PARMES_MPI_REAL, ORDERMPI, np, S_DIR from parmepy.operator.continuous import Operator from parmepy.mpi.topology import Bridge -from parmepy.mpi import main_rank, main_comm from parmepy.methods_keys import Support @@ -36,7 +35,8 @@ class Redistribute(Operator): """ @debug - def __init__(self, variables, opFrom, opTo, method=None, component=None): + def __init__(self, variables, opFrom, opTo, method=None, component=None, + task_id=None, name_suffix=''): """ Create an operator to distribute data between two mpi topologies for a @@ -49,25 +49,22 @@ class Redistribute(Operator): @param component: components of vector fields to consider (default: None, all components are taken). """ - - Operator.__init__(self, variables, method) + if not isinstance(variables, list): + variables = [variables] + vars_str = "_(" + for vv in variables: + vars_str += vv.name + "," + vars_str = vars_str[:-1] + ')' + if not component is None: + vars_str += S_DIR[component] + Operator.__init__(self, variables, method, task_id=task_id, + name_suffix=vars_str + name_suffix) ## Source Operator self.opFrom = opFrom ## Targeted operator self.opTo = opTo - # Then check if variables belong to both operators - # And check if variables have enought components. - for v in self.variables: - assert v in opFrom.variables and v in opTo.variables, \ - 'Redistribute error : one of the variable is not present\ - in both source and target operator.' - if component is not None: - assert component >= 0, 'component needs to be positive' - assert v.nbComponents > component, \ - 'Redistribute error : variable ' + str(v.name) + ' do not \ - have enough components (' + str(component) + ')' self.input = self.output = self.variables self.evts = [] self._toHost_fields = [] @@ -88,6 +85,11 @@ class Redistribute(Operator): self._r_types[v] = {} self._s_types[v] = {} + # Enable desynchronization: the opTo operator must call the wait + # function of this redistribute. This operator have to know self. + self.opTo.addRedistributeRequirement(self) + + def discretize(self): pass @@ -97,6 +99,17 @@ class Redistribute(Operator): Computes intersection of two topologies. """ + # Then check if variables belong to both operators + # And check if variables have enought components. + for v in self.variables: + assert v in self.opFrom.variables and v in self.opTo.variables, \ + 'Redistribute error : one of the variable is not present\ + in both source and target operator.' + if self.component is not None: + assert self.component >= 0, 'component needs to be positive' + assert v.nbComponents > self.component, \ + 'Redistribute error : variable ' + str(v.name) + ' do not \ + have enough components (' + str(self.component) + ')' assert self.opFrom.isUp() and self.opTo.isUp(), \ """You should setup both opFrom and opTo operators before any attempt to setup a redistribute operator.""" @@ -122,28 +135,28 @@ class Redistribute(Operator): opTo_is_device = False if not opFrom_is_device and not opTo_is_device: - # case: opFrom(host) --bridge--> opTo(host) - self.apply = self._bridges + # case: opFrom(host) --host--> opTo(host) + self.apply = self._host self.wait = self._wait_host else: # Have on device operators self.wait = self._wait_all if opFrom_is_device and not opTo_is_device: - # case: opFrom(GPU) --toHost--bridge--> opTo(host) - self.apply = self._apply_toHost_bridges + # case: opFrom(GPU) --toHost--host--> opTo(host) + self.apply = self._apply_toHost_host elif not opFrom_is_device and opTo_is_device: - # case: opFrom(host) --bridge--toDevice--> opTo(GPU) - self.apply = self._apply_bridges_toDevice + # case: opFrom(host) --host--toDevice--> opTo(GPU) + self.apply = self._apply_host_toDevice else: - # case: opFrom(GPU) --toHost--bridge--toDevice--> opTo(host) + # case: opFrom(GPU) --toHost--host--toDevice--> opTo(host) # Transfers are removed if variables are batched if np.any([self.opFrom.discreteFields[v].isBatch for v in self.variables] + [self.opTo.discreteFields[v].isBatch for v in self.variables]): - self.apply = self._bridges + self.apply = self._host else: - self.apply = self._apply_toHost_bridges_toDevice + self.apply = self._apply_toHost_host_toDevice # Build bridges and toTransfer lists self.bridges = {} @@ -168,9 +181,8 @@ class Redistribute(Operator): if opTo_is_device: self._toDevice_fields.append(self.opTo.discreteFields[v]) - # Enable desynchronization: the opTo operator must call the wait - # function of this redistribute. This operator have to know self. - self.opTo.addRedistributeRequirement(self) + self._main_comm = self.opFrom.discreteFields[v].topology.getParent() + self._main_rank = self._main_comm.Get_rank() # Flag telling if there will be some mpi data transfers. self._useless_transfer = {} @@ -213,27 +225,26 @@ class Redistribute(Operator): self._isUpToDate = True - def _apply_toHost_bridges_toDevice(self, simulation=None): - + def _apply_toHost_host_toDevice(self, simulation=None): if __VERBOSE__: - print "{"+str(main_rank)+"}", "_apply_toHost_bridges_toDevice" + print "{0} APPLY toHOST+HOST+toDEVICE".format(self._main_rank) self._toHost() self._wait_device() - self._bridges() + self._host() self._wait_host() self._toDevice() - def _apply_toHost_bridges(self, simulation=None): + def _apply_toHost_host(self, simulation=None): if __VERBOSE__: - print "{"+str(main_rank)+"}", "_apply_toHost_bridges" + print "{0} APPLY toHOST+HOST".format(self._main_rank) self._toHost() self._wait_device() - self._bridges() + self._host() - def _apply_bridges_toDevice(self, simulation=None): + def _apply_host_toDevice(self, simulation=None): if __VERBOSE__: - print "{"+str(main_rank)+"}", "_apply_bridges_toDevice" - self._bridges() + print "{0} APPLY HOST+toDEVICE".format(self._main_rank) + self._host() self._wait_host() self._toDevice() @@ -257,7 +268,7 @@ class Redistribute(Operator): if not self._useless_transfer[v]: dv.toDevice(self.component) - def _bridges(self, simulation=None): + def _host(self, simulation=None): """ Proceed with data redistribution from opFrom to opTo """ @@ -267,7 +278,7 @@ class Redistribute(Operator): # (use buffers for mpi send/recv? ) # - move MPI datatypes into the bridge? --> and free MPI type properly if __VERBOSE__: - print "{" + str(main_rank) + "}", "_apply_bridges" + print "{0} APPLY HOST".format(self._main_rank) self.r_request = {} self.s_request = {} for v in self.variables: @@ -275,7 +286,7 @@ class Redistribute(Operator): # Apply for each component considered for d in self._range_components(v): if __VERBOSE__: - print "{" + str(main_rank) + "}", "_apply_bridges", \ + print "{0} APPLY HOST".format(self._main_rank), \ self.opFrom.discreteFields[v].name, '->', \ self.opTo.discreteFields[v].name vTo = self.opTo.discreteFields[v].data[d] @@ -286,13 +297,13 @@ class Redistribute(Operator): for rk in br.recvFrom.keys(): self.r_request[v_name + str(rk)] = \ - main_comm.Irecv([vTo, 1, self._r_types[v][rk]], + self._main_comm.Irecv([vTo, 1, self._r_types[v][rk]], source=rk, tag=rk) self._hasRequests = True for rk in br.sendTo.keys(): self.s_request[v_name + str(rk)] = \ - main_comm.Issend([vFrom, 1, self._s_types[v][rk]], - dest=rk, tag=main_rank) + self._main_comm.Issend([vFrom, 1, self._s_types[v][rk]], + dest=rk, tag=self._main_rank) self._hasRequests = True def _wait_host(self): @@ -301,7 +312,7 @@ class Redistribute(Operator): of all communication requests. """ if __VERBOSE__: - print "{" + str(main_rank) + "}", "WAITING MPI", self._hasRequests + print "{0}", "WAIT MPI".format(self._main_rank), self._hasRequests if self._hasRequests: for rk in self.r_request.keys(): self.r_request[rk].Wait() @@ -311,7 +322,7 @@ class Redistribute(Operator): def _wait_device(self): if __VERBOSE__: - print "{" + str(main_rank) + "}", "WAITING OPENCL" + print "{0}".format(self._main_rank), "WAITING OPENCL" for dv in self._toDevice_fields + self._toHost_fields: dv.wait() diff --git a/HySoP/hysop/operator/redistribute_intercomm.py b/HySoP/hysop/operator/redistribute_intercomm.py index a4119ccd8bdc1d201b502a2db11890655feb0226..ebe3927f5d181900c1c3b94055fa9641830b9797 100644 --- a/HySoP/hysop/operator/redistribute_intercomm.py +++ b/HySoP/hysop/operator/redistribute_intercomm.py @@ -7,9 +7,11 @@ the destination. It relies on a Bridge_intercomm. """ -from parmepy.constants import debug, PARMES_MPI_REAL, ORDERMPI, S_DIR +from parmepy.constants import debug, PARMES_MPI_REAL, ORDERMPI, S_DIR, np +from parmepy import __VERBOSE__ from parmepy.operator.continuous import Operator from parmepy.mpi.topology import Bridge_intercomm +from parmepy.methods_keys import Support class RedistributeIntercomm(Operator): @@ -19,7 +21,7 @@ class RedistributeIntercomm(Operator): Transfers data from topology of id_from to the id_to. """ @debug - def __init__(self, variables, topo, id_from, id_to, + def __init__(self, variables, topo, op_from, op_to, proc_tasks, parent_comm, method=None, component=None): """ @@ -39,18 +41,29 @@ class RedistributeIntercomm(Operator): @remark : proc_tasks size and number of processus in parent_comm must be equal. """ - Operator.__init__(self, variables, method) + if not isinstance(variables, list): + variables = [variables] + vars_str = "_(" + for vv in variables: + vars_str += vv.name + "," + vars_str = vars_str[:-1] + ')' + if not component is None: + vars_str += S_DIR[component] + Operator.__init__(self, variables, method, name_suffix=vars_str) assert parent_comm.Get_size() == len(proc_tasks), \ "Parent communicator ({0})".format(parent_comm.Get_size()) + \ " and size of the task id array " + \ "({0}) are not equal".format(len(proc_tasks)) self.topo = topo - self.id_from = id_from - self.id_to = id_to + self.opFrom = op_from + self.opTo = op_to + self.id_from = self.opFrom.task_id + self.id_to = self.opTo.task_id self.parent_comm = parent_comm self._dim = topo.domain.dimension self.proc_tasks = proc_tasks self.input = self.output = self.variables + self.component = component if component is None: # All components are considered self._range_components = lambda v: range(v.nbComponents) @@ -70,6 +83,10 @@ class RedistributeIntercomm(Operator): for v in self.variables: self._r_types[v] = {} self._s_types[v] = {} + self._toHost_fields = [] + self._toDevice_fields = [] + for v in self.variables: + self.discreteFields[v] = v.discretize(self.topo) def discretize(self): pass @@ -82,11 +99,63 @@ class RedistributeIntercomm(Operator): assert self.topo.isUpToDate, \ """You should setup topology before any attempt to setup a redistribute operator.""" + + # Look for an operator opertating on device. + try: + opFrom_is_device = \ + self.opFrom.method[Support].find('gpu') >= 0 + except KeyError: # op.method is a dict not containing Support in keys + opFrom_is_device = False + except IndexError: # op.method is a sting + opFrom_is_device = False + except TypeError: # op.method is None + opFrom_is_device = False + try: + opTo_is_device = \ + self.opTo.method[Support].find('gpu') >= 0 + except KeyError: # op.method is a dict not containing Support in keys + opTo_is_device = False + except IndexError: # op.method is a sting + opTo_is_device = False + except TypeError: # op.method is None + opTo_is_device = False + + if not opFrom_is_device and not opTo_is_device: + # case: opFrom(host) --bridge--> opTo(host) + self._the_apply = self._apply_host + else: + # Have on device operators + if opFrom_is_device and not opTo_is_device: + # case: opFrom(GPU) --toHost--bridge--> opTo(host) + self._the_apply = self._apply_toHost_host + elif not opFrom_is_device and opTo_is_device: + # case: opFrom(host) --bridge--toDevice--> opTo(GPU) + self._the_apply = self._apply_host_toDevice + else: + # case: opFrom(GPU) --toHost--bridge--toDevice--> opTo(host) + # Transfers are removed if variables are batched + if np.any([self.opFrom.discreteFields[v].isBatch + for v in self.variables] + + [self.opTo.discreteFields[v].isBatch + for v in self.variables]): + self._the_apply = self._host + else: + self._the_apply = self._apply_toHost_host_toDevice + # Build bridges and toTransfer lists self.bridge = Bridge_intercomm(self.topo, self.parent_comm, self.id_from, self.id_to, self.proc_tasks) + for v in self.variables: + # toTransfer list completion + if self.proc_tasks[self._parent_rank] == self.id_from: + if opFrom_is_device: + self._toHost_fields.append(self.opFrom.discreteFields[v]) + if self.proc_tasks[self._parent_rank] == self.id_to: + if opTo_is_device: + self._toDevice_fields.append(self.opTo.discreteFields[v]) + for v in self.variables: dv = v.discreteFields[self.topo] transfers = self.bridge.transfers @@ -99,7 +168,7 @@ class RedistributeIntercomm(Operator): substart = tuple( [transfers[from_rk][i][0] for i in range(self._dim)]) self._r_types[v][from_rk] = \ - PARMES_MPI_REAL.Create_subarray(dv[0].shape, + PARMES_MPI_REAL.Create_subarray(dv.data[0].shape, subshape, substart, order=ORDERMPI) @@ -113,18 +182,69 @@ class RedistributeIntercomm(Operator): substart = tuple( [transfers[to_rk][i][0] for i in range(self._dim)]) self._r_types[v][to_rk] = \ - PARMES_MPI_REAL.Create_subarray(dv[0].shape, + PARMES_MPI_REAL.Create_subarray(dv.data[0].shape, subshape, substart, order=ORDERMPI) self._r_types[v][to_rk].Commit() + self._isUpToDate = True + @debug def apply(self, simulation=None): """ - Proceed with data redistribution from opFrom to opTo + Apply this operator to its variables. + @param simulation : object that describes the simulation + parameters (time, time step, iteration number ...), see + parmepy.problem.simulation.Simulation for details. """ + for req in self.requirements: + req.wait() + self._the_apply(simulation) + + def _apply_toHost_host_toDevice(self, simulation=None): + if __VERBOSE__: + print "{0} APPLY toHOST+HOST+toDEVICE".format(self._parent_rank) + if self.proc_tasks[self._parent_rank] == self.id_from: + self._toHost() + self._wait_device() + self._host() + self._wait_host() + if self.proc_tasks[self._parent_rank] == self.id_to: + self._toDevice() + self._wait_device() + + def _apply_toHost_host(self, simulation=None): + + if __VERBOSE__: + print "{0} APPLY toHOST+HOST".format(self._parent_rank) + if self.proc_tasks[self._parent_rank] == self.id_from: + self._toHost() + self._wait_device() + self._host() + self._wait_host() + + def _apply_host_toDevice(self, simulation=None): + if __VERBOSE__: + print "{0} APPLY HOST+toDEVICE".format(self._parent_rank) + self._host() + self._wait_host() + self.parent_comm.Barrier() + if self.proc_tasks[self._parent_rank] == self.id_to: + self._toDevice() + self._wait_device() self.parent_comm.Barrier() + def _apply_host(self, simulation=None): + if __VERBOSE__: + print "{0} APPLY HOST".format(self._parent_rank) + self._host() + self._wait_host() + + def _host(self, simulation=None): + """ + Proceed with data redistribution from opFrom to opTo + """ + self.parent_comm.Barrier() self.r_request = {} self.s_request = {} for v in self.variables: @@ -137,18 +257,48 @@ class RedistributeIntercomm(Operator): for from_rk in transfers.keys(): self.r_request[v_name + str(from_rk)] = \ self.bridge.inter_comm.Irecv( - [dv[d], 1, self._r_types[v][from_rk]], + [dv.data[d], 1, self._r_types[v][from_rk]], source=from_rk, tag=from_rk) # Set Sending if self.proc_tasks[self._parent_rank] == self.id_from: for to_rk in transfers.keys(): self.s_request[v_name + str(to_rk)] = \ self.bridge.inter_comm.Issend( - [dv[d], 1, self._r_types[v][to_rk]], + [dv.data[d], 1, self._r_types[v][to_rk]], dest=to_rk, tag=self._my_rank) - def wait(self, simulation=None): + def _toHost(self): + """ + Proceed with data transfer of variables from device to host + """ + if __VERBOSE__: + print "{0} APPLY toHOST".format(self._parent_rank) + for v in self.variables: + dv = self.opFrom.discreteFields[v] + if dv in self._toHost_fields: + dv.toHost(self.component) + + def _toDevice(self): + """ + Proceed with data transfer of variables from device to host + """ + if __VERBOSE__: + print "{0} APPLY toDEVICE".format(self._parent_rank) + for v in self.variables: + dv = self.opTo.discreteFields[v] + if dv in self._toDevice_fields: + dv.toDevice(self.component) + + def _wait_device(self): + if __VERBOSE__: + print "{0} WAIT OPENCL".format(self._parent_rank) + for dv in self._toDevice_fields + self._toHost_fields: + dv.wait() + + def _wait_host(self, simulation=None): """Wait for requests completion.""" + if __VERBOSE__: + print "{0} WAIT MPI".format(self._parent_rank) for rk in self.r_request: self.r_request[rk].Wait() for rk in self.s_request: @@ -186,3 +336,6 @@ class RedistributeIntercomm(Operator): if not res: return res return res + + def finalize(self): + pass diff --git a/HySoP/hysop/operator/stretching.py b/HySoP/hysop/operator/stretching.py index cffb0527f98762e7078ad4e6e531d50d794c97d2..374ac299cf3cd01cf687ce02eb5747343eac024b 100755 --- a/HySoP/hysop/operator/stretching.py +++ b/HySoP/hysop/operator/stretching.py @@ -22,7 +22,7 @@ class Stretching(Operator): @debug def __init__(self, velocity, vorticity, resolutions, - method=None, topo=None, ghosts=None): + method=None, topo=None, ghosts=None, task_id=None): """ Create a Stretching operator from given velocity and vorticity variables. @@ -39,7 +39,7 @@ class Stretching(Operator): Autom. computed if not set. """ Operator.__init__(self, [velocity, vorticity], method, topo=topo, - ghosts=ghosts) + ghosts=ghosts, task_id=task_id) ## velocity variable (vector) self.velocity = velocity ## vorticity variable (vector) @@ -116,7 +116,3 @@ class Stretching(Operator): self.discreteOperator.setUp() self._isUpToDate = True - - def apply(self, simulation=None): - # computation ... - self.discreteOperator.apply(simulation) diff --git a/HySoP/hysop/operator/velocity_correction.py b/HySoP/hysop/operator/velocity_correction.py index 0eb9c56119dbc6d2316d1f35d76cba1012b58a19..45bbada897349c258756c79811b6fa553d6a4117 100755 --- a/HySoP/hysop/operator/velocity_correction.py +++ b/HySoP/hysop/operator/velocity_correction.py @@ -13,25 +13,26 @@ import numpy as np class VelocityCorrection(Operator): """ - The velocity field is corrected after solving the - Poisson equation. For more details about calculations, - see the "velocity_correction.pdf" explanations document + The velocity field is corrected after solving the + Poisson equation. For more details about calculations, + see the "velocity_correction.pdf" explanations document in ParmeDoc directory. """ @debug def __init__(self, velocity, vorticity, resolutions, - topo=None, **other_config): + topo=None, task_id=None, **other_config): """ - Corrects the values of the velocity field after - solving Poisson equation in order to prescribe proper + Corrects the values of the velocity field after + solving Poisson equation in order to prescribe proper mean flow and ensure the desired inlet flowrate. @param velocity field @param resolutions : grid resolution of velocity @param topo : a predefined topology to discretize velocity """ - Operator.__init__(self, [velocity, vorticity], topo=topo) + Operator.__init__(self, [velocity, vorticity], topo=topo, + task_id=task_id) self.config = other_config ## velocity variable (vector) self.velocity = velocity @@ -64,8 +65,3 @@ class VelocityCorrection(Operator): self.discreteOperator.setUp() self._isUpToDate = True - - def apply(self, simulation=None): - # computation ... - self.discreteOperator.apply(simulation) - diff --git a/HySoP/hysop/problem/navier_stokes.py b/HySoP/hysop/problem/navier_stokes.py index f40c5284e0db709e8a9b6ddd61a56c01c730cdfe..4aa5ca521b6b5ac2990ec031df8e533025690885 100644 --- a/HySoP/hysop/problem/navier_stokes.py +++ b/HySoP/hysop/problem/navier_stokes.py @@ -14,7 +14,7 @@ class NSProblem(Problem): """ Navier Stokes problem description. """ - def __init__(self, operators, simulation, monitors=[], + def __init__(self, operators, simulation, monitors=None, dumpFreq=100, name=None): Problem.__init__(self, operators, simulation, diff --git a/HySoP/hysop/problem/problem.py b/HySoP/hysop/problem/problem.py index 680c912c3f3ab29f6803aafbdefb05c2defe698c..46c05d89d3ff7f86968b6dec331de8b15111be5e 100644 --- a/HySoP/hysop/problem/problem.py +++ b/HySoP/hysop/problem/problem.py @@ -28,7 +28,7 @@ class Problem(object): return object.__new__(cls, *args, **kw) @debug - def __init__(self, operators, simulation, monitors=[], + def __init__(self, operators, simulation, monitors=None, dumpFreq=100, name=None): """ Create a transport problem instance. @@ -45,6 +45,8 @@ class Problem(object): self.name = name ## Problem operators self.operators = operators + if monitors is None: + monitors = [] for m in monitors: self.operators.append(m) ## Computes time step and manage iterations @@ -182,7 +184,8 @@ class Problem(object): for op in self.operators: if __VERBOSE__: print main_rank, op.__class__.__name__ - op.apply(self.simulation) + + op.apply(self.simulation) testdump = \ self.simulation.currentIteration % self.dumpFreq is 0 diff --git a/HySoP/hysop/problem/problem_tasks.py b/HySoP/hysop/problem/problem_tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..ba67f14845c00d0a6175e1d8459d91b05b7841d7 --- /dev/null +++ b/HySoP/hysop/problem/problem_tasks.py @@ -0,0 +1,197 @@ +""" +@file problem_tasks.py + +Extending problem description to handle tasks parallelism. +Each operator owns a task id that define a process group that are sharing the +same tasks. +""" +from parmepy.constants import debug +from parmepy import __VERBOSE__ +from parmepy.problem.problem import Problem +from parmepy.operator.monitors.monitoring import Monitoring +from parmepy.operator.redistribute import Redistribute +from parmepy.operator.redistribute_intercomm import RedistributeIntercomm +from parmepy.tools.timers import timed_function + + +class ProblemTasks(Problem): + """ + As in Problem, it contains several operators and monitors that apply + on variables. The operators are labeled by task_id that defines + a identifier of a task. + Tasks are subset of operators and are assigned to a subset of the MPI + process by means of the task_list parameter. + """ + @debug + def __new__(cls, *args, **kw): + return object.__new__(cls, *args, **kw) + + @debug + def __init__(self, operators, simulation, tasks_list, monitors=None, + dumpFreq=100, name=None, main_comm=None): + """ + Creates the problem. + @param operators : list of operators. + @param simulation : a parmepy.simulation.Simulation object + to describe simulation parameters. + @param tasks_list : list of task identifiers for each process rank + @param monitors : list of monitors. + @param name : an id for the problem + @param dumpFreq : frequency of dump (i.e. saving to a file) + for the problem; set dumpFreq = -1 for no dumps. Default = 100. + @param main_comm : MPI communicator that contains all process + involved in this problem. + + @remark : process number in communicator main_comm must equal the + length of tasks_list. + """ + Problem.__init__(self, operators, simulation, monitors=monitors, + dumpFreq=dumpFreq, name=name) + self.tasks_list = tasks_list + if main_comm is None: + from parmepy.mpi.main_var import main_comm + self.main_comm = main_comm + self._main_rank = self.main_comm.Get_rank() + assert self.main_comm.Get_size() == len(self.tasks_list), \ + "The given task list length (" + str(self.tasks_list) + ") " \ + "does not match the communicator size" \ + " ({0})".format(self.main_comm.Get_size()) + + def pre_setUp(self): + """ + - Removes operators and monitors that not have the same task identifier + as the current process + - Keep the Redistribute_intercomm in both 'from' and 'to' task_id + - Partial setup : only for 'computational' operators + (i.e. excluding monitors, rendering, data distribution ...) + - Initialize variables. + """ + if self._isReady: + pass + + self.operators_backup = [] + ## Remove operators with a tasks not handled by this process. + for op in self.operators[::-1]: + self.operators_backup.append(op) + if not isinstance(op, RedistributeIntercomm): + if op.task_id != self.tasks_list[self._main_rank]: + self.operators.remove(op) + else: + if op.id_from != self.tasks_list[self._main_rank] and \ + op.id_to != self.tasks_list[self._main_rank]: + self.operators.remove(op) + + for op in self.operators: + if not isinstance(op, Monitoring): + if not isinstance(op, Redistribute): + if not isinstance(op, RedistributeIntercomm): + op.discretize() + + for op in self.operators: + if not isinstance(op, Monitoring): + if not isinstance(op, Redistribute): + if not isinstance(op, RedistributeIntercomm): + op.setUp() + + # Build variables list to initialize + # These are operators input variables that are not output of + # previous operators in the operator stack. + # Set the variables input topology as the the topology of the fist + # operator that uses this variable as input. + self.input = [] + for op in self.operators[::-1]: + for v in op.output: + if v in self.input: + self.input.remove(v) + for v in op.input: + if not v in self.input: + self.input.append(v) + if not isinstance(op, Monitoring): + if not isinstance(op, Redistribute): + if not isinstance(op, RedistributeIntercomm): + v.setTopoInit(op.discreteFields[v].topology) + + self._isReady = True + + @debug + @timed_function + def setUp(self): + """ + Prepare operators (create topologies, allocate memories ...) + """ + # Set up for 'computational' operators + if not self._isReady: + self.pre_setUp() + + for v in self.input: + v.initialize() + + # other operators + for op in self.operators: + if isinstance(op, Monitoring): + op.setUp() + + for op in self.operators: + if isinstance(op, RedistributeIntercomm): + op.setUp() + + for op in self.operators: + if isinstance(op, Redistribute): + op.setUp() + + if __VERBOSE__ and self._main_rank == 0: + print "====" + + @debug + @timed_function + def solve(self): + """ + Solve problem. + + Performs simulations iterations by calling each + operators of the list until timer ends.\n + At end of time step, call an io step.\n + Displays timings at simulation end. + """ + self.main_comm.Barrier() + if self._main_rank == 0: + print "\n\n Start solving ..." + while not self.simulation.isOver: + self.simulation.printState() + for op in self.operators: + op.apply(self.simulation) + testdump = \ + self.simulation.currentIteration % self.dumpFreq is 0 + self.simulation.advance() + if self._doDump and testdump: + self.dump() + + @debug + def finalize(self): + """ + Finalize method + """ + if self._main_rank == 0: + print "\n\n==== End ====" + ## We terminate monitors before operators. + for op in self.operators: + if isinstance(op, Monitoring): + op.finalize() + self.timer = self.timer + op.timer + for op in self.operators: + if not isinstance(op, Monitoring): + op.finalize() + self.timer = self.timer + op.timer + var = [] + for op in self.operators: + for v in op.variables: + if not v in var: + var.append(v) + for v in var: + v.finalize() + try: + self.timer = self.timer + v.timer + except AttributeError: + pass + if self._main_rank == 0: + print "===\n" diff --git a/HySoP/hysop/problem/transport.py b/HySoP/hysop/problem/transport.py index 621d9a16baae5a33d9c3fcb00dea9a571755050c..b8ae53beb2c0a62dc0aab59cf32b8f3dabfd8925 100644 --- a/HySoP/hysop/problem/transport.py +++ b/HySoP/hysop/problem/transport.py @@ -10,7 +10,7 @@ class TransportProblem(Problem): """ Transport problem description. """ - def __init__(self, operators, simulation, monitors=[], + def __init__(self, operators, simulation, monitors=None, dumpFreq=100, name=None): Problem.__init__(self, operators, simulation, monitors, dumpFreq, name="Transport Problem") diff --git a/HySoP/hysop/tools/problem2dot.py b/HySoP/hysop/tools/problem2dot.py index ac966fe1573defc87652552d20206f95407a1ed4..93dab83e5cf2b1629cb40167b44deffccaf786c3 100644 --- a/HySoP/hysop/tools/problem2dot.py +++ b/HySoP/hysop/tools/problem2dot.py @@ -3,10 +3,11 @@ Converts a problem instance to a graph throw dot syntax. """ +from parmepy.operator.advection import Advection from parmepy.operator.monitors.monitoring import Monitoring from parmepy.operator.redistribute import Redistribute -from parmepy.methods_keys import Support -from parmepy.constants import S_DIR +from parmepy.operator.redistribute_intercomm import RedistributeIntercomm +from parmepy.mpi.main_var import main_rank import pydot colors = [ "#dc322f", @@ -20,144 +21,110 @@ colors = [ "#ffffff"] -def _operators2node(op_list, ltopos): - """Create nodes from operators list""" - nodes = {} - - for op_id, op in enumerate(op_list): - label = "op"+str(op_id) - if not isinstance(op, Redistribute): - label += "_" + op.__class__.__name__ - else: - if op.component is None: - for v in op.variables: - label += "_R_" + v.name - else: - for v in op.variables: - label += "_R_" + v.name + S_DIR[op.component] - if isinstance(op, Redistribute): - shape = 'octagon' - elif isinstance(op, Monitoring): - shape = 'ellipse' - else: - shape = 'box' - - if not isinstance(op, Redistribute): - c_id = ltopos.index(op.discreteFields[op.variables[0]].topology) - try: - if op.method[Support].find('gpu') >= 0: - c_id = len(ltopos) - c_id += ltopos.index( - op.discreteFields[op.variables[0]].topology) - except: - pass - else: - c_id = -1 - nodes[op] = pydot.Node(op_id, label=label, fillcolor=colors[c_id], - shape=shape, - fontname='times-bold', style='filled') - return nodes - - -def defaultEdges(op, op_id, op_list, op_nodes, black_list=[]): - """Create edges to given operator from the operator list""" - edges = [] - for v in op.input: - if not v in black_list: - v_from = None - i_from = op_id - 1 - while(v_from is None): - if not isinstance(op_list[i_from], Redistribute): - if v in op_list[i_from].output: - v_from = op_list[i_from] - i_from -= 1 - color = 'black' - if i_from < 0: - color = colors[0] - if not isinstance(op, Redistribute): - edges.append(pydot.Edge(op_nodes[v_from], op_nodes[op], - label=v.name, color=color)) - else: - edges.append(pydot.Edge(op_nodes[v_from], op_nodes[op], - color=color)) - return edges +def get_shape(op): + if isinstance(op, Redistribute) or isinstance(op, RedistributeIntercomm): + return 'octagon' + elif isinstance(op, Monitoring): + return 'ellipse' + else: + return 'box' def toDot(pb, filename='graph.pdf'): - """Create a dot grapg from problem operator list""" - graph = pydot.Dot(pb.__class__.__name__, graph_type='digraph') - - # Build a topology set (for colors) - topos = [] - topos_gpu = [] - #nt = 0 - for op in pb.operators: - if not isinstance(op, Redistribute): + if main_rank == 0: + all_ops = [] + all_vars = [] + tasks = [] + for op in pb.operators: + if isinstance(op, Advection) and not op.advecDir is None: + for ad_op in op.advecDir: + all_ops.append(ad_op) + tasks.append(ad_op.task_id) + else: + all_ops.append(op) + tasks.append(op.task_id) for v in op.variables: - t = op.discreteFields[v].topology - try: - if op.method[Support].find('gpu') >= 0: - if not t in topos_gpu: - topos_gpu.append(op.discreteFields[v].topology) - else: - if not t in topos: - topos.append(op.discreteFields[v].topology) - except: - if not t in topos: - topos.append(op.discreteFields[v].topology) - - # Build nodes from operators - op_nodes = _operators2node(pb.operators, topos) - for n in op_nodes.values(): - graph.add_node(n) + all_vars.append(v) + all_vars = list(set(all_vars)) + tasks = list(set(tasks)) - # Build edges - edges = [] - for i in xrange(len(pb.operators) - 1, -1, -1): - op = pb.operators[i] - if not isinstance(op, Monitoring): - req_vars = [] - # Create edges for redistribute requirements - for req in op.getRedistributeRequirement(): - for v in req.output: - color = 'black' - if pb.operators.index(req) > pb.operators.index(op): - color = 'red' - edges.append(pydot.Edge(op_nodes[req], op_nodes[op], - color=color)) - req_vars.append(v) - # Create edges for other variables - ed = defaultEdges(op, i, pb.operators, op_nodes, - black_list=req_vars) - edges += ed - for i in xrange(len(pb.operators) - 1, -1, -1): - op = pb.operators[i] - if isinstance(op, Monitoring): - ed = defaultEdges(op, i, pb.operators, op_nodes, - black_list=req_vars) - edges += ed - for e in edges: - graph.add_edge(e) + all_edges = {} + for op_id, op in enumerate(all_ops): + for v in op.input: + out = None + for req in op.requirements: + if v in req.output: + out = req + i = op_id-1 + while(out is None): + if v in all_ops[i].output: + if isinstance(all_ops[i], RedistributeIntercomm): + if op == all_ops[i].opTo: + out = all_ops[i] + else: + if not isinstance(all_ops[i], Redistribute): + out = all_ops[i] + i = i-1 + if (out, op) in all_edges.keys(): + all_edges[(out, op)].append(v.name) + else: + all_edges[(out, op)] = [v.name] - # Add key - subg = pydot.Subgraph('Key', rank='same') - subg.add_node(pydot.Node('Computational', shape='box')) - subg.add_node(pydot.Node('Monitoring', shape='ellipse')) - subg.add_node(pydot.Node('Redistribute', shape='octagon')) - a = pydot.Node('a', shape='point') - b = pydot.Node('b', shape='point') - e = pydot.Edge(a, b, color=colors[0], label='New iteration') - subg.add_node(a) - subg.add_node(b) - subg.add_edge(e) - for t_id, t in enumerate(topos): - subg.add_node(pydot.Node('Topo'+str(t_id), shape='plaintext', - fontcolor=colors[t_id])) - for t_id, t in enumerate(topos_gpu): - subg.add_node(pydot.Node('Topo'+str(topos.index(t))+'_GPU', - shape='plaintext', - fontcolor=colors[len(topos)+topos.index(t)])) - graph.add_subgraph(subg) - - # Write graph in the format specified by filename extension - graph.write(filename, format=filename.split('.')[-1]) + graph = pydot.Dot(pb.__class__.__name__, graph_type='digraph') + sub_graphs = {} + nodes = {} + edges = {} + from_start = {} + to_end = {} + # Start iteration node + G_start = pydot.Node(-1, label="START", shape='none') + graph.add_node(G_start) + # End iteration node + G_end = pydot.Node(-2, label="END", shape='none') + graph.add_node(G_end) + if len(tasks) > 1: + for t in tasks: + if t is None: + c = 'white' + else: + c = colors[tasks.index(t)] + sub_graphs[t] = pydot.Subgraph('cluster_' + str(t), + label='', + color=c) + else: + sub_graphs[tasks[0]] = graph + for op_id, op in enumerate(all_ops): + label = 'Op' + str(op_id) + '_' + op.name + nodes[op] = pydot.Node(op_id, label=label, + shape=get_shape(op)) + sub_graphs[op.task_id].add_node(nodes[op]) + for e in all_edges.keys(): + if all_ops.index(e[0]) < all_ops.index(e[1]): + edges[e] = pydot.Edge(nodes[e[0]], nodes[e[1]], + label='_'.join(list(set(all_edges[e]))), + color='black') + graph.add_edge(edges[e]) + else: + if (e[0], G_end) in to_end.keys(): + to_end[(e[0], G_end)] += all_edges[e] + else: + to_end[(e[0], G_end)] = all_edges[e] + if (G_start, e[1]) in from_start.keys(): + from_start[(G_start, e[1])] += all_edges[e] + else: + from_start[(G_start, e[1])] = all_edges[e] + for e in to_end.keys(): + edges[e] = pydot.Edge(nodes[e[0]], e[1], + label='_'.join(list(set(to_end[e]))), + color='black') + graph.add_edge(edges[e]) + for e in from_start.keys(): + edges[e] = pydot.Edge(e[0], nodes[e[1]], + label='_'.join(list(set(from_start[e]))), + color='black') + graph.add_edge(edges[e]) + if len(tasks) > 1: + for t in tasks: + graph.add_subgraph(sub_graphs[t]) + graph.write(filename + '.dot', format='dot') + graph.write(filename, format=filename.split('.')[-1])