from sqlalchemy import Column, Float, Integer, String, Boolean from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.inspection import inspect from sqlalchemy.orm import relationship from sqlalchemy import or_ from .meta import Base from .star_data import Star, StarName, Band # --------------------------------------------------------------------------- # -- II/345 JMDC : JMMC Measured Stellar Diameters Catalogue (Duvert, 2016) # -------------------------------------------------------------------------- # ---Table: II/345/jmdc.dat JMMC Measured stellar Diameters Catalog (as part of Chelli et al., 2016A&A...589A.112C) (1554 records) # ------------------------------------------------------------------------------- # Label Format Unit Explanations # ------------------------------------------------------------------------------- # ID1 A23 --- Normalised star name (preferably HD) (ID1) # ID2 A20 --- Name used in the original publication (ID2) # UDdiam E13.6 mas ?=-1 Uniform Disk Diameter (UD_MEAS) # LDdiam E13.6 mas ?=-1 Limb-Darkened Disk diameter (LD_MEAS) # e_LDdiam E13.6 mas ?=-1 Error on UDdiam and LDdiam (E_LD_MEAS) (1) # Band A10 --- Text describing the wavelength or band # of the measurement (BAND) (2) # mu-lambda E13.6 --- ?=-1 When possible, value of the conversion # UDD-to-LDD user by the author (ORIG_MU_LAMBDA) # Method I1 --- Integer code of the method used (METHOD) (3) # BandCode I3 --- ?=- Integer code of the band used # (BANDCODE) (4) # Notes A224 --- Note about the star and/or measurement (NOTES) # BibCode A19 --- BibCode # Com A59 --- Author's name and comments (REFERENCE) # ------------------------------------------------------------------------------- # Note (1): In general, quotes the published error on LDD or UDD, which are # essentially equivalent. The blanking value of -1 can be due to an absence of # published error, or has been fixed thus when the measurement is # (retrospectively) in doubt, in which case an explanation lies in the Notes. # Note (2): A loosely defined string representing the band (UBVRIJHKLMNQ) or the # central wavelength, in microns if not otherwise precised, of the observation. # A stands for Angstroem, nm for nanometer. # Note (3): Code for the observational method used as follows: # 1 = optical interferometry # 2 = Lunar occultation # 3 = intensity interferometry # Note (4): Index from 1 (band U) and up through bands B,V,R,I,J,H,K,L,M,N,Q. # format notes : # the formats above are fortran format codes # Axx : Ascii length xx # Exx.y : Floating point in exponential form # xx : total length # y : digits to right of decimal point # Ixx : Integer length xx DUPLICATED_ERROR_MSG = 'star entry already present in the catalog (same id1, ud_diam, ld_diam and bibcode)' class StarDoesNotExist(Exception): pass class StarInfo(Base): __tablename__ = 'star_info' F_DUMP = 0 F_USER = 1 MAINFIELDS = ['ID1', 'ID2', 'UD_DIAM', 'LD_DIAM', 'E_LD_DIAM', 'BAND', 'MU_LAMBDA', 'METHOD', 'BANDCODE', 'NOTES', 'BIBCODE', 'SINPE'] FIELDS_DUMP = { 'mode_id': F_DUMP, 'fields': { 'ID1': { 'field': 'id1', 'must': True, 'check': 'check_star', "doc": "normalised star name (preferably HD)"}, 'ID2': { 'field': 'id2', 'must': True, 'check': 'check_star2', 'doc': "Name used in the original publication"}, 'UD_DIAM': { 'field': 'ud_diam', 'must': True, 'check': 'check_float', 'alternate': 'UD_MEAS', 'doc': "Uniform Disk Diameter", 'unit': "mas"}, 'UD_MEAS': { 'field': 'ud_diam', 'check': 'check_float'}, 'LD_DIAM': { 'field': 'ld_diam', 'must': True, 'check': 'check_float', 'alternate': 'LD_MEAS', 'doc': 'Limb-Darkened Disk diameter', 'unit': 'mas'}, 'LD_MEAS': { 'field': 'ld_diam', 'check': 'check_float'}, 'E_LD_DIAM': { 'field': 'e_ld_diam', 'must': True, 'check': 'check_float', 'alternate': 'E_LD_MEAS', 'doc': 'Error on UDdiam and LDdiam', 'unit': 'mas', 'note': 'In general, quotes the published error on LDD or UDD, which are essentially equivalent. The blanking value of -1 can be due to an absence of published error, or has been fixed thus when the measurement is (retrospectively) in doubt, in which case an explanation lies in the Notes'}, 'E_LD_MEAS': { 'field': 'e_ld_diam', 'check': 'check_float'}, 'BAND': { 'field': 'band', 'must': True, 'check': 'check_band', 'doc': 'Text describing the wavelength or band of the measurement', 'note': 'A loosely defined string representing the band (UBVRIJHKLMNQ) or the central wavelength, in microns if not otherwise precised, of the observation. A stands for Angstroem, nm for nanometer.', }, 'MU_LAMBDA': { 'field': 'mu_lambda', 'must': False, 'check': 'check_float', 'doc': 'When possible, value of the conversion UDD-to-LDD user by the author'}, 'METHOD': { 'field': 'method', 'must': False, 'check': 'check_method', 'doc': "Integer code of the method used", 'note': "Code for the observational method used as follows: 1 = optical interferometry, 2 = Lunar occultation, 3 = intensity interferometry"}, 'BANDCODE': { 'field': 'band_code', 'must': False, 'check': 'check_band_code', 'doc': 'Integer code of the band used'}, 'NOTES': { 'field': 'notes', 'must': False, 'doc': 'Note about the star and/or measurement'}, 'BIBCODE': { 'field': 'bibcode', 'must': True, 'check': 'check_bibcode', 'doc': 'BibCode', 'alternate': 'REFERENCE'}, 'REFERENCE': { 'must': False, 'parse': 'parse_reference'}, 'SINPE': { 'field': 'sinpe', 'must': False, 'check': 'check_bool', 'parse': 'parse_sinpe', 'doc': "Flag that SIMBAD is not precise enough for the measured star ID"} }, 'checks': [ # here we should have names of methods to call that check object coherency 'check_star_ids', 'check_diameter_data', 'set_band_code' ] } FIELDS_USER = { 'mode_id': F_USER, 'fields': { 'ID': {'field': 'starname', 'must': True, 'check': 'check_star', 'alternate': 'STARNAME'}, 'STARNAME': {'field': 'starname', 'check': 'check_star'}, 'UD_MEAS': {'field': 'ud_diam', 'must': True, 'check': 'check_float'}, 'LD_MEAS': {'field': 'ld_diam', 'must': True, 'check': 'check_float'}, 'E_LD_MEAS': {'field': 'e_ld_diam', 'must': True, 'check': 'check_float'}, 'BAND': {'field': 'band', 'must': True, 'check': 'check_band'}, 'MU_LAMBDA': {'field': 'mu_lambda', 'must': False}, 'METHOD': {'field': 'method', 'must': False, 'check': 'check_method'}, 'NOTES': {'field': 'notes', 'must': False, 'check': 'check_notes'}, 'REFERENCE': {'must': True, 'parse': 'parse_reference'} }, 'checks': [ 'set_star_ids', 'check_diameter_data', 'set_band_code' ] } FIELDS = [FIELDS_DUMP, FIELDS_USER] # 123456789012 PREFERED_CATALOGS = ['HD', 'HIP', '2MASS', 'BD'] id = Column(Integer, primary_key=True) id1 = Column(String(32)) id2 = Column(String(32)) ud_diam = Column(Float) ld_diam = Column(Float) e_ld_diam = Column(Float) band = Column(String(10)) mu_lambda = Column(Float) method = Column(Integer) band_code = Column(Integer) notes = Column(String(255)) bibcode = Column(String(19)) sinpe = Column(Boolean) star_info_entry = relationship('StarInfoEntry', back_populates='star_info') submission = association_proxy('star_info_entry', 'submission') # temp to check uniq accross same submission lines submission_id = -1 # initialize from a row of data, presumably from parsing a CSV def __init__(self, session, header, data, submission_id=-1): self.session = session self.srcdata = data self.problem_fields = None self.warnings = None self.errors = None self.star_id_1 = None self.star_id_2 = None self.submission_id = submission_id # self.parse_format_dump(data) self.parse(header, data) # print(self.warnings, self.errors) # # error handling # def append_problem_field(self, field): if not self.problem_fields: self.problem_fields = [] if field not in self.problem_fields: self.problem_fields.append(field) def append_warning(self, field, msg): self.append_problem_field(field) if not self.warnings: self.warnings = {} f_warn = self.warnings.get(field, None) if not f_warn: f_warn = [] self.warnings[field] = f_warn f_warn.append(msg) def append_entry_warning(self, msg): self.append_warning('_', msg) def append_error(self, field, msg): self.append_problem_field(field) if not self.errors: self.errors = {} f_err = self.errors.get(field, None) if not f_err: f_err = [] self.errors[field] = f_err f_err.append(msg) def has_problem(self, field): return self.problem_fields and field in self.problem_fields def append_entry_error(self, msg): self.append_error('_', msg) def all_messages(self): msg = {} if self.errors: msg['errors'] = self.errors if self.warnings: msg['warnings'] = self.warnings if msg: return msg return None #StarInfo.submission.any(id=self.submission) def check_unique(self): print ("sub_id : %s "%self.submission_id) # same id1, mesurements, bibcodes (compared to a validated or not validated) q = self.session.query(StarInfo).filter_by(id1=self.id1, ud_diam=self.ud_diam, ld_diam=self.ld_diam, e_ld_diam=self.e_ld_diam, bibcode=self.bibcode).filter( or_( StarInfo.star_info_entry.any(validated=True), StarInfo.star_info_entry.any(submission_id=self.submission_id), ) ).count() print("is_unique query returns : %s" % q) if q > 0: self.append_entry_error(DUPLICATED_ERROR_MSG) # # field parsing functions # def query_starname(self, name): """ looks for starname, returns Star.id """ # lookup the star in the cache identifier = StarName.lookup_name(self.session, name) if identifier: # self.append_entry_warning(repr(identifier)) print("query_starname found star in cache, exiting early", identifier) obj_id, canon_name = identifier return self.session.query(Star).filter_by(id=obj_id).first() print("name '%s' not found in cache" % (name)) # time to query the CDS identifier = StarName.cleanup_name(name) if not identifier: raise StarDoesNotExist() # self.append_entry_warning(repr(identifier)) obj_id, canon_name = identifier print("'%d' - '%s'" % (obj_id, canon_name)) # search for star with object id. # if does not exist, add star # print("searching for star with object id %d"%(obj_id)) star = self.session.query(Star).filter_by(id=obj_id).first() # print("obtained the info") if not star: # print("star %d-'%s' not found, adding"%(obj_id, canon_name)) star = Star(self.session, obj_id, canon_name) self.session.add(star) else: # there's one star with that id (as it should) star.session = self.session print("WARNING: updating star names") star.update_names() return star # # field handling methods # def set_field_value(self, field_name, value): # print("setting value '%s' to object variable '%s'"%(str(value), field_name)) setattr(self, field_name, value) def check_star(self, field_name, field_info, value, accept_empty_value=False, accept_invalid_star_id=False): # print("Checking star '%s' for field '%s'"%(value, field_name)) if not value: if not accept_empty_value: self.append_error(field_name, 'star identifier not set') return try: star = self.query_starname(value) except StarDoesNotExist: if accept_invalid_star_id: self.append_warning(field_name, "invalid star identifier '%s'" % (value)) else: self.append_error(field_name, "invalid star identifier '%s'" % (value)) else: print("found star for field '%s' :" % (field_name), star, star.id) field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var + '_id', star.id) self.set_field_value(field_var, value) def check_star2(self, field_name, field_info, value): self.check_star(field_name, field_info, value, True, self.sinpe) def check_float(self, field_name, field_info, value): if isinstance(value, str): if not value: value = None else: try: value = float(value) except ValueError: self.append_error(field_name, "invalid value '%s'" % (value)) return field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var, value) def check_bool(self, field_name, field_info, value): if isinstance(value, str): if not value: value = None else: try: value = value.lower() in ("true", "t", "1") except ValueError: self.append_error(field_name, "invalid value '%s'" % (value)) return field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var, value) def check_band(self, field_name, field_info, value): field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var, value.strip()) def check_method(self, field_name, field_info, value): if value: if value.isdecimal(): value = int(value) if value not in [1, 2, 3]: self.append_error(field_name, 'Invalid value, should be 1, 2 or 3') return field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var, value) def check_band_code(self, field_name, field_info, value): band_values = Band.gen_numeric_band_code_list() band_codes = Band.gen_letter_band_code_list() # we assume a string # check if we have a number if value.isdecimal(): code = int(value) if code not in band_values: self.append_error(field_name, "numerical value should be between %d and %d" % (band_values[0], band_values[-1])) else: if not value: # value is empty # should try to guess from band ? code = None elif (len(value) != 1) or (value not in band_codes): self.append_error(field_name, "character value should be one character long and in the set '%s'" % (band_codes)) return else: code = band_codes.index(value) + 1 if code is None: # check if we have band_code set pass field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var, code) def check_notes(self, field_name, field_info, value): field_var = field_info.get('field', None) field_type = inspect(self).mapper.columns[field_var].type if field_type.python_type is not str: # error, this must be str / can't happen raise TypeError if value: if len(value) > field_type.length: self.append_error(field_name, "string too long, maximum length %d" % (field_type.length)) return if field_var: self.set_field_value(field_var, value) def is_bibcode_valid(self, bibcode): if len(bibcode) != 19: return False bibcode_year = bibcode[0:4].strip('.') bibcode_journal = bibcode[4:9].strip('.') bibcode_volume = bibcode[9:13].strip('.') bibcode_section = bibcode[13].strip('.') bibcode_page = bibcode[14:18].strip('.') bibcode_author = bibcode[18].strip('.') print("|%s|%s|%s|%s|%s|%s|" % ( bibcode_year, bibcode_journal, bibcode_volume, bibcode_section, bibcode_page, bibcode_author), end=' ') if bibcode_year and bibcode_journal and bibcode_volume and bibcode_page and bibcode_author: print("bc check") return True print("bc invalid ?") return None def check_bibcode(self, field_name, field_info, value): bc_valid = self.is_bibcode_valid(value) if bc_valid is False: self.append_error(field_name, "invalid value") else: if bc_valid is None: self.append_warning(field_name, "incompletely specified") field_var = field_info.get('field', None) if field_var: self.set_field_value('bibcode', value) def parse_reference(self, field_name, field_info, value): print("reference: '%s'" % (value)) # the reference is composed of a bibcode (19 non spaces) and, optionally, a space, and comments first_space = value.find(' ') if first_space == -1: first_space = len(value) if first_space != 19: self.append_error(field_name, "must start with a bibcode (19 non spaces)") return bibcode = value[0:first_space] comment = value[first_space:].strip() # we have something that may be a bibcode bc_valid = self.is_bibcode_valid(bibcode) if bc_valid is False: self.append_error(field_name, "bibcode portion is invalid") else: if bc_valid is None: self.append_warning(field_name, "bibcode portion is incompletely specified") self.set_field_value('bibcode', bibcode) if comment: self.append_warning('bibcode', 'bibcode extract ignore next part : %s' % comment) # force parsing before checking since starname check depends on this field def parse_sinpe(self, field_name, field_info, value): self.check_bool(field_name, field_info, value) # # object level checks # def check_star_ids(self): if hasattr(self, 'id1_id') and hasattr(self, 'id2_id'): if self.id1_id != self.id2_id: self.append_entry_error("id1 and id2 don't match, '%s' (oid %d) - '%s' (oid %d)" % ( self.id1, self.id1_id, self.id2, self.id2_id)) def set_id1_from_starname(self, starname): names_q = self.session.query(StarName).filter_by(star_id=self.starname_id).all() names = [] for name in names_q: names.append(name.name) prefered_name = None for cat in self.PREFERED_CATALOGS: print('looking for cat %s - ' % (cat), end=' ') for name in names: print(name, end=' ') if name[:len(cat)] == cat: print('found') prefered_name = name break if prefered_name: break if not prefered_name: # search for the main name pass if not prefered_name: self.append_error('id1', 'not set (star_id %d, \'%s\')' % (self.starname_id, prefered_name)) else: self.id1 = prefered_name def set_star_ids(self): if hasattr(self, 'starname'): print("checking starname, identify id1 for star_id %d" % (self.starname_id)) # attempt to guess ID1 self.set_id1_from_starname(self.starname) self.id2 = self.starname def check_diameter_data(self): pass def set_band_code(self): bc = Band.parse_band_code(self.band) if not bc: self.append_error('band', 'unable to parse band') elif isinstance(bc, dict): warning = bc.get('warning', None) error = bc.get('error', None) bc = bc.get('band_code', None) if warning: self.append_warning('band', warning) if error: # Report error only if associated band_code is not valid if self.has_problem('band_code'): self.append_error('band', error) else: self.append_warning('band', error) if isinstance(bc, float) or isinstance(bc, int): # some lines have band_code -1 if self.band_code is not None and self.band_code != -1: if self.band_code != bc: self.append_error('band', 'calculated band_code %s differs from given band_code %s' % ( str(bc), str(self.band_code))) else: # there was no band_code given but bc seems ok. # GM: do not throw an error since next affection seems nice. # self.append_error('band', 'calculated band_code %s'%(repr(bc))) self.band_code = bc elif bc: self.append_error('band', 'calculated band_code invalid \'%s\'' % (repr(bc))) # # header parsing function # @staticmethod def parse_header(header): # identify mode : valid_modes = [] for fields_list in StarInfo.FIELDS: mode_id = fields_list['mode_id'] print("mode %d" % (mode_id), end=" ") fields = fields_list['fields'] fields_present = [] all_mandatory = True for f in fields.keys(): field = fields[f] # print(f, end=' - ') # print(fields, end=' - ') if field.get('must', None): if f not in header: # print('%s not in header'%(f), end=' - ') alternate = field.get('alternate', None) if not alternate: all_mandatory = False # print('no alternate') continue if alternate not in header: all_mandatory = False # print('alternate not in header') continue # print("alternate found '%s'"%(alternate)) f = alternate # else: # print('%s found'%(f)) fields_present.append(f) else: if f not in header: # print('not mandatory - skipping') continue # print('%s found'%(f)) if f not in fields_present: fields_present.append(f) if not fields_present: all_mandatory = False if all_mandatory: valid_modes.append((fields_list.get('mode_id'), fields_present,)) if all_mandatory: print("we have all mandatory fields :", end=' ') else: print("we are missing some mandatory fields :", end=' ') print(fields_present) if len(valid_modes) == 0: print("FATAL: no parse mode found") return None if len(valid_modes) > 1: print("FATAL: too many parse modes valid") return None # return only one mode return valid_modes[0] # # record parsing functions # def parse(self, header, data): """ parses the data according to the header, checking the header for mandatory fields """ print(header, data) parse_mode = StarInfo.parse_header(header) print("parse mode", parse_mode) mode, field_list = parse_mode field_defs = self.FIELDS[mode] field_engine = field_defs['fields'] for field_name in field_list: field_info = field_engine[field_name] index = header.index(field_name) value = data[index].strip() # print(field_info, index, "'%s'"%(value)) # if we have a parse function parse_func_name = field_info.get('parse', None) if parse_func_name: parse_func = getattr(self, parse_func_name, None) if parse_func: if callable(parse_func): # print(parse_func) parse_func(field_name, field_info, value) else: self.append_error(field_name, "FATAL: '%s' function is not callable" % (parse_func_name)) else: self.append_error(field_name, "FATAL: '%s' function is not present in '%s'" % ( parse_func_name, self.__class__.__name__)) # the parse func is responsible for setting the field value continue # if we have a check function check_func_name = field_info.get('check', None) if check_func_name: check_func = getattr(self, check_func_name, None) if check_func: if callable(check_func): # print(check_func) check_func(field_name, field_info, value) else: self.append_error(field_name, "FATAL: '%s' function is not callable" % (check_func_name)) else: self.append_error(field_name, "FATAL: '%s' function is not present in '%s'" % ( check_func_name, self.__class__.__name__)) # the check func is responsible for setting the field value continue # nothing specified, just set the field value field_var = field_info.get('field', None) if field_var: self.set_field_value(field_var, value) # object level checks checks = self.FIELDS[mode]['checks'] for check_func_name in checks: check_func = getattr(self, check_func_name, None) if check_func: if callable(check_func): check_func() # and finally retest for uniqueness self.check_unique() def csv_headers(): return [ '"%s"' % field for field in StarInfo.MAINFIELDS ] def csv_values(self): ret_cols =[] fields = StarInfo.FIELDS_DUMP['fields'] for field in StarInfo.MAINFIELDS: colname=fields[field]['field'] col=self.__getattribute__(colname) if isinstance(col, str) : # if col.property.columns[0].type==String: ret_cols.append('"%s"'%col.strip()) elif col == None: ret_cols.append("") else: ret_cols.append("%s" % col) return ret_cols