Source code for read_yaml_skymodel

"""Functions to read in the mwa_hyperdrive yaml sky model format and
convert it to the LoBES-style FITS sky model format."""

import numpy as np
import sys
import os
from typing import Union, Tuple

from wodenpy.skymodel.woden_skymodel import Component_Type_Counter, Component_Info, CompTypes, calc_pl_norm_at_200MHz, calc_cpl_norm_at_200MHz
from wodenpy.skymodel.chunk_sky_model import Skymodel_Chunk_Map
from wodenpy.use_libwoden.skymodel_structs import setup_chunked_source, setup_source_catalogue, _Ctype_Source_Into_Python
from wodenpy.skymodel.read_fits_skymodel import add_fits_info_to_source_catalogue

from wodenpy.use_libwoden.beam_settings import BeamTypes

from astropy.io import fits
from astropy.table import Table, Column

import erfa

D2R = np.pi/180.0

[docs] def read_yaml_radec_count_components(yaml_path : str): """ Reads a yaml file sky model. Reads just the ra, dec, and counts how many POINT/GAUSS/SHAPE and POWER/CURVE/LIST, consolidating the information into a Component_Type_Counter object. Parameters ----------- yaml_path : str The path to the yaml file containing the sky model information. Returns -------- comp_counter : Component_Type_Counter A Component_Type_Counter object that counts the number of components of each type and their properties. """ if not os.path.isfile(yaml_path): sys.exit(f"Cannot read sky model from {yaml_path}. Please check your paths, exiting now.") comp_counter = Component_Type_Counter() with open(yaml_path) as file: current_source = -1 first_component = True ##These are all things we have to count to be able to malloc correct ##amount of memory down the line for line in file: comp_counter.new_file_line() if line != '---\n' and '#' not in line and line != '' and line != ' ' and line != '\n': if line[0] != ' ' and line[0] != '-': current_source += 1 source_name = line[:-1] comp_counter.new_source() elif 'ra:' in line: ##If this is the first component, no need to reset ##and count things up if first_component: first_component = False else: ##ra should be the first thing in a component, so we need ##to append all the previously found values and reset the ##counters comp_counter.add_component_reset_counters() ##Start a new component comp_counter.initiate_component() ra = float(line.split()[-1])*D2R comp_counter.ra = ra elif 'dec:' in line: dec = float(line.split()[-1])*D2R comp_counter.dec = dec ##component type related things elif 'point' in line: comp_counter.point = 1 elif 'gaussian:' in line: comp_counter.gaussian = 1 elif 'shapelet:' in line: comp_counter.shapelet = 1 elif 'n1:' in line: comp_counter.num_shape_basis += 1 ##flux behaviou related things elif 'power_law:' in line and 'curved' not in line: comp_counter.flux_power = 1 elif 'curved_power_law:' in line: comp_counter.flux_curve = 1 elif 'list:' in line: comp_counter.flux_list = 1 elif 'freq:' in line: comp_counter.count_list_flux() ##The final component needs to be counted as we used a trigger of the ##next to count the pervious components comp_counter.add_component_reset_counters() ##shrink the array length of all info to the amount actually in the model comp_counter.remove_array_padding() ##total up some component counts comp_counter.total_components() return comp_counter
[docs] def read_full_yaml_into_fitstable(yaml_path : str) -> Tuple[Table, Table]: """Read in a hyperdrive style yaml sky model and convert it into the LoBES-style FITS sky model format. Can then be read in by `read_fits_skymodel.read_fits_skymodel_chunks`. Parameters ---------- yaml_path : str Path to the yaml sky model Returns ------- Tuple(Table, Table) The `main_table` and `shape_table` to be used by during lazy loading """ main_table = False shape_table = False all_freqs = [] all_names = [] with open(yaml_path) as file: components = [] sources = [] component = False source_name = False current_source = 0 comp_count = 0 source_indexes = [] freq_count = 0 freq_indent = 0 for line in file: if line != '---\n' and '#' not in line and line != '' and line != ' ' and line != '\n': if line[0] != ' ' and line[0] != '-': # print(current_source) source_name = line.split('\n')[0].strip(':') all_names.append(source_name) current_source += 1 comp_count = -1 elif 'ra:' in line: ##ra should be the first thing in a component, so we need ##to append all the previously found values and reset the ##counters ##If a previous component exists, append in to the list ##of all components, and then make a new one if component: ##Make some things into np.arrays so we can maths them component.n1s = np.array(component.n1s) component.n2s = np.array(component.n2s) component.coeffs = np.array(component.coeffs) component.fluxes = np.array(component.fluxes) component.freqs = np.array(component.freqs) component.comp_name = f"{component.source_name}_C{component.comp_count:02d}" components.append(component) component = Component_Info() freq_count = 0 component.source_name = source_name comp_count += 1 component.comp_count = comp_count component.ra = float(line.split()[-1]) source_indexes.append(current_source) elif 'dec:' in line: component.dec = float(line.split()[-1]) elif 'comp_type: point' in line: component.comp_type = 'P' elif 'gaussian:' in line: component.comp_type = 'G' elif 'shapelet:' in line: component.comp_type = 'S' elif "maj:" in line: component.major = float(line.split()[-1])*(1 / 3600.0) elif "min:" in line: component.minor = float(line.split()[-1])*(1 / 3600.0) elif "pa:" in line: component.pa = float(line.split()[-1]) elif 'n1:' in line: component.n1s.append(float(line.split()[-1])) elif 'n2:' in line: component.n2s.append(float(line.split()[-1])) elif 'value:' in line: component.coeffs.append(float(line.split()[-1])) elif 'power_law:' in line and 'curved' not in line: component.flux_type = 'pl' elif 'curved_power_law:' in line: component.flux_type = 'cpl' elif 'si:' in line: component.si = float(line.split()[-1]) elif 'list:' in line: component.flux_type = 'nan' elif 'freq:' in line: freq_count += 1 component.freqs.append(float(line.split()[-1])) ##OK, only want to write out flux column info if this ##is a 'list' type source. So just append to all freqs if ##correct type if component.flux_type == 'nan': all_freqs.append(float(line.split()[-1])) ##Stick in an empty np.array for Stokes I,Q,U,V component.fluxes.append(np.array([np.nan, np.nan, np.nan,np.nan])) ##See what indent this freq entry starts at - used to ##line up following freq entries, as `q` can either mean ##stokes Q or q curvature param freq_indent = line.index('f') elif ' i:' in line: component.fluxes[freq_count - 1][0] = float(line.split()[-1]) ##Gotta be fancy here to work out if this is a Stokes Q or a ##curved power law 'q' param elif ' q:' in line: q = float(line.split()[-1]) if line.index('q') == freq_indent: component.fluxes[freq_count - 1][1] = q else: if component.flux_type == 'cpl': component.curve_q = q elif ' u:' in line: component.fluxes[freq_count - 1][2] = float(line.split()[-1]) elif ' v:' in line: component.fluxes[freq_count - 1][3] = float(line.split()[-1]) ##last one doesn't get added to list, so do that component.n1s = np.array(component.n1s) component.n2s = np.array(component.n2s) component.coeffs = np.array(component.coeffs) component.fluxes = np.array(component.fluxes) component.freqs = np.array(component.freqs) component.comp_name = f"{component.source_name}_C{component.comp_count:02d}" components.append(component) # all_freqs = np.unique(all_freqs).tolist() all_freqs = np.sort(np.unique(all_freqs)) # print("HERE MAN", all_freqs) components = np.array(components) num_components = len(components) flux_types = np.array([component.flux_type for component in components]) power_laws = np.where(flux_types == 'pl')[0] curve_laws = np.where(flux_types == 'cpl')[0] list_laws = np.where(flux_types == 'nan')[0] # print("Before fitting: num power, curved, list", len(power_laws), len(curve_laws), len(list_laws)) for comp_ind, component in enumerate(components[power_laws]): component = calc_pl_norm_at_200MHz(component) for comp_ind, component in enumerate(components[curve_laws]): component = calc_cpl_norm_at_200MHz(component) ##for all components, what SOURCE do they belong to? comp_source_names = np.array([component.source_name for component in components]) comp_names = np.array([component.comp_name for component in components]) unq_source_ID = Column(data=comp_source_names, name='UNQ_SOURCE_ID') name = Column(data=comp_names, name='NAME') ras = Column(data=np.array([component.ra for component in components]), name='RA', unit='deg') decs = Column(data=np.array([component.dec for component in components]), name='DEC', unit='deg') majors = Column(data=np.array([component.major for component in components]), name='MAJOR_DC', unit='deg') minors = Column(data=np.array([component.minor for component in components]), name='MINOR_DC', unit='deg') pas = Column(data=np.array([component.pa for component in components]), name='PA_DC', unit='deg') # main_table.add_columns([unq_source_ID, name]) mod_type = Column(data=np.array([component.flux_type for component in components]), name='MOD_TYPE', unit='deg') norm_comp_pl = Column(data=np.array([component.norm_comp_pl for component in components]), name="NORM_COMP_PL") alpha_pl = Column(data=np.array([component.si for component in components]), name="ALPHA_PL") norm_comp_cpl = Column(data=np.array([component.norm_comp_cpl for component in components]), name="NORM_COMP_CPL") alpha_cpl = Column(data=np.array([component.si for component in components]), name="ALPHA_CPL") curve_cpl = Column(data=np.array([component.curve_q for component in components]), name="CURVE_CPL") comp_type = Column(data=np.array([component.comp_type for component in components]), name="COMP_TYPE") out_columns = [unq_source_ID, name, ras, decs, majors, minors, pas, mod_type, comp_type, norm_comp_pl, alpha_pl, norm_comp_cpl, alpha_cpl, curve_cpl] for freq in all_freqs: flux_data = np.full(num_components, np.nan) for comp_ind, component in enumerate(components): # print(component.fluxes.shape) fluxes = component.fluxes[np.where(component.freqs == freq)[0]] if len(fluxes) == 1: stokesI = fluxes[0][0] flux_data[comp_ind] = stokesI flux_col = Column(data=flux_data, name=f"INT_FLX{freq/1e+6:.3f}", unit='Jy') out_columns.append(flux_col) main_table = Table() main_table.add_columns(out_columns) # main_table.write(, overwrite=True) ##gather the shapelet specific information shape_names = [] shape_n1s = [] shape_n2s = [] shape_coeffs = [] for component in components: if component.comp_type == 'S': for n1, n2, coeff in zip(component.n1s, component.n2s, component.coeffs): shape_names.append(component.comp_name) shape_n1s.append(n1) shape_n2s.append(n2) shape_coeffs.append(coeff) s_names = Column(data=np.array(shape_names), name="NAME") s_n1s = Column(data=np.array(shape_n1s, dtype=int), name="N1") s_n2s = Column(data=np.array(shape_n2s, dtype=int), name="N2") s_coeffs = Column(data=np.array(shape_coeffs), name="COEFF") shape_table = Table() shape_table.add_columns([s_names, s_n1s, s_n2s, s_coeffs]) # hdu_list = fits.HDUList([ # fits.PrimaryHDU(), # fits.table_to_hdu(main_table), # fits.table_to_hdu(shape_table), # ]) ##TODO - option to write out FITS version of input model? # hdu_list.writeto('converted_input.fits', overwrite=True) return main_table, shape_table