Source code for read_text_skymodel

"""Functions to read in the deprecated WODEN text sky model format and
convert it to the LoBES-style FITS sky model format."""

import numpy as np
import sys
import os
from typing import Union, Tuple

from wodenpy.skymodel.woden_skymodel import Component_Type_Counter, Component_Info, CompTypes, calc_pl_norm_at_200MHz, calc_cpl_norm_at_200MHz
from wodenpy.skymodel.chunk_sky_model import Skymodel_Chunk_Map
from wodenpy.use_libwoden.skymodel_structs import setup_chunked_source, setup_source_catalogue, _Ctype_Source_Into_Python
from wodenpy.skymodel.read_fits_skymodel import add_fits_info_to_source_catalogue

from wodenpy.use_libwoden.beam_settings import BeamTypes

from astropy.io import fits
from astropy.table import Table, Column

import erfa

D2R = np.pi/180.0

[docs] def read_text_radec_count_components(text_path : str): """ Reads a text file sky model. Reads just the ra, dec, and counts how many POINT/GAUSS/SHAPE and POWER/CURVE/LIST, consolidating the information into a Component_Type_Counter object. Parameters ----------- text_path : str The path to the text file containing the sky model information. Returns -------- comp_counter : Component_Type_Counter A Component_Type_Counter object that counts the number of components of each type and their properties. """ if not os.path.isfile(text_path): sys.exit(f"Cannot read sky model from {text_path}. Please check your paths, exiting now.") comp_counter = Component_Type_Counter() with open(text_path) as file: current_source = -1 first_component = True ##These are all things we have to count to be able to malloc correct ##amount of memory down the line for line in file: comp_counter.new_file_line() if line != '\n\n' and '#' not in line and line != '' and line != ' ' and line != '\n': if 'SOURCE' in line and 'END' not in line: current_source += 1 source_name = line.split() comp_counter.new_source() elif 'COMPONENT' in line and 'END' not in line: if first_component: first_component = False else: ##ra should be the first thing in a component, so we need ##to append all the previously found values and reset the ##counters comp_counter.add_component_reset_counters() ##Start a new component comp_counter.initiate_component() if len(line.split()) != 4: sys.exit(f'error reading line {comp_counter.file_line_num} of skymodel {text_path}') _, comp_type, ra, dec = line.split() comp_counter.ra = float(ra)*D2R*15.0 comp_counter.dec = float(dec)*D2R ##everything is a power law in the WODEN text sky model comp_counter.flux_power = 1 if comp_type == 'POINT': comp_counter.point = 1 elif comp_type == 'GAUSSIAN': comp_counter.gaussian = 1 elif comp_type == 'SHAPELET': comp_counter.shapelet = 1 elif 'SCOEFF' in line: comp_counter.num_shape_basis += 1 ##The final component needs to be counted as we used a trigger of the ##next to count the pervious components comp_counter.add_component_reset_counters() ##shrink the array length of all info to the amount actually in the model comp_counter.remove_array_padding() ##total up some component counts comp_counter.total_components() return comp_counter
[docs] def read_full_text_into_fitstable(text_path : str) -> Tuple[Table, Table]: """Read in an old-school WODEN style text sky model and convert it into the LoBES-style FITS sky model format. Can then be read in by `read_fits_skymodel.read_fits_skymodel_chunks`. Parameters ---------- text_path : str Path to the text sky model Returns ------- Tuple(Table, Table) The `main_table` and `shape_table` to be used by during lazy loading """ main_table = False shape_table = False all_freqs = [] all_names = [] with open(text_path) as file: components = [] sources = [] component = False source_name = False current_source = 0 comp_count = 0 source_indexes = [] freq_count = 0 freq_indent = 0 for line in file: if line != '\n\n' and '#' not in line and line != '' and line != ' ' and line != '\n': if 'SOURCE' in line and 'END' not in line: current_source += 1 source_name = line.split() all_names.append(source_name) current_source += 1 comp_count = -1 elif 'COMPONENT' in line and 'END' not in line: ##COMPONENT should be the first thing in a component, so we need ##to append all the previously found values and reset the ##counters ##If a previous component exists, append in to the list ##of all components, and then make a new one if component: ##Make some things into np.arrays so we can maths them component.n1s = np.array(component.n1s) component.n2s = np.array(component.n2s) component.coeffs = np.array(component.coeffs) component.fluxes = np.array(component.fluxes) component.freqs = np.array(component.freqs) component.comp_name = f"{component.source_name}_C{component.comp_count:02d}" components.append(component) component = Component_Info() freq_count = 0 component.source_name = source_name comp_count += 1 component.comp_count = comp_count component.ra = float(line.split()[-1]) source_indexes.append(current_source) _, comp_type, ra, dec = line.split() component.ra = float(ra)*15.0 component.dec = float(dec) if comp_type == 'POINT': component.comp_type = 'P' elif comp_type == 'GAUSSIAN': component.comp_type = 'G' elif comp_type == 'SHAPELET': component.comp_type = 'S' ##everything is a power law in the WODEN text sky model component.flux_type = 'pl' elif 'SPARAMS' in line or 'GPARAMS' in line: _, pa, major, minor = line.split() component.major = float(major) / 60.0 component.minor = float(minor) / 60.0 component.pa = float(pa) elif 'SCOEFF' in line: _, n1, n2, coeff = line.split() component.n1s.append(float(n1)) component.n2s.append(float(n2)) component.coeffs.append(float(coeff)) ##Assume a powerlaw with SI = -0.8 for lines with FREQ ##Otherwise read in the SI elif 'FREQ' in line or 'LINEAR' in line: freq_count += 1 if 'FREQ' in line: _, freq, stokesI, stokesQ, stokesU, stokesV = line.split() si = -0.8 else: _, freq, stokesI, stokesQ, stokesU, stokesV, si = line.split() si = float(si) component.freqs.append(float(freq)) ##Stick in a Stokes I,Q,U,V component.fluxes.append(np.array([float(stokesI), float(stokesQ), float(stokesU), float(stokesV)])) component.si = si ##last one doesn't get added to list, so do that component.n1s = np.array(component.n1s) component.n2s = np.array(component.n2s) component.coeffs = np.array(component.coeffs) component.fluxes = np.array(component.fluxes) component.freqs = np.array(component.freqs) component.comp_name = f"{component.source_name}_C{component.comp_count:02d}" components.append(component) all_freqs = np.sort(np.unique(all_freqs)) components = np.array(components) num_components = len(components) flux_types = np.array([component.flux_type for component in components]) power_laws = np.where(flux_types == 'pl')[0] for comp_ind, component in enumerate(components[power_laws]): component = calc_pl_norm_at_200MHz(component) ##for all components, what SOURCE do they belong to? comp_source_names = np.array([component.source_name for component in components]) comp_names = np.array([component.comp_name for component in components]) unq_source_ID = Column(data=comp_source_names, name='UNQ_SOURCE_ID') name = Column(data=comp_names, name='NAME') ras = Column(data=np.array([component.ra for component in components]), name='RA', unit='deg') decs = Column(data=np.array([component.dec for component in components]), name='DEC', unit='deg') majors = Column(data=np.array([component.major for component in components]), name='MAJOR_DC', unit='deg') minors = Column(data=np.array([component.minor for component in components]), name='MINOR_DC', unit='deg') pas = Column(data=np.array([component.pa for component in components]), name='PA_DC', unit='deg') # main_table.add_columns([unq_source_ID, name]) mod_type = Column(data=np.array([component.flux_type for component in components]), name='MOD_TYPE', unit='deg') norm_comp_pl = Column(data=np.array([component.norm_comp_pl for component in components]), name="NORM_COMP_PL") alpha_pl = Column(data=np.array([component.si for component in components]), name="ALPHA_PL") norm_comp_cpl = Column(data=np.array([component.norm_comp_cpl for component in components]), name="NORM_COMP_CPL") alpha_cpl = Column(data=np.array([component.si for component in components]), name="ALPHA_CPL") curve_cpl = Column(data=np.array([component.curve_q for component in components]), name="CURVE_CPL") comp_type = Column(data=np.array([component.comp_type for component in components]), name="COMP_TYPE") out_columns = [unq_source_ID, name, ras, decs, majors, minors, pas, mod_type, comp_type, norm_comp_pl, alpha_pl, norm_comp_cpl, alpha_cpl, curve_cpl] for freq in all_freqs: flux_data = np.full(num_components, np.nan) for comp_ind, component in enumerate(components): # print(component.fluxes.shape) fluxes = component.fluxes[np.where(component.freqs == freq)[0]] if len(fluxes) == 1: stokesI = fluxes[0][0] flux_data[comp_ind] = stokesI flux_col = Column(data=flux_data, name=f"INT_FLX{freq/1e+6:.3f}", unit='Jy') out_columns.append(flux_col) main_table = Table() main_table.add_columns(out_columns) # main_table.write(, overwrite=True) ##gather the shapelet specific information shape_names = [] shape_n1s = [] shape_n2s = [] shape_coeffs = [] for component in components: if component.comp_type == 'S': for n1, n2, coeff in zip(component.n1s, component.n2s, component.coeffs): shape_names.append(component.comp_name) shape_n1s.append(n1) shape_n2s.append(n2) shape_coeffs.append(coeff) s_names = Column(data=np.array(shape_names), name="NAME") s_n1s = Column(data=np.array(shape_n1s, dtype=int), name="N1") s_n2s = Column(data=np.array(shape_n2s, dtype=int), name="N2") s_coeffs = Column(data=np.array(shape_coeffs), name="COEFF") shape_table = Table() shape_table.add_columns([s_names, s_n1s, s_n2s, s_coeffs]) # hdu_list = fits.HDUList([ # fits.PrimaryHDU(), # fits.table_to_hdu(main_table), # fits.table_to_hdu(shape_table), # ]) ##TODO - option to write out FITS version of input model? # hdu_list.writeto('converted_input.fits', overwrite=True) return main_table, shape_table