Source code for layup.predict

import logging
import os
from argparse import Namespace
from pathlib import Path

import numpy as np
import pooch
import spiceypy as spice
from sorcha.ephemeris.simulation_geometry import vec2ra_dec, integrate_light_time
from sorcha.ephemeris.simulation_setup import create_assist_ephemeris, furnish_spiceypy, generate_simulations
from sorcha.ephemeris.simulation_driver import (
    EphemerisGeometryParameters,
    calculate_rates_and_geometry,
    get_vec,
)
from pandas import DataFrame, Series
from numpy.lib.recfunctions import join_by

from layup.convert import convert
from layup.routines import Observation, get_ephem, numpy_to_eigen, predict_sequence
from layup.utilities.data_processing_utilities import (
    LayupObservatory,
    create_chunks,
    get_format,
    layup_furnish_spiceypy,
    parse_fit_result,
    process_data,
    process_data_by_id,
    skyplane_cov_to_radec_cov,
)
from layup.utilities.file_io import CSVDataReader
from layup.utilities.file_io.file_output import write_csv
from layup.utilities.cli_utilities import warn_or_remove_file

[docs] logger = logging.getLogger(__name__)
# The list of required input column names. Note: This should not include the # primary id column name.
[docs] AU_M = 149597870700
[docs] AU_KM = AU_M / 1000.0
[docs] SPEED_OF_LIGHT = 2.99792458e5 * 86400.0 / AU_KM
[docs] REQUIRED_INPUT_COLUMN_NAMES = [ "epochMJD_TDB", "FORMAT", ]
[docs] def layup_get_residual_vectors(v1): """ Decomposes n vectors into two unit vectors to facilitate computation of on-sky angles The decomposition is such that A = (-sin (RA), cos(RA), 0) is in the direction of increasing RA, and D = (-sin(dec)cos (RA), -sin(dec) sin(RA), cos(dec)) is in the direction of increasing Dec The triplet (A,D,v1) forms an orthonormal basis of the 3D vector space. Has vectorisation capabilities. Parameters ---------- v1 : array, shape = (3, n)) The vectors to be decomposed Returns ------- A : array, shape = (3, n)) A vectors D : array, shape = (3, n)) D vectors """ [x, y, z] = v1 cosd = np.sqrt(1 - z * z) A = np.array((-y, x, np.zeros(len(x)))) / cosd D = np.array((-z * x / cosd, -z * y / cosd, cosd)) return A, D
[docs] def layup_calculate_rates_and_geometry(pointing: DataFrame, ephem_geom_params: EphemerisGeometryParameters): """Calculate rates and geometry for objects within the field of view, with vectorisation capabilities Parameters ---------- pointing : pandas dataframe The dataframe containing the pointing database. ephem_geom_params : EphemerisGeometryParameters Various parameters necessary to calculate the ephemeris Returns ------- : tuple Tuple containing the ephemeris parameters needed for Sorcha post processing. """ r_sun = get_vec(pointing, "r_sun") r_obs = get_vec(pointing, "r_obs") v_sun = get_vec(pointing, "v_sun") v_obs = get_vec(pointing, "v_obs") ra0, dec0 = vec2ra_dec(ephem_geom_params.rho_hat) drhodt = ephem_geom_params.v_ast - v_obs drho_magdt = (1 / ephem_geom_params.rho_mag) * np.einsum("ji,ji->i", ephem_geom_params.rho, drhodt) ddeltatdt = drho_magdt / (SPEED_OF_LIGHT) drhodt = ephem_geom_params.v_ast * (1 - ddeltatdt) - v_obs A, D = layup_get_residual_vectors(ephem_geom_params.rho_hat) drho_hatdt = ( drhodt / ephem_geom_params.rho_mag - drho_magdt * ephem_geom_params.rho_hat / ephem_geom_params.rho_mag ) dradt = np.einsum("ji,ji->i", A, drho_hatdt) ddecdt = np.einsum("ji,ji->i", D, drho_hatdt) r_ast_sun = ephem_geom_params.r_ast - r_sun v_ast_sun = ephem_geom_params.v_ast - v_sun r_ast_obs = ephem_geom_params.r_ast - r_obs phase_angle = np.arccos( np.einsum("ji,ji->i", r_ast_sun, r_ast_obs) / (np.linalg.norm(r_ast_sun, axis=0) * np.linalg.norm(r_ast_obs, axis=0)) ) obs_sun = r_obs - r_sun dobs_sundt = v_obs - v_sun return ( ephem_geom_params.obj_id, pointing["FieldID"], ephem_geom_params.mjd_tai, pointing["fieldJD_TDB"], ephem_geom_params.rho_mag * AU_KM, drho_magdt * AU_KM / (24 * 60 * 60), ra0, dradt * 180 / np.pi, dec0, ddecdt * 180 / np.pi, r_ast_sun[0] * AU_KM, r_ast_sun[1] * AU_KM, r_ast_sun[2] * AU_KM, v_ast_sun[0] * AU_KM / (24 * 60 * 60), v_ast_sun[1] * AU_KM / (24 * 60 * 60), v_ast_sun[2] * AU_KM / (24 * 60 * 60), obs_sun[0] * AU_KM, obs_sun[1] * AU_KM, obs_sun[2] * AU_KM, dobs_sundt[0] * AU_KM / (24 * 60 * 60), dobs_sundt[1] * AU_KM / (24 * 60 * 60), dobs_sundt[2] * AU_KM / (24 * 60 * 60), phase_angle * 180 / np.pi, )
[docs] def _get_on_sky_data(predictions, orbits_df, obs_pos_vel, primary_id_column_name, args, configs): '''Calculates the on-sky rates and geometry for a set of orbits at certain pointings. Parameters ---------- predictions : numpy recarray Numpy recarray containing the pre-calculated predictions of the objects. orbits_df : pandas dataframe The dataframe containing the orbits being predicted. obs_pos_vel : numpy recarray Numpy recarray containing the position and velocity of the observatory at the times requested for predictions. primary_id_column_name : str The name of the primary ID column. args : argparse The argparse object that was created when running from the CLI. aux : AuxiliaryConfigs object The LayUp auxiliary arguments. Returns ------- rates : numpy recarray Numpy recarray containing the on-sky rates. """''' observations = [] for i, pos in enumerate(obs_pos_vel): obs = Observation() obs.observer_position = [pos["x"], pos["y"], pos["z"]] obs.observer_velocity = [pos["vx"], pos["vy"], pos["vz"]] obs.epoch = predictions["epoch_JD_TDB"][i] observations.append(obs) # Create simulations ephem, gm_sun, gm_total = create_assist_ephemeris(args, configs.auxiliary) furnish_spiceypy(args, configs.auxiliary) sim_dict = generate_simulations(ephem, gm_sun, gm_total, orbits_df, args) ephem_geom_params = EphemerisGeometryParameters() ephem_geom_params.obj_id = predictions[primary_id_column_name] [ephem_geom_params.rho, ephem_geom_params.r_ast, ephem_geom_params.v_ast, ephem_geom_params.rho_hat] = ( np.empty((4, 3, len(predictions))) ) ephem_geom_params.rho_mag = np.empty(len(predictions)) [obs_pos, obs_vel, sun_pos, sun_vel] = np.empty((4, len(predictions), 3)) v = sim_dict[predictions[primary_id_column_name][0]] sim = v["sim"] ex = v["ex"] for i, pred in enumerate(predictions): # Setup - define values used later obs_pos[i] = observations[i].observer_position obs_vel[i] = observations[i].observer_velocity sun_pos[i] = ephem.get_particle("sun", pred["epoch_JD_TDB"] - ephem.jd_ref).xyz sun_vel[i] = ephem.get_particle("sun", pred["epoch_JD_TDB"] - ephem.jd_ref).vxyz # Get rest of geometry params ( [ephem_geom_params.rho[0][i], ephem_geom_params.rho[1][i], ephem_geom_params.rho[2][i]], ephem_geom_params.rho_mag[i], _, [ephem_geom_params.r_ast[0][i], ephem_geom_params.r_ast[1][i], ephem_geom_params.r_ast[2][i]], [ephem_geom_params.v_ast[0][i], ephem_geom_params.v_ast[1][i], ephem_geom_params.v_ast[2][i]], ) = integrate_light_time(sim, ex, pred["epoch_JD_TDB"] - ephem.jd_ref, obs_pos[i], lt0=0.01) ephem_geom_params.rho_hat = ephem_geom_params.rho / ephem_geom_params.rho_mag[:, None].T # Formatting the pointing data pointing = DataFrame( { "FieldID": ephem_geom_params.obj_id, "fieldJD_TDB": predictions["epoch_JD_TDB"], "r_obs_x": obs_pos.T[0], "r_obs_y": obs_pos.T[1], "r_obs_z": obs_pos.T[2], "v_obs_x": obs_vel.T[0], "v_obs_y": obs_vel.T[1], "v_obs_z": obs_vel.T[2], "r_sun_x": sun_pos.T[0], "r_sun_y": sun_pos.T[1], "r_sun_z": sun_pos.T[2], "v_sun_x": sun_vel.T[0], "v_sun_y": sun_vel.T[1], "v_sun_z": sun_vel.T[2], } ) # Get the on-sky rates onsky = layup_calculate_rates_and_geometry(pointing, ephem_geom_params) # Only want certain info from onsky # This method is the fastest for making a recarray, but I've commented a different way in case we don't want to depend on pandas rates_df = DataFrame( { primary_id_column_name: ephem_geom_params.obj_id, "epoch_JD_TDB": predictions["epoch_JD_TDB"], "Range_LTC_au": onsky[4] / AU_KM, "Range_LTC_km": onsky[4], "RangeRate_LTC_km_s": onsky[5], # calculate heliocentric distance "Obj_Sun_LTC_au": np.sqrt(onsky[10] ** 2 + onsky[11] ** 2 + onsky[12] ** 2) / AU_KM, "Obj_Sun_LTC_km": np.sqrt(onsky[10] ** 2 + onsky[11] ** 2 + onsky[12] ** 2), "Obj_Sun_x_LTC_km": onsky[10], "Obj_Sun_y_LTC_km": onsky[11], "Obj_Sun_z_LTC_km": onsky[12], "Obj_Sun_vx_LTC_km_s": onsky[13], "Obj_Sun_vy_LTC_km_s": onsky[14], "Obj_Sun_vz_LTC_km_s": onsky[15], "phase_deg": onsky[22], } ) rates = rates_df.to_records(index=False) # rates = np.empty( # len(predictions), # dtype=[ # ("provID", "O"), # ("epoch_JD_TDB", "f8"), # ("Range_LTC_km", "f8"), # ("RangeRate_LTC_km_s", "f8"), # ("Obj_Sun_x_LTC_km", "f8"), # ("Obj_Sun_y_LTC_km", "f8"), # ("Obj_Sun_z_LTC_km", "f8"), # ("Obj_Sun_vx_LTC_km_s", "f8"), # ("Obj_Sun_vy_LTC_km_s", "f8"), # ("Obj_Sun_vz_LTC_km_s", "f8"), # ("phase_deg", "f8"), # ], # ) # rates["provID"], rates["epoch_JD_TDB"] = ephem_geom_params.obj_id, predictions["epoch_JD_TDB"] # rates["Range_LTC_km"], rates["RangeRate_LTC_km_s"] = onsky[4], onsky[5] # ( # rates["Obj_Sun_x_LTC_km"], # rates["Obj_Sun_y_LTC_km"], # rates["Obj_Sun_z_LTC_km"], # rates["Obj_Sun_vx_LTC_km_s"], # rates["Obj_Sun_vy_LTC_km_s"], # rates["Obj_Sun_vz_LTC_km_s"], # ) = (onsky[10], onsky[11], onsky[12], onsky[13], onsky[14], onsky[15]) # rates["phase_deg"] = onsky[22] spice.kclear() return rates
[docs] def _get_result_dtypes(primary_id_column_name: str): """Helper function to create the result dtype with the correct primary ID column name.""" # Define a structured dtype to match the OrbfitResult fields return np.dtype( [ (primary_id_column_name, "O"), # Object ID ("epoch_UTC", "O"), # Time for prediction in UTC ("epoch_JD_TDB", "f8"), # Time for prediction in JD TDB ("ra_deg", "f8"), ("dec_deg", "f8"), ("rho_x", "f8"), # The first of the 3 rho unit vector ("rho_y", "f8"), ("rho_z", "f8"), ("obs_cov_xx", "f8"), # The first of 4 of the observer covariance ("obs_cov_yy", "f8"), ("obs_cov_xy", "f8"), ("a_arcsec", "f8"), ("b_arcsec", "f8"), ("PA_deg", "f8"), ] )
[docs] def _convert_to_sg(data): """This function appends two columns of the RA and Dec in sexagesimal to the input array. Parameters ---------- data : numpy structured array The data to be processed. Returns ------- input array with ra and dec in sexagesimal appended, called ra_str_hms and dec_str_dms respectively. """ ra_deg = (data["ra_deg"] / 15) % 24 # Ensuring ra is within 24 hours/360 degrees ra_h = ra_deg.astype(int) dec_deg = data["dec_deg"] dec_d = np.abs(dec_deg).astype(int) ra_decimal = (ra_deg % 1) * 60 ra_m = ra_decimal.astype(int) dec_decimal = (np.abs(dec_deg) % 1) * 60 dec_m = dec_decimal.astype(int) ra_s = (ra_decimal % 1) * 60 # Take decimal portion again for arcseconds dec_s = (dec_decimal % 1) * 60 ra = np.empty(len(ra_h), dtype="<U16") dec = np.empty(len(ra_h), dtype="<U16") for i in range(len(ra_h)): ra[i] = f"{ra_h[i]:02} {ra_m[i]:02} {ra_s[i]:05.2f}" # Same format as dec[i] = ( f"{'-' if dec_deg[i] < 0 else '+'}{dec_d[i]:02} {dec_m[i]:02} {dec_s[i]:04.1f}" # JPL Horizons ) return np.lib.recfunctions.append_fields(data, ["ra_str_hms", "dec_str_dms"], [ra, dec], usemask=False)
[docs] def _predict(data, obs_pos_vel, times, cache_dir, primary_id_column_name): """This function is called by the parallelization function to call the C++ code. Parameters ---------- data : nump structured array The data to be processed. obs_pos_vel : numpy structured array The observer position and velocity. times : list The times for the predictions, in jd_tdb. cache_dir : str The directory to the cached kernels. primary_id_column_name : str The name of the primary ID column. Returns ------- numpy structured array with the flattened results """ if cache_dir is None: kernels_loc = str(pooch.os_cache("layup")) else: kernels_loc = str(cache_dir) observations = [] for i, pos in enumerate(obs_pos_vel): obs = Observation() obs.observer_position = [pos["x"], pos["y"], pos["z"]] obs.observer_velocity = [pos["vx"], pos["vy"], pos["vz"]] obs.epoch = times[i] observations.append(obs) # Load kernels for time conversion. layup_furnish_spiceypy(kernels_loc) predict_results = [] for row in data: # get the fit result (we don't need the csq and ndof values) fit = parse_fit_result(row, orbit_colm_flag=False) pred_res = predict_sequence(get_ephem(kernels_loc), fit, observations, numpy_to_eigen(fit.cov, 6, 6)) for pred in pred_res: et = spice.str2et(f"jd {pred.epoch} tdb") utc_time = spice.et2utc(et, "C", 0) predict_results.append( ( row[primary_id_column_name], utc_time, pred.epoch, # JD TDB pred.rho[0], # place holder pred.rho[0], # place holder pred.rho[0], pred.rho[1], pred.rho[2], pred.obs_cov[0], pred.obs_cov[2], pred.obs_cov[1], 0.0, 0.0, 0.0, ) ) results = np.array(predict_results, dtype=_get_result_dtypes(primary_id_column_name)) results["ra_deg"], results["dec_deg"] = vec2ra_dec([results["rho_x"], results["rho_y"], results["rho_z"]]) # Only calculate covariances if covaraiances are in the input file if "cov_0_0" in data.dtype.names: results["a_arcsec"], results["b_arcsec"], results["PA_deg"] = skyplane_cov_to_radec_cov( results["ra_deg"], results["dec_deg"], results["obs_cov_xx"], results["obs_cov_yy"], results["obs_cov_xy"], ) return results
[docs] def predict( data, obscode, times, primary_id_column_name="provID", num_workers=-1, cache_dir=None, args=None, configs=None, ): """The function to all that predict functionality interactively, i.e from a notebook or a script. Parameters ---------- data : numpy structured array The data to be processed. obscode : str The observer code. times : list The times for the predictions, in jd_tdb. primary_id_column_name : str The name of the primary ID column. num_workers : int The number of workers to use for parallelization. If -1, use all available cores. cache_dir : str or None The directory to the cached kernels. If None, use the default cache directory. args : argparse The argparse object that was created when running from the CLI. aux : AuxiliaryConfigs object The LayUp auxiliary arguments. Needed if on-sky rates are requested. Returns ------- numpy structured array with the flattened results """ if num_workers < 0: num_workers = os.cpu_count() Layup_observatory = LayupObservatory(cache_dir=cache_dir) times_et = np.array([spice.str2et(f"jd {t} tdb") for t in times], dtype="<f8") obs_data = np.array([(obscode, t) for t in times_et], dtype=[("stn", "<U10"), ("et", "<f8")]) obs_pos_vel = Layup_observatory.obscodes_to_barycentric(obs_data) predictions = process_data( data, n_workers=num_workers, func=_predict, obs_pos_vel=obs_pos_vel, times=times, cache_dir=cache_dir, primary_id_column_name=primary_id_column_name, ) if args.onsky_data: # generate_simulations (used in _get_on_sky_data) doesn't accept BCART_EQ, accepts COM, KEP, CART and their barycentric equivalents data = convert( data, "BCART", cache_dir=cache_dir, primary_id_column_name=args.primary_id_column_name, ) cols = data.dtype.names orbits_df = DataFrame(data, columns=cols, index=data[primary_id_column_name]) if primary_id_column_name != "ObjID": orbits_df = orbits_df.rename(columns={primary_id_column_name: "ObjID"}) # onsky_results = _get_on_sky_data(orbits_df, observations, results, args, configs) onsky_results = process_data_by_id( predictions, n_workers=num_workers, func=_get_on_sky_data, primary_id_column_name=primary_id_column_name, orbits_df=orbits_df, obs_pos_vel=obs_pos_vel, args=args, configs=configs, ) results = join_by( [primary_id_column_name, "epoch_JD_TDB"], predictions, onsky_results, usemask=False, asrecarray=True, ) return results else: return predictions
[docs] def predict_cli( cli_args: Namespace, input_file: str, start_date: float, end_date: float, timestep_day: float, output_file: str, cache_dir: Path, configs: None, ): """The function for calling predict through the command line interface. Parameters ---------- cli_args : Namespace The command line arguments. input_file : str The input file to read the data from. start_date : float The start date for the predictions, in jd_tdb. end_date : float The end date for the predictions, in jd_tdb. timestep_day : float The time step for the predictions, in days. output_file : str The output file to write the predictions to. cache_dir : Path The directory to the cached kernels. aux : AuxiliaryConfigs object The LayUp auxiliary arguments """ num_workers = cli_args.n if num_workers < 0: num_workers = os.cpu_count() times = np.arange(start_date, end_date + timestep_day, step=timestep_day) reader = CSVDataReader( input_file, primary_id_column_name=cli_args.primary_id_column_name, sep="csv", required_columns=REQUIRED_INPUT_COLUMN_NAMES, ) chunks = create_chunks(reader, chunk_size=cli_args.chunk) for chunk in chunks: # Read the objects from the file data = reader.read_objects(chunk) if get_format(data) != "BCART_EQ": data = convert( data, "BCART_EQ", cache_dir=cache_dir, primary_id_column_name=cli_args.primary_id_column_name, ) predictions = predict( data, obscode=cli_args.station, times=times, num_workers=cli_args.n, cache_dir=cache_dir, primary_id_column_name=cli_args.primary_id_column_name, args=cli_args, configs=configs, ) if len(predictions) > 0: if cli_args.sexagesimal: predictions = _convert_to_sg(predictions) write_csv(predictions, output_file, move_columns={"ra_str_hms": 3, "dec_str_dms": 4}) else: write_csv(predictions, output_file)