#!/usr/bin/env python3
"""Wrapper script to run the GPU WODEN code. Author: J.L.B. Line
"""
import numpy as np
import sys
import os
from time import time
import argparse
from queue import Queue
from threading import Thread
from ctypes import POINTER, c_double, c_float
from typing import Union, Tuple, List
from astropy.table import Table
from astropy.io import fits
from wodenpy.use_libwoden.beam_settings import BeamTypes
from wodenpy.use_libwoden.woden_settings import create_woden_settings, setup_lsts_and_phase_centre
from wodenpy.use_libwoden.visibility_set import setup_visi_set_array, load_visibility_set
from wodenpy.wodenpy_setup.run_setup import get_parser, check_args, get_code_version
from wodenpy.use_libwoden.use_libwoden import load_in_woden_library
from wodenpy.observational.calc_obs import get_uvfits_date_and_position_constants, calc_jdcal
from wodenpy.skymodel.woden_skymodel import crop_below_horizon
from wodenpy.skymodel.read_skymodel import get_skymodel_tables, read_radec_count_components, create_source_catalogue_from_python_sources
from wodenpy.skymodel.chunk_sky_model import create_skymodel_chunk_map, reshape_chunked_skymodel_map_sets, Skymodel_Chunk_Map
from wodenpy.array_layout.create_array_layout import calc_XYZ_diffs, enh2xyz
from wodenpy.use_libwoden.array_layout_struct import Array_Layout
from wodenpy.uvfits.wodenpy_uvfits import make_antenna_table, make_baseline_date_arrays, create_uvfits
from wodenpy.phase_rotate.remove_phase_track import remove_phase_tracking
from wodenpy.use_libwoden.shapelets import create_sbf
from wodenpy.use_libwoden.create_woden_struct_classes import Woden_Struct_Classes
from wodenpy.skymodel.read_fits_skymodel import read_fits_skymodel_chunks, calc_everybeam_for_components
from wodenpy.use_libwoden.skymodel_structs import setup_source_catalogue, Source_Python
import concurrent.futures
from line_profiler import profile, LineProfiler
from wodenpy.primary_beam.use_everybeam import run_everybeam
##Constants
R2D = 180.0 / np.pi
D2R = np.pi / 180.0
MWA_LAT = -26.703319405555554
MWA_LONG = 116.67081523611111
MWA_HEIGHT = 377.827
VELC = 299792458.0
SOLAR2SIDEREAL = 1.00274
sbf_N = 101
sbf_L = 10001
sbf_c = 5000
sbf_dx = 0.01
##This call is so we can use it as a type annotation
woden_struct_classes = Woden_Struct_Classes()
Woden_Settings = woden_struct_classes.Woden_Settings
Visi_Set = woden_struct_classes.Visi_Set
Source_Catalogue = woden_struct_classes.Source_Catalogue
def _run_run_woden(run_woden, woden_settings : Woden_Settings,
visibility_set : Visi_Set, source_catalogue : Source_Catalogue,
array_layout : Array_Layout, sbf : np.ndarray):
"""This is silly, but for profiling purposes, we need to wrap the `run_woden`
C/GPU function in another function so it's easy to pick out when profiling.
We can search for just `_run_run_woden` in the profiling output to see how
long we're spending on the GPU (just searching for `run_woden` doesn't
seem to work, at least in `yappi`.)
"""
run_woden(woden_settings, visibility_set, source_catalogue, array_layout,
sbf)
[docs]
def woden_thread(all_loaded_python_sources : List[List[Source_Python]],
all_loaded_sources_orders : List[int], round_num : int,
run_woden, woden_settings : Woden_Settings, #type: ignore
visibility_set : Visi_Set, #type: ignore
array_layout : Array_Layout,
woden_struct_classes : Woden_Struct_Classes,
sbf : np.ndarray, beamtype : int,
precision : str = "double"):
"""
This function runs WODEN C/GPU code on a separate thread, processing source catalogues from a queue until the queue is empty.
Parameters
----------
all_loaded_python_sources : List[List[Source_Python]]
A list of lists of `Source_Python` to be processed, as ouput by `read_skymodel_thread`,
where each list of `Source_Python` matches a chunk_map in `chunked_skymodel_map_set`.
Each entry is a list itself as the Shapelet chunking can result in multiple sources
per thread per round, as the chunking happens by sky direction as well as number of
shapelet coefficients.
all_loaded_sources_orders : List[int]
The order of the sources in `all_loaded_python_sources` as matched to `chunked_skymodel_map_set`;
orders also output by `read_skymodel_thread` (different threads finish in different times so
the order of the sources in `all_loaded_python_sources` may not match the order of the chunk_maps)
round_num : int
The round number of the processing
run_woden : _NamedFuncPointer
A pointer to the WODEN GPU function to be run.
woden_settings : Woden_Settings
The WODEN settings to be used.
visibility_set : Visi_Set
The visibility set to write outputs to.
array_layout : Array_Layout
The array layout to be used.
woden_struct_classes : Woden_Struct_Classes
The WODEN struct classes to be used
sbf : np.ndarray
The shapelet basis function array
beamtype : int
The value of the type of beam being used, e.g. BeamTypes.FEE_BEAM.value
"""
python_sources = []
##grab all the python sources, and reorder/flatten them to match the order of
##chunked_skymodel_maps. Not actually sure we need to do this, but if we
##change this order it'll mean we have to change the order of a bunch of
##tests. I don't think it costs much, and saves a bunch of developing time
ordering = np.argsort(all_loaded_sources_orders)
for order in ordering:
sources = all_loaded_python_sources[order]
for source in sources:
python_sources.append(source)
##Create a ctypes Source_Catalogue from the python sources to feed the GPU
source_catalogue = create_source_catalogue_from_python_sources(python_sources,
woden_struct_classes,
beamtype, precision)
print(f"Sending set {round_num} to GPU")
start = time()
#Run the GPU code in a wrapper function so we can easily profile it
_run_run_woden(run_woden, woden_settings, visibility_set,
source_catalogue, array_layout, sbf)
end = time()
print(f"Set {round_num} has returned from the GPU after {end-start:.1f} seconds")
return 0
[docs]
def read_skymodel_thread(thread_id : int, num_threads : int,
chunked_skymodel_map_sets: List[Skymodel_Chunk_Map],
lsts : np.ndarray, latitudes : np.ndarray,
args : argparse.Namespace,
beamtype : int,
main_table : Table, shape_table : Table,
v_table : Table = False, q_table : Table = False,
u_table : Table = False, p_table : Table = False) -> Tuple[List[Source_Python], int]:
"""
Reads a list of `Skymodel_Chunk_Map` types in `chunked_skymodel_map_sets`
from the given astropy tables, into a list of `Source_Python` types.
Depending on the type of primary beam specified by `beamtype`
Parameters
==========
thread_id : int
The ID of the current thread.
num_threads : int
The total number of threads.
chunked_skymodel_map_sets : List[Skymodel_Chunk_Map]
A list of chunked skymodel map sets.
lsts : np.ndarray
Local Sidereal Times (LSTs) for all time steps.
latitudes : np.ndarray
Latitudes of the array at all time steps; when doing array precession,
these will all be slightly different.
args : argparse.Namespace
Command-line arguments namespace.
beamtype : int
The value of the type of beam being used, e.g. BeamTypes.FEE_BEAM.value
main_table : Table
Main table containing skymodel data.
shape_table : Table
Table containing Shapelet data.
v_table : Table, optional
Table containing V polarization data (default is False).
q_table : Table, optional
Table containing Q polarization data (default is False).
u_table : Table, optional
Table containing U polarization data (default is False).
p_table : Table, optional
Table containing P polarization data (default is False).
profile : bool, optional
Whether to profile the function (default is False).
Returns
=======
tuple : Tuple[List[Source_Python], int]
A tuple containing the list of `Source_Python`s and the thread number,
where thead number is `thread_id % num_threads`; this can be used to
match the order of recovered data to the order of the chunked maps.
"""
set_ind = thread_id // num_threads
thread_num = thread_id % num_threads
start = time()
chunk_maps = chunked_skymodel_map_sets[set_ind][thread_num]
n_p, n_g, n_s, n_c = 0, 0, 0, 0
for ind, chunk_map in enumerate(chunk_maps):
n_p += chunk_map.n_points
n_g += chunk_map.n_gauss
n_s += chunk_map.n_shapes
n_c += chunk_map.n_shape_coeffs
print(f"From Set {set_ind} thread num {thread_num} reading {n_p} points, {n_g} gauss, {n_s} shape, {n_c} shape coeffs using thread id {thread_id}")
if len(chunk_maps) == 0:
print(f"Thread {thread_id} has no work to do")
return [], thread_num
if args.profile:
profiler = LineProfiler()
##Add whatever functions that are called in `read_fits_skymodel_chunks`
##here to profile them
profiler.add_function(read_fits_skymodel_chunks)
profiler.add_function(calc_everybeam_for_components)
profiler.add_function(run_everybeam)
profiler.enable()
python_sources = read_fits_skymodel_chunks(args, main_table, shape_table,
chunk_maps,
args.num_freq_channels, args.num_time_steps,
beamtype,
lsts, latitudes,
v_table, q_table,
u_table, p_table,
args.precision)
end = time()
print(f"Finshed thread {thread_id} in {end-start:.1f} seconds")
if args.profile:
profiler.disable()
profile_filename = f"line_profile_{os.getpid()}.lprof"
print("Dumping profile to", profile_filename)
profiler.dump_stats(profile_filename)
return python_sources, thread_num
[docs]
@profile
def main(argv=None):
"""Runs the WODEN simulator, given command line inputs. Does fancy things
like reading in the sky model and running the GPU code in parallel; the
sky model lazy load allows us to simulate sky models that cannot fit into
RAM.
Parameters
----------
argv : _type_, optional
Will be parsed from the command line if run in main script. Can be
passed in explicitly for easy testing
"""
##Grab the parser and parse some args
parser = get_parser()
args = parser.parse_args(argv)
##Check that the input arguments make sense
args = check_args(args)
##Find out what git/release version we are using, and where the code lives
gitlabel = get_code_version()
lst_deg, gst0_deg, degpdy, ut1utc = get_uvfits_date_and_position_constants(latitude=args.latitude, longitude=args.longitude,
height=args.array_height, date=args.date)
int_jd, float_jd = calc_jdcal(args.date)
jd_date = int_jd + float_jd
##Check the uvfits prepend to make sure we end in .uvfits
output_uvfits_prepend = args.output_uvfits_prepend
if output_uvfits_prepend[-7:] == '.uvfits': output_uvfits_prepend = output_uvfits_prepend[-7:]
##Generate the woden_struct_classes, which are used to mimic the C
##structs. Must be made dynamically, as the precision can change
woden_struct_classes = Woden_Struct_Classes(args.precision)
##Depending on what precision was selected by the user, load in the
##C/CUDA library and return the `run_woden` function
run_woden = load_in_woden_library(woden_struct_classes)
num_baselines = int(((args.num_antennas - 1)*args.num_antennas) / 2)
if args.do_autos:
num_visis = args.num_time_steps*args.num_freq_channels*(num_baselines + args.num_antennas)
else:
num_visis = args.num_time_steps*args.num_freq_channels*num_baselines
##populates a ctype equivalent of woden_settings struct to pass
##to the C library
woden_settings = create_woden_settings(woden_struct_classes.Woden_Settings(),
args, jd_date, lst_deg)
##fill the lst and mjds fields, precessing if necessary
lsts, latitudes = setup_lsts_and_phase_centre(woden_settings)
##calculate the array layout
array_layout = calc_XYZ_diffs(woden_settings, args)
# ##read in and chunk the sky model=======================================
print("Doing the initial reading/mapping of sky model into chunks")
t_before = time()
comp_counter = read_radec_count_components(args.cat_filename)
t_after = time()
print(f"Mapping took {(t_after - t_before)/60.0:.1f} mins")
crop_by_component = True
if args.sky_crop_sources:
crop_by_component = False
comp_counter = crop_below_horizon(woden_settings.lsts[0],
woden_settings.latitude,
comp_counter,
crop_by_component=crop_by_component)
max_chunks_per_set=args.num_threads
chunked_skymodel_map_sets = create_skymodel_chunk_map(comp_counter,
args.chunking_size, woden_settings.num_baselines,
args.num_freq_channels,
args.num_time_steps,
num_threads=args.num_threads,
max_chunks_per_set=max_chunks_per_set,
max_dirs=args.max_sky_directions)
if args.dry_run:
##User only wants to check whether the arguments would have worked or not
pass
else:
##Create the shapelet basis functions
sbf = create_sbf(precision=args.precision)
##Create an array of visibility_sets, which get fed into run_woden
##and store the output visibilities
visi_set_array = setup_visi_set_array(woden_struct_classes.Visi_Set,
len(args.band_nums), num_visis,
precision=args.precision)
###---------------------------------------------------------------------
### heavy lifting area - here we setup running the sky model reading
### and running GPU code at the same time. Means we can limit the
### absolute amount of RAM used, and save time doing IO at the same
### time as compute
num_threads = args.num_threads
num_rounds = len(chunked_skymodel_map_sets)
main_table, shape_table, v_table, q_table, u_table, p_table = get_skymodel_tables(args.cat_filename)
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
gpu_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
gpu_calc = None # To store the future of the calculation thread
# Loop through multiple rounds of data reading and calculation
for round_num in range(num_rounds):
future_data = [executor.submit(read_skymodel_thread, i + round_num * num_threads,
num_threads, chunked_skymodel_map_sets,
lsts, latitudes,
args,
woden_settings.beamtype,
main_table, shape_table,
v_table, q_table, u_table, p_table)
for i in range(num_threads)]
all_loaded_python_sources = []
all_loaded_sources_orders = []
for future in concurrent.futures.as_completed(future_data):
python_sources, order = future.result()
all_loaded_python_sources.append(python_sources)
all_loaded_sources_orders.append(order)
# Wait for previous calculation to complete (if there was one)
if gpu_calc is not None:
meh = gpu_calc.result()
gpu_calc = gpu_executor.submit(woden_thread,
all_loaded_python_sources,
all_loaded_sources_orders,
round_num,
run_woden, woden_settings,
visi_set_array, array_layout,
woden_struct_classes, sbf,
woden_settings.beamtype,
args.precision)
# # Wait for the final calculation to complete
if gpu_calc is not None:
meh = gpu_calc.result()
### we've now calculated all the visibilities
###---------------------------------------------------------------------
##I think we want to X,Y,Z to be in the current frame for writing
##out to the uvfits, so calculate again
X,Y,Z = enh2xyz(args.east, args.north, args.height, args.latitude*D2R)
##X,Y,Z are stored in a 2D array in units of seconds in the uvfits file
XYZ_array = np.empty((args.num_antennas,3))
XYZ_array[:,0] = X
XYZ_array[:,1] = Y
XYZ_array[:,2] = Z
##If we were to grab them out of the code used in the simulation,
##need to do this
# expec_num = woden_settings.num_time_steps*args.num_antennas
# XYZ_array[:,0] = np.ctypeslib.as_array(array_layout.ant_X, shape=(expec_num, ))[:args.num_antennas]
# XYZ_array[:,1] = np.ctypeslib.as_array(array_layout.ant_Y, shape=(expec_num, ))[:args.num_antennas]
# XYZ_array[:,2] = np.ctypeslib.as_array(array_layout.ant_Z, shape=(expec_num, ))[:args.num_antennas]
##Get the central frequency channels, used in the uvfits header
central_freq_chan = int(np.floor(args.num_freq_channels / 2.0))
##Useful number
num_baselines = int(((args.num_antennas - 1)*args.num_antennas) / 2)
##Loop over coarse frequency band and convert visibilities output
##from woden.c into uvfits files
for band_ind, band in enumerate(args.band_nums):
output_uvfits_name = output_uvfits_prepend + '_band%02d.uvfits' %band
band_low_freq = args.lowest_channel_freq + (band - 1)*args.coarse_band_width
central_freq_chan_value = band_low_freq + central_freq_chan*args.freq_res
uus,vvs,wws,v_container = load_visibility_set(visibility_set=visi_set_array[band_ind],
num_baselines=num_baselines,
num_freq_channels=args.num_freq_channels,
num_time_steps=args.num_time_steps,
precision=args.precision,
do_autos=args.do_autos,
num_ants=args.num_antennas)
if args.remove_phase_tracking:
frequencies = band_low_freq + np.arange(args.num_freq_channels)*args.freq_res
v_container = remove_phase_tracking(frequencies=frequencies,
wws_seconds=wws,
num_time_steps=args.num_time_steps,
v_container=v_container,
num_baselines=num_baselines)
hdu_ant = make_antenna_table(XYZ_array=XYZ_array,telescope_name=args.telescope_name,
num_antennas=args.num_antennas, freq_cent=central_freq_chan_value,
date=args.date, gst0_deg=gst0_deg, degpdy=degpdy,
ut1utc=ut1utc, longitude=args.longitude, latitude=args.latitude,
array_height=args.array_height,
ant_names=args.ant_names)
baselines_array, date_array = make_baseline_date_arrays(args.num_antennas,
args.date, args.num_time_steps, args.time_res,
do_autos=args.do_autos)
create_uvfits(v_container=v_container, freq_cent=central_freq_chan_value,
ra_point=args.ra0, dec_point=args.dec0,
output_uvfits_name=output_uvfits_name,
uu=uus, vv=vvs, ww=wws, baselines_array=baselines_array,
date_array=date_array,
central_freq_chan=central_freq_chan,
ch_width=args.freq_res, int_jd=int_jd,
hdu_ant=hdu_ant, gitlabel=gitlabel,
longitude=args.longitude, latitude=args.latitude,
array_height=args.array_height,
telescope_name=args.telescope_name,
IAU_order=args.IAU_order,
comment=args.command)
if __name__ == "__main__":
main()