"""Wrappers around pyuvdata.UVBeam to calculate primary beam values.
See https://pyuvdata.readthedocs.io/en/latest/uvbeam.html for more information
on the UVBeam class.
"""
from pyuvdata import ShortDipoleBeam, BeamInterface, UVBeam
from pyuvdata.analytic_beam import AnalyticBeam
import numpy as np
import erfa
from astropy.coordinates import SkyCoord, EarthLocation
import astropy.units as u
import concurrent.futures
from typing import Tuple
from copy import deepcopy
import os
import shutil
import time
from wodenpy.use_libwoden.skymodel_structs import Components_Python
import sys
from wodenpy.wodenpy_setup.woden_logger import simple_logger
from logging import Logger
[docs]
def setup_MWA_uvbeams(hdf5_path: str, freqs: np.ndarray,
delays: np.ndarray = None,
amplitudes: np.ndarray = None,
pixels_per_deg : float = 5) -> np.array:
"""
Setup the MWA uvbeam objects for a given set of frequencies and delays.
Delays should be as appears in the metafits file, so 16 integers.
Amplitudes should have a length which is a multiple of 32; 16 amplitudes
for each dipole, for each tile. So if you want three unqiue tile beams,
submit an amplitude array of length 96.
Parameters
------------
hdf5_path : str
Path to the hdf5 file containing the MWA FEE beam model.
freqs : np.ndarray
Array of frequencies in Hz.
delays : np.ndarray, optional
Array of delays as integers as written in the metafits file. If not
given, all delays set to 0.
amplitudes : np.ndarray, optional
Array of amplitudes. If None, amplitudes of 1.0 used everywhere.
Should be in format output from wodenpy.run_setup.check_args a.k.a 16 amps for NS
dipole for tile 1, 16 amps for EW dipole for tile 1, 16 amps for NS dipole for tile 2, etc.
Should be a multiple of 32.
pixels_per_deg : float, optional
Number of pixels per degree for UVBeam to use. Defaults to 5.
Returns
--------
np.array
Array of MWA uvbeam objects, of length len(amplitudes)/32.
"""
if delays is None:
use_delays = np.zeros((2, 16), dtype='int')
else:
check = delays.size / 16
if check != 1:
sys.exit(f"Delays array size {delays.size} is equal to 16 "
"WODEN only points all tiles/polarisations to the same delay ."
"Exiting now as can't proceed sensibly.")
use_delays = np.empty((2, 16), dtype='int')
use_delays[0, :] = delays
use_delays[1, :] = delays
if amplitudes is None:
amplitudes = np.ones(32, dtype='float')
else:
check = amplitudes.size % 32
if check != 0:
sys.exit(f"Amplitudes array size {amplitudes.size} is not a multiple of 32. "
"Expected 32 delays per tile so exiting now.")
##need to go a certain distance outside the requested range
##as UVBeam does interpolation over frequencies
freq_range = [freqs.min()-2*1.28e6, freqs.max()+2*1.28e+6]
num_beams = int(amplitudes.size / 32)
uvbeam_objs = np.empty(num_beams, dtype=object)
for beam_ind in range(num_beams):
use_amplitudes = np.empty((2, 16), dtype='float')
pol1 = amplitudes[beam_ind*32:beam_ind*32+16]
pol2 = amplitudes[beam_ind*32+16:(beam_ind+1)*32]
use_amplitudes[0, :] = pol1
use_amplitudes[1, :] = pol2
mwabeam = UVBeam.from_file(hdf5_path,
pixels_per_deg=pixels_per_deg,
delays=use_delays,
amplitudes=use_amplitudes,
freq_range=freq_range)
mwabeam.peak_normalize()
uvbeam_objs[beam_ind] = mwabeam
return uvbeam_objs
[docs]
def setup_HERA_uvbeams_from_CST(filenames: list[str], freqs: np.ndarray[float],
logger : Logger = False) -> np.array:
"""
Setup the HERA uvbeam object, using a list of CST simulation files
and corresponding frequencies.
Currently only supports a single beam object, so the output
is a 1D array of length 1. Can be extended to support multiple beams
by modifying filenames and freqs to be 2D arrays. However, currently only
have access to a single set of CST files.
Parameters
------------
filenames : list[str]
A list of paths to CST simulation files containing the HERA beam
at various frequencies
freqs : np.ndarray
Array of frequencies in Hz.
logger : Logger, optional
A logger object to log messages. If False, a default logger is created.
Defaults to False.
Returns
--------
np.array
Array of HERA uvbeam objects, currently of length 1.
"""
if logger is False:
logger = simple_logger()
if len(filenames) != len(freqs):
msg = "Number of filenames and frequencies submitted to HERA pyvudata UVBeam do not match.\n"
msg += "WODEN must exit now as it doesn't know how to run the beam."
logger.error(msg)
raise ValueError(msg)
herabeam = UVBeam.from_file(
filenames, beam_type='e-field', frequency=freqs,
feed_pol='x', rotate_pol=True, telescope_name='HERA',
feed_name='PAPER_dipole', feed_version='0.1',
model_name='E-field pattern - Rigging height 4.9m',
model_version='1.0',
)
herabeam.peak_normalize()
return np.array([herabeam], dtype=object)
[docs]
def setup_HERA_uvbeams_from_single_file(filepath: str,
logger : Logger = False) -> np.array:
"""
Setup the HERA uvbeam object, using the single file path to a UVBeam
file (e.g. a beam FITS file). Will try to initialise the UVBeam object
using the `pyuvdata.UVBeam.from_file` method.
Currently only supports a single beam object, so the output
is a 1D array of length 1. Can be extended to support multiple beams
by modifying to taking in a list of filepaths.
Parameters
------------
filepath : str
Path to a single UVBeam file (e.g. a FITS file) containing the HERA beam
logger : Logger, optional
A logger object to log messages. If False, a default logger is created.
Defaults to False.
Returns
--------
np.array
Array of HERA uvbeam objects, currently of length 1.
"""
if logger is False:
logger = simple_logger()
herabeam = UVBeam.from_file(filepath)
herabeam.peak_normalize()
return np.array([herabeam], dtype=object)
[docs]
def run_uvbeam(uvbeam_objs: np.ndarray[UVBeam],
ras: np.ndarray, decs: np.ndarray,
j2000_latitudes: np.ndarray, j2000_lsts: np.ndarray,
freqs: np.ndarray,
iau_order: bool = False,
parallactic_rotate: bool = True,
freq_interp : bool = True,
logger : Logger = False) -> np.ndarray:
"""
Calculate the Jones matrices for a given set of directions, times (via `j2000_lsts`
and `j2000_latitudes`), frequencies, and stations (a.k.a tiles for MWA),
using an array of UVBeam objects.
`j2000_latitudes` should be the array latitude as precessed back to J2000,
with `j2000_lsts` being the matching LST in J2000. Uses these values to
calculate the azimuth and zenith angles for each station at each time.
Parameters
------------
uvbeam_objs: np.ndarray[UVBeam]
Array of initiliased UVBeam objects.
ras : np.ndarray
Right ascensions of the coordinates in radians.
decs : np.ndarray
Declinations of the coordinates in radians.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
freqs : np.ndarray
Array of frequencies.
iau_order : bool, optional
Whether to return the Jones matrices in IAU order (NS = X, EW = Y). Defaults to False.
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to True.
freq_interp : bool, optional
Whether to use frequency interpolation. Defaults to True. If True, uses
the default interpolation method of UVBeam.
logger : Logger, optional
A logger object to log messages. If False, a default logger is created.
Defaults to False.
Returns
--------
np.ndarray
The calculated Jones matrices with shape (num_stations, num_times, num_freqs, num_coords, 2, 2).
"""
if not logger:
logger = simple_logger()
num_stations = len(uvbeam_objs)
num_times = len(j2000_lsts)
num_freqs = len(freqs)
num_coords = len(ras)
beam = BeamInterface(uvbeam_objs[0], beam_type="efield")
num_base_freqs = len(uvbeam_objs[0].freq_array)
##Turn off freq interp if we have less than 3 frequencies
##Do a warning first time we do it
if num_base_freqs < 3:
freq_interp = False
if not getattr(run_uvbeam, "_warned_three_freqs", False):
msg = "Number of frequencies in the UVBeam object is less than 3.\n"
msg += "WODEN will proceed, but will switch off frequency interpolation.\n"
msg += "UVBeam needs at least three frequencies to perform default interpolation.\n"
msg += "Further warnings of this type will be suppressed.\n"
logger.warning(msg)
run_uvbeam._warned_three_freqs = True
# use the faster interpolation method if appropriate
if beam._isuvbeam and beam.beam.pixel_coordinate_system == "az_za":
interpol_fn = "az_za_map_coordinates"
else:
interpol_fn = None
check_azza_domain = False
all_output_jones = np.zeros((num_stations, num_times, num_freqs, num_coords, 2, 2), dtype=np.complex128)*np.nan
if parallactic_rotate:
coords = SkyCoord(ras*u.rad, decs*u.rad, frame='icrs')
for time_ind in range(num_times):
comp_has = j2000_lsts[time_ind] - ras
azs, els = erfa.hd2ae(comp_has, decs, j2000_latitudes[time_ind])
zas = np.pi/2 - els
##pyuvdata defines azimuth as 0 at East, increasing to 90 at North
##astropy, erfa, and wodenpy define azimuth as 0 at North, increasing to 90 at East
##So need to rotate azimuth by 90 degrees and invert rotation direction
##classic astronomy
use_azs = np.pi/2 - azs
where_neg_az = np.nonzero(use_azs < 0)
use_azs[where_neg_az] = use_azs[where_neg_az] + np.pi * 2.
# use_azs = azs
if parallactic_rotate:
has = j2000_lsts[time_ind] - ras
para_angles = erfa.hd2pa(has, decs, j2000_latitudes[time_ind])
rot_matrix = np.empty((num_coords, 2,2))
rot_matrix[:,0,0] = np.sin(-para_angles)
rot_matrix[:,0,1] = -np.cos(-para_angles)
rot_matrix[:,1,0] = -np.cos(-para_angles)
rot_matrix[:,1,1] = -np.sin(-para_angles)
for station_ind, beam_obj in enumerate(uvbeam_objs):
beam = BeamInterface(beam_obj, beam_type="efield")
if freq_interp:
response = beam.compute_response(az_array=use_azs,
za_array=zas,
freq_array=freqs,
check_azza_domain=check_azza_domain
)
else:
freq_interp_kind = "nearest"
spline_opts = None
response = beam.compute_response(az_array=use_azs,
za_array=zas,
freq_array=freqs,
interpolation_function=interpol_fn,
freq_interp_kind=freq_interp_kind,
spline_opts=spline_opts,
check_azza_domain=check_azza_domain
)
response[:,:,:,np.isnan(zas)] = np.nan
##Correct for MWA beam convention
all_output_jones[station_ind, time_ind, :, :, 0, 0] = response[1,0,:,:]
all_output_jones[station_ind, time_ind, :, :, 0, 1] = -response[0,0,:,:]
all_output_jones[station_ind, time_ind, :, :, 1, 0] = response[1,1,:,:]
all_output_jones[station_ind, time_ind, :, :, 1, 1] = -response[0,1,:,:]
if parallactic_rotate:
##Parallactic angle doesn't change per station or freq, only by
##time and direction
rot_jones = np.einsum('ijklm,kmn->ijkln', all_output_jones[:, time_ind, :, :, :, :], rot_matrix)
all_output_jones[:, time_ind, :, :, :, :] = rot_jones
##different telescope beams seem to come out with different east-west/ north-south
##polarisation orders. So change whether we reorder based on telescope name??
reorder = False
if uvbeam_objs[0].telescope_name == "HERA":
if not iau_order:
reorder = True
else:
if iau_order:
reorder = True
if reorder:
##swap all_output_jones[:,:,:,:,0,0] with all_output_jones[:,:,:,:,1,1]
all_output_jones[:, :, :, :, [0, 1], [0, 1]] = all_output_jones[:, :, :, :, [1, 0], [1, 0]]
##swap all_output_jones[:,:,:,:,0,1] with all_output_jones[:,:,:,:,1,0]
all_output_jones[:, :, :, :, [0, 1], [1, 0]] = all_output_jones[:, :, :, :, [1, 0], [0, 1]]
return all_output_jones
[docs]
def calc_uvbeam_for_components(components : Components_Python, uvbeam_objs : np.array,
all_freqs : np.ndarray,
j2000_latitudes : np.ndarray, j2000_lsts : np.ndarray,
iau_order : bool = True,
parallactic_rotate : bool = True,
logger : Logger = False) -> None:
"""For a set of `components` with filled `ras,decs` attributes,
calculate the primary beam Jones matrices for all UVBeam objects
in `uvbeam_objs` at the given frequencies `all_freqs`. Uses `j2000_latitudes`
and `j2000_lsts` to calculate `az,za` for all time steps.
Stores the results in the `components` `gxs,Dxs,Dys,gys`
attributes. The beam values will eventually be used by the compiled code,
either on the CPU or GPU.
Parameters
----------
components : Components_Python
An initialised Components_Python object with the ras, decs attributes
filled with the coordinates of the components. Should also have
the gxs, Dxs, Dys, gys attributes setup as an array of np.float32 or
np.float64 of length `len(uvbeam_objs)*len(all_freqs)*len(j2000_lsts)*len(components.ras)`.
float32 or float64 depends on if running simulation in "float" or "double" mode.
uvbeam_objs: np.ndarray[UVBeam]
Array of initiliased UVBeam objects.
all_freqs : np.ndarray
Array of frequencies in Hz.
j2000_latitudes : np.ndarray
Latitudes in J2000 coordinates.
j2000_lsts : np.ndarray
Local sidereal times in J2000 coordinates.
freqs : np.ndarray
Array of frequencies.
iau_order : bool, optional
Whether to return the Jones matrices in IAU order (NS = X, EW = Y). Defaults to False.
parallactic_rotate : bool, optional
parallactic_rotate : bool, optional
Whether to apply parallactic angle rotation. Defaults to True.
logger : Logger, optional
A logger object to log messages. If False, a default logger is created.
Defaults to False.
"""
if not logger:
logger = simple_logger()
all_jones = run_uvbeam(uvbeam_objs, components.ras, components.decs,
j2000_latitudes, j2000_lsts,
all_freqs,
iau_order=iau_order,
parallactic_rotate=parallactic_rotate,
logger=logger)
##all_jones comes out as shape (num_beams, num_times, num_freqs, num_components, 2, 2)
##WODEN wants this to be flat. I've intentionally made the shape so we can just
##use the flatten call
gxs = all_jones[:,:,:,:, 0,0]
Dxs = all_jones[:,:,:,:, 0,1]
Dys = all_jones[:,:,:,:, 1,0]
gys = all_jones[:,:,:,:, 1,1]
components.gxs = gxs.flatten()
components.Dxs = Dxs.flatten()
components.Dys = Dys.flatten()
components.gys = gys.flatten()