Source code for woden_settings

import ctypes 
import sys
import os
import subprocess
from typing import Union
from wodenpy.array_layout.precession import RTS_Precess_LST_Lat_to_J2000

import numpy as np
import argparse

from ctypes import c_double, c_float, c_int, POINTER, c_ulong, c_char, create_string_buffer, c_uint, c_long, c_uint64

D2R = np.pi/180.0
SOLAR2SIDEREAL = 1.00274
DS2R  = 7.2722052166430399038487115353692196393452995355905e-5
DD2R = 0.017453292519943295769236907684886127134428718885417

[docs] def command(cmd): """ Runs the command string `cmd` using `subprocess.call` Parameters ---------- cmd : string The command to run on the command line """ subprocess.call(cmd,shell=True)
[docs] class Woden_Settings_Double(ctypes.Structure): """A class structured equivalently to a `visi_set` struct, used by the C and CUDA code in libwoden_double.so :cvar c_double lst_base: Local sidereal time for first time step (radians) :cvar c_double lst_obs_epoch_base: Local sidereal time for first time step (radians) for the observation epoch (e.g. in 2020 for a 2020 obs) :cvar c_double ra0: Right ascension of phase centre (radians) :cvar c_double dec0: Declination of phase centre (radians) :cvar c_double sdec0: Sine of Declination of phase centre (radians) :cvar c_double cdec0: Cosine of Declination of phase centre (radians) :cvar c_int num_baselines: Number of baselines this array layout has :cvar c_int num_ants: Number of antennas this array layout has (MWA calls this number of tiles) :cvar c_int num_freqs: Number of frequencies per coarse band :cvar c_double frequency_resolution: Frequency resolution of a fine channel (Hz) :cvar c_double base_low_freq: The lowest fine channel frequency of band 1 :cvar c_int num_time_steps: Number of time steps to simulate :cvar c_double time_res: Time resolution of simulation (seconds) :cvar POINTER(c_char) cat_filename: Path to WODEN-style sky model :cvar c_int num_bands: Number of coarse frequency bands to simulate :cvar POINTER(c_int) band_nums: Which number coarse bands to simulate (e.g 1,4,6) :cvar c_int sky_crop_type: Whether to crop sky models by SOURCE or COMPONENT :cvar c_int beamtype: What type of primary beam to simulate with :cvar c_double gauss_beam_FWHM: FWHM of Gaussian primary beam (degrees) :cvar c_double gauss_beam_ref_freq: Reference frequency for given Gaussian primary beam FWHM :cvar c_uint64 chunking_size: Maximum number of COMPONENTs to include in a single chunk :cvar POINTER(c_char) hdf5_beam_path: Path to *.hf file containing MWA FEE beam spherical harmonic information :cvar c_double jd_date: Julian date at beginning of simulation :cvar c_int array_layout_file: Do we have a path to the array layout or not :cvar POINTER(c_char) array_layout_file_path: ath to file containing E,N,H coords of array layout :cvar c_double latitude: Latitude of the array to simulate (radians) :cvar c_double latitude_obs_epoch_base: Latitude of the array at the observation epoch (radians) :cvar c_double longitude: Longitude of the array to simulate (radians) :cvar (c_double*16) FEE_ideal_delays: Delay values specifying the pointing for the MWA FEE beam model :cvar c_double coarse_band_width: Frequency bandwidth of a single coarse band (Hz) :cvar c_double gauss_ra_point: The initial Right Ascension to point the Gaussian beam at (radians) :cvar c_double gauss_dec_point: The initial Declination to point the Gaussian beam at (radians) :cvar c_int num_cross: Total number of cross-correlations to simulate, so freqs*times*baselines :cvar c_int num_autos: Total number of auto-correlations to simulate, so freqs*times*baselines :cvar c_int num_visis: Total number of visibilities to simulate, so num_cross + num_autos :cvar c_double base_band_freq: The lowest fine channel frequency in the current band being simulated :cvar c_int do_precession: Boolean of whether to apply precession to the array layout or not :cvar POINTER(c_double) lsts: Array to hold LSTs for all time centroids (these are different when precession is happening) :cvar POINTER(c_double) latitudes: Array to hold latitudes for all time centroids (these are different when precession is happening) :cvar POINTER(c_double) mjds: Array to hold modified julian dates for all time centroids :cvar c_int do_autos: Boolean of whether to simulate autos or not (0 False, 1 True) :cvar c_int do_QUV: Boolean of whether to use Stokes Q,U,V (0 False, 1 True) """ _fields_ = [("lst_base", c_double), ("lst_obs_epoch_base", c_double), ("ra0", c_double), ("dec0", c_double), ("sdec0", c_double), ("cdec0", c_double), ("num_baselines", c_int), ("num_ants", c_int), ("num_freqs", c_int), ("frequency_resolution", c_double), ("base_low_freq", c_double), ("num_time_steps", c_int), ("time_res", c_double), ("cat_filename", POINTER(c_char)), ("num_bands", c_int), ("band_nums", POINTER(c_int)), ("sky_crop_type", c_int), ("beamtype", c_int), ("gauss_beam_FWHM", c_double), ("gauss_beam_ref_freq", c_double), ("chunking_size", c_uint64), ("hdf5_beam_path", POINTER(c_char)), ("jd_date", c_double), ("array_layout_file", c_int), ("array_layout_file_path", POINTER(c_char)), ("latitude", c_double), ("latitude_obs_epoch_base", c_double), ("longitude", c_double), ("FEE_ideal_delays", (c_double*16)), ("coarse_band_width", c_double), ("gauss_ra_point", c_double), ("gauss_dec_point", c_double), ("num_cross", c_int), ("num_autos", c_int), ("num_visis", c_int), ("base_band_freq", c_double), ("do_precession", c_int), ("lsts", POINTER(c_double)), ("latitudes", POINTER(c_double)), ("mjds", POINTER(c_double)), ("do_autos", c_int), ("do_QUV", c_int)]
##TODO gotta be a way to set the float or double fields via some kind of ##variable instead of making two different classes
[docs] class Woden_Settings_Float(ctypes.Structure): """A class structured equivalently to a `visi_set` struct, used by the C and CUDA code in libwoden_float.so :cvar c_double lst_base: Local sidereal time for first time step (radians) :cvar c_double lst_obs_epoch_base: Local sidereal time for first time step (radians) for the observation epoch (e.g. in 2020 for a 2020 obs) :cvar c_double ra0: Right ascension of phase centre (radians) :cvar c_double dec0: Declination of phase centre (radians) :cvar c_double sdec0: Sine of Declination of phase centre (radians) :cvar c_double cdec0: Cosine of Declination of phase centre (radians) :cvar c_int num_baselines: Number of baselines this array layout has :cvar c_int num_ants: Number of antennas this array layout has (MWA calls this number of tiles) :cvar c_int num_freqs: Number of frequencies per coarse band :cvar c_double frequency_resolution: Frequency resolution of a fine channel (Hz) :cvar c_double base_low_freq: The lowest fine channel frequency of band 1 :cvar c_int num_time_steps: Number of time steps to simulate :cvar c_double time_res: Time resolution of simulation (seconds) :cvar POINTER(c_char) cat_filename: Path to WODEN-style sky model :cvar c_int num_bands: Number of coarse frequency bands to simulate :cvar POINTER(c_int) band_nums: Which number coarse bands to simulate (e.g 1,4,6) :cvar c_int sky_crop_type: Whether to crop sky models by SOURCE or COMPONENT :cvar c_int beamtype: What type of primary beam to simulate with :cvar c_float gauss_beam_FWHM: FWHM of Gaussian primary beam (degrees) :cvar c_double gauss_beam_ref_freq: Reference frequency for given Gaussian primary beam FWHM :cvar c_uint64 chunking_size: Maximum number of COMPONENTs to include in a single chunk :cvar POINTER(c_char) hdf5_beam_path: Path to *.hf file containing MWA FEE beam spherical harmonic information :cvar c_double jd_date: Julian date at beginning of simulation :cvar c_int array_layout_file: Do we have a path to the array layout or not :cvar POINTER(c_char) array_layout_file_path: ath to file containing E,N,H coords of array layout :cvar c_double latitude: Latitude of the array to simulate (radians) :cvar c_double latitude_obs_epoch_base: Latitude of the array at the observation epoch (radians) :cvar c_float longitude: Longitude of the array to simulate (radians) :cvar (c_float*16) FEE_ideal_delays: Delay values specifying the pointing for the MWA FEE beam model :cvar c_double coarse_band_width: Frequency bandwidth of a single coarse band (Hz) :cvar c_double gauss_ra_point: The initial Right Ascension to point the Gaussian beam at (radians) :cvar c_double gauss_dec_point: The initial Declination to point the Gaussian beam at (radians) :cvar c_int num_cross: Total number of cross-correlations to simulate, so freqs*times*baselines :cvar c_int num_autos: Total number of auto-correlations to simulate, so freqs*times*baselines :cvar c_int num_visis: Total number of visibilities to simulate, so num_cross + num_autos :cvar c_double base_band_freq: The lowest fine channel frequency in the current band being simulated :cvar c_int do_precession: Boolean of whether to apply precession to the array layout or not :cvar POINTER(c_double) lsts: Array to hold LSTs for all time centroids (these are different when precession is happening) :cvar POINTER(c_double) latitudes: Array to hold latitudes for all time centroids (these are different when precession is happening) :cvar POINTER(c_double) mjds: Array to hold modified julian dates for all time centroids :cvar c_int do_autos: Boolean of whether to simulate autos or not (0 False, 1 True) :cvar c_int do_QUV: Boolean of whether to use Stokes Q,U,V (0 False, 1 True) """ _fields_ = [("lst_base", c_double), ("lst_obs_epoch_base", c_double), ("ra0", c_double), ("dec0", c_double), ("sdec0", c_double), ("cdec0", c_double), ("num_baselines", c_int), ("num_ants", c_int), ("num_freqs", c_int), ("frequency_resolution", c_double), ("base_low_freq", c_double), ("num_time_steps", c_int), ("time_res", c_double), ("cat_filename", POINTER(c_char)), ("num_bands", c_int), ("band_nums", POINTER(c_int)), ("sky_crop_type", c_int), ("beamtype", c_int), ("gauss_beam_FWHM", c_float), ("gauss_beam_ref_freq", c_double), ("chunking_size", c_ulong), ("hdf5_beam_path", POINTER(c_char)), ("jd_date", c_double), ("array_layout_file", c_int), ("array_layout_file_path", POINTER(c_char)), ("latitude", c_double), ("latitude_obs_epoch_base", c_double), ("longitude", c_float), ("FEE_ideal_delays", (c_float*16)), ("coarse_band_width", c_double), ("gauss_ra_point", c_double), ("gauss_dec_point", c_double), ("num_cross", c_int), ("num_autos", c_int), ("num_visis", c_int), ("base_band_freq", c_double), ("do_precession", c_int), ("lsts", POINTER(c_double)), ("latitudes", POINTER(c_double)), ("mjds", POINTER(c_double)), ("do_autos", c_int), ("do_QUV", c_int)]
[docs] def create_woden_settings(args : argparse.Namespace, jd_date : float, lst : float) -> Union[Woden_Settings_Float, Woden_Settings_Double]: """Given the parsed and checked arguments in `args`, populate a `woden_settings` ctypes.Structure that can be passed into libwoden_float.so or libwoden_double.so, depending on the desired `precision`. Parameters ---------- args : argparse.Namespace The populated arguments `args = parser.parse_args()`` as returned from the parser given by :func:`~wodenpy.run_setup.check_args` jd_date : float The julian date of the first time step of the observation lst : float The LST of the first time step of the observation (degrees) Returns ------- woden_settings : Woden_Settings_Float or Woden_Settings_Double Populated ctype struct that can be passed into the C/CUDA code """ if args.precision == 'float': woden_settings = Woden_Settings_Float() else: woden_settings = Woden_Settings_Double() woden_settings.ra0 = args.ra0 * D2R woden_settings.dec0 = args.dec0 * D2R woden_settings.latitude = args.latitude * D2R woden_settings.latitude_obs_epoch_base = args.latitude * D2R woden_settings.lst_base = lst * D2R woden_settings.lst_obs_epoch_base = lst * D2R woden_settings.num_freqs = args.num_freq_channels woden_settings.frequency_resolution = args.freq_res woden_settings.base_low_freq = args.lowest_channel_freq woden_settings.coarse_band_width = float(args.coarse_band_width) woden_settings.num_time_steps = args.num_time_steps woden_settings.time_res = args.time_res woden_settings.cat_filename = create_string_buffer(args.cat_filename.encode('utf-8')) woden_settings.jd_date = jd_date woden_settings.sky_crop_type = int(args.sky_crop_components) ##If MWA_FEE_delays is set, convert into a array and populate the ##woden_settings equivalent if args.MWA_FEE_delays: delays = np.array(args.MWA_FEE_delays.strip('[]').split(',')) for ind, delay in enumerate(delays): woden_settings.FEE_ideal_delays[ind] = float(delay) if args.primary_beam == 'none': woden_settings.beamtype = 0 elif args.primary_beam == 'Gaussian': woden_settings.beamtype = 1 woden_settings.gauss_beam_FWHM = float(args.gauss_beam_FWHM) woden_settings.gauss_beam_ref_freq = float(args.gauss_beam_ref_freq) woden_settings.gauss_ra_point = float(args.gauss_ra_point)*D2R woden_settings.gauss_dec_point = float(args.gauss_dec_point)*D2R elif args.primary_beam == 'MWA_FEE': woden_settings.beamtype = 2 woden_settings.hdf5_beam_path = create_string_buffer(args.hdf5_beam_path.encode('utf-8')) elif args.primary_beam == 'EDA2': woden_settings.beamtype = 3 elif args.primary_beam == 'MWA_FEE_interp': woden_settings.beamtype = 4 woden_settings.hdf5_beam_path = create_string_buffer(args.hdf5_beam_path.encode('utf-8')) elif args.primary_beam == 'MWA_analy': woden_settings.beamtype = 5 if args.no_precession: woden_settings.do_precession = 0 else: woden_settings.do_precession = 1 woden_settings.do_autos = args.do_autos woden_settings.chunking_size = int(args.chunking_size) woden_settings.num_bands = len(args.band_nums) woden_settings.band_nums = (ctypes.c_int*woden_settings.num_bands)() for ind, band in enumerate(args.band_nums): woden_settings.band_nums[ind] = int(band) return woden_settings
[docs] def setup_lsts_and_phase_centre(woden_settings : Union[Woden_Settings_Float, Woden_Settings_Double]) -> np.ndarray: """ Calculate the Local Sidereal Time (LST) for each time step of an observation, and set the phase centre coordinates. If `woden_settings.do_precession == True`, the returned LSTs are precessed back to J2000. Parameters ---------- woden_settings : (Union[Woden_Settings_Float, Woden_Settings_Double]) A populated Woden_Settings object containing the observation parameters. Returns ------- lsts : np.ndarray: An array of LST values, one for each time step of the observation. """ ##Used for calculating l,m,n for components woden_settings.sdec0 = np.sin(woden_settings.dec0) woden_settings.cdec0 = np.cos(woden_settings.dec0) print("Setting phase centre RA,DEC {:.5f}deg {:.5f}deg".format(woden_settings.ra0/DD2R, woden_settings.dec0/DD2R)) ##Calculate all lsts for this observation ##Used in some python calcs later, and by the C code, so store a ctypes ##array as well as a numpy one lsts = np.empty(woden_settings.num_time_steps) num_time_array = woden_settings.num_time_steps*c_double woden_settings.lsts = num_time_array() woden_settings.latitudes = num_time_array() woden_settings.mjds = num_time_array() mjd = woden_settings.jd_date - 2400000.5 for time_step in range(woden_settings.num_time_steps): ##Add on the angle accrued by current time step to the base LST lst_current = woden_settings.lst_obs_epoch_base + time_step*woden_settings.time_res*SOLAR2SIDEREAL*DS2R ##Add half a time_res so we are sampling centre of each time step lst_current += 0.5*woden_settings.time_res*SOLAR2SIDEREAL*DS2R if (woden_settings.do_precession): # ##Move the mjd to the time of the current step ##Want the LST at centre of time step so 0.5 adds half a time step extra mjd_current = mjd + ((time_step + 0.5)*woden_settings.time_res)/(24.0*60.0*60.0) woden_settings.mjds[time_step] = mjd_current lst_J2000, latitude_J2000 = RTS_Precess_LST_Lat_to_J2000( lst_current, woden_settings.latitude_obs_epoch_base, mjd_current) lsts[time_step] = lst_J2000 woden_settings.lsts[time_step] = lst_J2000 woden_settings.latitudes[time_step] = latitude_J2000 if (time_step == 0): print("Obs epoch initial LST was {:.10f} deg".format(lst_current/D2R) ) print("Setting initial J2000 LST to {:.10f} deg".format(lst_J2000/D2R) ) print("Setting initial mjd to {:.10f}".format(woden_settings.mjds[time_step]) ) print("After precession initial latitude of the array is {:.10f} deg".format(latitude_J2000/D2R) ) woden_settings.lst_base = lst_J2000 woden_settings.latitude = latitude_J2000 else: lsts[time_step] = lst_current woden_settings.lsts[time_step] = lst_current woden_settings.latitudes[time_step] = woden_settings.latitude_obs_epoch_base if time_step == 0: print("Obs epoch initial LST was {:.10f} deg\n".format(lst_current/D2R) ) return lsts