Source code for create_array_layout

"""Functions to create the array layout for the simulation, precess it, 
and convert it to the correct format for the C functions."""
import numpy as np
import sys
import os
from typing import Union
import palpy
from ctypes import Structure
import argparse
from typing import Tuple
from erfa import gd2gc
from ctypes import POINTER, c_double, c_int
from wodenpy.wodenpy_setup.woden_logger import simple_logger
from logging import Logger

##Constants
R2D = 180.0 / np.pi
D2R = np.pi / 180.0
MWA_LAT = -26.703319405555554
MWA_LONG = 116.67081523611111
MWA_HEIGHT = 377.827
VELC = 299792458.0
SOLAR2SIDEREAL = 1.00274
DS2R = 7.2722052166430399038487115353692196393452995355905e-5

from wodenpy.use_libwoden.woden_settings import Woden_Settings_Python
from wodenpy.use_libwoden.array_layout_struct import Array_Layout_Ctypes

##This call is so we can use it as a type annotation
# woden_struct_classes = Woden_Struct_Classes()
# Woden_Settings = woden_struct_classes.Woden_Settings

[docs] class Array_Layout_Python(object): """ A class to represent the layout of an array in Python. Attributes ---------- ant_X : None Antenna X coordinates (metres). ant_Y : None Antenna Y coordinates (metres). ant_Z : None Antenna Z coordinates (metres). X_diff_metres : None The difference in X coordinates (metres). Y_diff_metres : None The difference in Y coordinates (metres). Z_diff_metres : None The difference in Z coordinates (metres). ant_east : None Antenna east coordinates (metres) ant_north : None Antenna north coordinates (metres) ant_height : None Antenna height coordinates (metres) latitude : None The latitude of the array (radians). num_baselines : None The number of baselines. num_tiles : None The number of tiles. lst_base : None The base of the local sidereal time. """ def __init__(self): self.ant_X = None self.ant_Y = None self.ant_Z = None self.X_diff_metres = None self.Y_diff_metres = None self.Z_diff_metres = None self.ant_east = None self.ant_north = None self.ant_height = None self.latitude = None self.num_baselines = None self.num_tiles = None self.lst_base = None
[docs] def convert_ecef_to_enh(ecef_X : np.ndarray, ecef_Y : np.ndarray, ecef_Z : np.ndarray, lon : float, lat : float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ The array coords in a measurement set seem to be Earth Centred Earth Fixed coords I got the maths to go from ECEF to enh from this site: https://gssc.esa.int/navipedia/index.php/Transformations_between_ECEF_and_ENU_coordinates Parameters ---------- ecef_X : np.ndarray ECEF X coordinates (metres) ecef_Y : np.ndarray ECEF Y coordinates (metres) ecef_Z : np.ndarray ECEF Z coordinates (metres) lon : float Longitude of the array (radians) lat : float Latitude of the array (radians Returns ------- Tuple[np.ndarray, np.ndarray, np.ndarray] The east, north, and height coordinates """ ##Calculate the ecef xyz of the array position using the geodectic to ##geocentric function arrX, arrY, arrZ = gd2gc(1, lon, lat, 0) ##subtract it from the earth centred coords, as we want enh coords ##centred at the array X = ecef_X - arrX Y = ecef_Y - arrY Z = ecef_Z - arrZ ##convert xyz to enh east = -np.sin(lon)*X + np.cos(lon)*Y north = -np.cos(lon)*np.sin(lat)*X - np.sin(lon)*np.sin(lat)*Y + np.cos(lat)*Z height = np.cos(lon)*np.cos(lat)*X + np.sin(lon)*np.cos(lat)*Y + np.sin(lat)*Z return east, north, height
[docs] def enh2xyz(east : float, north : float, height : float, latitude : float) -> Tuple[float, float, float]: """ Takes local east, north, height coords for a given latitude (radians) and returns local X,Y,Z coords to put in the uvfits antenna table Parameters ---------- east : float Local east coorindate (metres) north : float Local north coorindate (metres) height : float Local height coorindate (metres) latitude : float Latitude of the array - defaults to MWA location (radians) Returns ------- X : float Local X antenna location Y : float Local Y antenna location Z : float Local Z antenna location """ sl = np.sin(latitude) cl = np.cos(latitude) X = -north*sl + height*cl Y = east Z = north*cl + height*sl return X,Y,Z
[docs] def RTS_precXYZ(rmat : np.ndarray, x : float, y : float, z : float, lmst : float, lmst2000 : float) -> Tuple[float, float, float]: """RTS magic for precessing local X,Y,Z from the current time frame back to J2000 Parameters ---------- rmat : np.ndarray 2D rotation matrix as output by `palpy.prenut` x : float Local X coord of antenna (tile) y : float Local Y coord of antenna (tile) z : float Local X coord of antenna (tile) lmst : float LST of the array in the current frame lmst2000 : float LST of the array in the J2000 frame Returns ------- Tuple[float, float, float] The precessed X,Y,Z coords """ sep = np.sin(lmst) cep = np.cos(lmst) s2000 = np.sin(lmst2000) c2000 = np.cos(lmst2000) ##/* rotate to frame with x axis at zero RA */ xpr = cep*x - sep*y ypr = sep*x + cep*y zpr = z ##/* apply the rotation matrix to account for precession/nutation */ xpr2 = (rmat[0][0])*xpr + (rmat[0][1])*ypr + (rmat[0][2])*zpr ypr2 = (rmat[1][0])*xpr + (rmat[1][1])*ypr + (rmat[1][2])*zpr zpr2 = (rmat[2][0])*xpr + (rmat[2][1])*ypr + (rmat[2][2])*zpr ##/* rotate back to frame with xp pointing out at lmst2000 */ xp = c2000*xpr2 + s2000*ypr2 yp = -s2000*xpr2 + c2000*ypr2 zp = zpr2 return xp, yp, zp
[docs] def RTS_PrecessXYZtoJ2000( array_layout : Array_Layout_Python, woden_settings : Woden_Settings_Python) -> Array_Layout_Python: """Given the populated `array_layout` and settings in `woden_settings`, use RTS functions to precess the array back to J2000, to account for the skymodel being in J2000. Parameters ---------- array_layout : Array_Layout_Python Populated array layout in the simulation epoch woden_settings : Woden_Settings_Python Populated settings, where the `woden_settings.lsts` should have been precessed back to J2000 already. Returns ------- Array_Layout_Ctypes The updated array layout with `array_layout.ant_X`, `array_layout.ant_Y`, and `array_layout.ant_Z` precessed back to J2000.""" ##Rotate the array positions for each time step - they have different ##mjd dates and current epoch lsts so yield different XYZ over time for time_step in range(woden_settings.num_time_steps): lst_J2000 = woden_settings.lsts[time_step] mjd_current = woden_settings.mjds[time_step] ##Add on the angle accrued by current time step to the base LST lst_current = woden_settings.lst_obs_epoch_base + time_step*woden_settings.time_res*SOLAR2SIDEREAL*DS2R ##Add half a time_res so we are sampling centre of each time step lst_current += 0.5*woden_settings.time_res*SOLAR2SIDEREAL*DS2R rmatpn = palpy.prenut(2000, mjd_current) J2000_transformation = np.transpose(rmatpn) for st in range(array_layout.num_tiles): ##Where do this XYZ for this time step start st_offset = time_step*array_layout.num_tiles ##Calculate antennas positions in the J2000 frame X_epoch = array_layout.ant_X[st_offset + st] Y_epoch = array_layout.ant_Y[st_offset + st] Z_epoch = array_layout.ant_Z[st_offset + st] X_prec, Y_prec, Z_prec = RTS_precXYZ(J2000_transformation, X_epoch, Y_epoch, Z_epoch, lst_current, lst_J2000 ) ##Update stored coordinates array_layout.ant_X[st_offset + st] = X_prec array_layout.ant_Y[st_offset + st] = Y_prec array_layout.ant_Z[st_offset + st] = Z_prec return array_layout
[docs] def setup_array_layout_python(woden_settings_python : Woden_Settings_Python, args : argparse.Namespace) -> Array_Layout_Python: #type: ignore """Given the populated Woden_Settings_Python class and the command line arguments, set up the `Array_Layout_Python` class, and fill it with the correct values, as well as creating the correct size arrays: - array_layout.ant_X - array_layout.ant_Y - array_layout.ant_Z - array_layout.X_diff_metres - array_layout.Y_diff_metres - array_layout.Z_diff_metres Parameters ---------- woden_settings_python : Woden_Settings_Python Populated Woden_Settings_Python class. args : argparse.Namespace Input arguments from the command line that have been checked using `wodenpy.use_libwoden.check_args.check_args`. Returns ------- array_layout : Array_Layout_Python Initialised Array_Layout_Python class. """ array_layout = Array_Layout_Python() array_layout.num_tiles = args.num_antennas woden_settings_python.num_ants = args.num_antennas array_layout.latitude = woden_settings_python.latitude array_layout.num_baselines = int((array_layout.num_tiles*(array_layout.num_tiles-1)) / 2) woden_settings_python.num_baselines = array_layout.num_baselines num_cross = woden_settings_python.num_baselines * woden_settings_python.num_time_steps * woden_settings_python.num_freqs woden_settings_python.num_cross = num_cross ##If user asked for auto-correlations, set this number to total autos to work out ##Otherwise stick it to zero if woden_settings_python.do_autos: num_autos = woden_settings_python.num_ants * woden_settings_python.num_time_steps * woden_settings_python.num_freqs else: num_autos = 0 woden_settings_python.num_autos = num_autos woden_settings_python.num_visis = num_cross + num_autos array_layout.ant_X = np.empty(woden_settings_python.num_time_steps*args.num_antennas, dtype=np.float64) array_layout.ant_Y = np.empty(woden_settings_python.num_time_steps*args.num_antennas, dtype=np.float64) array_layout.ant_Z = np.empty(woden_settings_python.num_time_steps*args.num_antennas, dtype=np.float64) array_layout.X_diff_metres = np.empty(woden_settings_python.num_time_steps*array_layout.num_baselines, dtype=np.float64) array_layout.Y_diff_metres = np.empty(woden_settings_python.num_time_steps*array_layout.num_baselines, dtype=np.float64) array_layout.Z_diff_metres = np.empty(woden_settings_python.num_time_steps*array_layout.num_baselines, dtype=np.float64) return array_layout
[docs] def calc_XYZ_diffs(woden_settings_python : Woden_Settings_Python, args : argparse.Namespace, logger : Logger = False) -> Array_Layout_Python: """ Populates an Array_Layout_Ctypes class with the instrument layout, given the command line arguments. Calculates the differences in X, Y, and Z coordinates between all pairs of antennas in the array. Parameters ---------- woden_settings_python: Woden_Settings_Python An populated Woden_Settings instance args: argparse.Namespace The command line arguments checked by `wodenpy.wodenpy_setup.check_args` logger: Logger A logger instance to log messages Returns ------- array_layout - An instance of the Structure class containing the X, Y, and Z differences between all pairs of antennas in the array at each time step. """ if logger == False: logger = simple_logger() array_layout = setup_array_layout_python(woden_settings_python, args) # ##When precessing the array back to J2000, we apply a different precession # ##based on mjd of each time step. So store enough copies of XYZ for # ##all time steps for time_step in range(woden_settings_python.num_time_steps): for i in range(array_layout.num_tiles): ##Where do this XYZ for this time step start st_offset = time_step*array_layout.num_tiles ##Convert to local X,Y,Z x, y, z = enh2xyz(args.east[i], args.north[i], args.height[i], woden_settings_python.latitude_obs_epoch_base) array_layout.ant_X[st_offset + i] = x array_layout.ant_Y[st_offset + i] = y array_layout.ant_Z[st_offset + i] = z if woden_settings_python.do_precession: logger.info("Precessing array layout to J2000") array_layout = RTS_PrecessXYZtoJ2000(array_layout, woden_settings_python) else: logger.info("Not precessing the array layout to J2000") for time_step in range(woden_settings_python.num_time_steps): ##Where do these XYZ for this time step start ant_off = array_layout.num_tiles*time_step ##Where do these baselines for this time step start base_off = array_layout.num_baselines*time_step baseline_ind = 0 for ant1 in range(array_layout.num_tiles - 1): for ant2 in range(ant1 + 1, array_layout.num_tiles): array_layout.X_diff_metres[base_off + baseline_ind] = array_layout.ant_X[ant_off + ant1] - array_layout.ant_X[ant_off + ant2] array_layout.Y_diff_metres[base_off + baseline_ind] = array_layout.ant_Y[ant_off + ant1] - array_layout.ant_Y[ant_off + ant2] array_layout.Z_diff_metres[base_off + baseline_ind] = array_layout.ant_Z[ant_off + ant1] - array_layout.ant_Z[ant_off + ant2] baseline_ind += 1 return array_layout
[docs] def convert_array_layout_to_ctypes(array_layout_python : Array_Layout_Python, array_layout_ctypes : Array_Layout_Ctypes) -> Array_Layout_Ctypes: """Just convert things we need for C/GPU functions, not every field""" array_layout_ctypes.X_diff_metres = array_layout_python.X_diff_metres.ctypes.data_as(POINTER(c_double)) array_layout_ctypes.Y_diff_metres = array_layout_python.Y_diff_metres.ctypes.data_as(POINTER(c_double)) array_layout_ctypes.Z_diff_metres = array_layout_python.Z_diff_metres.ctypes.data_as(POINTER(c_double)) array_layout_ctypes.latitude = array_layout_python.latitude array_layout_ctypes.num_baselines = array_layout_python.num_baselines array_layout_ctypes.num_tiles = array_layout_python.num_tiles # array_layout_ctypes.lst_base = array_layout_python.lst_base return array_layout_ctypes