Skip to content
Snippets Groups Projects
Commit 75d7f4f8 authored by Jean-Baptiste Keck's avatar Jean-Baptiste Keck
Browse files

fft interfaces

parent 17379d1a
No related branches found
No related tags found
No related merge requests found
"""
Base interface for fast Fourier Transforms.
:class:`~hysop.numerics.fft.FFTPlanI`
:class:`~hysop.numerics.fft.FFTI`
"""
from abc import ABCMeta, abstractmethod
import numpy as np
from hysop.tools.numerics import float_to_complex_dtype, complex_to_float_dtype
class FFTPlanI(object):
"""
Common inteface for FFT plans.
Basically just a functor that holds relevant data
to execute a preconfigurarted FFT-like tranform.
"""
__metaclass__ = ABCMeta
@abstractmethod
def execute(self):
"""
Execute the FFT plan on current input and output array.
"""
pass
@abstractmethod
def __call__(self, a=None, out=None):
"""
Apply the FFT plan to possibly new input or output arrays.
"""
pass
class FFTI(object):
"""
Interface to compute local to process FFT-like transforms.
Common inteface for all array backends, based on the numpy.fft interface.
Standard FFTs: complex to complex (C2C)
fft() Compute the 1-dimensional discrete Fourier Transform.
ifft() Compute the 1-dimensional inverse discrete Fourier Transform.
fftn() Compute the N-dimensional discrete Fourier Transform.
ifftn() Compute the N-dimensional inverse discrete Fourier Transform.
Real data FFTS: real to complex (R2C) and complex to real (C2R)
rfft() Compute the 1-dimensional discrete Fourier Transform for real input.
irfft() Compute the inverse of the 1-dimensional FFT of real input.
rfftn() Compute the N-dimensional discrete Fourier Transform for real input.
irfftn() Compute the inverse of the N-dimensional FFT of real input.
Real FFTS: real to real (R2R)
Not implemented yet.
The default normalization has the direct transforms unscaled and the inverse transforms are scaled by 1/N.
By default, both simple and double precision are supported.
"""
__metaclass__ = ABCMeta
def __init__(self):
"""Initializes the interface and supported real and complex types."""
self.supported_ftypes = (np.float32, np.float64)
self.supported_ctypes = (np.complex64, np.complex128)
@abstractmethod
def fft(self, a, out, axis=-1, build_plan=False, **kwds):
"""
Compute the unscaled one-dimensional complex to complex discrete Fourier Transform.
Parameters
----------
a: array_like of np.complex64 or np.complex128
Complex input array.
out: array_like of np.complex64 or np.complex128
Complex output array of the same shape and dtype as the input.
axis: int, optional
Axis over witch to compute the FFT.
Defaults to last axis.
build_plan: bool, optional
Build and return a FFT plan instead of applying the transform.
Returns
-------
(shape, dtype) of the output array determined from the input array.
Notes
-----
N = a.shape[axis]
out[0] will contain the sum of the signal (zero-frequency term, always real for real inputs).
If N is even:
out[1:N/2] contains the positive frequency terms.
out[N/2] contains the Nyquist frequency (always real for real inputs).
out[N/2+1:] contains the negative frequency terms.
Else if N is odd:
out[1:(N-1)/2] contains the positive frequency terms.
out[(N-1)/2:] contains the negative frequency terms.
"""
assert a.dtype in self.supported_ctypes, a.dtype
if (out is not None):
assert a.dtype == out.dtype
assert np.arrayequal(a.shape, out.shape)
return (a.shape, a.dtype)
@abstractmethod
def ifft(self, a, out, axis=-1, build_plan=False, **kwds):
"""
Compute the one-dimensional complex to complex discrete Fourier Transform scaled by 1/N.
Parameters
----------
a: array_like of np.complex64 or np.complex128
Complex input array.
out: array_like of np.complex64 or np.complex128
Complex output array of the same shape and dtype as the input.
axis: int, optional
Axis over witch to compute the FFT.
Defaults to last axis.
build_plan: bool, optional
Build and return a IFFT plan instead of applying the transform.
Returns
-------
(shape, dtype) of the output array determined from the input array.
"""
assert a.dtype in self.supported_ctypes, a.dtype
if (out is not None):
assert a.dtype == out.dtype
assert np.arrayequal(a.shape, out.shape)
return (a.shape, a.dtype)
@abstractmethod
def rfft(self, a, out, axis=-1, build_plan=False, **kwds):
"""
Compute the unscaled one-dimensional real to hermitian complex discrete Fourier Transform.
Parameters
----------
a: array_like of np.float32 or np.float64
Real input array.
out: array_like of np.complex64 or np.complex128
Complex output array of matching complex dtype.
out.shape[...] = a.shape[...]
out.shape[axis] = a.shape[axis]//2 + 1
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
build_plan: bool, optional
Build and return a RFFT plan instead of applying the transform.
Returns
-------
(shape, dtype) of the output array determined from the input array.
Notes
-----
For real inputs there is no information in the negative frequency components that is not already
available from the positive frequency component because of the Hermitian symmetry.
N = out.shape[axis] = a.shape[axis]//2 + 1
out[0] will contain the sum of the signal (zero-frequency term, always real).
If N is even:
out[1:N/2] contains the positive frequency terms.
out[N/2] contains the Nyquist frequency (always real).
Else if N is odd:
out[1:(N+1)/2] contains the positive frequency terms.
"""
assert a.dtype in self.supported_ftypes
ctype = float_to_complex_dtype(a.dtype)
cshape = list(a.shape)
cshape[axis] = cshape[axis]//2 + 1
if (out is not None):
assert out.dtype in self.supported_ctypes
assert ctype == out.dtype
assert np.arrayequal(out.shape, cshape)
return (cshape, ctype)
@abstractmethod
def irfft(self, a, out, axis=-1, build_plan=False, **kwds):
"""
Compute the one-dimensional hermitian complex to real discrete Fourier Transform scaled by 1/N.
Parameters
----------
a: array_like of np.complex64 or np.complex128
Complex input array.
out: array_like of np.float32 or np.float64
Real output array of matching real type.
out.shape[...] = a.shape[...]
Last axis should match forward transform size used:
1) out.shape[axis] = 2*(a.shape[axis]-1)
2) out.shape[axis] = 2*(a.shape[axis]-1) + 1
axis: int, optional
Axis over witch to compute the transform.
Defaults to last axis.
build_plan: bool, optional
Build and return a IRFFT plan instead of applying the transform.
Returns
-------
(shape0, shape1, dtype) of the output array determined from the input array.
"""
assert a.dtype in self.supported_ctypes
cshape = a.shape
rtype = complex_to_float_dtype(a.dtype)
rshape = list(a.shape)
rshape[axis] = 2*(cshape[axis]-1)
if (out is not None):
assert out.dtype in self.supported_ftypes
assert rtype == out.dtype
assert np.arrayequal(out.shape, rshape)
return (rshape, rtype)
"""
FFT iterface for fast Fourier Transforms using FFTW backend.
:class:`~hysop.numerics.FftwFFT`
:class:`~hysop.numerics.FftwFFTlan`
"""
import warnings
import pyfftw
import numpy as np
from hysop.tools.types import first_not_None
from hysop.tools.units import bytes2str
from hysop.tools.misc import prod
from hysop.tools.warning import HysopWarning
from hysop.numerics.fft.fft import FFTPlanI, FFTI
class HysopFFTWWarning(HysopWarning):
pass
class FftwFFTPlan(FFTPlanI):
"""
Build and wraps a FFTW plan.
Emit warnings when SIMD alignment is not used.
Emit warnings when changing input and output alignment.
"""
def __init__(self, **plan_kwds):
super(FftwFFTPlan, self).__init__()
plan = pyfftw.FFTW(**plan_kwds)
if (not plan.simd_aligned):
msg='Resulting plan is not SIMD aligned ({} bytes boundary).'
msg=msg.format(pyfftw.simd_alignment)
warnings.warn(msg, HysopFFTWWarning)
self.plan = plan
def check_new_inputs(self, a, out):
plan = self.plan
if (a is not None) and (not pyfftw.is_byte_aligned(a, n=plan.input_alignment)):
msg='New input array is not aligned on original planned input alignment of {} bytes.'
msg+='\nA copy will be made.'
msg=msg.format(plan.input_alignment)
warnings.warn(msg, HysopFFTWWarning)
if (out is not None) and (not pyfftw.is_byte_aligned(out, n=plan.output_alignment)):
msg='New output array is not aligned on original planned output alignment of {} bytes.'
msg+='\nA copy will be made.'
msg=msg.format(plan.output_alignment)
warnings.warn(msg, HysopFFTWWarning)
def execute(self):
"""
Execute plan on current input and output array.
"""
return self.plan.__call__()
def __call__(self, a=None, out=None):
"""
Execute the plan on possibly different input and output arrays.
Input array updates with arrays that are not aligned on original byte boundary
will result in a copy being made.
Return output array for convenience.
"""
output = self.check_new_inputs(a, out)
self.plan(input_array=a, output_array=out, normalize_idft=True)
return self.plan.output_array
class FftwFFT(FFTI):
"""
Interface to compute local to process FFT-like transforms using the FFTW backend.
Fftw fft backend has many advantages:
- single, double and long double precision supported
- no intermediate temporary buffers created at each call.
- planning capability with caching
- multithreading capability
Planning destroys initial arrays content.
"""
def __init__(self, threads=1,
planner_effort='FFTW_MEASURE',
planning_timelimit=None,
destroy_input=False,
warn_on_allocation=True,
warn_on_misalignment=True):
super(FftwFFT, self).__init__()
self.supported_ftypes = (np.float32, np.float64, np.longdouble)
self.supported_ctypes = (np.complex64, np.complex128, np.clongdouble)
self.threads = threads
self.planner_effort = planner_effort
self.planning_timelimit = planning_timelimit
self.destroy_input = destroy_input
self.warn_on_allocation = warn_on_allocation
self.warn_on_misalignment = warn_on_misalignment
@classmethod
def check_alignment(cls, a, out):
"""Check SIMD alignment of input and output arrays."""
msg0='{} array is not aligned on SIMD aligment ({} bytes).'
msg0=msg0.format('{}', pyfftw.simd_alignment)
if (a is not None) and not pyfftw.is_byte_aligned(array=a):
msg=msg0.format('Input')
warnings.warn(msg, HysopFFTWWarning)
elif (out is not None) and not pyfftw.is_byte_aligned(out):
msg=msg0.format('Output')
warnings.warn(msg, HysopFFTWWarning)
def allocate_output(self, out, shape, dtype):
"""Alocate output if required and check shape and dtype."""
if (out is None):
if self.warn_on_allocation:
nbytes = prod(shape)*dtype.itemsize
msg='FftwFFT: allocating aligned output array of size {}.'
msg=msg.format(bytes2str(nbytes))
warnings.warn(msg, HysopFFTWWarning)
out = pyfftw.empty_aligned(shape=shape, dtype=dtype)
else:
assert out.dtype == dtype
assert out.shape == shape
return out
def fmt_kwds(self, **kwds):
plan_kwds = {}
plan_kwds['input_array'] = kwds.pop('a')
plan_kwds['output_array'] = kwds.pop('out')
plan_kwds['direction'] = kwds.pop('direction')
plan_kwds['axes'] = kwds.pop('axes', (kwds.pop('axis'),))
plan_kwds['threads'] = kwds.pop('threads', self.threads)
plan_kwds['planning_timelimit'] = kwds.pop('planning_timelimit', self.planning_timelimit)
flags = ()
flags += (kwds.pop('planner_effort', self.planner_effort),)
if kwds.pop('destroy_input', self.destroy_input) is True:
flags += ('FFTW_DESTROY_INPUT',)
if kwds.pop('wisdom_only', False) is True:
flags += ('FFTW_WISDOM_ONLY',)
plan_kwds['flags'] = flags
if kwds:
msg='Unknown keyword arguments: {}'
msg=msg.format(', '.join('\'{}\''.format(kwd) for kwd in kwds.keys()))
raise RuntimeError(msg)
return plan_kwds
def fft(self, a, out=None, axis=-1, **kwds):
"""Planning destroys initial arrays content."""
(shape, dtype) = super(FftwFFT, self).fft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
if self.warn_on_misalignment:
self.check_alignment(a, out)
kwds = self.fmt_kwds(a=a, out=out, axis=axis, direction='FFTW_FORWARD', **kwds)
plan = FftwFFTPlan(**kwds)
return plan
def ifft(self, a, out=None, axis=-1, **kwds):
"""Planning destroys initial arrays content."""
(shape, dtype) = super(FftwFFT, self).ifft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
if self.warn_on_misalignment:
self.check_alignment(a, out)
kwds = self.fmt_kwds(a=a, out=out, axis=axis, direction='FFTW_BACKWARD', **kwds)
plan = FftwFFTPlan(**kwds)
return plan
def rfft(self, a, out=None, axis=-1, **kwds):
"""Planning destroys initial arrays content."""
(shape, dtype) = super(FftwFFT, self).rfft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
if self.warn_on_misalignment:
self.check_alignment(a, out)
kwds = self.fmt_kwds(a=a, out=out, axis=axis, direction='FFTW_FORWARD', **kwds)
plan = FftwFFTPlan(**kwds)
return plan
def irfft(self, a, out=None, axis=-1, **kwds):
"""Planning destroys initial arrays content."""
(shape, dtype) = super(FftwFFT, self).irfft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
if self.warn_on_misalignment:
self.check_alignment(a, out)
kwds = self.fmt_kwds(a=a, out=out, axis=axis, direction='FFTW_BACKWARD', **kwds)
plan = FftwFFTPlan(**kwds)
return plan
"""
FFT iterface for fast Fourier Transforms using numpy fft backend.
:class:`~hysop.numerics.NumpyFFT`
:class:`~hysop.numerics.NumpyFFTPlan`
"""
import numpy as np
from numpy import fft as _FFT
from hysop.tools.types import first_not_None
from hysop.numerics.fft.fft import FFTPlanI, FFTI, complex_to_float_dtype, float_to_complex_dtype
class NumpyFFTPlan(FFTPlanI):
"""
Wrap a numpy fft call (numpy.fft does not offer real planning capabilities).
"""
def __init__(self, fn, out,
output_dtype=lambda dtype: dtype,
**kwds):
super(NumpyFFTPlan, self).__init__()
assert callable(output_dtype)
self.fn = fn
self.out = out
self.kwds = kwds.copy()
self.output_dtype = output_dtype
def execute(self):
out = self.out
res = self.fn(**self.kwds)
if (out is None):
# explicit cast
cast = self.output_dtype(self.kwds['a'].dtype)
return res.astype(cast)
else:
# implicit cast (during copy)
out[...] = res
return out
def __call__(self, out=None, **kwds):
fn_kwds = self.kwds.copy()
fn_kwds.update(kwds)
if (self.fn is _FFT.irfft) and (out is not None):
fn_kwds.setdefault('n', out.shape[fn_kwds['axis']])
out = first_not_None(out, self.out)
res = self.fn(**fn_kwds)
if (out is None):
# explicit cast
cast = self.output_dtype(fn_kwds['a'].dtype)
return res.astype(cast)
else:
# implicit cast (during copy)
out[...] = res
return out
class NumpyFFT(FFTI):
"""
Interface to compute local to process FFT-like transforms using the numpy fft backend.
Numpy fft backend has many disadvantages:
- only double precision is really supported
- single precision is supported by casting to double precision
- creates intermediate temporary buffers at each call
- no planning capabilities (numpy.fft methods are just wrapped into fake plans)
The only advantage is that planning won't destroy original inputs.
"""
def __init__(self):
super(NumpyFFT, self).__init__()
self.supported_ftypes = (np.float32, np.float64,)
self.supported_ctypes = (np.complex64, np.complex128,)
def fft(self, a, out=None, axis=-1, **kwds):
super(NumpyFFT, self).fft(a=a, out=out, axis=axis, **kwds)
plan = NumpyFFTPlan(fn=_FFT.fft, a=a, out=out, axis=axis, **kwds)
return plan
def ifft(self, a, out=None, axis=-1, **kwds):
super(NumpyFFT, self).ifft(a=a, out=out, axis=axis, **kwds)
plan = NumpyFFTPlan(fn=_FFT.ifft, a=a, out=out, axis=axis, **kwds)
return plan
def rfft(self, a, out=None, axis=-1, **kwds):
super(NumpyFFT, self).rfft(a=a, out=out, axis=axis, **kwds)
plan = NumpyFFTPlan(fn=_FFT.rfft, a=a, out=out, axis=axis,
output_dtype=lambda x: float_to_complex_dtype(x),
**kwds)
return plan
def irfft(self, a, out=None, axis=-1, **kwds):
super(NumpyFFT, self).irfft(a=a, out=out, axis=axis, **kwds)
plan = NumpyFFTPlan(fn=_FFT.irfft, a=a, out=out, axis=axis,
output_dtype=lambda x: complex_to_float_dtype(x),
**kwds)
return plan
"""
Test of fields defined with an analytic formula.
"""
import random
import pyfftw
import numpy as np
import itertools as it
from hysop.deps import it, sm, random
from hysop.constants import HYSOP_REAL
from hysop.testsenv import __ENABLE_LONG_TESTS__, __HAS_OPENCL_BACKEND__
from hysop.testsenv import opencl_failed, iter_clenv
from hysop.tools.contexts import printoptions
from hysop.tools.numerics import float_to_complex_dtype
from hysop.tools.types import check_instance, first_not_None
from hysop.numerics.fft.numpy_fft import NumpyFFT
from hysop.numerics.fft.fftw_fft import FftwFFT
class TestFFT(object):
implementations = {
'numpy': NumpyFFT(),
'fftw': FftwFFT(warn_on_allocation=False)
}
msg_shape = 'Expected output array shape to be {} but got {} for implementation {}.'
msg_dtype = 'Expected output array dtype to be {} but got {} for implementation {}.'
msg_input_modified = 'Input array has been modified for implementation {}.'
msg_output_modified = 'Output array results are not consistent for implementation {}.'
max_eps = 2
def _test_1d(self, dtype):
print
print '::Testing 1D transform, precision {}::'.format(dtype.__name__)
eps = np.finfo(dtype).eps
ctype = float_to_complex_dtype(dtype)
def iter_shapes():
msg = ('EVEN', 'ODD')
maxj=(15,10)
minj=(10,5)
for i in xrange(2):
base = 2+i
print ' '+msg[i]
for j1 in xrange(minj[i],maxj[i]):
shape = (base**j1,)
cshape = list(shape)
cshape[-1] = cshape[-1]//2 + 1
cshape = tuple(cshape)
N = np.prod(shape, dtype=np.int64)
Nc = np.prod(cshape, dtype=np.int64)
yield (shape, cshape, N, Nc)
print ' FORWARD C2C: complex to complex forward transform'
for (shape, cshape, N, Nc) in iter_shapes():
print ' FFT shape={:9s} '.format(str(shape)+':'),
Href = np.random.rand(2*N).astype(dtype).view(dtype=ctype).reshape(shape)
H = np.empty_like(Href)
H, Href = self.simd_align(H, Href)
results = {}
for (name, impl) in self.implementations.iteritems():
if dtype in impl.supported_ftypes:
plan = impl.fft(a=H)
H[...] = Href
out = plan.execute()
assert out.shape == shape, self.msg_shape.format(shape, out.shape, name)
assert out.dtype == ctype, self.msg_dtype.format(ctype, out.dtype, name)
assert np.array_equal(Href, H), self.msg_input_modified.format(name)
refout = out.copy()
out[...] = 0
out = plan.execute()
assert np.array_equal(out, refout), self.msg_output_modified.format(name)
results[name] = refout
self.check_distances(results, eps, N*self.max_eps)
print ' BACKWARD C2C: complex to complex backward transform'
for (shape, cshape, N, Nc) in iter_shapes():
print ' IFFT shape={:9s} '.format(str(shape)+':'),
Href = np.random.rand(2*N).astype(dtype).view(dtype=ctype).reshape(shape)
H = np.empty_like(Href)
H, Href = self.simd_align(H, Href)
results = {}
for (name, impl) in self.implementations.iteritems():
if dtype in impl.supported_ftypes:
plan = impl.ifft(a=H)
H[...] = Href
out = plan.execute()
assert out.shape == shape, self.msg_shape.format(shape, out.shape, name)
assert out.dtype == ctype, self.msg_dtype.format(ctype, out.dtype, name)
assert np.array_equal(Href, H), self.msg_input_modified.format(name)
refout = out.copy()
out[...] = 0
out = plan.execute()
assert np.array_equal(out, refout), self.msg_output_modified.format(name)
results[name] = refout
self.check_distances(results, eps, self.max_eps)
print ' FORWARD R2C: real to hermitian complex transform'
for (shape, cshape, N, Nc) in iter_shapes():
print ' RFFT shape={:9s} '.format(str(shape)+':'),
Href = np.random.rand(N).astype(dtype).reshape(shape)
H = np.empty_like(Href)
H, Href = self.simd_align(H, Href)
results = {}
for (name, impl) in self.implementations.iteritems():
if dtype in impl.supported_ftypes:
plan = impl.rfft(a=H)
H[...] = Href
out = plan.execute()
assert out.shape == cshape, self.msg_shape.format(cshape, out.shape, name)
assert out.dtype == ctype, self.msg_dtype.format(ctype, out.dtype, name)
assert np.array_equal(Href, H), self.msg_input_modified.format(name)
refout = out.copy()
out[...] = 0
out = plan.execute()
assert np.array_equal(out, refout), self.msg_output_modified.format(name)
results[name] = refout
self.check_distances(results, eps, N*self.max_eps)
print ' BACKWARD C2R: real to hermitian complex transform'
for (shape, cshape, N, Nc) in iter_shapes():
print ' RFFT shape={:9s} '.format(str(shape)+':'),
Href = np.random.rand(2*Nc).astype(dtype).view(dtype=ctype).reshape(cshape)
H = np.empty_like(Href)
H, Href = self.simd_align(H, Href)
results = {}
for (name, impl) in self.implementations.iteritems():
if dtype in impl.supported_ftypes:
plan = impl.irfft(a=H)
H[...] = Href
out = plan.execute()
#assert out.shape == shape, self.msg_shape.format(shape, out.shape, name)
assert out.dtype == dtype, self.msg_dtype.format(dtype, out.dtype, name)
assert np.array_equal(Href, H), self.msg_input_modified.format(name)
refout = out.copy()
out[...] = 0
out = plan.execute()
assert np.array_equal(out, refout), self.msg_output_modified.format(name)
results[name] = refout
self.check_distances(results, eps, self.max_eps)
def check_distances(self, results, eps, max_eps):
if len(results.keys())<2:
impl = results.keys()[0]
print 'cannot compare'
return
ss=()
failed = False
for (r0,r1) in it.combinations(results.keys(), 2):
E = results[r1] - results[r0]
Einf = np.max(np.abs(E))
Eeps = int(np.round(Einf/eps))
failed |= (Eeps>max_eps)
s='|{}-{}|={}eps'.format(r0,r1,Eeps)
ss += (s,)
print ', '.join(ss)
if failed:
print
msg='Some implementations did not agree on the result !'
raise RuntimeError(msg)
def simd_align(self, *arrays):
aligned_arrays = ()
for array in arrays:
aligned_arrays += (pyfftw.byte_align(array),)
return aligned_arrays
def test_1d_transforms(self):
self._test_1d(dtype=np.float32)
self._test_1d(dtype=np.float64)
#self._test_1d(dtype=np.longdouble)
def perform_tests(self):
self.test_1d_transforms()
if __name__ == '__main__':
test = TestFFT()
with printoptions(threshold=10000, linewidth=240,
nanstr='nan', infstr='inf',
formatter={'float': lambda x: '{:>6.2f}'.format(x)}):
test.perform_tests()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment