"""Wrappers around the EveryBeam library to calculate primary beam values.
The core functionality is in the C++ library `libuse_everybeam.so`, which in
turn contains wrappers about the EveryBeam library. Functions here call the
C++ library to calculate the primary beam values, and return them to Python
usable data types, via `ctypes`.
See https://everybeam.readthedocs.io/en/latest/index.html for more information
on the EveryBeam library."""
import numpy as np
from astropy.coordinates import ITRS, SkyCoord, AltAz, EarthLocation
from astropy.time import Time, TimeDelta
import astropy.units as u
import argparse
from wodenpy.use_libwoden.create_woden_struct_classes import Woden_Struct_Classes
import erfa
from typing import Union, Tuple
import concurrent.futures
from line_profiler import profile
import os
import astropy
import wodenpy
import importlib_resources
from wodenpy.use_libwoden.skymodel_structs import c_double_complex
from ctypes import c_char_p, c_int, c_double, POINTER, c_bool
import ctypes
from wodenpy.wodenpy_setup.woden_logger import simple_logger
from logging import Logger
from multiprocessing import Process, Queue
from wodenpy.use_libwoden.beam_settings import BeamTypes
import ctypes
from wodenpy.array_layout.create_array_layout import convert_ecef_to_enh, convert_enh_to_ecef, enh2xyz
from erfa import gc2gd, gd2gc
##This call is so we can use it as a type annotation
woden_struct_classes = Woden_Struct_Classes()
Source_Catalogue = woden_struct_classes.Source_Catalogue
Woden_Settings = woden_struct_classes.Woden_Settings
[docs]
def worker_get_num_stations(ms_path : str, q : Queue) -> int:
"""
Worker function to get the number of stations in the measurement set
given by `ms_path`, and put the in the queue `q`.
This is done in a separate process because it uses `python-casacore`.
Running `import casacore` creates a `c++` state with a number of library paths
and casacore global variables. If `libuse_everybeam.so` has been built with
against a different casacore library, this causes epic intermittent errors
and segfaults. To void this, run the import in this separate thread process,
which isolates the state of the `c++` library.
TODO: All calls to `python-casacore` should be done via direct calls to
casacore in c++ in `libuse_everybeam.so`. This removes all conflicts;
however, this is a large task and will take time to implement. For now,
this is a workaround to avoid the segfaults and errors.
Parameters
----------
ms_path : str
Path to the measurement set.
q : Queue
Queue to put the number of stations in.
"""
from casacore.tables import table
with table(ms_path + '/ANTENNA') as t:
num_stations = len(t)
q.put(num_stations)
[docs]
def get_num_stations(ms_path : str) -> int:
"""
Get the number of stations in a measurement set given by `ms_path`.
Runs :func:`~worker_get_num_stations` in a separate process to
avoid `python-casacore` clashing with WODEN-built `libuse_everybeam.so`.
See :func:`~worker_get_num_stations` for more details.
Parameters
----------
ms_path : str
Path to the measurement set.
Returns
-------
int
Number of stations in the measurement set.
"""
q = Queue()
p = Process(target=worker_get_num_stations, args=(ms_path, q,))
p.start()
p.join()
num_stations = q.get()
return num_stations
[docs]
def enu_basis(lat : float, lon : float) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate the ENU basis vectors in terms of earth-centred earth-fixed coords,
for a given latitude `lat` and longitude `lon`.
Parameters
----------
lat : float
Latitude in radians.
lon : float
Longitude in radians.
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
Tuple of the east, north, and up basis vectors.
"""
east = np.array([-np.sin(lon), np.cos(lon), 0.0])
north = np.array([-np.sin(lat) * np.cos(lon),
-np.sin(lat) * np.sin(lon),
np.cos(lat)])
up = np.array([np.cos(lat) * np.cos(lon),
np.cos(lat) * np.sin(lon),
np.sin(lat)])
return east, north, up
[docs]
def calc_coordinate_axes(positions : np.ndarray,
central_lat : float, central_lon : float):
"""
Internal to a LOFAR measurement set, inside the `LOFAR_ANTENNA_FIELD` table,
there is a column called `COORDINATE_AXES`. This has something to do with
defining what the normal direction of the incoming radation is relative
to a particular station. If moving a measurement set from one lat/long to
another, this must be recalculated. Below is a best guess by JLBLine at how
to do this. See the following doc string copied from EveryBeam which
describes the coordinate system, from the
`EveryBeam/cpp/antenna.h::Antenna::CoordinateSystem` function
.. seealso:: from the EveryBeam documentation:
Station coordinate system.
A right handed, cartesian, local coordinate system with coordinate axes
`p`, `q`, and `r` is associated with each antenna field.
The r-axis is orthogonal to the antenna field, and points towards the
local pseudo zenith.
The q-axis is the northern bisector of the X and Y dipoles, i.e.
it is the reference direction from which the orientation of the dual
dipole antennae is determined. The q-axis points towards the North at
the core. At remote sites it is defined as the intersection of the
antenna field plane and a plane parallel to the meridian plane at the
core. This ensures the reference directions at all sites are similar.
The p-axis is orthogonal to both other axes, and points towards the East
at the core.
The axes and origin of the anntena field coordinate system are expressed
as vectors in the geocentric, cartesian, ITRF coordinate system, in
meters.
"LOFAR Reference Plane and Reference Direction", M.A. Brentjens,
LOFAR-ASTRON-MEM-248.
.. todo::
find out how to do this properly. As described above, everything is
supposed to be calculated relative to the array centre, or for remote stations,
"the intersection of the antenna field plane and a plane parallel to the
meridian plane at the core". Wtf does that mean? Below, I calculate things
relative to the local ENU for each station. This means the calculations
for remote stations are way off, as the curvature of the earth changes
what local up is the further you get from the core. If people need accurate
remote stations, this needs to be fixed (or just don't move the telescope).
Parameters
----------
positions : np.ndarray
Array of station positions in ECEF coordinates. Shape (num_positions, 3).
central_lat : float
Latitude of the array centre in radians.
central_lon : float
Longitude of the array centre in radians.
Returns
-------
coordinate_axes : np.ndarray
Array of coordinate axes for each station. Shape (num_positions, 3, 3).
coordinate_axes[:, 0, :] are the `p` vectors, coordinate_axes[:, 1, :]
are the `q` vectors, and coordinate_axes[:, 2, :] are the `r` vectors,
as described in the docstring above.
"""
arrX, arrY, arrZ = gd2gc(1, central_lon, central_lat, 0)
arr_centre = np.array([arrX, arrY, arrZ])
num_positions = positions.shape[0]
coordinate_axes = np.zeros((num_positions, 3, 3), dtype=np.float64)
for pos in range(num_positions):
# delta = arr_centre - positions[pos, :]
delta = positions[pos, :] - arr_centre
lon, lat, height = gc2gd(1, positions[pos, :])
east, north, up = enu_basis(lat, lon)
##IF doing everything relative to the array centre, then
##use the array centre as the ENU basis
# east, north, up = enu_basis(central_lat, central_lon)
east_comp = np.dot(delta, east)
north_comp = np.dot(delta, north)
up_comp = np.dot(delta, up)
# Reconstruct component vectors
east_vec = east_comp * east
north_vec = north_comp * north
up_vec = up_comp * up
east_vec /= np.linalg.norm(east_vec)
north_vec /= np.linalg.norm(north_vec)
up_vec /= np.linalg.norm(up_vec)
if lon < central_lon:
east_vec *= -1
if lat < central_lat:
north_vec *= -1
coordinate_axes[pos, 0, :] = east_vec
coordinate_axes[pos, 1, :] = north_vec
coordinate_axes[pos, 2, :] = up_vec
return coordinate_axes
[docs]
def worker_create_filtered_ms(ms_path : str, new_ms_path : str,
ra0 : float, dec0 : float,
recentre_array : bool = False,
current_latitude : float = False,
current_longitude : float = False,
new_latitude : float = False,
new_longitude : float = False) -> None:
"""
Create a reduced measurement to path `new_ms_path` with only the first time
and frequency channel of the measurement set `ms_path`. Set the delay and
reference directions to the given RA/Dec `ra0,dec0`.
The `EveryBeam` functions that `WODEN` wraps read the beam centre value
directly from the measurement set. Rather than downloading a specific
MS which exactly the pointing we want, we just make the smallest possible
copy of the MS with the first time and frequency channel, and set
the delay and reference direction to what we want.
If `recentre_array` is True, also perform the necessary calculations and
updates to move the array centre from `current_latitude` and `current_longitude`
to `new_latitude` and `new_longitude`.
This function is run in a separate process to avoid `python-casacore`
clashing with WODEN-built `libuse_everybeam.so`. Running `import casacore`
creates a `c++` state with a number of library paths and casacore global variables.
If `libuse_everybeam.so` has been built with against a different casacore library,
this causes epic intermittent errors and segfaults. To void this, run the import
in this separate thread process, which isolates the state of the `c++` library.
Parameters
----------
ms_path : str
Path to the original measurement set.
new_ms_path : str
Path to the new measurement set.
ra0 : float
Right ascension of the beam centre in radians.
dec0 : float
Declination of the beam centre in radians.
recentre_array : bool, optional
Whether to recentre the array to the new latitude and longitude.
Defaults to False. If True, `current_latitude`, `current_longitude`,
`new_latitude`, and `new_longitude` must be set.
current_latitude : float, optional
Current latitude of the array in radians. Required if `recentre_array`
is True.
current_longitude : float, optional
Current longitude of the array in radians. Required if `recentre_array`
is True.
new_latitude : float, optional
New latitude of the array in radians. Required if `recentre_array`
is True.
new_longitude : float, optional
New longitude of the array in radians. Required if `recentre_array`
is True.
"""
from casacore.tables import table, taql
##Find out telescope type
with table(ms_path + "/OBSERVATION", ack=False) as tb:
telescope_name = tb.getcol("TELESCOPE_NAME")[0] # Get first entry
##First up, read in original MS, and filter it to only have
## the first time and frequency channel
with table(ms_path, readonly=True) as ms:
time_col = ms.getcol("TIME")
ddid_col = ms.getcol("DATA_DESC_ID")
first_time = time_col[0] # First timestamp
first_ddid = ddid_col[0] # First frequency channel
# Use TaQL (Table Query Language) to select the subset efficiently
query = f"SELECT * FROM {ms_path} WHERE TIME = {first_time} AND DATA_DESC_ID = {first_ddid}"
filtered_ms = taql(query)
filtered_ms.copy(new_ms_path, deep=True)
filtered_ms.close()
with table(new_ms_path+'::FIELD', readonly=False) as field_table:
field_table.putcol('DELAY_DIR', np.array([[[ra0, dec0]]]))
field_table.putcol('REFERENCE_DIR', np.array([[[ra0, dec0]]]))
if telescope_name == "LOFAR":
field_table.putcol('LOFAR_TILE_BEAM_DIR', np.array([[[ra0, dec0]]]))
if recentre_array:
with table(ms_path + '/ANTENNA', readonly=True) as t:
num_ants = len(t)
num_antennas = num_ants
ant_locations = np.array([t.getcell('POSITION', ant) for ant in range(num_ants)])
##convert from ECEF to ENH, as WODEN starts with enh coords
east, north, height = convert_ecef_to_enh(ant_locations[:,0],
ant_locations[:,1], ant_locations[:,2],
current_longitude, current_latitude)
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_longitude, new_latitude)
with table(new_ms_path+'::ANTENNA', readonly=False) as antenna:
new_locations = np.zeros((num_antennas, 3), dtype=np.float64)
new_locations[:, 0] = ecef_X
new_locations[:, 1] = ecef_Y
new_locations[:, 2] = ecef_Z
antenna.putcol('POSITION', new_locations)
if telescope_name == "LOFAR":
antenna.putcol('LOFAR_PHASE_REFERENCE', new_locations)
if telescope_name == "LOFAR":
with table(new_ms_path+'::LOFAR_ANTENNA_FIELD', readonly=False) as antenna:
lof_ant_position = antenna.getcol('POSITION')
east, north, height = convert_ecef_to_enh(lof_ant_position[:,0],
lof_ant_position[:,1], lof_ant_position[:,2],
current_longitude, current_latitude)
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_longitude, new_latitude)
new_locations = np.zeros((num_antennas, 3), dtype=np.float64)
new_locations[:, 0] = ecef_X
new_locations[:, 1] = ecef_Y
new_locations[:, 2] = ecef_Z
antenna.putcol('POSITION', new_locations)
new_coord_axes = calc_coordinate_axes(new_locations,
new_latitude,
new_longitude)
with table(new_ms_path+'::LOFAR_ANTENNA_FIELD', readonly=False) as antenna:
coord_axes = antenna.getcol('COORDINATE_AXES')
antenna.putcol('COORDINATE_AXES', new_coord_axes)
arrX, arrY, arrZ = gd2gc(1, new_longitude, new_latitude, 0)
arr_centre = np.array([arrX, arrY, arrZ])
for station in range(num_antennas):
offsets = antenna.getcell('ELEMENT_OFFSET', station)
offsets += ant_locations[station, :]
cur_lon, cur_lat, height = gc2gd(1, ant_locations[station, :])
east, north, height = convert_ecef_to_enh(offsets[:,0],
offsets[:,1], offsets[:,2],
cur_lon, cur_lat)
new_lon, new_lat, height = gc2gd(1, new_locations[station, :])
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_lon, new_lat)
new_offsets = np.zeros_like(offsets)
new_offsets[:, 0] = ecef_X - new_locations[station, 0]
new_offsets[:, 1] = ecef_Y - new_locations[station, 1]
new_offsets[:, 2] = ecef_Z - new_locations[station, 2]
antenna.putcell('ELEMENT_OFFSET', station, new_offsets)
for station in range(num_antennas):
offsets = antenna.getcell('TILE_ELEMENT_OFFSET', station)
offsets += ant_locations[station, :]
cur_lon, cur_lat, height = gc2gd(1, ant_locations[station, :])
east, north, height = convert_ecef_to_enh(offsets[:,0],
offsets[:,1], offsets[:,2],
cur_lon, cur_lat)
new_lon, new_lat, height = gc2gd(1, new_locations[station, :])
ecef_X, ecef_Y, ecef_Z = convert_enh_to_ecef(east, north, height,
new_lon, new_lat)
new_offsets = np.zeros_like(offsets)
new_offsets[:, 0] = ecef_X - new_locations[station, 0]
new_offsets[:, 1] = ecef_Y - new_locations[station, 1]
new_offsets[:, 2] = ecef_Z - new_locations[station, 2]
antenna.putcell('TILE_ELEMENT_OFFSET', station, new_offsets)
[docs]
def create_filtered_ms(ms_path : str, new_ms_path : str,
ra0 : float, dec0 : float,
recentre_array : bool = False,
current_latitude : float = False,
current_longitude : float = False,
new_latitude : float = False,
new_longitude : float = False):
"""
Create a reduced measurement to path `new_ms_path` with only the first time
and frequency channel of the measurement set `ms_path`. Set the delay and
reference directions to the given RA/Dec `ra0,dec0`. If `recentre_array`
is True, also perform the necessary calculations and updates to move the
array centre from `current_latitude` and `current_longitude` to
`new_latitude` and `new_longitude`.
Runs :func:`~worker_create_filtered_ms` in a separate process to
avoid `python-casacore` clashing with WODEN-built `libuse_everybeam.so`.
See :func:`~worker_create_filtered_ms` for more details.
Parameters
----------
ms_path : str
Path to the original measurement set.
new_ms_path : str
Path to the new measurement set.
ra0 : float
Right ascension of the beam centre in radians.
dec0 : float
Declination of the beam centre in radians.
recentre_array : bool, optional
Whether to recentre the array to the new latitude and longitude.
Defaults to False. If True, `current_latitude`, `current_longitude`,
`new_latitude`, and `new_longitude` must be set.
current_latitude : float, optional
Current latitude of the array in radians. Required if `recentre_array`
is True.
current_longitude : float, optional
Current longitude of the array in radians. Required if `recentre_array`
is True.
new_latitude : float, optional
New latitude of the array in radians. Required if `recentre_array`
is True.
new_longitude : float, optional
New longitude of the array in radians. Required if `recentre_array`
is True.
"""
p = Process(target=worker_create_filtered_ms,
args=(ms_path, new_ms_path, ra0, dec0,
recentre_array, current_latitude,
current_longitude, new_latitude,
new_longitude))
p.start()
p.join() # module is fully cleaned when the process exits
[docs]
def check_ms_telescope_type_matches_element_response(ms_path : str,
element_response_model : str = 'default',
logger : Logger = False) -> Tuple[str, str]:
"""
Check the requested `element_response_model` is compatible with the
telescope type in the measurement set. If they don't match, set the element
response model to the default for the telescope type. This function uses
the `check_ms_telescope_type` function in `libuse_everybeam.so`.
Accpetable combinations of telescope type and `element_response_model` are:
- MWA: 'MWA' or 'default'
- LOFAR: 'hamaker', 'hamakerlba', 'lobes' or 'default'
- OSKAR: 'skala40_wave' or 'default'
Please note that 'hamakerlba', 'lobes' are not tested or supported in
WODEN, and are not recommended for use. They are included for completeness.
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str, optional
Requested Element response model to use. Defaults to 'default'.
logger : Logger, optional
Logger to use. Defaults to False; if False, create a new simple logger instance.
Returns
-------
Tuple[str, str]
Tuple of the telescope type and the element response model to use.
"""
if not logger:
logger = simple_logger()
woden_path = importlib_resources.files(wodenpy).joinpath(f"libwoden_double.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
check_ms_telescope_type = woden_lib.check_ms_telescope_type
check_ms_telescope_type.argtypes = [c_char_p]
check_ms_telescope_type.restype = c_char_p
# print(f"MS path: {type(ms_path)}, {ms_path}")
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
telescope_type = check_ms_telescope_type(ms_path_ctypes).decode('utf-8')
use_element_response_model = False
if telescope_type == 'MWA':
if element_response_model == 'default':
use_element_response_model = "MWA"
else:
if element_response_model != 'MWA':
logger.warning(f"Measurement set telescope type is MWA, but element_response_model was set to {element_response_model}. Changing to 'MWA'")
use_element_response_model = "MWA"
else:
use_element_response_model = element_response_model
elif telescope_type == 'LOFAR':
if element_response_model == 'default':
use_element_response_model = "hamaker"
else:
if element_response_model not in ['hamaker', 'hamakerlba', 'lobes']:
logger.warning(f"Measurement set telescope type is LOFAR, but "
f"element_response_model was set to {element_response_model}, "
"which is not one of ['hamaker', 'hamakerlba','lobes']. "
"Defaulting to 'hamaker'")
use_element_response_model = "hamaker"
else:
use_element_response_model = element_response_model
elif telescope_type == 'OSKAR':
if element_response_model == 'default':
use_element_response_model = "skala40_wave"
else:
if element_response_model != 'skala40_wave':
logger.warning(f"Measurement set telescope type is OSKAR, but element_response_model "
f"was set to {element_response_model}. Changing to 'skala40_wave'")
use_element_response_model = "skala40_wave"
else:
use_element_response_model = element_response_model
else:
use_element_response_model = element_response_model
exit_message = f"Measurement set telescope type is {telescope_type}. "
exit_message += "WODEN currently supports LOFAR, MWA, OSKAR EveryBeam."
exit_message += "Cannot proceed as unknown behaviour will happen in C++ code. "
exit_message += "Exiting now."
logger.error(exit_message)
exit(exit_message)
return telescope_type, use_element_response_model
[docs]
def convert_common_args_to_everybeam_args(ms_path : str, coeff_path : str,
element_response_model : str,
station_idxs : np.ndarray,
freqs : np.ndarray,
mjd_sec_times : np.ndarray) -> Tuple:
"""
Convert input arugments common to all EveryBeam primary beam models
into `ctypes` equivalents to pass to the C++ library.
Parameters
----------
ms_path : str
Path to the measurement set.
coeff_path : str
Path to the coefficients file (only needed for MWA, pass the path
to the hdf5 FEE file).
element_response_model : str
Element response model to use. Can be 'MWA', 'hamaker', 'skala40_wave',
or 'default'.
station_idxs : np.ndarray
Array of station indices to use.
freqs : np.ndarray
Array of frequencies to use.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
Returns
-------
Tuple[ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_double),ctypes.POINTER(ctypes.c_double)]
Tuple of the converted arguments in the format required by the
EveryBeam library. Order of outputs is identical to order of inputs arguments.
"""
num_stations = len(station_idxs)
num_times = len(mjd_sec_times)
num_freqs = len(freqs)
mjd_sec_times_ctypes = (ctypes.c_double * num_times)()
for i in range(num_times):
mjd_sec_times_ctypes[i] = mjd_sec_times[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
station_idxs_ctypes = (ctypes.c_int * num_stations)()
for i in range(num_stations):
station_idxs_ctypes[i] = station_idxs[i]
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
element_response_model_ctypes = ctypes.c_char_p(element_response_model.encode('utf-8'))
coeff_path_ctypes = ctypes.c_char_p(coeff_path.encode('utf-8'))
return ms_path_ctypes, coeff_path_ctypes, element_response_model_ctypes, station_idxs_ctypes, freqs_ctypes, mjd_sec_times_ctypes
[docs]
def run_everybeam(ms_path : str, coeff_path : str,
ras: np.ndarray, decs: np.ndarray,
beam_ra0: float, beam_dec0: float,
j2000_latitudes: np.ndarray, j2000_lsts: np.ndarray,
times: np.ndarray, freqs: np.ndarray,
station_ids: np.ndarray,
element_response_model='default',
apply_beam_norms: bool = True,
iau_order: bool = False,
element_only: bool = False,
parallactic_rotate: bool = False,
logger : Logger = False) -> np.ndarray:
"""
Calculate the Jones matrices for a given set of coordinates, times,
frequencies, and station ids using the EveryBeam library.
`j2000_latitudes` should be the array latitude as precessed back to J2000,
with `j2000_lsts` being the matching LST in J2000.
Parameters
------------
ms_path : str
Path to the measurement set to load the EveryBeam telescope from.
coeff_path : str
Path to the coefficients file (only needs a value for MWA, pass the path
to the hdf5 FEE file. Otherwise just pass "").
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
beam_ra0 : float
Right ascension of the beam center in radians.
beam_dec0 : float
Declination of the beam center in radians.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
times : np.ndarray
Array of observation times.
freqs : np.ndarray
Array of frequencies.
telescope : (eb.Telescope):
Telescope object from the EveryBeam library.
station_ids : np.ndarray
Array of station IDs.
element_response_model : str, optional
The Everybeam element response model to use. Defaults to 'hamaker'.
Avaible options are 'hamaker' (LOFAR), 'skala40_wave' (OSKAR), and 'MWA' (MWA).
apply_beam_norms : bool, optional
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
iau_order : bool, optional
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool, optional
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to False.
logger : Logger, optional
Logger to use. Defaults to False; if False, create a new simple logger instance.
Returns
--------
np.ndarray
The calculated Jones matrices with shape (num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
if not logger:
logger = simple_logger()
telescope_type, checked_element_response_model = check_ms_telescope_type_matches_element_response(ms_path,
element_response_model,
logger)
num_stations = len(station_ids)
num_times = len(times)
num_freqs = len(freqs)
num_coords = len(ras)
mjd_sec_times = np.array([time.mjd * 86400.0 for time in times])
if telescope_type == 'MWA':
jones = run_mwa_beam(ms_path, checked_element_response_model,
coeff_path, station_ids,
ras, decs, mjd_sec_times,
j2000_lsts, j2000_latitudes, freqs,
apply_beam_norms=apply_beam_norms,
parallactic_rotate=parallactic_rotate,
iau_order=iau_order,
element_only=element_only)
elif telescope_type == 'LOFAR':
jones = run_lofar_beam(ms_path, checked_element_response_model,
station_ids,
beam_ra0, beam_dec0,
ras, decs, mjd_sec_times, freqs,
apply_beam_norms=apply_beam_norms,
parallactic_rotate=parallactic_rotate,
iau_order=iau_order,
element_only=element_only)
elif telescope_type == 'OSKAR':
jones = run_oskar_beam(ms_path, checked_element_response_model,
station_ids,
beam_ra0, beam_dec0,
ras, decs, mjd_sec_times, freqs,
apply_beam_norms=apply_beam_norms,
parallactic_rotate=parallactic_rotate,
iau_order=iau_order,
element_only=element_only)
else:
logger.error("Unknown telescope type. Exiting now.")
exit()
return jones
[docs]
def run_everybeam_thread(num_threads : int, thread_id : int,
ms_path : str, coeff_path : str,
ras : np.ndarray, decs : np.ndarray,
ra0 : float, dec0 : float,
j2000_latitudes : np.ndarray, j2000_lsts : np.ndarray,
times : np.ndarray, freqs : np.ndarray,
station_ids : np.ndarray,
apply_beam_norms : bool = True,
iau_order : bool = False,
element_only : bool = False,
parallactic_rotate : bool = True,
element_response_model='default',
use_local_mwa : bool = True) -> Tuple[np.ndarray, int]:
"""
Thread function called by `run_everybeam_over_threads` to calculate the
EveryBeam response in parrallel. Calls `run_everybeam` with a subset of
the coordinates; see `run_everybeam` for more details of the parameters.
Creates a new EveryBeam telescope object from `ms_path` for each thread.
This has to be done because `concurrent.futures.ProcessPoolExecutor` has
to pickle the function and all it's arguments, and EveryBeam objects can't
be pickled. This is somewhat wasteful but I can't work out a better way
to make things parallel.
Parameters
------------
num_threads : int
Number of threads being in call by `run_everybeam_over_threads`.
thread_id : int
ID of the current thread. Useds to work out what chunk of `ras` and `decs`
to process.
ms_path : str
Path to the measurement set to load the EveryBeam telescope from.
coeff_path : str
Path to the coefficients file (only needs a value for MWA, pass the path
to the hdf5 FEE file. Otherwise just pass "").
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
ra0 : float
Right ascension of the beam center in radians.
dec0 : float
Declination of the beam center in radians.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
times : np.ndarray
Array of observation times.
freqs : np.ndarray
Array of frequencies.
station_ids : np.ndarray
Array of station IDs.
apply_beam_norms : bool, optional
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
iau_order : bool, optional
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool, optional
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to False.
element_response_model : str, optional
The Everybeam element response model to use. Defaults to 'hamaker'.
Avaible options are 'hamaker' (LOFAR), 'skala40_wave' (OSKAR), and 'MWA' (MWA).
use_local_mwa : bool, optional
Whether to use the local MWA model. Defaults to True. The local MWA model
takes za/az instead of RA/Dec.
Returns
--------
Tuple[np.ndarray, int]
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords_in_thread, 2, 2), as
well as the thread ID. Use the thread ID to insert this thread output
into the correct place in the final Jones matrix.
"""
num_coords = len(ras)
coords_per_thread = int(np.ceil(num_coords / num_threads))
low_coord = thread_id * coords_per_thread
high_coord = (thread_id + 1) * coords_per_thread
print(f"Thread {thread_id} processing coords {low_coord} to {high_coord}")
jones = run_everybeam(ms_path, coeff_path,
ras[low_coord:high_coord],
decs[low_coord:high_coord],
ra0, dec0, j2000_latitudes, j2000_lsts,
times, freqs,
station_ids,
element_response_model=element_response_model,
apply_beam_norms=apply_beam_norms,
iau_order=iau_order,
element_only=element_only,
parallactic_rotate=parallactic_rotate)
print(f"Thread {thread_id} finished")
return jones, thread_id
[docs]
def run_everybeam_over_threads(num_threads : int,
ms_path : str,
coeff_path : str,
ras : np.ndarray, decs : np.ndarray,
ra0 : float, dec0 : float,
j2000_latitudes : np.ndarray, j2000_lsts : np.ndarray,
times : np.ndarray, freqs : np.ndarray,
station_ids : np.ndarray,
apply_beam_norms : bool = True,
iau_order : bool = False,
element_only : bool = False,
parallactic_rotate : bool = True,
use_local_mwa : bool = True,
element_response_model='default'):
"""
Runs `run_everybeam` in parallel over `num_threads` threads, using
`concurrent.futures.ProcessPoolExecutor`. See `run_everybeam` for more
details of what each parameter does.
Creates a new EveryBeam telescope object from `ms_path` for each thread.
This has to be done because `concurrent.futures.ProcessPoolExecutor` has
to pickle the function and all it's arguments, and EveryBeam objects can't
be pickled. This is somewhat wasteful but I can't work out a better way
to make things parallel.
Parameters
------------
num_threads : int
Number of threads being in call by `run_everybeam_over_threads`.
ms_path : str
Path to the measurement set to load the EveryBeam telescope from.
coeff_path : str
Path to the coefficients file (only needs a value for MWA, pass the path
to the hdf5 FEE file. Otherwise just pass "").
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
ra0 : float
Right ascension of the beam center in radians.
dec0 : float
Declination of the beam center in radians.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
times : np.ndarray
Array of observation times.
freqs : np.ndarray
Array of frequencies.
station_ids : np.ndarray
Array of station IDs.
apply_beam_norms : bool, optional
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
iau_order : bool, optional
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool, optional
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
parallactic_rotate : bool, optional
Whether to apply parallactic angle. Defaults to False.
para_angle_offset : float, optional
Offset to add to the parallactic angle. Defaults to 0.
element_response_model : str, optional
The Everybeam element response model to use. Defaults to 'hamaker'.
Avaible options are 'hamaker' (LOFAR), 'skala40_wave' (OSKAR), and 'MWA' (MWA).
use_local_mwa : bool, optional
Whether to use the local MWA model. Defaults to True. The local MWA model
takes za/az instead of RA/Dec.
Returns
--------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coord, 2, 2)
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
future_data = [executor.submit(run_everybeam_thread,
num_threads, thread_id,
ms_path,
coeff_path,
ras, decs,
ra0, dec0,
j2000_latitudes, j2000_lsts,
times, freqs,
station_ids,
apply_beam_norms=apply_beam_norms,
iau_order=iau_order,
element_only=element_only,
parallactic_rotate=parallactic_rotate,
element_response_model=element_response_model,
use_local_mwa=use_local_mwa)
for thread_id in range(num_threads)]
all_jones_chunks = []
all_thread_ids = []
for future in concurrent.futures.as_completed(future_data):
jones_chunk, thread_id = future.result()
all_jones_chunks.append(jones_chunk)
all_thread_ids.append(thread_id)
num_stations = len(station_ids)
num_times = len(times)
num_freqs = len(freqs)
num_coords = len(ras)
all_jones = np.zeros((num_stations, num_times, num_freqs, num_coords, 2, 2), dtype=np.complex128)*np.nan
coords_per_thread = int(np.ceil(num_coords / num_threads))
for jones_chunk, thread_id in zip(all_jones_chunks, all_thread_ids):
low_coord = thread_id * coords_per_thread
high_coord = (thread_id + 1) * coords_per_thread
all_jones[:, :, :, low_coord:high_coord, :, :] = jones_chunk
return all_jones
[docs]
def run_lofar_beam(ms_path : str, element_response_model : bool,
station_idxs : np.ndarray,
beam_ra0 : float, beam_dec0 : float,
ras : np.ndarray, decs : np.ndarray,
mjd_sec_times : np.ndarray,
freqs : np.ndarray,
apply_beam_norms : bool = True,
parallactic_rotate : bool = False,
iau_order : bool = True,
element_only : bool = False):
"""
Run the LOFAR beam model using the EveryBeam library.
This function is a wrapper around the C++ library `libuse_everybeam.so`.
It takes the input parameters and converts them to the appropriate
ctypes types, then calls the C++ function to calculate the beam response.
The output is a numpy array of the beam response, with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str
Element response model to use. Can be 'hamaker', 'hamakerlba', 'lobes' or 'default'.
station_idxs : np.ndarray
Array of station indices to use.
beam_ra0 : float
Right ascension of the beam center in radians.
beam_dec0 : float
Declination of the beam center in radians.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
freqs : np.ndarray
Array of frequencies to use.
apply_beam_norms : bool
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
parallactic_rotate : bool
Whether to apply parallactic angle rotation. Defaults to False.
iau_order : bool
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
Returns
-------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
woden_path = importlib_resources.files(wodenpy).joinpath(f"libuse_everybeam.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
load_and_run_lofar_beam = woden_lib.load_and_run_lofar_beam
num_stations = len(station_idxs)
num_dirs = len(ras)
num_freqs = len(freqs)
num_times = len(mjd_sec_times)
ras_ctypes = (ctypes.c_double * num_dirs)()
decs_ctypes = (ctypes.c_double * num_dirs)()
for i in range(num_dirs):
ras_ctypes[i] = ras[i]
decs_ctypes[i] = decs[i]
mjd_sec_times_ctypes = (ctypes.c_double * num_times)()
for i in range(num_times):
mjd_sec_times_ctypes[i] = mjd_sec_times[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
station_idxs_ctypes = (ctypes.c_int * num_stations)()
for i in range(num_stations):
station_idxs_ctypes[i] = station_idxs[i]
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
element_response_model_ctypes = ctypes.c_char_p(element_response_model.encode('utf-8'))
jones = ((num_stations*num_times*num_freqs*num_dirs*4)*c_double_complex)()
load_and_run_lofar_beam.argtypes = [c_char_p, c_char_p,
c_int, POINTER(c_int),
c_int, c_double, c_double,
POINTER(c_double), POINTER(c_double),
c_int, POINTER(c_double),
c_int, POINTER(c_double),
c_bool, c_bool, c_bool, c_bool,
POINTER(c_double_complex)]
load_and_run_lofar_beam(ms_path_ctypes,
element_response_model_ctypes,
num_stations, station_idxs_ctypes,
num_dirs,
beam_ra0, beam_dec0,
ras_ctypes, decs_ctypes,
num_times, mjd_sec_times_ctypes,
num_freqs, freqs_ctypes,
apply_beam_norms, parallactic_rotate,
element_only, iau_order,
jones)
# print(jones)
jones_py = np.ctypeslib.as_array(jones, shape=(num_stations*num_times*num_freqs*num_dirs*4))
jones_py = jones_py['real'] + 1j*jones_py['imag']
jones_py = jones_py.reshape(num_stations, num_times, num_freqs, num_dirs, 2, 2)
return jones_py
[docs]
def run_oskar_beam(ms_path : str, element_response_model : bool,
station_idxs : np.ndarray,
beam_ra0 : float, beam_dec0 : float,
ras : np.ndarray, decs : np.ndarray,
mjd_sec_times : np.ndarray,
freqs : np.ndarray,
apply_beam_norms : bool,
parallactic_rotate : bool,
iau_order : bool = True,
element_only : bool = False):
"""
Run the OSKAR beam model using the EveryBeam library.
This function is a wrapper around the C++ library `libuse_everybeam.so`.
It takes the input parameters and converts them to the appropriate
ctypes types, then calls the C++ function to calculate the beam response.
The output is a numpy array of the beam response, with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
NOTE: This function is currently identical to the LOFAR beam function.
Both LOFAR and OSKAR call the PhasedArrayBeam class in the C++ library.
These functions have only been lightly tested for functionality. So I've
kept them separate incase it's discovered they need different default
values in the future.
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str
Element response model to use. Can be 'hamaker', 'hamakerlba', 'lobes' or 'default'.
station_idxs : np.ndarray
Array of station indices to use.
beam_ra0 : float
Right ascension of the beam center in radians.
beam_dec0 : float
Declination of the beam center in radians.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
freqs : np.ndarray
Array of frequencies to use.
apply_beam_norms : bool
Whether to apply beam normalisation. Defaults to True. Achieved by
calculating the beam response at beam centre, and multiplying all
Jones by the inverse of this central beam response.
parallactic_rotate : bool
Whether to apply parallactic angle rotation. Defaults to False.
iau_order : bool
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
Returns
-------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
woden_path = importlib_resources.files(wodenpy).joinpath(f"libuse_everybeam.so")
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
load_and_run_oskar_beam = woden_lib.load_and_run_oskar_beam
num_stations = len(station_idxs)
num_dirs = len(ras)
num_freqs = len(freqs)
num_times = len(mjd_sec_times)
ras_ctypes = (ctypes.c_double * num_dirs)()
decs_ctypes = (ctypes.c_double * num_dirs)()
for i in range(num_dirs):
ras_ctypes[i] = ras[i]
decs_ctypes[i] = decs[i]
mjd_sec_times_ctypes = (ctypes.c_double * num_times)()
for i in range(num_times):
mjd_sec_times_ctypes[i] = mjd_sec_times[i]
freqs_ctypes = (ctypes.c_double * num_freqs)()
for i in range(num_freqs):
freqs_ctypes[i] = freqs[i]
station_idxs_ctypes = (ctypes.c_int * num_stations)()
for i in range(num_stations):
station_idxs_ctypes[i] = station_idxs[i]
ms_path_ctypes = ctypes.c_char_p(ms_path.encode('utf-8'))
element_response_model_ctypes = ctypes.c_char_p(element_response_model.encode('utf-8'))
jones = ((num_stations*num_times*num_freqs*num_dirs*4)*c_double_complex)()
load_and_run_oskar_beam.argtypes = [c_char_p, c_char_p,
c_int, POINTER(c_int),
c_int, c_double, c_double,
POINTER(c_double), POINTER(c_double),
c_int, POINTER(c_double),
c_int, POINTER(c_double),
c_bool, c_bool, c_bool, c_bool,
POINTER(c_double_complex)]
load_and_run_oskar_beam(ms_path_ctypes,
element_response_model_ctypes,
num_stations, station_idxs_ctypes,
num_dirs,
beam_ra0, beam_dec0,
ras_ctypes, decs_ctypes,
num_times, mjd_sec_times_ctypes,
num_freqs, freqs_ctypes,
apply_beam_norms, parallactic_rotate,
element_only, iau_order,
jones)
jones_py = np.ctypeslib.as_array(jones, shape=(num_stations*num_times*num_freqs*num_dirs*4))
jones_py = jones_py['real'] + 1j*jones_py['imag']
jones_py = jones_py.reshape(num_stations, num_times, num_freqs, num_dirs, 2, 2)
return jones_py
[docs]
def run_mwa_beam(ms_path : str, element_response_model : bool,
coeff_path : str,
station_idxs : np.ndarray,
ras : np.ndarray, decs : np.ndarray,
mjd_sec_times : np.ndarray,
j2000_lsts : np.ndarray, j2000_latitudes : np.ndarray,
freqs : np.ndarray,
apply_beam_norms : bool = False,
parallactic_rotate : bool = True,
iau_order : bool = True,
element_only : bool = False):
"""
Run the MWA beam model using the EveryBeam library.
This function is a wrapper around the C++ library `libuse_everybeam.so`.
It takes the input parameters and converts them to the appropriate
ctypes types, then calls the C++ function to calculate the beam response.
The output is a numpy array of the beam response, with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
Parameters
----------
ms_path : str
Path to the measurement set.
element_response_model : str
Element response model to use. Can be 'hamaker', 'hamakerlba', 'lobes' or 'default'.
coeff_path : str
Path to the hdf5 FEE file.
station_idxs : np.ndarray
Array of station indices to use. For now this should always be np.zeros(1),
as there is not yet a way to pass bespoke dipole amplitudes to the C++ code.
Hence all stations will be the same, so passing multiple station ids will
just increase computation time for no gain.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
mjd_sec_times : np.ndarray
Array of times in MJD seconds.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
freqs : np.ndarray
Array of frequencies to use.
apply_beam_norms : bool
Whether to apply beam normalisation. Defaults to False. The beam seems
to be normalised by EveryBeam by default, so this is not needed.
parallactic_rotate : bool
Whether to apply parallactic angle rotation. Defaults to True.
iau_order : bool
If True, use IAU polarisation ordering, so set jones[0,0] to the NS dipole and jones[1,1] to EW. If False, jones[0,0] is EW.
element_only : bool
Whether to use only the element response. Defaults to False. Use this to
look at the dipole response only, not the beam formed response.
Returns
-------
np.ndarray
The calculated Jones matrices with shape
(num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
num_stations = len(station_idxs)
num_dirs = len(ras)
num_freqs = len(freqs)
num_times = len(mjd_sec_times)
azs = np.empty(num_dirs*num_times)
zas = np.empty(num_dirs*num_times)
para_angles = np.empty(num_dirs*num_times)
for comp_ind in range(num_dirs):
comp_has = j2000_lsts - ras[comp_ind]
these_azs, these_els = erfa.hd2ae(comp_has, decs[comp_ind], j2000_latitudes)
these_zas = np.pi/2 - these_els
these_para_angles = erfa.hd2pa(comp_has, decs[comp_ind], j2000_latitudes)
azs[comp_ind*num_times:(comp_ind+1)*num_times] = these_azs
zas[comp_ind*num_times:(comp_ind+1)*num_times] = these_zas
para_angles[comp_ind*num_times:(comp_ind+1)*num_times] = these_para_angles
woden_path = importlib_resources.files(wodenpy).joinpath(f"libuse_everybeam.so")
woden_path = "/home/jack-line/software/WODEN_dev/build/cmake_testing/GPU_or_C_code/libuse_everybeam.so"
woden_lib = ctypes.cdll.LoadLibrary(woden_path)
load_and_run_mwa_beam = woden_lib.load_and_run_mwa_beam
zas_ctypes = (ctypes.c_double*(num_dirs*num_times))()
azs_ctypes = (ctypes.c_double*(num_dirs*num_times))()
para_angles_ctypes = (ctypes.c_double*(num_dirs*num_times))()
for i in range(num_dirs*num_times):
zas_ctypes[i] = zas[i]
azs_ctypes[i] = azs[i]
para_angles_ctypes[i] = para_angles[i]
ms_path_ctypes, coeff_path_ctypes, \
element_response_model_ctypes, \
station_idxs_ctypes, freqs_ctypes, \
mjd_sec_times_ctypes = convert_common_args_to_everybeam_args(ms_path,
coeff_path, element_response_model,
station_idxs, freqs, mjd_sec_times)
jones = ((num_stations*num_times*num_freqs*num_dirs*4)*c_double_complex)()
load_and_run_mwa_beam.argtypes = [c_char_p, c_char_p, c_char_p,
c_int, POINTER(c_int),
c_int,
POINTER(c_double), POINTER(c_double),
POINTER(c_double),
c_int, POINTER(c_double),
c_int, POINTER(c_double),
c_bool, c_bool, c_bool, c_bool,
POINTER(c_double_complex)]
load_and_run_mwa_beam(ms_path_ctypes,
element_response_model_ctypes,
coeff_path_ctypes,
num_stations, station_idxs_ctypes,
num_dirs,
azs_ctypes, zas_ctypes,
para_angles_ctypes,
num_times, mjd_sec_times_ctypes,
num_freqs, freqs_ctypes,
apply_beam_norms, parallactic_rotate, element_only,
iau_order,
jones)
jones_py = np.ctypeslib.as_array(jones, shape=(num_stations*num_times*num_freqs*num_dirs*4))
jones_py = jones_py['real'] + 1j*jones_py['imag']
jones_py = jones_py.reshape(num_stations, num_times, num_freqs, num_dirs, 2, 2)
return jones_py