#-*- coding: utf-8 -*- """ Contains the functions building observations """ import os, os.path import numpy as np import h5py import math import logging import glob from pygeodyn.inout import reads from pygeodyn import common, utilities class Observation: """ Class storing the observation data, operator and errors in members: * Observation.data * Observation.operator * Observation.errors """ def __init__(self, obs_data, H, R, pos_go_vo=None): """ The observation is expected to be used as Y = HX with Y the observation data, X the observed data under spectral form and H the observation operator. :param obs_data: 2D numpy array (nb_real x nb_obs) storing the observation data Y :type obs_data: numpy.ndarray :param H: 2D numpy array (nb_obs x nb_coeffs) storing the observation operator :type H: numpy.ndarray :param R: 2D numpy array (nb_obs x nb_obs) storing the observation errors :type R: numpy.ndarray """ self.data = obs_data self.nb_obs = self.data.shape[1] # Check that H has first dim nb_obs if H.shape[0] != self.nb_obs: raise ValueError('Observation operator\'s first dimension must be equal to number of observations ({}).Got {} dims instead.'.format(self.nb_obs, H.shape)) self.operator = H # Check that R has shape (nb_obs x nb_obs) if R.shape != (self.nb_obs, self.nb_obs): raise ValueError('Observation errors\' dimensions must be equal to number of observations ({}).Got {} dims instead.'.format(self.nb_obs, R.shape)) self.errors = R # for GVOs, it is useful for a few applications to save positions self.positions = pos_go_vo def build_go_vo_observations(cfg, nb_realisations, measure_type): """ Builds the observations (including direct observation operators H) from direct sources (Ground observatories (GO), virtual observatories of CHAMP (VO_CHAMP) and SWARM (VO_SWARM). The dates where no analysis takes place are discarded. :param cfg: configuration of the computation :type cfg: inout.config.ComputationConfig :param nb_realisations: Number of realisations of the computation :type nb_realisations: int :param measure_type: Type of the measure to extract ('MF' or 'SV') :type measure_type: str :return: dictionary of Observation objects with dates (np.datetime64) as keys. :rtype: dict[np.datetime64, Observation] """ datadir = cfg.obs_dir # if adding a new satellite, the first two letters of the cdf file should # match the first two letters after '_' in the variable obs_types_to_use. # If not possible, update code lines below that find cdf filename. possible_ids = np.array(['GROUND', 'CHAMP', 'SWARM', 'OERSTED', 'CRYOSAT', 'COMPOSITE']) cdf_name_shortcuts = np.array(['GO', 'CH', 'SW', 'OR', 'CR', 'CO']) if cfg.obs_select == all: obs_types_to_use = {obs: ('MF', 'SV') for obs in possible_ids} obs_types_to_use.pop('COMPOSITE') # remove composite data else: obs_types_to_use = cfg.obs_select.copy() complete_obs = ({}, {}, {}) # dicts will resp. contain positions, data and variance # Extract data for all valid observatories for obs in obs_types_to_use.keys(): # these conditions filters which observations are taken into account according to config.obs_select # e.g. if obs_read['GROUND'] = (SV,), then MF of GO are not taken into account. if not np.any(obs == possible_ids): continue elif measure_type not in obs_types_to_use[obs]: continue # variance data is already extracted because everything is located # in a single file in cdf files file = None for files in os.walk(datadir): for f in files[2]: # find the filename of the cdf file that correspond to GO, CHAMP, SWARM, OERSTED, CRYO or COMPOSITE # files that are not starting with GO, CH, SW, OR, CO or CR will be ignored. if np.any(f[:2] == cdf_name_shortcuts[obs == possible_ids]): obs = cdf_name_shortcuts[obs == possible_ids] file = os.path.join(files[0], f) if file is None: logging.warning('cdf file corresponding to {} has not been found, continuing without'.format(obs)) continue # all_data consist of (obs_pos, obs_data, var_data) all_data = reads.read_observatories_cdf_data(file, measure_type, obs, discard=cfg.discard_high_lat) complete_obs = noise_GVO_data(obs, all_data, cfg, nb_realisations, measure_type, complete_obs) # End of loop on obs_type return compute_H_R_GVOs(complete_obs, cfg, measure_type) def noise_GVO_data(obs, data, cfg, nb_realisations, measure_type, complete_obs): """ Noise data contained in obs_data with a gaussian distribution, variance taken from var_data, in the basis of obs_pos. """ complete_obs_positions, complete_obs_data, complete_var_date = complete_obs obs_pos, obs_data, var_data = data # check that the length is compatible with a format like [Br1, Bth1, Bph1, Br2, Bth2, Bph2, ..., BrN, BthN, BphN] assert len(var_data[list(var_data.keys())[0]]) % 3 == 0 numberofVOs = len(var_data[list(var_data.keys())[0]]) // 3 # Draw a seed for normal draw seed = np.random.randint(0, 50000) logging.debug("Noising {} observations of {} with seed {}".format(obs, measure_type, seed)) random_draw = np.random.RandomState(seed).normal for date in obs_data: # Skip any date that will not be used for analyses if date not in cfg.t_analyses: continue nb_observations_at_date = len(obs_data[date]) assert nb_observations_at_date % 3 == 0 noised_data_at_date = np.zeros(shape=(nb_realisations, nb_observations_at_date)) var_at_date = [] for i_obs in range(0, nb_observations_at_date // 3, 1): # For each component (j=0: r, j=1: th, j=2: ph) for j in range(0, 3): n = 3 * i_obs + j # var_data is now ordered as the magnetic field # Format: [Br1, Bth1, Bph1, Br2, Bth2, Bph2, ..., BrN, BthN, BphN] # The value stored is assumed to be sigma**2 sigma2 = var_data[date][n] var_at_date.append(sigma2) # Noise data with normal noise N(data, sigma) noised_data_at_date[:, n] = random_draw(obs_data[date][n], math.sqrt(sigma2), size=nb_realisations) # If the date does not exist yet, create the entry in the dict if date not in complete_obs_data: complete_obs_positions[date] = obs_pos[date] complete_obs_data[date] = noised_data_at_date complete_var_date[date] = var_at_date # Else concatenate the data with the existing one else: complete_obs_positions[date].extend(obs_pos[date]) complete_obs_data[date] = np.concatenate((complete_obs_data[date], noised_data_at_date), axis=1) complete_var_date[date].extend(var_at_date) return (complete_obs_positions, complete_obs_data, complete_var_date) def compute_H_R_GVOs(complete_obs, cfg, measure_type): """ Final part of build_govo_observations(). Compute the observations and error matrices at all relevant dates. :param complete_var_date: variance of the measurements :type complete_var_date: dict :param complete_obs_data: observations data :type complete_obs_data: str :param complete_obs_positions: positions (r, theta, phi) :type complete_obs_positions: dict :param measure_type: Type of the measure to extract ('MF' or 'SV') :type measure_type: str """ complete_obs_positions, complete_obs_data, complete_var_date = complete_obs observations = {} logging.info('Computing observation operator H and error matrix R for {}'.format(measure_type)) # Build Observation objects max_degree = cfg.Lsv if measure_type == 'SV' else cfg.Lb for date, obs_data_at_date in complete_obs_data.items(): H = common.compute_direct_obs_operator(complete_obs_positions[date], max_degree) # Convert the list of sigmas as diagonal matrix R (obs errors) R = np.diagflat(complete_var_date[date]) observations[date] = Observation(obs_data_at_date, H, R, pos_go_vo=complete_obs_positions[date]) return observations def build_covobs_observations(cfg, nb_realisations, measure_type): """ Builds the observations from COVOBS files (without a var file). The dates where no analysis takes place are discarded. :param cfg: configuration of the computation :type cfg: inout.config.ComputationConfig :param nb_realisations: Number of realisations of the computation :type nb_realisations: int :param measure_type: Type of the measure to extract ('MF' or 'SV') :type measure_type: str :return: dictionary of Observation objects with dates (np.datetime64) as keys. :rtype: dict[np.datetime64, Observation] """ datadir = cfg.obs_dir if measure_type == 'SV': nb_coefs = cfg.Nsv else: nb_coefs = cfg.Nb # Find files (sort to have reproducible order of realisations_files) realisations_files = sorted(glob.glob(os.path.join(datadir,'real*_{}_int_coefs.dat'.format(measure_type.lower())))) nb_real_files = len(realisations_files) if nb_real_files == 0: raise IOError("No COVOBS files were found in {} ! Check that it is the valid directory path.".format(datadir)) if nb_real_files < nb_realisations: raise ValueError("There are not enough COVOBS files to handle the {} realisations asked ! Please retry with a lower number (max: {}).".format(nb_realisations, nb_real_files)) # Create a list to store the unformatted data read from the files dates = None unformatted_data = [] # Get all realisations (unformatted) for file_name in realisations_files: unformatted_data.append(np.loadtxt(file_name)) if dates is None: # Get dates array (first element of the lines in file is the date) dates = unformatted_data[-1][:, 0] # Convert the list in a numpy array and remove the date (first member of last axis) unformatted_data = np.array(unformatted_data)[:, :, 1:] # Compute the var_data from ALL realisations var_data = np.var(unformatted_data, axis=0) # Format the data as a dict of Observations with dates as keys observations = {} for i_d, decimal_date in enumerate(dates): # Shift the date by one month (different convention) date = utilities.decimal_to_date(decimal_date, month_shift=1) if date not in cfg.t_analyses: continue # Get nb_realisations and nb_coefs obs_data = unformatted_data[:nb_realisations, i_d, :nb_coefs] # Get the number of observed coefficients (here equal to nb_coefs) current_No = obs_data.shape[-1] # H is identity H = np.eye(current_No, nb_coefs) # R is a diagonal matrix with diagonal from 1D var_data (skip first item that is the date) R = np.diagflat(var_data[i_d, :current_No]) observations[date] = Observation(obs_data, H, R) return observations def build_covobs_hdf5_observations(cfg, nb_realisations, measure_type): """ Builds the observations from a hdf5 COVOBS file. The dates where no analysis takes place are discarded. :param cfg: configuration of the computation :type cfg: inout.config.ComputationConfig :param nb_realisations: Number of realisations of the computation :type nb_realisations: int :param measure_type: Type of the measure to extract ('MF' or 'SV') :type measure_type: str :return: dictionary of Observation objects with dates (np.datetime64) as keys. :rtype: dict[np.datetime64, Observation] """ datadir = cfg.obs_dir if measure_type == 'SV': max_degree = cfg.Lsv nb_coefs = cfg.Nsv dataset_name = 'dgnm' else: max_degree = cfg.Lb nb_coefs = cfg.Nb dataset_name = 'gnm' # Find the hdf5 files (sort to have reproducible order of realisations_files) hdf5_files = sorted(glob.glob(os.path.join(datadir, '*.hdf5'))) if len(hdf5_files) == 0: raise IOError("No hdf5 files were found in {} ! Check that it is the valid directory path.".format(datadir)) logging.debug('Observations are read from {}'.format(hdf5_files[0])) with h5py.File(hdf5_files[0], 'r') as f: dates = f['times'][:] real_data = f[dataset_name][:] nb_reals_data, nb_dates_data, nb_coefs_data = real_data.shape # N = L(L+2) => L = sqrt(N+1) - 1 max_degree_data = np.sqrt(nb_coefs_data + 1) - 1 variance_data = np.var(real_data, axis=0) if nb_reals_data < nb_realisations: raise ValueError("There are not enough realisations in the COVOBS file to handle the {} realisations asked !" " Please retry with a lower number (max: {}).".format(nb_realisations, nb_reals_data)) if max_degree_data < max_degree: raise ValueError("There are not enough coefficients in the COVOBS file to handle the max degree {} asked for {} !" " Please retry with a lower degree (max: {}).".format(max_degree, measure_type, max_degree_data)) # Format the data as a dict of Observations with dates as keys observations = {} for i_d, decimal_date in enumerate(dates): # Shift the date by one month (different convention) date = utilities.decimal_to_date(decimal_date, month_shift=1) if date not in cfg.t_analyses: continue # Get asked nb_realisations and nb_coefs obs_data = real_data[:nb_realisations, i_d, :nb_coefs] # Get the number of observed coefficients (here equal to nb_coefs) current_No = obs_data.shape[-1] # H is identity (spectral data) H = np.eye(current_No, nb_coefs) # R is a diagonal matrix with variances as diagonal R = np.diagflat(variance_data[i_d, :current_No]) observations[date] = Observation(obs_data, H, R) return observations def build_chaos_hdf5_observations(cfg, nb_realisations, measure_type): """ Builds the observations from a hdf5 file storing CHAOS' spectral coefficients. The dates where no analysis takes place are discarded. :param cfg: configuration of the computation :type cfg: inout.config.ComputationConfig :param nb_realisations: Number of realisations of the computation :type nb_realisations: int :param measure_type: Type of the measure to extract ('MF' or 'SV') :type measure_type: str :return: dictionary of Observation objects with dates (np.datetime64) as keys. :rtype: dict[np.datetime64, Observation] """ datadir = cfg.obs_dir if measure_type == 'SV': max_degree = cfg.Lsv nb_coefs = cfg.Nsv dataset_name = 'dgnm' else: max_degree = cfg.Lb nb_coefs = cfg.Nb dataset_name = 'gnm' # Find the hdf5 files (sort to have reproducible order of realisations_files) hdf5_files = sorted(glob.glob(os.path.join(datadir, '*.hdf5'))) if len(hdf5_files) == 0: raise IOError("No hdf5 files were found in {} ! Check that it is the valid directory path.".format(datadir)) logging.debug('Observations are read from {}'.format(hdf5_files[0])) with h5py.File(hdf5_files[0], 'r') as f: dates = f['times'][:] chaos_data = f[dataset_name][:] variance_data = f['var_{}'.format(dataset_name)][:] nb_dates_data, nb_coefs_data = chaos_data.shape # N = L(L+2) => L = sqrt(N+1) - 1 max_degree_data = np.sqrt(nb_coefs_data + 1) - 1 if max_degree_data < max_degree: raise ValueError("There are not enough coefficients in the CHAOS file to handle the max degree {} asked for {} !" " Please retry with a lower degree (max: {}).".format(max_degree, measure_type, max_degree_data)) # Format the data as a dict of Observations with dates as keys observations = {} for i_d, decimal_date in enumerate(dates): # Shift the date by one month (different convention) date = utilities.decimal_to_date(decimal_date, month_shift=1) if date not in cfg.t_analyses: continue # Get asked nb_realisations and nb_coefs obs_data = np.zeros((nb_realisations, nb_coefs)) obs_data[:] = chaos_data[i_d, :nb_coefs] # Get the number of observed coefficients (here equal to nb_coefs) current_No = obs_data.shape[-1] # H is identity (spectral data) H = np.eye(current_No, nb_coefs) # R is a diagonal matrix with variances as diagonal R = np.diagflat(variance_data[i_d, :current_No]) observations[date] = Observation(obs_data, H, R) return observations