"""Wrappers around the EveryBeam library to calculate primary beam values.
The core functionality is in the C++ library `libuse_everybeam.so`, which in
turn contains wrappers about the EveryBeam library. Functions here call the
C++ library to calculate the primary beam values, and return them to Python
usable data types, via `ctypes`.
See https://everybeam.readthedocs.io/en/latest/index.html for more information
on the EveryBeam library."""
from time import time
import numpy as np
from astropy.coordinates import ITRS, SkyCoord, AltAz, EarthLocation
from astropy.time import Time, TimeDelta
import astropy.units as u
import argparse
from wodenpy.use_libwoden.create_woden_struct_classes import Woden_Struct_Classes
import erfa
from typing import Union, Tuple
import concurrent.futures
from line_profiler import profile
import os
import astropy
import wodenpy
import importlib_resources
from wodenpy.use_libwoden.skymodel_structs import c_double_complex
from ctypes import c_char_p, c_int, c_double, POINTER, c_bool
import ctypes
from wodenpy.wodenpy_setup.woden_logger import simple_logger
from logging import Logger
from multiprocessing import Process, Queue
from wodenpy.use_libwoden.beam_settings import BeamTypes
import ctypes
from wodenpy.array_layout.create_array_layout import convert_ecef_to_enh, convert_enh_to_ecef
from erfa import gc2gd, gd2gc
##This call is so we can use it as a type annotation
woden_struct_classes = Woden_Struct_Classes()
Source_Catalogue = woden_struct_classes.Source_Catalogue
Woden_Settings = woden_struct_classes.Woden_Settings
[docs]
def worker_get_num_stations(ms_path : str, q : Queue) -> int:
"""
Worker function to get the number of stations in the measurement set
given by `ms_path`, and put the in the queue `q`.
This is done in a separate process because it uses `python-casacore`.
Running `import casacore` creates a `c++` state with a number of library paths
and casacore global variables. If `libuse_everybeam.so` has been built with
against a different casacore library, this causes epic intermittent errors
and segfaults. To void this, run the import in this separate thread process,
which isolates the state of the `c++` library.
TODO: All calls to `python-casacore` should be done via direct calls to
casacore in c++ in `libuse_everybeam.so`. This removes all conflicts;
however, this is a large task and will take time to implement. For now,
this is a workaround to avoid the segfaults and errors.
Parameters
----------
ms_path : str
Path to the measurement set.
q : Queue
Queue to put the number of stations in.
"""
from casacore.tables import table
with table(ms_path + '/ANTENNA') as t:
num_stations = len(t)
q.put(num_stations)
[docs]
def get_num_stations(ms_path : str) -> int:
"""
Get the number of stations in a measurement set given by `ms_path`.
Runs :func:`~worker_get_num_stations` in a separate process to
avoid `python-casacore` clashing with WODEN-built `libuse_everybeam.so`.
See :func:`~worker_get_num_stations` for more details.
Parameters
----------
ms_path : str
Path to the measurement set.
Returns
-------
int
Number of stations in the measurement set.
"""
q = Queue()
p = Process(target=worker_get_num_stations, args=(ms_path, q,))
p.start()
p.join()
num_stations = q.get()
return num_stations
[docs]
def enu_basis(lat : float, lon : float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate the ENU basis vectors in terms of earth-centred earth-fixed coords,
for a given latitude `lat` and longitude `lon`.
Parameters
----------
lat : float
Latitude in radians.
lon : float
Longitude in radians.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
Tuple of the east, north, and up basis vectors.
"""
east = np.array([-np.sin(lon), np.cos(lon), 0.0])
north = np.array([-np.sin(lat) * np.cos(lon),
-np.sin(lat) * np.sin(lon),
np.cos(lat)])
up = np.array([np.cos(lat) * np.cos(lon),
np.cos(lat) * np.sin(lon),
np.sin(lat)])
return east, north, up
[docs]
def calc_coordinate_axes(positions : np.ndarray,
central_lat : float, central_lon : float):
"""
Internal to a LOFAR measurement set, inside the `LOFAR_ANTENNA_FIELD` table,
there is a column called `COORDINATE_AXES`. This has something to do with
defining what the normal direction of the incoming radation is relative
to a particular station. If moving a measurement set from one lat/long to
another, this must be recalculated. Below is a best guess by JLBLine at how
to do this. See the following doc string copied from EveryBeam which
describes the coordinate system, from the
`EveryBeam/cpp/antenna.h::Antenna::CoordinateSystem` function
.. seealso:: from the EveryBeam documentation:
Station coordinate system.
A right handed, cartesian, local coordinate system with coordinate axes
`p`, `q`, and `r` is associated with each antenna field.
The r-axis is orthogonal to the antenna field, and points towards the
local pseudo zenith.
The q-axis is the northern bisector of the X and Y dipoles, i.e.
it is the reference direction from which the orientation of the dual
dipole antennae is determined. The q-axis points towards the North at
the core. At remote sites it is defined as the intersection of the
antenna field plane and a plane parallel to the meridian plane at the
core. This ensures the reference directions at all sites are similar.
The p-axis is orthogonal to both other axes, and points towards the East
at the core.
The axes and origin of the anntena field coordinate system are expressed
as vectors in the geocentric, cartesian, ITRF coordinate system, in
meters.
"LOFAR Reference Plane and Reference Direction", M.A. Brentjens,
LOFAR-ASTRON-MEM-248.
.. todo::
find out how to do this properly. As described above, everything is
supposed to be calculated relative to the array centre, or for remote stations,
"the intersection of the antenna field plane and a plane parallel to the
meridian plane at the core". Wtf does that mean? Below, I calculate things
relative to the local ENU for each station. This means the calculations
for remote stations are way off, as the curvature of the earth changes
what local up is the further you get from the core. If people need accurate
remote stations, this needs to be fixed (or just don't move the telescope).
Parameters
----------
positions : np.ndarray
Array of station positions in ECEF coordinates. Shape (num_positions, 3).
central_lat : float
Latitude of the array centre in radians.
central_lon : float
Longitude of the array centre in radians.
Returns
-------
coordinate_axes : np.ndarray
Array of coordinate axes for each station. Shape (num_positions, 3, 3).
coordinate_axes[:, 0, :] are the `p` vectors, coordinate_axes[:, 1, :]
are the `q` vectors, and coordinate_axes[:, 2, :] are the `r` vectors,
as described in the docstring above.
"""
arrX, arrY, arrZ = gd2gc(1, central_lon, central_lat, 0)
arr_centre = np.array([arrX, arrY, arrZ])
num_positions = positions.shape[0]
coordinate_axes = np.zeros((num_positions, 3, 3), dtype=np.float64)
for pos in range(num_positions):
# delta = arr_centre - positions[pos, :]
delta = positions[pos, :] - arr_centre
lon, lat, height = gc2gd(1, positions[pos, :])
east, north, up = enu_basis(lat, lon)
##IF doing everything relative to the array centre, then
##use the array centre as the ENU basis
# east, north, up = enu_basis(central_lat, central_lon)
east_comp = np.dot(delta, east)
north_comp = np.dot(delta, north)
up_comp = np.dot(delta, up)
# Reconstruct component vectors
east_vec = east_comp * east
north_vec = north_comp * north
up_vec = up_comp * up
east_vec /= np.linalg.norm(east_vec)
north_vec /= np.linalg.norm(north_vec)
up_vec /= np.linalg.norm(up_vec)
if lon < central_lon:
east_vec *= -1
if lat < central_lat:
north_vec *= -1
coordinate_axes[pos, 0, :] = east_vec
coordinate_axes[pos, 1, :] = north_vec
coordinate_axes[pos, 2, :] = up_vec
return coordinate_axes
[docs]
def worker_create_filtered_ms(ms_path : str, new_ms_path : str,
ra0 : float, dec0 : float,
recentre_array : bool = False,
current_latitude : float = False,
current_longitude : float = False,
new_latitude : float = False,
new_longitude : float = False) -> None:
"""
Create a reduced measurement to path `new_ms_path` with only the first time
and frequency channel of the measurement set `ms_path`. Set the delay and
reference directions to the given RA/Dec `ra0,dec0`.
The `EveryBeam` functions that `WODEN` wraps read the beam centre value
directly from the measurement set. Rather than downloading a specific
MS which exactly the pointing we want, we just make the smallest possible
copy of the MS with the first time and frequency channel, and set
the delay and reference direction to what we want.
If `recentre_array` is True, also perform the necessary calculations and
updates to move the array centre from `current_latitude` and `current_longitude`
to `new_latitude` and `new_longitude`.
This function is run in a separate process to avoid `python-casacore`
clashing with WODEN-built `libuse_everybeam.so`. Running `import casacore`
creates a `c++` state with a number of library paths and casacore global variables.
If `libuse_everybeam.so` has been built with against a different casacore library,
this causes epic intermittent errors and segfaults. To void this, run the import
in this separate thread process, which isolates the state of the `c++` library.
Parameters
----------
ms_path : str
Path to the original measurement set.
new_ms_path : str
Path to the new measurement set.
ra0 : float
Right ascension of the beam centre in radians.
dec0 : float
Declination of the beam centre in radians.
recentre_array : bool, optional
Whether to recentre the array to the new latitude and longitude.
Defaults to False. If True, `current_latitude`, `current_longitude`,
`new_latitude`, and `new_longitude` must be set.
current_latitude : float, optional
Current latitude of the array in radians. Required if `recentre_array`
is True.
current_longitude : float, optional
Current longitude of the array in radians. Required if `recentre_array`
is True.
new_latitude : float, optional
New latitude of the array in radians. Required if `recentre_array`
is True.
new_longitude : float, optional
New longitude of the array in radians. Required if `recentre_array`
is True.
"""
from casacore.tables import table, taql
##Find out telescope type
with table(ms_path + "/OBSERVATION", ack=False) as tb:
telescope_name = tb.getcol("TELESCOPE_NAME")[0] # Get first entry
##First up, read in original MS, and filter it to only have
## the first time and frequency channel
with table(ms_path, readonly=True) as ms:
time_col = ms.getcol("TIME")
ddid_col = ms.getcol("DATA_DESC_ID")
first_time = time_col[0] # First timestamp
first_ddid = ddid_col[0] # First frequency channel
# Use TaQL (Table Query Language) to select the subset efficiently
query = f"SELECT * FROM {ms_path} WHERE TIME = {first_time} AND DATA_DESC_ID = {first_ddid}"
filtered_ms = taql(query)
filtered_ms.copy(new_ms_path, deep=True)
filtered_ms.close()
with table(new_ms_path+'::FIELD', readonly=False) as field_table:
field_table.putcol('PHASE_DIR', np.array([[[ra0, dec0]]]))
field_table.putcol('DELAY_DIR', np.array([[[ra0, dec0]]]))
field_table.putcol('REFERENCE_DIR', np.array([[[ra0, dec0]]]))
if telescope_name == "LOFAR":
field_table.putcol('LOFAR_TILE_BEAM_DIR', np.array([[[ra0, dec0]]]))
if recentre_array:
with table(ms_path + '/ANTENNA', readonly=True) as t:
num_ants = len(t)
num_antennas = num_ants
ant_locations = np.array([t.getcell('POSITION', ant) for ant in range(num_ants)])
##convert from ECEF to ENH, as WODEN starts with enh coords
east, north, height = convert_ecef_to_enh(ant_locations[:,0],
ant_locations[:,1], ant_locations[:,2],
current_longitude, current_latitude)
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_longitude, new_latitude)
with table(new_ms_path+'::ANTENNA', readonly=False) as antenna:
new_locations = np.zeros((num_antennas, 3), dtype=np.float64)
new_locations[:, 0] = ecef_X
new_locations[:, 1] = ecef_Y
new_locations[:, 2] = ecef_Z
antenna.putcol('POSITION', new_locations)
if telescope_name == "LOFAR":
antenna.putcol('LOFAR_PHASE_REFERENCE', new_locations)
if telescope_name == "LOFAR":
with table(new_ms_path+'::LOFAR_ANTENNA_FIELD', readonly=False) as antenna:
lof_ant_position = antenna.getcol('POSITION')
east, north, height = convert_ecef_to_enh(lof_ant_position[:,0],
lof_ant_position[:,1], lof_ant_position[:,2],
current_longitude, current_latitude)
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_longitude, new_latitude)
new_locations = np.zeros((num_antennas, 3), dtype=np.float64)
new_locations[:, 0] = ecef_X
new_locations[:, 1] = ecef_Y
new_locations[:, 2] = ecef_Z
antenna.putcol('POSITION', new_locations)
new_coord_axes = calc_coordinate_axes(new_locations,
new_latitude,
new_longitude)
with table(new_ms_path+'::LOFAR_ANTENNA_FIELD', readonly=False) as antenna:
coord_axes = antenna.getcol('COORDINATE_AXES')
antenna.putcol('COORDINATE_AXES', new_coord_axes)
arrX, arrY, arrZ = gd2gc(1, new_longitude, new_latitude, 0)
arr_centre = np.array([arrX, arrY, arrZ])
for station in range(num_antennas):
offsets = antenna.getcell('ELEMENT_OFFSET', station)
offsets += ant_locations[station, :]
cur_lon, cur_lat, height = gc2gd(1, ant_locations[station, :])
east, north, height = convert_ecef_to_enh(offsets[:,0],
offsets[:,1], offsets[:,2],
cur_lon, cur_lat)
new_lon, new_lat, height = gc2gd(1, new_locations[station, :])
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_lon, new_lat)
new_offsets = np.zeros_like(offsets)
new_offsets[:, 0] = ecef_X - new_locations[station, 0]
new_offsets[:, 1] = ecef_Y - new_locations[station, 1]
new_offsets[:, 2] = ecef_Z - new_locations[station, 2]
antenna.putcell('ELEMENT_OFFSET', station, new_offsets)
for station in range(num_antennas):
offsets = antenna.getcell('TILE_ELEMENT_OFFSET', station)
offsets += ant_locations[station, :]
cur_lon, cur_lat, height = gc2gd(1, ant_locations[station, :])
east, north, height = convert_ecef_to_enh(offsets[:,0],
offsets[:,1], offsets[:,2],
cur_lon, cur_lat)
new_lon, new_lat, height = gc2gd(1, new_locations[station, :])
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_lon, new_lat)
new_offsets = np.zeros_like(offsets)
new_offsets[:, 0] = ecef_X - new_locations[station, 0]
new_offsets[:, 1] = ecef_Y - new_locations[station, 1]
new_offsets[:, 2] = ecef_Z - new_locations[station, 2]
antenna.putcell('TILE_ELEMENT_OFFSET', station, new_offsets)
[docs]
def create_filtered_ms(ms_path : str, new_ms_path : str,
ra0 : float, dec0 : float,
recentre_array : bool = False,
current_latitude : float = False,
current_longitude : float = False,
new_latitude : float = False,
new_longitude : float = False):
"""
Create a reduced measurement to path `new_ms_path` with only the first time
and frequency channel of the measurement set `ms_path`. Set the delay and
reference directions to the given RA/Dec `ra0,dec0`. If `recentre_array`
is True, also perform the necessary calculations and updates to move the
array centre from `current_latitude` and `current_longitude` to
`new_latitude` and `new_longitude`.
Runs :func:`~worker_create_filtered_ms` in a separate process to
avoid `python-casacore` clashing with WODEN-built `libuse_everybeam.so`.
See :func:`~worker_create_filtered_ms` for more details.
Parameters
----------
ms_path : str
Path to the original measurement set.
new_ms_path : str
Path to the new measurement set.
ra0 : float
Right ascension of the beam centre in radians.
dec0 : float
Declination of the beam centre in radians.
recentre_array : bool, optional
Whether to recentre the array to the new latitude and longitude.
Defaults to False. If True, `current_latitude`, `current_longitude`,
`new_latitude`, and `new_longitude` must be set.
current_latitude : float, optional
Current latitude of the array in radians. Required if `recentre_array`
is True.
current_longitude : float, optional
Current longitude of the array in radians. Required if `recentre_array`
is True.
new_latitude : float, optional
New latitude of the array in radians. Required if `recentre_array`
is True.
new_longitude : float, optional
New longitude of the array in radians. Required if `recentre_array`
is True.
"""
p = Process(target=worker_create_filtered_ms,
args=(ms_path, new_ms_path, ra0, dec0,
recentre_array, current_latitude,
current_longitude, new_latitude,
new_longitude))
p.start()
p.join() # module is fully cleaned when the process exits
[docs]
def check_ms_telescope_type_matches_element_response(ms_path : str,
element_response_model : str = 'default',
logger : Logger = False) -> Tuple[str, str]:
"""
Check the requested `element_response_model` is compatible with the
telescope type in the measurement set. If they don't match, set the element
response model to the default for the telescope type. This function uses
the `check_ms_telescope_type` function in `libuse_everybeam.so`.
- LOFAR: 'hamaker', 'hamakerlba', 'lobes' or 'default'
- OSKAR: 'skala40_wave' or 'default'
Please note that 'hamakerlba', 'lobes' are not tested or supported in
WODEN, and are not recommended for use. They are included for completeness.
Furthermore, please note that WODEN doesn't use a base measurement set
for the MWA primary beam, and so will throw an error here to make sure
you're not doing something the c++ code can't handle.
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str, optional
Requested Element response model to use. Defaults to 'default'.
logger : Logger, optional
Logger to use. Defaults to False; if False, create a new simple logger instance.
Returns
-------
Tuple[str, str]
Tuple of the telescope type and the element response model to use.
"""
if not logger:
logger = simple_logger()
woden_path = importlib_resources.files(wodenpy).joinpath(f"libwoden_double.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
check_ms_telescope_type = woden_lib.check_ms_telescope_type
check_ms_telescope_type.argtypes = [c_char_p]
check_ms_telescope_type.restype = c_char_p
# print(f"MS path: {type(ms_path)}, {ms_path}")
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
telescope_type = check_ms_telescope_type(ms_path_ctypes).decode('utf-8')
use_element_response_model = False
if telescope_type == 'MWA':
exit_message = "WODEN does not support using a measurement set for the MWA primary beam model. "
exit_message += "It instead calls the EveryBeam Beam2016Implementation code "
exit_message += "directly to input az,za, and uses the array layout from either "
exit_message += "an MWA metafits file, or a user-defined array layout. "
logger.error(exit_message)
exit(exit_message)
elif telescope_type == 'LOFAR':
if element_response_model == 'default':
use_element_response_model = "hamaker"
else:
if element_response_model not in ['hamaker', 'hamakerlba', 'lobes']:
logger.warning(f"Measurement set telescope type is LOFAR, but "
f"element_response_model was set to {element_response_model}, "
"which is not one of ['hamaker', 'hamakerlba','lobes']. "
"Defaulting to 'hamaker'")
use_element_response_model = "hamaker"
else:
use_element_response_model = element_response_model
elif telescope_type == 'OSKAR':
if element_response_model == 'default':
use_element_response_model = "skala40_wave"
else:
if element_response_model != 'skala40_wave':
logger.warning(f"Measurement set telescope type is OSKAR, but element_response_model "
f"was set to {element_response_model}. Changing to 'skala40_wave'")
use_element_response_model = "skala40_wave"
else:
use_element_response_model = element_response_model
else:
use_element_response_model = element_response_model
exit_message = f"Measurement set telescope type is {telescope_type}. "
exit_message += "WODEN currently supports LOFAR, OSKAR EveryBeam from a measurement set. "
exit_message += "Cannot proceed as unknown behaviour will happen in C++ code. "
exit_message += "Exiting now."
logger.error(exit_message)
exit(exit_message)
return telescope_type, use_element_response_model
[docs]
def convert_common_args_to_everybeam_args(ms_path : str, coeff_path : str,
element_response_model : str,
station_idxs : np.ndarray,
freqs : np.ndarray,
mjd_sec_times : np.ndarray) -> Tuple:
"""
Convert input arugments common to all EveryBeam primary beam models
into `ctypes` equivalents to pass to the C++ library.
Parameters
----------
ms_path : str
Path to the measurement set.
coeff_path : str
Path to the coefficients file (only needed for MWA, pass the path
to the hdf5 FEE file).
element_response_model : str
Element response model to use. Can be 'MWA', 'hamaker', 'skala40_wave',
or 'default'.
station_idxs : np.ndarray
Array of station indices to use.
freqs : np.ndarray
Array of frequencies to use.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
Returns
-------
Tuple[ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_double),ctypes.POINTER(ctypes.c_double)]
Tuple of the converted arguments in the format required by the
EveryBeam library. Order of outputs is identical to order of inputs arguments.
"""
num_stations = len(station_idxs)
num_times = len(mjd_sec_times)
num_freqs = len(freqs)
mjd_sec_times_ctypes = (ctypes.c_double * num_times)()
for i in range(num_times):
mjd_sec_times_ctypes[i] = mjd_sec_times[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
station_idxs_ctypes = (ctypes.c_int * num_stations)()
for i in range(num_stations):
station_idxs_ctypes[i] = station_idxs[i]
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
element_response_model_ctypes = ctypes.c_char_p(element_response_model.encode('utf-8'))
coeff_path_ctypes = ctypes.c_char_p(coeff_path.encode('utf-8'))
return ms_path_ctypes, coeff_path_ctypes, element_response_model_ctypes, station_idxs_ctypes, freqs_ctypes, mjd_sec_times_ctypes
[docs]
def run_everybeam(ras: np.ndarray, decs: np.ndarray,
freqs: np.ndarray,
ms_path : str = False,
beam_ra0: float = np.nan, beam_dec0: float = np.nan,
times: np.ndarray[Time] = False,
station_ids: np.ndarray = False,
element_response_model='default',
apply_beam_norms: bool = True,
mwa_coeff_path : str = False,
mwa_dipole_delays: np.ndarray = False,
mwa_dipole_amps: np.ndarray = np.ones(16),
j2000_latitudes: np.ndarray = False,
j2000_lsts: np.ndarray = False,
iau_order: bool = False,
element_only: bool = False,
parallactic_rotate: bool = False,
logger : Logger = False) -> np.ndarray:
"""
Calculate the Jones matrices for a given set of coordinates, times,
frequencies, and station ids using the EveryBeam library. Currently,
LOFAR and OSKAR beams require a measurement set as an input. The MWA beam
does not need a measurement set, but does need the path to the FEE
coefficients hdf5 file.
For all beam types, `ras`, `decs`, `freqs` are required.
For LOFAR/OSKAR beams, `ms_path`, `beam_ra0`, `beam_dec0`, `times`,
and `station_ids` are required.
For MWA beams, `mwa_coeff_path`, `mwa_dipole_delays`, `j2000_latitudes`,
and `j2000_lsts` are required. `j2000_latitudes` should be the array
latitude as precessed back to J2000, with `j2000_lsts` being the
matching LST in J2000.
The returned Jones matrices will have shape
(num_stations, num_times, num_freqs, num_radec, 2, 2). The MWA beam will
always default to `num_stations=1` as dipole flagging has not been
implemented in the WODEN wrapper. All other beams will have
`num_stations=len(station_ids)`.
Parameters
------------
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
freqs : np.ndarray
Array of frequencies (Hz)
ms_path : str
Path to the measurement set to load the EveryBeam telescope from.
beam_ra0 : float
Right ascension (radians) of the beam pointing centre; the beam will
be centered on this position and therefore move with time. Does not
apply to MWA beam as that is locked in az,za.
beam_dec0 : float
Declination (radians) of the beam pointing centre; the beam will
be centered on this position and therefore move with time. Does not
apply to MWA beam as that is locked in az,za.
times : np.ndarray
Array of `astropy.Time` observation times.
station_ids : np.ndarray
Array of station IDs. Will calculation n_stations=len(station_ids)
worth of station beams (does not apply to MWA, which will only
calculate a single beam).
element_response_model : str, optional
The Everybeam element response model to use. Defaults to 'hamaker'.
Avaible options are 'hamaker' (LOFAR), 'skala40_wave' (OSKAR)
apply_beam_norms : bool, optional
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response (does not apply
to MWA beam).
mwa_coeff_path : str
Path to the coefficients file (needed for MWA, pass the path
to the hdf5 FEE file.)
mwa_dipole_delays : np.ndarray
Array of dipole delays for the MWA stations. Must be of length 16.
mwa_dipole_amps : np.ndarray, optional
Array of dipole amplitudes for the MWA stations. Must be of length 16.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates (needed for MWA beam az,za calculations).
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates (needed for MWA beam az,za calculations).
iau_order : bool, optional
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole
and jones[1,1] to EW. If False, jones[0,0] is EW. Defaults to False
element_only : bool, optional
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to False.
logger : Logger, optional
Logger to use. Defaults to False; if False, create a new simple logger instance.
Returns
--------
np.ndarray
The calculated Jones matrices with shape (num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
if type(ms_path) == bool and type(mwa_coeff_path) == bool:
logger.error("Either `ms_path` or `mwa_coeff_path` must be provided. Exiting now.")
exit()
if not logger:
logger = simple_logger()
if mwa_coeff_path:
##Check coeff path exists
if not os.path.exists(mwa_coeff_path):
logger.error(f"MWA coefficient path mwa_coeff_path={mwa_coeff_path} does not exist. Exiting now.")
exit()
if type(j2000_latitudes) == bool or type(j2000_lsts) == bool or type(mwa_dipole_delays) == bool:
logger.error("j2000_latitudes, j2000_lsts, and mwa_dipole_delays must be provided for MWA beam calculations. Exiting now.")
exit()
jones = run_mwa_beam(mwa_dipole_delays,
mwa_coeff_path,
ras, decs,
j2000_lsts, j2000_latitudes, freqs,
amps=mwa_dipole_amps,
apply_beam_norms=apply_beam_norms,
parallactic_rotate=parallactic_rotate,
iau_order=iau_order,
element_only=element_only)
else:
if type(ms_path) == bool:
logger.error("ms_path must be provided for LOFAR and OSKAR beam calculations. Exiting now.")
exit()
telescope_type, checked_element_response_model = check_ms_telescope_type_matches_element_response(ms_path,
element_response_model,
logger)
if type(station_ids) == bool or type(times) == bool or type(beam_ra0) == bool or type(beam_dec0) == bool:
logger.error("station_ids, times, beam_ra0, and beam_dec0 must be provided for LOFAR and OSKAR beam calculations. Exiting now.")
exit()
mjd_sec_times = np.array([time.mjd * 86400.0 for time in times])
if telescope_type == 'LOFAR':
jones = run_lofar_beam(ms_path, checked_element_response_model,
station_ids,
beam_ra0, beam_dec0,
ras, decs, mjd_sec_times, freqs,
apply_beam_norms=apply_beam_norms,
parallactic_rotate=parallactic_rotate,
iau_order=iau_order,
element_only=element_only)
elif telescope_type == 'OSKAR':
jones = run_oskar_beam(ms_path, checked_element_response_model,
station_ids,
beam_ra0, beam_dec0,
ras, decs, mjd_sec_times, freqs,
apply_beam_norms=apply_beam_norms,
parallactic_rotate=parallactic_rotate,
iau_order=iau_order,
element_only=element_only)
else:
logger.error("Unknown telescope type. Exiting now.")
exit()
return jones
[docs]
def run_everybeam_thread(num_threads : int, thread_id : int,
ras: np.ndarray, decs: np.ndarray,
freqs: np.ndarray,
ms_path : str = False,
beam_ra0: float = np.nan, beam_dec0: float = np.nan,
times: np.ndarray[Time] = False,
station_ids: np.ndarray = False,
element_response_model='default',
apply_beam_norms: bool = True,
mwa_coeff_path : str = False,
mwa_dipole_delays: np.ndarray = False,
mwa_dipole_amps: np.ndarray = np.ones(16),
j2000_latitudes: np.ndarray = False,
j2000_lsts: np.ndarray = False,
iau_order: bool = False,
element_only: bool = False,
parallactic_rotate: bool = False,
logger : Logger = False) -> Tuple[np.ndarray, int]:
"""
Thread function called by `run_everybeam_over_threads` to calculate the
EveryBeam response in parrallel. Calls `run_everybeam` with a subset of
the coordinates; see `run_everybeam` for more details of the parameters.
Creates a new EveryBeam telescope object from `ms_path` or a new EveryBeam
MWA Tile beam from `mwa_coeff_path` for each thread.
This has to be done because `concurrent.futures.ProcessPoolExecutor` has
to pickle the function and all it's arguments, and EveryBeam objects can't
be pickled. This is somewhat wasteful but I can't work out a better way
to make things parallel.
Parameters
------------
num_threads : int
Number of threads being in call by `run_everybeam_over_threads`.
thread_id : int
ID of the current thread. Useds to work out what chunk of `ras` and `decs`
to process.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
freqs : np.ndarray
Array of frequencies (Hz)
ms_path : str
Path to the measurement set to load the EveryBeam telescope from.
beam_ra0 : float
Right ascension (radians) of the beam pointing centre; the beam will
be centered on this position and therefore move with time. Does not
apply to MWA beam as that is locked in az,za.
beam_dec0 : float
Declination (radians) of the beam pointing centre; the beam will
be centered on this position and therefore move with time. Does not
apply to MWA beam as that is locked in az,za.
times : np.ndarray
Array of `astropy.Time` observation times.
station_ids : np.ndarray
Array of station IDs. Will calculation n_stations=len(station_ids)
worth of station beams (does not apply to MWA, which will only
calculate a single beam).
element_response_model : str, optional
The Everybeam element response model to use. Defaults to 'hamaker'.
Avaible options are 'hamaker' (LOFAR), 'skala40_wave' (OSKAR)
apply_beam_norms : bool, optional
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response (does not apply
to MWA beam).
mwa_coeff_path : str
Path to the coefficients file (needed for MWA, pass the path
to the hdf5 FEE file.)
mwa_dipole_delays : np.ndarray
Array of dipole delays for the MWA stations. Must be of length 16.
mwa_dipole_amps : np.ndarray, optional
Array of dipole amplitudes for the MWA stations. Must be of length 16.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates (needed for MWA beam az,za calculations).
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates (needed for MWA beam az,za calculations).
iau_order : bool, optional
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole
and jones[1,1] to EW. If False, jones[0,0] is EW. Defaults to False
element_only : bool, optional
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to False.
logger : Logger, optional
Logger to use. Defaults to False; if False, create a new simple logger instance.
Returns
--------
Tuple[np.ndarray, int]
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords_in_thread, 2, 2), as
well as the thread ID. Use the thread ID to insert this thread output
into the correct place in the final Jones matrix.
"""
num_coords = len(ras)
coords_per_thread = int(np.ceil(num_coords / num_threads))
low_coord = thread_id * coords_per_thread
high_coord = (thread_id + 1) * coords_per_thread
# print(f"Thread {thread_id} processing coords {low_coord} to {high_coord}")
jones = run_everybeam(ras[low_coord:high_coord],
decs[low_coord:high_coord],
freqs,
ms_path=ms_path,
times=times,
beam_ra0=beam_ra0, beam_dec0=beam_dec0,
mwa_coeff_path=mwa_coeff_path,
mwa_dipole_delays=mwa_dipole_delays,
mwa_dipole_amps=mwa_dipole_amps,
j2000_latitudes=j2000_latitudes,
j2000_lsts=j2000_lsts,
station_ids=station_ids,
element_response_model=element_response_model,
apply_beam_norms=apply_beam_norms,
iau_order=iau_order,
element_only=element_only,
parallactic_rotate=parallactic_rotate,
logger=logger)
# print(f"Thread {thread_id} finished")
return jones, thread_id
[docs]
def run_everybeam_over_threads(num_threads : int,
ras: np.ndarray, decs: np.ndarray,
freqs: np.ndarray,
ms_path : str = False,
beam_ra0: float = np.nan, beam_dec0: float = np.nan,
times: np.ndarray[Time] = False,
station_ids: np.ndarray = False,
element_response_model='default',
apply_beam_norms: bool = True,
mwa_coeff_path : str = False,
mwa_dipole_delays: np.ndarray = False,
mwa_dipole_amps: np.ndarray = np.ones(16),
j2000_latitudes: np.ndarray = False,
j2000_lsts: np.ndarray = False,
iau_order: bool = False,
element_only: bool = False,
parallactic_rotate: bool = False,
logger : Logger = False):
"""
Runs `run_everybeam` in parallel over `num_threads` threads, using
`concurrent.futures.ProcessPoolExecutor`. See `run_everybeam` for more
details of what each parameter does.
Creates a new EveryBeam telescope object from `ms_path` or a new EveryBeam
MWA Tile beam from `mwa_coeff_path` for each thread.
This has to be done because `concurrent.futures.ProcessPoolExecutor` has
to pickle the function and all it's arguments, and EveryBeam objects can't
be pickled. This is somewhat wasteful but I can't work out a better way
to make things parallel (without using MPI on the other end of things).
Parameters
------------
num_threads : int
Number of threads being in call by `run_everybeam_over_threads`.
ras : np.ndarray
Right ascensions of the coordinates in radians.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
freqs : np.ndarray
Array of frequencies (Hz)
ms_path : str
Path to the measurement set to load the EveryBeam telescope from.
beam_ra0 : float
Right ascension (radians) of the beam pointing centre; the beam will
be centered on this position and therefore move with time. Does not
apply to MWA beam as that is locked in az,za.
beam_dec0 : float
Declination (radians) of the beam pointing centre; the beam will
be centered on this position and therefore move with time. Does not
apply to MWA beam as that is locked in az,za.
times : np.ndarray
Array of `astropy.Time` observation times.
station_ids : np.ndarray
Array of station IDs. Will calculation n_stations=len(station_ids)
worth of station beams (does not apply to MWA, which will only
calculate a single beam).
element_response_model : str, optional
The Everybeam element response model to use. Defaults to 'hamaker'.
Avaible options are 'hamaker' (LOFAR), 'skala40_wave' (OSKAR)
apply_beam_norms : bool, optional
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response (does not apply
to MWA beam).
mwa_coeff_path : str
Path to the coefficients file (needed for MWA, pass the path
to the hdf5 FEE file.)
mwa_dipole_delays : np.ndarray
Array of dipole delays for the MWA stations. Must be of length 16.
mwa_dipole_amps : np.ndarray, optional
Array of dipole amplitudes for the MWA stations. Must be of length 16.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates (needed for MWA beam az,za calculations).
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates (needed for MWA beam az,za calculations).
iau_order : bool, optional
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole
and jones[1,1] to EW. If False, jones[0,0] is EW. Defaults to False
element_only : bool, optional
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to False.
logger : Logger, optional
Logger to use. Defaults to False; if False, create a new simple logger instance.
Returns
--------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coord, 2, 2)
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
future_data = [executor.submit(run_everybeam_thread,
num_threads, thread_id,
ras, decs, freqs,
ms_path=ms_path,
times=times,
beam_ra0=beam_ra0, beam_dec0=beam_dec0,
mwa_coeff_path=mwa_coeff_path,
mwa_dipole_delays=mwa_dipole_delays,
mwa_dipole_amps=mwa_dipole_amps,
j2000_latitudes=j2000_latitudes,
j2000_lsts=j2000_lsts,
station_ids=station_ids,
element_response_model=element_response_model,
apply_beam_norms=apply_beam_norms,
iau_order=iau_order,
element_only=element_only,
parallactic_rotate=parallactic_rotate,
logger=logger)
for thread_id in range(num_threads)]
all_jones_chunks = []
all_thread_ids = []
for future in concurrent.futures.as_completed(future_data):
jones_chunk, thread_id = future.result()
all_jones_chunks.append(jones_chunk)
all_thread_ids.append(thread_id)
if type(mwa_coeff_path) != bool:
num_stations = 1
num_times = len(j2000_lsts)
else:
num_stations = len(station_ids)
num_times = len(times)
num_freqs = len(freqs)
num_coords = len(ras)
all_jones = np.zeros((num_stations, num_times, num_freqs, num_coords, 2, 2), dtype=np.complex128)*np.nan
coords_per_thread = int(np.ceil(num_coords / num_threads))
for jones_chunk, thread_id in zip(all_jones_chunks, all_thread_ids):
low_coord = thread_id * coords_per_thread
high_coord = (thread_id + 1) * coords_per_thread
all_jones[:, :, :, low_coord:high_coord, :, :] = jones_chunk
return all_jones
[docs]
def run_lofar_beam(ms_path : str, element_response_model : bool,
station_idxs : np.ndarray,
beam_ra0 : float, beam_dec0 : float,
ras : np.ndarray, decs : np.ndarray,
mjd_sec_times : np.ndarray,
freqs : np.ndarray,
apply_beam_norms : bool = True,
parallactic_rotate : bool = False,
iau_order : bool = True,
element_only : bool = False):
"""
Run the LOFAR beam model using the EveryBeam library.
This function is a wrapper around the C++ library `libuse_everybeam.so`.
It takes the input parameters and converts them to the appropriate
ctypes types, then calls the C++ function to calculate the beam response.
The output is a numpy array of the beam response, with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str
Element response model to use. Can be 'hamaker', 'hamakerlba', 'lobes' or 'default'.
station_idxs : np.ndarray
Array of station indices to use.
beam_ra0 : float
Right ascension of the beam center in radians.
beam_dec0 : float
Declination of the beam center in radians.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
freqs : np.ndarray
Array of frequencies to use.
apply_beam_norms : bool
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
parallactic_rotate : bool
Whether to apply parallactic angle rotation. Defaults to False.
iau_order : bool
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
Returns
-------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
woden_path = importlib_resources.files(wodenpy).joinpath(f"libuse_everybeam.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
load_and_run_lofar_beam = woden_lib.load_and_run_lofar_beam
num_stations = len(station_idxs)
num_dirs = len(ras)
num_freqs = len(freqs)
num_times = len(mjd_sec_times)
ras_ctypes = (ctypes.c_double * num_dirs)()
decs_ctypes = (ctypes.c_double * num_dirs)()
for i in range(num_dirs):
ras_ctypes[i] = ras[i]
decs_ctypes[i] = decs[i]
mjd_sec_times_ctypes = (ctypes.c_double * num_times)()
for i in range(num_times):
mjd_sec_times_ctypes[i] = mjd_sec_times[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
station_idxs_ctypes = (ctypes.c_int * num_stations)()
for i in range(num_stations):
station_idxs_ctypes[i] = station_idxs[i]
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
element_response_model_ctypes = ctypes.c_char_p(element_response_model.encode('utf-8'))
jones = ((num_stations*num_times*num_freqs*num_dirs*4)*c_double_complex)()
load_and_run_lofar_beam.argtypes = [c_char_p, c_char_p,
c_int, POINTER(c_int),
c_int, c_double, c_double,
POINTER(c_double), POINTER(c_double),
c_int, POINTER(c_double),
c_int, POINTER(c_double),
c_bool, c_bool, c_bool, c_bool,
POINTER(c_double_complex)]
load_and_run_lofar_beam(ms_path_ctypes,
element_response_model_ctypes,
num_stations, station_idxs_ctypes,
num_dirs,
beam_ra0, beam_dec0,
ras_ctypes, decs_ctypes,
num_times, mjd_sec_times_ctypes,
num_freqs, freqs_ctypes,
apply_beam_norms, parallactic_rotate,
element_only, iau_order,
jones)
# print(jones)
jones_py = np.ctypeslib.as_array(jones, shape=(num_stations*num_times*num_freqs*num_dirs*4))
jones_py = jones_py['real'] + 1j*jones_py['imag']
jones_py = jones_py.reshape(num_stations, num_times, num_freqs, num_dirs, 2, 2)
return jones_py
[docs]
def run_oskar_beam(ms_path : str, element_response_model : bool,
station_idxs : np.ndarray,
beam_ra0 : float, beam_dec0 : float,
ras : np.ndarray, decs : np.ndarray,
mjd_sec_times : np.ndarray,
freqs : np.ndarray,
apply_beam_norms : bool,
parallactic_rotate : bool,
iau_order : bool = True,
element_only : bool = False):
"""
Run the OSKAR beam model using the EveryBeam library.
This function is a wrapper around the C++ library `libuse_everybeam.so`.
It takes the input parameters and converts them to the appropriate
ctypes types, then calls the C++ function to calculate the beam response.
The output is a numpy array of the beam response, with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
NOTE: This function is currently identical to the LOFAR beam function.
Both LOFAR and OSKAR call the PhasedArrayBeam class in the C++ library.
These functions have only been lightly tested for functionality. So I've
kept them separate in case it's discovered they need different default
values in the future.
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str
Element response model to use. Can be 'hamaker', 'hamakerlba', 'lobes' or 'default'.
station_idxs : np.ndarray
Array of station indices to use.
beam_ra0 : float
Right ascension of the beam center in radians.
beam_dec0 : float
Declination of the beam center in radians.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
freqs : np.ndarray
Array of frequencies to use.
apply_beam_norms : bool
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
parallactic_rotate : bool
Whether to apply parallactic angle rotation. Defaults to False.
iau_order : bool
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
Returns
-------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
woden_path = importlib_resources.files(wodenpy).joinpath(f"libuse_everybeam.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
load_and_run_oskar_beam = woden_lib.load_and_run_oskar_beam
num_stations = len(station_idxs)
num_dirs = len(ras)
num_freqs = len(freqs)
num_times = len(mjd_sec_times)
ras_ctypes = (ctypes.c_double * num_dirs)()
decs_ctypes = (ctypes.c_double * num_dirs)()
for i in range(num_dirs):
ras_ctypes[i] = ras[i]
decs_ctypes[i] = decs[i]
mjd_sec_times_ctypes = (ctypes.c_double * num_times)()
for i in range(num_times):
mjd_sec_times_ctypes[i] = mjd_sec_times[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
station_idxs_ctypes = (ctypes.c_int * num_stations)()
for i in range(num_stations):
station_idxs_ctypes[i] = station_idxs[i]
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
element_response_model_ctypes = ctypes.c_char_p(element_response_model.encode('utf-8'))
jones = ((num_stations*num_times*num_freqs*num_dirs*4)*c_double_complex)()
load_and_run_oskar_beam.argtypes = [c_char_p, c_char_p,
c_int, POINTER(c_int),
c_int, c_double, c_double,
POINTER(c_double), POINTER(c_double),
c_int, POINTER(c_double),
c_int, POINTER(c_double),
c_bool, c_bool, c_bool, c_bool,
POINTER(c_double_complex)]
load_and_run_oskar_beam(ms_path_ctypes,
element_response_model_ctypes,
num_stations, station_idxs_ctypes,
num_dirs,
beam_ra0, beam_dec0,
ras_ctypes, decs_ctypes,
num_times, mjd_sec_times_ctypes,
num_freqs, freqs_ctypes,
apply_beam_norms, parallactic_rotate,
element_only, iau_order,
jones)
jones_py = np.ctypeslib.as_array(jones, shape=(num_stations*num_times*num_freqs*num_dirs*4))
jones_py = jones_py['real'] + 1j*jones_py['imag']
jones_py = jones_py.reshape(num_stations, num_times, num_freqs, num_dirs, 2, 2)
return jones_py
[docs]
def run_mwa_beam(delays : np.ndarray,
coeff_path : str,
ras : np.ndarray, decs : np.ndarray,
j2000_lsts : np.ndarray, j2000_latitudes : np.ndarray,
freqs : np.ndarray,
amps : np.ndarray = np.ones(16),
apply_beam_norms : bool = False,
parallactic_rotate : bool = True,
iau_order : bool = True,
element_only : bool = False):
"""
Run the MWA beam model using the EveryBeam library.
This function is a wrapper around the C++ library `libuse_everybeam.so`.
It takes the input parameters and converts them to the appropriate
ctypes types, then calls the C++ function to calculate the beam response.
The output is a numpy array of the beam response, with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
Parameters
----------
delays : np.ndarray
Array of length 16 of MWA pointing delays
coeff_path : str
Path to the hdf5 FEE file.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
freqs : np.ndarray
Array of frequencies to use.
amps : np.ndarray
Array of length 16 of MWA dipole amplitudes. Defaults to all ones.
apply_beam_norms : bool
Whether to apply beam normalisation. Defaults to False. The beam seems
to be normalised by EveryBeam by default, so this is not ne eded.
parallactic_rotate : bool
Whether to apply parallactic angle rotation. Defaults to True.
iau_order : bool
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response. Does this by settings
all amplitudes bar the first dipole, so defaults to the first dipole only.
Returns
-------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
Note that this is to have a consistent shape with other EveryBeam
functions which can have varying station patterns; at the moment,
num_stations is hard-coded to 1 for MWA.
"""
num_stations = 1
station_idxs = np.array([0])
num_dirs = len(ras)
num_freqs = len(freqs)
num_times = len(j2000_lsts)
azs = np.empty(num_dirs*num_times)
zas = np.empty(num_dirs*num_times)
para_angles = np.empty(num_dirs*num_times)
for comp_ind in range(num_dirs):
comp_has = j2000_lsts - ras[comp_ind]
these_azs, these_els = erfa.hd2ae(comp_has, decs[comp_ind], j2000_latitudes)
these_zas = np.pi/2 - these_els
these_para_angles = erfa.hd2pa(comp_has, decs[comp_ind], j2000_latitudes)
azs[comp_ind*num_times:(comp_ind+1)*num_times] = these_azs
zas[comp_ind*num_times:(comp_ind+1)*num_times] = these_zas
para_angles[comp_ind*num_times:(comp_ind+1)*num_times] = these_para_angles
woden_path = importlib_resources.files(wodenpy).joinpath(f"libuse_everybeam.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
load_and_run_mwa_beam = woden_lib.load_and_run_mwa_beam
zas_ctypes = (ctypes.c_double*(num_dirs*num_times))()
azs_ctypes = (ctypes.c_double*(num_dirs*num_times))()
para_angles_ctypes = (ctypes.c_double*(num_dirs*num_times))()
for i in range(num_dirs*num_times):
zas_ctypes[i] = zas[i]
azs_ctypes[i] = azs[i]
para_angles_ctypes[i] = para_angles[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
coeff_path_ctypes = ctypes.c_char_p(coeff_path.encode('utf-8'))
jones = ((num_stations*num_times*num_freqs*num_dirs*4)*c_double_complex)()
delays_ctypes = (ctypes.c_double * 16)()
for i in range(16):
delays_ctypes[i] = delays[i]
amps_ctypes = (ctypes.c_double * 16)()
for i in range(16):
amps_ctypes[i] = amps[i]
load_and_run_mwa_beam.argtypes = [POINTER(c_double), POINTER(c_double),
c_char_p,
c_int, POINTER(c_double), POINTER(c_double),
POINTER(c_double),
c_int, POINTER(c_double),
c_int,
c_bool, c_bool,
POINTER(c_double_complex)]
load_and_run_mwa_beam(delays_ctypes, amps_ctypes,
coeff_path_ctypes,
num_dirs, azs_ctypes, zas_ctypes,
para_angles_ctypes,
num_freqs, freqs_ctypes,
num_times,
parallactic_rotate, iau_order,
jones)
jones_py = np.ctypeslib.as_array(jones, shape=(num_stations*num_times*num_freqs*num_dirs*4))
jones_py = jones_py['real'] + 1j*jones_py['imag']
jones_py = jones_py.reshape(num_stations, num_times, num_freqs, num_dirs, 2, 2)
return jones_py