Skip to content
Snippets Groups Projects
Commit 194514b7 authored by Eric Dalissier's avatar Eric Dalissier
Browse files

dp/rho and drho/rho in multiphase

and the begin of the cpu data transfert test
parent 673c4ca0
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
"""
@package parmepy.operatorPressure
MultiPhase Rot Grad P
"""
from .continuous import ContinuousOperator
from .stretching_d import Stretching_d
class Pressure(ContinuousOperator):
"""
Pressure operator representation
"""
def __init__(self, velocity, vorticity, viscosity, density):
"""
Constructor.
Create a Pressure operator from given velocity variables.
@param velocity ContinuousVectorField : velocity variable.
"""
ContinuousOperator.__init__(self)
## velocity variable (vector)
self.velocity = velocity
self.vorticity = vorticity
self.density = density
self.viscosity = viscosity
self.addVariable([velocity, vorticity, density])
def discretize(self, topology=None, idVelocityD=0, idVorticityD=0, idDensityD=0):
"""
Pressure operator discretization method.
Create a discrete Pressure operator from given specifications.
@param idVelocityD : Index of velocity discretisation to use.
@param idVelocityD : Index of vorticity discretisation to update.
@param idDensityD : Index of density discretisation to use.
@param propertyOp : choice of the property garantied by the stretching Op ( divConservation or massConservation)
@param method : the method to use. (Euler, RK, ..) default : RK3
"""
if self.discreteOperator is None:
self.discreteOperator = Pressure_d(self, topology, idVelocityD, idVorticityD, idDensityD)
def __str__(self):
"""ToString method"""
s = " Pressure operator (ContinuousOperator)"
if self.discreteOperator is not None:
s += "with the following discretization:\n"
s += str(self.discreteOperator)
else:
s += ". Not discretised"
return s + "\n"
if __name__ == "__main__":
print __doc__
print "- Provided class : Pressure"
print Stretching.__doc__
# -*- coding: utf-8 -*-
"""
@package operator
Discrete MultiPhase Rot Grad P
"""
from ..constants import *
from .discrete import DiscreteOperator
from .differentialOperator import DifferentialOperator
from .differentialOperator_d import DifferentialOperator_d
import numpy as np
import numpy.testing as npt
import time
import sys
class Pressure_d(DiscreteOperator):
"""
Operator representation.
"""
def __init__(self,pressureOp , topology=None, IdVelocityD=0, IdVorticityD=0, IdDensityD=0 ):
"""
Constructor.
Create the old velocity field to compute the -Dp/rho
in N.S equation
@param operator.
"""
DiscreteOperator.__init__(self)
## Topology of the obstacle
if(topology is not None):
self.topology = topology
else:
raise NotImplementedError()
self.velocity_old = np.copy(pressureOp.velocity.discreteField[IdVelocityD].data)
self.velocity_new = pressureOp.velocity.discreteField[IdVelocityD]
self.vorticity = pressureOp.vorticity.discreteField[IdVorticityD]
self.density = pressureOp.density.discreteField[IdDensityD]
self.resolution = self.topology.mesh.resolution
self.meshSize = self.topology.mesh.size
self.compute_time = 0.
def apply(self, t, dt):
"""
Apply operator.
"""
self.compute_time = time.time()
self.velocity_new = pressureOp.velocity.discreteField[IdVelocity]
result = (self.velocity_new - self.velocity_old )/ dt
gradU = DifferentialOperator_d(self.velocity_new, self.velocity_new, choice='gradV', topology=self.topology).apply()
result = result + np.dot(self.velocity_new, gradU)
## result = - gradP/rho
result = result - self.viscosity * DifferentialOperator_d(self.velocity_new, self.velocity_new, choice='laplacianV', topology=self.topology).apply()
## result = -(gradP/rho) x (gradRho/rho)
result = np.cross(result, DifferentialOperator_d(self.density, self.density, choice='gradS', topology=self.topology).apply()/self.density)
## vorti(n+1) = vorti(n) + dt * result
self.vorticity.data = self.vorticity.data + dt * result
self.velocity_old = np.copy(self.velocity_new.data)
self.compute_time = time.time() - self.compute_time
return self.compute_time
# return result
def printComputeTime(self):
self.timings_info[0] = "\"Pressure calculation total\""
# self.timings_info[1] = str(self.total_time)
# print "Multiphase total time : ", self.total_time
print "Time of the last Dp/rho calculation loop :", self.compute_time
def __str__(self):
s = "Pressure_d (DiscreteOperator). " + DiscreteOperator.__str__(self)
return s
if __name__ == "__main__":
print __doc__
print "- Provided class : Pressure_d"
print Pressure_d.__doc__
# -*- coding: utf-8 -*-
import time
import parmepy as pp
import numpy as np
from parmepy.tools.cpu_data_transfer import Synchronize
import mpi4py.MPI as MPI
from math import *
import unittest
class test_Cpu_data_transfert(unittest.TestCase):
"""
DiscreteVariable test class
"""
def vitesse(self,x, y, z):
vx = x
vy = y
vz = z
return vx, vy, vz
"""
DiscreteVariable test class
"""
def setUp(self):
nb = 32.
# Parameters
self.finalTime = 0.09
self.t = 0.
self.e = 0.002 # Accepted error between result and analytical result
self.dim = 3
self.boxLength = [1., 1., 1.]
self.boxMin = [ 0., 0., 0.]
self.nbPts = [nb, nb, nb]
self.timeStep = 0.02
self.comm = MPI.COMM_WORLD
## Domain
self.box = pp.Box(dimension=self.dim,
length=self.boxLength,
origin=self.boxMin)
def testOperatorSynchronize(self):
t0 = time.time()
## Fields
velo = pp.AnalyticalField(domain=self.box, formula=self.vitesse, name='Velocity', vector=True)
## Solver creation (discretisation of objects is done in solver initialisation)
topo3D = pp.CartesianTopology(domain=self.box, resolution=self.nbPts, dim=self.dim, periods=[True,True,True] ,ghosts=[2.,2.,2.])
OpSynchronize = Synchronize(topo3D)
## Discretization
velo.discretize(topo3D)
t1 = time.time()
print 'velo :', velo.discreteField[0].data[0][:,0,0],velo.discreteField[0].data[0][:,1,1]
OpSynchronize.apply(velo.discreteField[0])
tf = time.time()
print 'velo :', velo.discreteField[0].data[0][:,0,0],velo.discreteField[0].data[0][:,1,1]
print "\n"
print "Total time : ", tf - t0, "sec (CPU)"
print "Init time : ", t1 - t0, "sec (CPU)"
print "Solving time : ", tf - t1, "sec (CPU)"
def runTest(self):
self.testOperatorSynchronize()
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_Cpu_data_transfert))
return suite
if __name__ == "__main__":
unittest.TextTestRunner(verbosity=2).run(suite())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment