import logging
import os
from argparse import Namespace
from pathlib import Path
import numpy as np
import pooch
import spiceypy as spice
from sorcha.ephemeris.simulation_geometry import vec2ra_dec, integrate_light_time
from sorcha.ephemeris.simulation_setup import create_assist_ephemeris, furnish_spiceypy, generate_simulations
from sorcha.ephemeris.simulation_driver import (
EphemerisGeometryParameters,
calculate_rates_and_geometry,
get_vec,
)
from pandas import DataFrame, Series
from numpy.lib.recfunctions import join_by
from layup.convert import convert
from layup.routines import Observation, get_ephem, numpy_to_eigen, predict_sequence
from layup.utilities.data_processing_utilities import (
LayupObservatory,
create_chunks,
get_format,
layup_furnish_spiceypy,
parse_fit_result,
process_data,
process_data_by_id,
skyplane_cov_to_radec_cov,
)
from layup.utilities.file_io import CSVDataReader
from layup.utilities.file_io.file_output import write_csv
from layup.utilities.cli_utilities import warn_or_remove_file
[docs]
logger = logging.getLogger(__name__)
# The list of required input column names. Note: This should not include the
# primary id column name.
[docs]
SPEED_OF_LIGHT = 2.99792458e5 * 86400.0 / AU_KM
[docs]
def layup_get_residual_vectors(v1):
"""
Decomposes n vectors into two unit vectors to facilitate computation of on-sky angles
The decomposition is such that A = (-sin (RA), cos(RA), 0) is in the direction of increasing RA,
and D = (-sin(dec)cos (RA), -sin(dec) sin(RA), cos(dec)) is in the direction of increasing Dec
The triplet (A,D,v1) forms an orthonormal basis of the 3D vector space. Has vectorisation capabilities.
Parameters
----------
v1 : array, shape = (3, n))
The vectors to be decomposed
Returns
-------
A : array, shape = (3, n))
A vectors
D : array, shape = (3, n))
D vectors
"""
[x, y, z] = v1
cosd = np.sqrt(1 - z * z)
A = np.array((-y, x, np.zeros(len(x)))) / cosd
D = np.array((-z * x / cosd, -z * y / cosd, cosd))
return A, D
[docs]
def layup_calculate_rates_and_geometry(pointing: DataFrame, ephem_geom_params: EphemerisGeometryParameters):
"""Calculate rates and geometry for objects within the field of view, with vectorisation capabilities
Parameters
----------
pointing : pandas dataframe
The dataframe containing the pointing database.
ephem_geom_params : EphemerisGeometryParameters
Various parameters necessary to calculate the ephemeris
Returns
-------
: tuple
Tuple containing the ephemeris parameters needed for Sorcha post processing.
"""
r_sun = get_vec(pointing, "r_sun")
r_obs = get_vec(pointing, "r_obs")
v_sun = get_vec(pointing, "v_sun")
v_obs = get_vec(pointing, "v_obs")
ra0, dec0 = vec2ra_dec(ephem_geom_params.rho_hat)
drhodt = ephem_geom_params.v_ast - v_obs
drho_magdt = (1 / ephem_geom_params.rho_mag) * np.einsum("ji,ji->i", ephem_geom_params.rho, drhodt)
ddeltatdt = drho_magdt / (SPEED_OF_LIGHT)
drhodt = ephem_geom_params.v_ast * (1 - ddeltatdt) - v_obs
A, D = layup_get_residual_vectors(ephem_geom_params.rho_hat)
drho_hatdt = (
drhodt / ephem_geom_params.rho_mag
- drho_magdt * ephem_geom_params.rho_hat / ephem_geom_params.rho_mag
)
dradt = np.einsum("ji,ji->i", A, drho_hatdt)
ddecdt = np.einsum("ji,ji->i", D, drho_hatdt)
r_ast_sun = ephem_geom_params.r_ast - r_sun
v_ast_sun = ephem_geom_params.v_ast - v_sun
r_ast_obs = ephem_geom_params.r_ast - r_obs
phase_angle = np.arccos(
np.einsum("ji,ji->i", r_ast_sun, r_ast_obs)
/ (np.linalg.norm(r_ast_sun, axis=0) * np.linalg.norm(r_ast_obs, axis=0))
)
obs_sun = r_obs - r_sun
dobs_sundt = v_obs - v_sun
return (
ephem_geom_params.obj_id,
pointing["FieldID"],
ephem_geom_params.mjd_tai,
pointing["fieldJD_TDB"],
ephem_geom_params.rho_mag * AU_KM,
drho_magdt * AU_KM / (24 * 60 * 60),
ra0,
dradt * 180 / np.pi,
dec0,
ddecdt * 180 / np.pi,
r_ast_sun[0] * AU_KM,
r_ast_sun[1] * AU_KM,
r_ast_sun[2] * AU_KM,
v_ast_sun[0] * AU_KM / (24 * 60 * 60),
v_ast_sun[1] * AU_KM / (24 * 60 * 60),
v_ast_sun[2] * AU_KM / (24 * 60 * 60),
obs_sun[0] * AU_KM,
obs_sun[1] * AU_KM,
obs_sun[2] * AU_KM,
dobs_sundt[0] * AU_KM / (24 * 60 * 60),
dobs_sundt[1] * AU_KM / (24 * 60 * 60),
dobs_sundt[2] * AU_KM / (24 * 60 * 60),
phase_angle * 180 / np.pi,
)
[docs]
def _get_on_sky_data(predictions, orbits_df, obs_pos_vel, primary_id_column_name, args, configs):
'''Calculates the on-sky rates and geometry for a set of orbits at certain pointings.
Parameters
----------
predictions : numpy recarray
Numpy recarray containing the pre-calculated predictions of the objects.
orbits_df : pandas dataframe
The dataframe containing the orbits being predicted.
obs_pos_vel : numpy recarray
Numpy recarray containing the position and velocity of the observatory at the times requested for predictions.
primary_id_column_name : str
The name of the primary ID column.
args : argparse
The argparse object that was created when running from the CLI.
aux : AuxiliaryConfigs object
The LayUp auxiliary arguments.
Returns
-------
rates : numpy recarray
Numpy recarray containing the on-sky rates.
"""'''
observations = []
for i, pos in enumerate(obs_pos_vel):
obs = Observation()
obs.observer_position = [pos["x"], pos["y"], pos["z"]]
obs.observer_velocity = [pos["vx"], pos["vy"], pos["vz"]]
obs.epoch = predictions["epoch_JD_TDB"][i]
observations.append(obs)
# Create simulations
ephem, gm_sun, gm_total = create_assist_ephemeris(args, configs.auxiliary)
furnish_spiceypy(args, configs.auxiliary)
sim_dict = generate_simulations(ephem, gm_sun, gm_total, orbits_df, args)
ephem_geom_params = EphemerisGeometryParameters()
ephem_geom_params.obj_id = predictions[primary_id_column_name]
[ephem_geom_params.rho, ephem_geom_params.r_ast, ephem_geom_params.v_ast, ephem_geom_params.rho_hat] = (
np.empty((4, 3, len(predictions)))
)
ephem_geom_params.rho_mag = np.empty(len(predictions))
[obs_pos, obs_vel, sun_pos, sun_vel] = np.empty((4, len(predictions), 3))
v = sim_dict[predictions[primary_id_column_name][0]]
sim = v["sim"]
ex = v["ex"]
for i, pred in enumerate(predictions):
# Setup - define values used later
obs_pos[i] = observations[i].observer_position
obs_vel[i] = observations[i].observer_velocity
sun_pos[i] = ephem.get_particle("sun", pred["epoch_JD_TDB"] - ephem.jd_ref).xyz
sun_vel[i] = ephem.get_particle("sun", pred["epoch_JD_TDB"] - ephem.jd_ref).vxyz
# Get rest of geometry params
(
[ephem_geom_params.rho[0][i], ephem_geom_params.rho[1][i], ephem_geom_params.rho[2][i]],
ephem_geom_params.rho_mag[i],
_,
[ephem_geom_params.r_ast[0][i], ephem_geom_params.r_ast[1][i], ephem_geom_params.r_ast[2][i]],
[ephem_geom_params.v_ast[0][i], ephem_geom_params.v_ast[1][i], ephem_geom_params.v_ast[2][i]],
) = integrate_light_time(sim, ex, pred["epoch_JD_TDB"] - ephem.jd_ref, obs_pos[i], lt0=0.01)
ephem_geom_params.rho_hat = ephem_geom_params.rho / ephem_geom_params.rho_mag[:, None].T
# Formatting the pointing data
pointing = DataFrame(
{
"FieldID": ephem_geom_params.obj_id,
"fieldJD_TDB": predictions["epoch_JD_TDB"],
"r_obs_x": obs_pos.T[0],
"r_obs_y": obs_pos.T[1],
"r_obs_z": obs_pos.T[2],
"v_obs_x": obs_vel.T[0],
"v_obs_y": obs_vel.T[1],
"v_obs_z": obs_vel.T[2],
"r_sun_x": sun_pos.T[0],
"r_sun_y": sun_pos.T[1],
"r_sun_z": sun_pos.T[2],
"v_sun_x": sun_vel.T[0],
"v_sun_y": sun_vel.T[1],
"v_sun_z": sun_vel.T[2],
}
)
# Get the on-sky rates
onsky = layup_calculate_rates_and_geometry(pointing, ephem_geom_params)
# Only want certain info from onsky
# This method is the fastest for making a recarray, but I've commented a different way in case we don't want to depend on pandas
rates_df = DataFrame(
{
primary_id_column_name: ephem_geom_params.obj_id,
"epoch_JD_TDB": predictions["epoch_JD_TDB"],
"Range_LTC_au": onsky[4] / AU_KM,
"Range_LTC_km": onsky[4],
"RangeRate_LTC_km_s": onsky[5],
# calculate heliocentric distance
"Obj_Sun_LTC_au": np.sqrt(onsky[10] ** 2 + onsky[11] ** 2 + onsky[12] ** 2) / AU_KM,
"Obj_Sun_LTC_km": np.sqrt(onsky[10] ** 2 + onsky[11] ** 2 + onsky[12] ** 2),
"Obj_Sun_x_LTC_km": onsky[10],
"Obj_Sun_y_LTC_km": onsky[11],
"Obj_Sun_z_LTC_km": onsky[12],
"Obj_Sun_vx_LTC_km_s": onsky[13],
"Obj_Sun_vy_LTC_km_s": onsky[14],
"Obj_Sun_vz_LTC_km_s": onsky[15],
"phase_deg": onsky[22],
}
)
rates = rates_df.to_records(index=False)
# rates = np.empty(
# len(predictions),
# dtype=[
# ("provID", "O"),
# ("epoch_JD_TDB", "f8"),
# ("Range_LTC_km", "f8"),
# ("RangeRate_LTC_km_s", "f8"),
# ("Obj_Sun_x_LTC_km", "f8"),
# ("Obj_Sun_y_LTC_km", "f8"),
# ("Obj_Sun_z_LTC_km", "f8"),
# ("Obj_Sun_vx_LTC_km_s", "f8"),
# ("Obj_Sun_vy_LTC_km_s", "f8"),
# ("Obj_Sun_vz_LTC_km_s", "f8"),
# ("phase_deg", "f8"),
# ],
# )
# rates["provID"], rates["epoch_JD_TDB"] = ephem_geom_params.obj_id, predictions["epoch_JD_TDB"]
# rates["Range_LTC_km"], rates["RangeRate_LTC_km_s"] = onsky[4], onsky[5]
# (
# rates["Obj_Sun_x_LTC_km"],
# rates["Obj_Sun_y_LTC_km"],
# rates["Obj_Sun_z_LTC_km"],
# rates["Obj_Sun_vx_LTC_km_s"],
# rates["Obj_Sun_vy_LTC_km_s"],
# rates["Obj_Sun_vz_LTC_km_s"],
# ) = (onsky[10], onsky[11], onsky[12], onsky[13], onsky[14], onsky[15])
# rates["phase_deg"] = onsky[22]
spice.kclear()
return rates
[docs]
def _get_result_dtypes(primary_id_column_name: str):
"""Helper function to create the result dtype with the correct primary ID column name."""
# Define a structured dtype to match the OrbfitResult fields
return np.dtype(
[
(primary_id_column_name, "O"), # Object ID
("epoch_UTC", "O"), # Time for prediction in UTC
("epoch_JD_TDB", "f8"), # Time for prediction in JD TDB
("ra_deg", "f8"),
("dec_deg", "f8"),
("rho_x", "f8"), # The first of the 3 rho unit vector
("rho_y", "f8"),
("rho_z", "f8"),
("obs_cov_xx", "f8"), # The first of 4 of the observer covariance
("obs_cov_yy", "f8"),
("obs_cov_xy", "f8"),
("a_arcsec", "f8"),
("b_arcsec", "f8"),
("PA_deg", "f8"),
]
)
[docs]
def _convert_to_sg(data):
"""This function appends two columns of the RA and Dec in sexagesimal to the input array.
Parameters
----------
data : numpy structured array
The data to be processed.
Returns
-------
input array with ra and dec in sexagesimal appended, called ra_str_hms and dec_str_dms respectively.
"""
ra_deg = (data["ra_deg"] / 15) % 24 # Ensuring ra is within 24 hours/360 degrees
ra_h = ra_deg.astype(int)
dec_deg = data["dec_deg"]
dec_d = np.abs(dec_deg).astype(int)
ra_decimal = (ra_deg % 1) * 60
ra_m = ra_decimal.astype(int)
dec_decimal = (np.abs(dec_deg) % 1) * 60
dec_m = dec_decimal.astype(int)
ra_s = (ra_decimal % 1) * 60 # Take decimal portion again for arcseconds
dec_s = (dec_decimal % 1) * 60
ra = np.empty(len(ra_h), dtype="<U16")
dec = np.empty(len(ra_h), dtype="<U16")
for i in range(len(ra_h)):
ra[i] = f"{ra_h[i]:02} {ra_m[i]:02} {ra_s[i]:05.2f}" # Same format as
dec[i] = (
f"{'-' if dec_deg[i] < 0 else '+'}{dec_d[i]:02} {dec_m[i]:02} {dec_s[i]:04.1f}" # JPL Horizons
)
return np.lib.recfunctions.append_fields(data, ["ra_str_hms", "dec_str_dms"], [ra, dec], usemask=False)
[docs]
def _predict(data, obs_pos_vel, times, cache_dir, primary_id_column_name):
"""This function is called by the parallelization function to call the C++ code.
Parameters
----------
data : nump structured array
The data to be processed.
obs_pos_vel : numpy structured array
The observer position and velocity.
times : list
The times for the predictions, in jd_tdb.
cache_dir : str
The directory to the cached kernels.
primary_id_column_name : str
The name of the primary ID column.
Returns
-------
numpy structured array with the flattened results
"""
if cache_dir is None:
kernels_loc = str(pooch.os_cache("layup"))
else:
kernels_loc = str(cache_dir)
observations = []
for i, pos in enumerate(obs_pos_vel):
obs = Observation()
obs.observer_position = [pos["x"], pos["y"], pos["z"]]
obs.observer_velocity = [pos["vx"], pos["vy"], pos["vz"]]
obs.epoch = times[i]
observations.append(obs)
# Load kernels for time conversion.
layup_furnish_spiceypy(kernels_loc)
predict_results = []
for row in data:
# get the fit result (we don't need the csq and ndof values)
fit = parse_fit_result(row, orbit_colm_flag=False)
pred_res = predict_sequence(get_ephem(kernels_loc), fit, observations, numpy_to_eigen(fit.cov, 6, 6))
for pred in pred_res:
et = spice.str2et(f"jd {pred.epoch} tdb")
utc_time = spice.et2utc(et, "C", 0)
predict_results.append(
(
row[primary_id_column_name],
utc_time,
pred.epoch, # JD TDB
pred.rho[0], # place holder
pred.rho[0], # place holder
pred.rho[0],
pred.rho[1],
pred.rho[2],
pred.obs_cov[0],
pred.obs_cov[2],
pred.obs_cov[1],
0.0,
0.0,
0.0,
)
)
results = np.array(predict_results, dtype=_get_result_dtypes(primary_id_column_name))
results["ra_deg"], results["dec_deg"] = vec2ra_dec([results["rho_x"], results["rho_y"], results["rho_z"]])
# Only calculate covariances if covaraiances are in the input file
if "cov_0_0" in data.dtype.names:
results["a_arcsec"], results["b_arcsec"], results["PA_deg"] = skyplane_cov_to_radec_cov(
results["ra_deg"],
results["dec_deg"],
results["obs_cov_xx"],
results["obs_cov_yy"],
results["obs_cov_xy"],
)
return results
[docs]
def predict(
data,
obscode,
times,
primary_id_column_name="provID",
num_workers=-1,
cache_dir=None,
args=None,
configs=None,
):
"""The function to all that predict functionality interactively, i.e from a notebook or a script.
Parameters
----------
data : numpy structured array
The data to be processed.
obscode : str
The observer code.
times : list
The times for the predictions, in jd_tdb.
primary_id_column_name : str
The name of the primary ID column.
num_workers : int
The number of workers to use for parallelization. If -1, use all available cores.
cache_dir : str or None
The directory to the cached kernels. If None, use the default cache directory.
args : argparse
The argparse object that was created when running from the CLI.
aux : AuxiliaryConfigs object
The LayUp auxiliary arguments. Needed if on-sky rates are requested.
Returns
-------
numpy structured array with the flattened results
"""
if num_workers < 0:
num_workers = os.cpu_count()
Layup_observatory = LayupObservatory(cache_dir=cache_dir)
times_et = np.array([spice.str2et(f"jd {t} tdb") for t in times], dtype="<f8")
obs_data = np.array([(obscode, t) for t in times_et], dtype=[("stn", "<U10"), ("et", "<f8")])
obs_pos_vel = Layup_observatory.obscodes_to_barycentric(obs_data)
predictions = process_data(
data,
n_workers=num_workers,
func=_predict,
obs_pos_vel=obs_pos_vel,
times=times,
cache_dir=cache_dir,
primary_id_column_name=primary_id_column_name,
)
if args.onsky_data:
# generate_simulations (used in _get_on_sky_data) doesn't accept BCART_EQ, accepts COM, KEP, CART and their barycentric equivalents
data = convert(
data,
"BCART",
cache_dir=cache_dir,
primary_id_column_name=args.primary_id_column_name,
)
cols = data.dtype.names
orbits_df = DataFrame(data, columns=cols, index=data[primary_id_column_name])
if primary_id_column_name != "ObjID":
orbits_df = orbits_df.rename(columns={primary_id_column_name: "ObjID"})
# onsky_results = _get_on_sky_data(orbits_df, observations, results, args, configs)
onsky_results = process_data_by_id(
predictions,
n_workers=num_workers,
func=_get_on_sky_data,
primary_id_column_name=primary_id_column_name,
orbits_df=orbits_df,
obs_pos_vel=obs_pos_vel,
args=args,
configs=configs,
)
results = join_by(
[primary_id_column_name, "epoch_JD_TDB"],
predictions,
onsky_results,
usemask=False,
asrecarray=True,
)
return results
else:
return predictions
[docs]
def predict_cli(
cli_args: Namespace,
input_file: str,
start_date: float,
end_date: float,
timestep_day: float,
output_file: str,
cache_dir: Path,
configs: None,
):
"""The function for calling predict through the command line interface.
Parameters
----------
cli_args : Namespace
The command line arguments.
input_file : str
The input file to read the data from.
start_date : float
The start date for the predictions, in jd_tdb.
end_date : float
The end date for the predictions, in jd_tdb.
timestep_day : float
The time step for the predictions, in days.
output_file : str
The output file to write the predictions to.
cache_dir : Path
The directory to the cached kernels.
aux : AuxiliaryConfigs object
The LayUp auxiliary arguments
"""
num_workers = cli_args.n
if num_workers < 0:
num_workers = os.cpu_count()
times = np.arange(start_date, end_date + timestep_day, step=timestep_day)
reader = CSVDataReader(
input_file,
primary_id_column_name=cli_args.primary_id_column_name,
sep="csv",
required_columns=REQUIRED_INPUT_COLUMN_NAMES,
)
chunks = create_chunks(reader, chunk_size=cli_args.chunk)
for chunk in chunks:
# Read the objects from the file
data = reader.read_objects(chunk)
if get_format(data) != "BCART_EQ":
data = convert(
data,
"BCART_EQ",
cache_dir=cache_dir,
primary_id_column_name=cli_args.primary_id_column_name,
)
predictions = predict(
data,
obscode=cli_args.station,
times=times,
num_workers=cli_args.n,
cache_dir=cache_dir,
primary_id_column_name=cli_args.primary_id_column_name,
args=cli_args,
configs=configs,
)
if len(predictions) > 0:
if cli_args.sexagesimal:
predictions = _convert_to_sg(predictions)
write_csv(predictions, output_file, move_columns={"ra_str_hms": 3, "dec_str_dms": 4})
else:
write_csv(predictions, output_file)