Source code for run_woden

#!/usr/bin/env python3
"""Wrapper script to run the GPU WODEN code. Author: J.L.B. Line
"""
import numpy as np
import sys
import os
import importlib_resources
import wodenpy
from time import time
import argparse
from threading import Thread
from ctypes import POINTER, c_double, c_float, c_int, create_string_buffer, string_at
import ctypes
from typing import Union, Tuple, List
from astropy.table import Table
from astropy.io import fits
from wodenpy.use_libwoden.beam_settings import BeamTypes, BeamGroups
from wodenpy.use_libwoden.woden_settings import fill_woden_settings_python, setup_lsts_and_phase_centre, Woden_Settings_Python, convert_woden_settings_to_ctypes
from wodenpy.use_libwoden.visibility_set import setup_visi_set_array, load_visibility_set, Visi_Set_Python
from wodenpy.wodenpy_setup.run_setup import get_parser, check_args, get_code_version
from wodenpy.use_libwoden.use_libwoden import load_in_run_woden
from wodenpy.observational.calc_obs import get_uvfits_date_and_position_constants, calc_jdcal
from wodenpy.skymodel.woden_skymodel import crop_below_horizon
from wodenpy.skymodel.read_skymodel import get_skymodel_tables, read_radec_count_components, create_source_catalogue_from_python_sources
from wodenpy.skymodel.chunk_sky_model import create_skymodel_chunk_map, reshape_chunked_skymodel_map_sets, Skymodel_Chunk_Map
from wodenpy.array_layout.create_array_layout import calc_XYZ_diffs, enh2xyz, Array_Layout_Python, convert_array_layout_to_ctypes, setup_array_layout_python
from wodenpy.use_libwoden.array_layout_struct import Array_Layout_Ctypes
from wodenpy.uvfits.wodenpy_uvfits import make_antenna_table, make_baseline_date_arrays, create_uvfits
from wodenpy.phase_rotate.remove_phase_track import remove_phase_tracking
from wodenpy.use_libwoden.shapelets import create_sbf
from wodenpy.use_libwoden.create_woden_struct_classes import Woden_Struct_Classes
from wodenpy.skymodel.read_fits_skymodel import read_fits_skymodel_chunks
from wodenpy.use_libwoden.skymodel_structs import setup_source_catalogue, Source_Python
import concurrent.futures
from logging import Logger
import logging
from concurrent.futures import ProcessPoolExecutor, as_completed
from line_profiler import profile, LineProfiler
from wodenpy.primary_beam.use_everybeam import run_everybeam
from wodenpy.wodenpy_setup.woden_logger import get_log_callback, set_logger_header, simple_logger, log_chosen_beamtype, summarise_input_args, set_woden_logger
from copy import deepcopy
import multiprocessing
from datetime import timedelta
import traceback
import shutil

##Constants
R2D = 180.0 / np.pi
D2R = np.pi / 180.0
MWA_LAT = -26.703319405555554
MWA_LONG = 116.67081523611111
MWA_HEIGHT = 377.827
VELC = 299792458.0
SOLAR2SIDEREAL = 1.00274

sbf_N = 101
sbf_L = 10001
sbf_c = 5000
sbf_dx = 0.01

##This call is so we can use it as a type annotation
woden_struct_classes = Woden_Struct_Classes()
Woden_Settings = woden_struct_classes.Woden_Settings
Visi_Set = woden_struct_classes.Visi_Set
Source_Catalogue = woden_struct_classes.Source_Catalogue

[docs] def woden_worker(thread_ind : int, all_loaded_python_sources : List[List[Source_Python]], all_loaded_sources_orders : List[int], round_num : int, woden_settings_python : Woden_Settings_Python, array_layout_python : Array_Layout_Python, visi_sets_python : List[Visi_Set_Python], beamtype : int, logging_level : int = logging.DEBUG, precision : str = "double", logger : Logger = False, profile = False) -> Tuple[List[Visi_Set_Python], int, int]: """ This function runs WODEN CPU code on a separate thread, processing source catalogues from a queue until the queue is empty. Parameters ---------- all_loaded_python_sources : List[List[Source_Python]] A list of lists of `Source_Python` to be processed, as ouput by `read_skymodel_worker`, where each list of `Source_Python` matches a chunk_map in `chunked_skymodel_map_set`. Each entry is a list itself as the Shapelet chunking can result in multiple sources per thread per round, as the chunking happens by sky direction as well as number of shapelet coefficients. all_loaded_sources_orders : List[int] The order of the sources in `all_loaded_python_sources` as matched to `chunked_skymodel_map_set`; orders also output by `read_skymodel_worker` (different threads finish in different times so the order of the sources in `all_loaded_python_sources` may not match the order of the chunk_maps) round_num : int The round number of the processing run_woden : _NamedFuncPointer A pointer to the WODEN GPU function to be run. woden_settings : Woden_Settings The WODEN settings to be used. array_layout : Array_Layout_Ctypes The array layout to be used. woden_struct_classes : Woden_Struct_Classes The WODEN struct classes to be used sbf : np.ndarray The shapelet basis function array beamtype : int The value of the type of beam being used, e.g. BeamTypes.FEE_BEAM.value """ try: if woden_settings_python.do_gpu == 1: device = 'GPU' else: device = 'CPU' woden_struct_classes = Woden_Struct_Classes(precision) ##Depending on what precision was selected by the user, load in the # ##C/CUDA library and return the `run_woden` function # run_woden = load_in_woden_library(woden_struct_classes) woden_lib_path = importlib_resources.files(wodenpy).joinpath(f"libwoden_{woden_struct_classes.precision}.so") woden_lib = ctypes.cdll.LoadLibrary(woden_lib_path) ##This lets the WODEN C/C++/GPU code write to the logger c_log_callback = get_log_callback(logger, logging_level) woden_lib.set_log_callback(c_log_callback) run_woden = load_in_run_woden(woden_lib, woden_struct_classes) python_sources = [] ##grab all the python sources, and reorder/flatten them to match the order of ##chunked_skymodel_maps. Not actually sure we need to do this, but if we ##change this order it'll mean we have to change the order of a bunch of ##tests. I don't think it costs much, and saves a bunch of developing time ordering = np.argsort(all_loaded_sources_orders) for order in ordering: sources = all_loaded_python_sources[order] for source in sources: python_sources.append(source) if len(python_sources) == 0: logger.warning(f"Visibility processing thread {thread_ind} has no work to do") # print(f"Visibility processing thread {thread_ind} has no work to do") return visi_sets_python, thread_ind, round_num ##Create a ctypes Source_Catalogue from the python sources to feed the GPU source_catalogue = create_source_catalogue_from_python_sources(python_sources, woden_struct_classes, beamtype, precision) woden_settings = woden_struct_classes.Woden_Settings() woden_settings = convert_woden_settings_to_ctypes(woden_settings_python, woden_settings) visibility_set = setup_visi_set_array(woden_struct_classes.Visi_Set, woden_settings.num_bands, woden_settings.num_visis, precision=precision) array_layout = Array_Layout_Ctypes() array_layout = convert_array_layout_to_ctypes(array_layout_python, array_layout) sbf = create_sbf(precision=precision) ##We send all sky models chunks to the GPU at once, and then process them if device == 'GPU': logger.info(f"Sending Sky set {round_num} to the {device}") ##We send each chunk to a separate CPU thread, so report the thread ind else: logger.info(f"Sending Sky set {round_num} chunk {thread_ind} to the {device}") # print(f"Sending Sky set {round_num} thread {thread_ind} to CPU") start = time() run_woden(woden_settings, visibility_set, source_catalogue, array_layout, sbf) end = time() if device == 'GPU': logger.info(f"Sky set {round_num} has returned from the {device} after {end-start:.1f} seconds") else: logger.info(f"Sky set {round_num} chunk {thread_ind} has returned from the {device} after {end-start:.1f} seconds") for band_ind in range(woden_settings_python.num_bands): visi_sets_python[band_ind].us_metres = deepcopy(np.ctypeslib.as_array(visibility_set[band_ind].us_metres, shape=(woden_settings_python.num_visis,))) visi_sets_python[band_ind].vs_metres = deepcopy(np.ctypeslib.as_array(visibility_set[band_ind].vs_metres, shape=(woden_settings_python.num_visis,))) visi_sets_python[band_ind].ws_metres = deepcopy(np.ctypeslib.as_array(visibility_set[band_ind].ws_metres, shape=(woden_settings_python.num_visis,))) visi_sets_python[band_ind].sum_visi_XX_real += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_XX_real, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_XX_imag += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_XX_imag, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_XY_real += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_XY_real, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_XY_imag += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_XY_imag, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_YX_real += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_YX_real, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_YX_imag += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_YX_imag, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_YY_real += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_YY_real, shape=(woden_settings_python.num_visis,)) visi_sets_python[band_ind].sum_visi_YY_imag += np.ctypeslib.as_array(visibility_set[band_ind].sum_visi_YY_imag, shape=(woden_settings_python.num_visis,)) return visi_sets_python, thread_ind, round_num except Exception as e: return f"woden_worker: {e}\n{traceback.format_exc()}"
[docs] def read_skymodel_worker(thread_id : int, num_threads : int, chunked_skymodel_map_sets: List[Skymodel_Chunk_Map], lsts : np.ndarray, latitudes : np.ndarray, args : argparse.Namespace, beamtype : int, main_table : Table, shape_table : Table, v_table : Table = False, q_table : Table = False, u_table : Table = False, p_table : Table = False, logger : Logger = False) -> Tuple[List[Source_Python], int]: """ Reads a list of `Skymodel_Chunk_Map` types in `chunked_skymodel_map_sets` from the given astropy tables, into a list of `Source_Python` types. Depending on the type of primary beam specified by `beamtype` Parameters ========== thread_id : int The ID of the current thread. num_threads : int The total number of threads. chunked_skymodel_map_sets : List[Skymodel_Chunk_Map] A list of chunked skymodel map sets. lsts : np.ndarray Local Sidereal Times (LSTs) for all time steps. latitudes : np.ndarray Latitudes of the array at all time steps; when doing array precession, these will all be slightly different. args : argparse.Namespace Command-line arguments namespace. beamtype : int The value of the type of beam being used, e.g. BeamTypes.FEE_BEAM.value main_table : Table Main table containing skymodel data. shape_table : Table Table containing Shapelet data. v_table : Table, optional Table containing V polarization data (default is False). q_table : Table, optional Table containing Q polarization data (default is False). u_table : Table, optional Table containing U polarization data (default is False). p_table : Table, optional Table containing P polarization data (default is False). profile : bool, optional Whether to profile the function (default is False). Returns ======= tuple : Tuple[List[Source_Python], int] A tuple containing the list of `Source_Python`s and the thread number, where thead number is `thread_id % num_threads`; this can be used to match the order of recovered data to the order of the chunked maps. """ if not logger: logger = simple_logger(args.log_level) try: set_ind = thread_id // num_threads thread_num = thread_id % num_threads start = time() chunk_maps = chunked_skymodel_map_sets[set_ind][thread_num] n_p, n_g, n_s, n_c = 0, 0, 0, 0 for ind, chunk_map in enumerate(chunk_maps): n_p += chunk_map.n_points n_g += chunk_map.n_gauss n_s += chunk_map.n_shapes n_c += chunk_map.n_shape_coeffs msg = f"From sky set {set_ind} thread num {thread_num} reading {n_p} points, {n_g} gauss, {n_s} shape, {n_c} shape coeffs" # msg = f'cmon man {thread_id}' logger.info(msg) if len(chunk_maps) == 0: logger.warning(f"Sky model reading thread {thread_id} has no work to do") # print(f"Sky model reading thread {thread_id} has no work to do") return [], thread_num if args.profile: profiler = LineProfiler() ##Add whatever functions that are called in `read_fits_skymodel_chunks` ##here to profile them profiler.add_function(read_fits_skymodel_chunks) profiler.add_function(run_everybeam) profiler.enable() python_sources = read_fits_skymodel_chunks(args, main_table, shape_table, chunk_maps, args.num_freq_channels, args.num_time_steps, beamtype, lsts, latitudes, v_table, q_table, u_table, p_table, args.precision) end = time() if beamtype in BeamGroups.eb_beam_values: logger.info(f"Finshed sky set {set_ind} reading thread num {thread_num} in {end-start:.1f} seconds") else: logger.debug(f"Finshed sky set {set_ind} reading thread num {thread_num} in {end-start:.1f} seconds") if args.profile: profiler.disable() profile_filename = f"wod_read_skymodel_worker_{os.getpid()}_{set_ind:04d}.lprof" # logger.info("Dumping profile to", profile_filename) profiler.dump_stats(profile_filename) return python_sources, thread_num except Exception as e: return f"read_skymodel_worker: {e}\n{traceback.format_exc()}"
[docs] def sum_components_in_chunked_skymodel_map_sets(chunked_skymodel_map_sets): """ Sums the number of components in a list of `Skymodel_Chunk_Map` types. Parameters ========== chunked_skymodel_map_sets : List[Skymodel_Chunk_Map] A list of chunked skymodel map sets. Returns ======= int : The total number of components in the list of `Skymodel_Chunk_Map` types. """ n_points, n_gauss, n_shapes, n_shape_coeffs = 0, 0, 0, 0 for chunk_map_set in chunked_skymodel_map_sets: for chunk_maps in chunk_map_set: if len(chunk_maps) > 0: for chunk_map in chunk_maps: n_points += chunk_map.n_points n_gauss += chunk_map.n_gauss n_shapes += chunk_map.n_shapes n_shape_coeffs += chunk_map.n_shape_coeffs return n_points, n_gauss, n_shapes, n_shape_coeffs
def get_future_result(future, logger): try: result = future.result() if isinstance(result, str): logger.error(result) sys.exit(result) else: return result except Exception as e: msg = f"Exception in getting result from future: {e}" logger.error(msg) sys.exit(msg) def run_woden_processing(num_threads, num_rounds, chunked_skymodel_map_sets, lsts, latitudes, args, beamtype, main_table, shape_table, v_table, q_table, u_table, p_table, woden_settings_python, array_layout_python, visi_sets_python, logger = False, serial_mode = False): if not logger: logger = simple_logger(args.log_level) total_n_points, total_n_gauss, total_n_shapes, total_n_shape_coeffs = sum_components_in_chunked_skymodel_map_sets(chunked_skymodel_map_sets) total_comps = total_n_points + total_n_gauss + total_n_shapes + total_n_shape_coeffs if serial_mode: for round_num in range(num_rounds): logger.info(f"Reading set {round_num} sky models") all_loaded_python_sources = [] all_loaded_sources_orders = [] for i in range(num_threads): python_sources, order = read_skymodel_worker(i + round_num * num_threads, num_threads, chunked_skymodel_map_sets, lsts, latitudes, args, beamtype, main_table, shape_table, v_table, q_table, u_table, p_table, logger) all_loaded_python_sources.append(python_sources) all_loaded_sources_orders.append(order) visi_set_python, _, completed_round = woden_worker(0, all_loaded_python_sources, all_loaded_sources_orders, round_num, woden_settings_python, array_layout_python, visi_sets_python[0,:], beamtype, args.log_level, args.precision, profile=args.profile, logger=logger) visi_sets_python[0, :] = visi_set_python done_n_points, done_n_gauss, done_n_shapes, done_n_shape_coeffs = sum_components_in_chunked_skymodel_map_sets(chunked_skymodel_map_sets[:completed_round + 1]) done_comps = done_n_points + done_n_gauss + done_n_shapes + done_n_shape_coeffs logger.info(f"Have completed {done_comps} of {total_comps} components calcs ({(done_comps/total_comps)*100:.1f}%)") logger.debug(f"\t{done_n_points} of {total_n_points} points\n" f"\t{done_n_gauss} of {total_n_gauss} gauss\n" f"\t{done_n_shapes} of {total_n_shapes} shapelets\n" f"\t{done_n_shape_coeffs} of {total_n_shape_coeffs} shape coeffs") else: if args.cpu_mode: num_visi_threads = num_threads device = 'CPU' else: num_visi_threads = 1 device = 'GPU' # logger.info(f"Running WODEN with {num_threads} threads for sky model reading and {num_visi_threads} threads for visibility processing") if device == 'CPU': with ProcessPoolExecutor(max_workers=num_threads) as sky_model_executor, ProcessPoolExecutor(max_workers=num_visi_threads) as visi_executor: # # Loop through multiple rounds of data reading and calculation for round_num in range(num_rounds): logger.info(f"Reading set {round_num} sky models") future_data_sky = [sky_model_executor.submit(read_skymodel_worker, i + round_num * num_threads, num_threads, chunked_skymodel_map_sets, lsts, latitudes, args, beamtype, main_table, shape_table, v_table, q_table, u_table, p_table, logger) for i in range(num_threads)] all_loaded_python_sources = [0]*num_threads all_loaded_sources_orders = [0]*num_threads for future in future_data_sky: python_sources, order = get_future_result(future, logger) all_loaded_python_sources[order] = python_sources all_loaded_sources_orders[order] = order future_data_visi = [visi_executor.submit(woden_worker, i, [all_loaded_python_sources[i]], #all_loaded_python_sources [all_loaded_sources_orders[i]], #all_loaded_sources_orders round_num, woden_settings_python, array_layout_python, visi_sets_python[i, :], beamtype, args.log_level, args.precision, logger, args.profile) for i in range(num_visi_threads)] completed = 0 for future in concurrent.futures.as_completed(future_data_visi): visi_set_python, thread_ind, completed_round = get_future_result(future, logger) visi_sets_python[thread_ind, :] = visi_set_python completed += 1 if completed == num_visi_threads: done_n_points, done_n_gauss, done_n_shapes, done_n_shape_coeffs = sum_components_in_chunked_skymodel_map_sets(chunked_skymodel_map_sets[:completed_round + 1]) done_comps = done_n_points + done_n_gauss + done_n_shapes + done_n_shape_coeffs logger.info(f"Have completed {done_comps} of {total_comps} components calcs ({(done_comps/total_comps)*100:.1f}%)") logger.debug(f"\t{done_n_points} of {total_n_points} points\n" f"\t{done_n_gauss} of {total_n_gauss} gauss\n" f"\t{done_n_shapes} of {total_n_shapes} shapelets\n" f"\t{done_n_shape_coeffs} of {total_n_shape_coeffs} shape coeffs") else: with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads) as sky_model_executor: visi_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) gpu_calc = None # To store the future of the calculation thread # # Loop through multiple rounds of data reading and calculation for round_num in range(num_rounds): logger.info(f"Reading set {round_num} sky models") future_data_sky = [sky_model_executor.submit(read_skymodel_worker, i + round_num * num_threads, num_threads, chunked_skymodel_map_sets, lsts, latitudes, args, beamtype, main_table, shape_table, v_table, q_table, u_table, p_table, logger) for i in range(num_threads)] all_loaded_python_sources = [] all_loaded_sources_orders = [] for future in concurrent.futures.as_completed(future_data_sky): python_sources, order = get_future_result(future, logger) all_loaded_python_sources.append(python_sources) all_loaded_sources_orders.append(order) # Wait for previous calculation to complete (if there was one) if gpu_calc is not None: # visi_set_python, _, completed_round = gpu_calc.result() visi_set_python, _, completed_round = get_future_result(gpu_calc, logger) visi_sets_python[0, :] = visi_set_python done_n_points, done_n_gauss, done_n_shapes, done_n_shape_coeffs = sum_components_in_chunked_skymodel_map_sets(chunked_skymodel_map_sets[:completed_round + 1]) done_comps = done_n_points + done_n_gauss + done_n_shapes + done_n_shape_coeffs logger.info(f"Have completed {done_comps} of {total_comps} components calcs ({(done_comps/total_comps)*100:.1f}%)") logger.debug(f"\t{done_n_points} of {total_n_points} points\n" f"\t{done_n_gauss} of {total_n_gauss} gauss\n" f"\t{done_n_shapes} of {total_n_shapes} shapelets\n" f"\t{done_n_shape_coeffs} of {total_n_shape_coeffs} shape coeffs") gpu_calc = visi_executor.submit(woden_worker, 0, all_loaded_python_sources, all_loaded_sources_orders, round_num, woden_settings_python, array_layout_python, visi_sets_python[0,:], beamtype, args.log_level, args.precision, logger, args.profile) # # Wait for the final calculation to complete if gpu_calc is not None: # visi_set_python, _, completed_round = gpu_calc.result() visi_set_python, _, completed_round = get_future_result(gpu_calc, logger) visi_sets_python[0, :] = visi_set_python done_n_points, done_n_gauss, done_n_shapes, done_n_shape_coeffs = sum_components_in_chunked_skymodel_map_sets(chunked_skymodel_map_sets[:completed_round + 1]) done_comps = done_n_points + done_n_gauss + done_n_shapes + done_n_shape_coeffs logger.info(f"Have completed {done_comps} of {total_comps} components calcs ({(done_comps/total_comps)*100:.1f}%)") logger.debug(f"\t{done_n_points} of {total_n_points} points\n" f"\t{done_n_gauss} of {total_n_gauss} gauss\n" f"\t{done_n_shapes} of {total_n_shapes} shapelets\n" f"\t{done_n_shape_coeffs} of {total_n_shape_coeffs} shape coeffs") logger.info("Finished all rounds of processing") return visi_sets_python
[docs] @profile def main(argv=None): """Runs the WODEN simulator, given command line inputs. Does fancy things like reading in the sky model and running the GPU code in parallel; the sky model lazy load allows us to simulate sky models that cannot fit into RAM. Parameters ---------- argv : _type_, optional Will be parsed from the command line if run in main script. Can be passed in explicitly for easy testing """ woden_start = time() ##Find out what git/release version we are using, and where the code lives gitlabel = get_code_version() if not argv: argv = sys.argv[1:] if '--version' in argv: print(f"You are using wodenpy version/git hash: {gitlabel}") sys.exit() ##Grab the parser and parse some args parser = get_parser() args = parser.parse_args(argv) ##Check that the input arguments make sense args = check_args(args) if args.num_threads == 1: serial_mode = True else: serial_mode = False main_logger = set_woden_logger(args.log_level, args.log_file_name) set_logger_header(main_logger, gitlabel) summarise_input_args(main_logger, args) lst_deg, gst0_deg, degpdy, ut1utc = get_uvfits_date_and_position_constants(latitude=args.latitude, longitude=args.longitude, height=args.array_height, date=args.date) jd_midnight, float_jd = calc_jdcal(args.date) jd_date = jd_midnight + float_jd ##Generate the woden_struct_classes, which are used to mimic the C ##structs. Must be made dynamically, as the precision can change woden_struct_classes = Woden_Struct_Classes(args.precision) num_baselines = int(((args.num_antennas - 1)*args.num_antennas) / 2) if args.do_autos: num_visis = args.num_time_steps*args.num_freq_channels*(num_baselines + args.num_antennas) else: num_visis = args.num_time_steps*args.num_freq_channels*num_baselines woden_settings_python = fill_woden_settings_python(args, jd_date, lst_deg) ##fill the lst and mjds fields, precessing if necessary lsts, latitudes = setup_lsts_and_phase_centre(woden_settings_python, main_logger) ##Precessing the array can be expensive if there are 1000s of time steps ##so only calculate it if we need to if args.dry_run: array_layout_python = setup_array_layout_python(woden_settings_python, args) else: ##calculate the array layout array_layout_python = calc_XYZ_diffs(woden_settings_python, args, main_logger) ##report what beam type we are using log_chosen_beamtype(main_logger, woden_settings_python, args) # ##read in and chunk the sky model======================================= main_logger.info("Doing the initial mapping of sky model") t_before = time() comp_counter = read_radec_count_components(args.cat_filename) t_after = time() main_logger.info(f"Sky model mapping took {(t_after - t_before)/60.0:.1f} mins") crop_by_component = True if args.sky_crop_sources: crop_by_component = False comp_counter = crop_below_horizon(woden_settings_python.lsts[0], woden_settings_python.latitude, comp_counter, main_logger, crop_by_component=crop_by_component) main_logger.debug(comp_counter.info_string()) if serial_mode: num_threads = 1 else: num_threads = args.num_threads if args.cpu_mode: max_chunks_per_set=num_threads else: max_chunks_per_set=32 # max_chunks_per_set=args.num_threads*3 chunked_skymodel_map_sets = create_skymodel_chunk_map(comp_counter, args.chunking_size, woden_settings_python.num_baselines, args.num_freq_channels, args.num_time_steps, num_threads=num_threads, max_chunks_per_set=max_chunks_per_set, max_dirs=args.max_sky_directions, beamtype_value=woden_settings_python.beamtype) if args.dry_run: ##User only wants to check whether the arguments would have worked or not main_logger.info("As --dry-run was selected, exiting now before calculating visibilities") else: woden_lib_path = importlib_resources.files(wodenpy).joinpath(f"libwoden_{args.precision}.so") main_logger.info(f"Will load libwoden from {woden_lib_path}") ##Create the shapelet basis functions sbf = create_sbf(precision=args.precision) ##Create an array of visibility_sets, which get fed into run_woden ##and store the output visibilities visi_set_array = setup_visi_set_array(woden_struct_classes.Visi_Set, len(args.band_nums), num_visis, precision=args.precision) if woden_settings_python.do_gpu or serial_mode: num_visi_threads = 1 else: num_visi_threads = args.num_threads ##Something to hold all visibility outputs for all threads, for all bands visi_sets_python = [[Visi_Set_Python() for _ in range(len(args.band_nums))] for _ in range(num_visi_threads)] visi_sets_python = np.array(visi_sets_python) ##initialise to zeros for thread_ind in range(num_visi_threads): for band_ind in range(len(args.band_nums)): visi_sets_python[thread_ind, band_ind].sum_visi_XX_real = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_XX_imag = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_XY_real = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_XY_imag = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_YX_real = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_YX_imag = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_YY_real = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].sum_visi_YY_imag = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].us_metres = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].vs_metres = np.zeros(num_visis) visi_sets_python[thread_ind, band_ind].ws_metres = np.zeros(num_visis) ###--------------------------------------------------------------------- ### heavy lifting area - here we setup running the sky model reading ### and running CPU/GPU code at the same time. Means we can limit the ### absolute amount of RAM used, and save time doing IO at the same ### time as compute # num_threads = args.num_threads num_rounds = len(chunked_skymodel_map_sets) main_table, shape_table, v_table, q_table, u_table, p_table = get_skymodel_tables(args.cat_filename) msg = "" if serial_mode: para_mode = "serial" else: para_mode = "parallel" if args.cpu_mode: msg = f"Running in {para_mode} on CPU mode with {num_threads} threads" else: msg = f"Running in {para_mode} on GPU.\nWill read sky model using {num_threads} threads" msg += f"\nThere are {num_rounds} sets of sky models to run" main_logger.info(msg) run_woden_processing(num_threads, num_rounds, chunked_skymodel_map_sets, lsts, latitudes, args, woden_settings_python.beamtype, main_table, shape_table, v_table, q_table, u_table, p_table, woden_settings_python, array_layout_python, visi_sets_python, main_logger, serial_mode) ### we've now calculated all the visibilities ###--------------------------------------------------------------------- visi_sets_python_combined = visi_sets_python[0, :] for thread_ind in range(1, num_visi_threads): for band_ind in range(len(args.band_nums)): visi_sets_python_combined[band_ind].sum_visi_XX_real += visi_sets_python[thread_ind, band_ind].sum_visi_XX_real visi_sets_python_combined[band_ind].sum_visi_XX_imag += visi_sets_python[thread_ind, band_ind].sum_visi_XX_imag visi_sets_python_combined[band_ind].sum_visi_XY_real += visi_sets_python[thread_ind, band_ind].sum_visi_XY_real visi_sets_python_combined[band_ind].sum_visi_XY_imag += visi_sets_python[thread_ind, band_ind].sum_visi_XY_imag visi_sets_python_combined[band_ind].sum_visi_YX_real += visi_sets_python[thread_ind, band_ind].sum_visi_YX_real visi_sets_python_combined[band_ind].sum_visi_YX_imag += visi_sets_python[thread_ind, band_ind].sum_visi_YX_imag visi_sets_python_combined[band_ind].sum_visi_YY_real += visi_sets_python[thread_ind, band_ind].sum_visi_YY_real visi_sets_python_combined[band_ind].sum_visi_YY_imag += visi_sets_python[thread_ind, band_ind].sum_visi_YY_imag ##I think we want to X,Y,Z to be in the current frame for writing ##out to the uvfits, so calculate again X,Y,Z = enh2xyz(args.east, args.north, args.height, args.latitude*D2R) ##X,Y,Z are stored in a 2D array in units of seconds in the uvfits file XYZ_array = np.empty((args.num_antennas,3)) XYZ_array[:,0] = X XYZ_array[:,1] = Y XYZ_array[:,2] = Z ##Get the central frequency channels, used in the uvfits header central_freq_chan = int(np.floor(args.num_freq_channels / 2.0)) ##Useful number num_baselines = int(((args.num_antennas - 1)*args.num_antennas) / 2) ##Loop over coarse frequency band and convert visibilities output ##from woden.c into uvfits files for band_ind, band in enumerate(args.band_nums): output_uvfits_name = args.output_uvfits_prepend + '_band%02d.uvfits' %band band_low_freq = args.lowest_channel_freq + (band - 1)*args.coarse_band_width central_freq_chan_value = band_low_freq + central_freq_chan*args.freq_res uus,vvs,wws,v_container = load_visibility_set(visibility_set=visi_sets_python_combined[band_ind], num_baselines=num_baselines, num_freq_channels=args.num_freq_channels, num_time_steps=args.num_time_steps, precision=args.precision, do_autos=args.do_autos, num_ants=args.num_antennas) if args.remove_phase_tracking: frequencies = band_low_freq + np.arange(args.num_freq_channels)*args.freq_res v_container = remove_phase_tracking(frequencies=frequencies, wws_seconds=wws, num_time_steps=args.num_time_steps, v_container=v_container, num_baselines=num_baselines) hdu_ant = make_antenna_table(XYZ_array=XYZ_array,telescope_name=args.telescope_name, num_antennas=args.num_antennas, freq_cent=central_freq_chan_value, date=args.date, gst0_deg=gst0_deg, degpdy=degpdy, ut1utc=ut1utc, longitude=args.longitude, latitude=args.latitude, array_height=args.array_height, ant_names=args.ant_names) baselines_array, date_array = make_baseline_date_arrays(args.num_antennas, args.date, args.num_time_steps, args.time_res, do_autos=args.do_autos) create_uvfits(v_container=v_container, freq_cent=central_freq_chan_value, ra_point=args.ra0, dec_point=args.dec0, output_uvfits_name=output_uvfits_name, uu=uus, vv=vvs, ww=wws, baselines_array=baselines_array, date_array=date_array, central_freq_chan=central_freq_chan, ch_width=args.freq_res, jd_midnight=jd_midnight, hdu_ant=hdu_ant, gitlabel=gitlabel, longitude=args.longitude, latitude=args.latitude, array_height=args.array_height, telescope_name=args.telescope_name, IAU_order=args.IAU_order, comment=args.command) if args.pointed_ms_file_name and not args.dry_run: main_logger.info(f"Deleting {args.pointed_ms_file_name}...") shutil.rmtree(args.pointed_ms_file_name) main_logger.info(f"Done") woden_end = time() time_passed = timedelta(seconds=woden_end - woden_start) main_logger.info(f"Full run took {time_passed}") main_logger.info("WODEN is done. Closing the log. S'later")
if __name__ == "__main__": main()