import logging
import os
from pathlib import Path
from typing import Literal
import numpy as np
from sorcha.ephemeris.simulation_geometry import equatorial_to_ecliptic
from sorcha.ephemeris.simulation_parsing import parse_orbit_row
from sorcha.ephemeris.simulation_setup import _create_assist_ephemeris
from layup.utilities.data_processing_utilities import (
get_cov_columns,
get_format,
has_cov_columns,
process_data,
)
from layup.utilities.file_io import CSVDataReader, HDF5DataReader
from layup.utilities.file_io.file_output import write_csv, write_hdf5
from layup.utilities.layup_configs import LayupConfigs
from layup.utilities.orbit_conversion import (
covariance_cometary_xyz,
covariance_eq_to_ecl,
covariance_keplerian_xyz,
parse_covariance_row_to_CART,
universal_cometary,
universal_keplerian,
)
[docs]
logger = logging.getLogger(__name__)
# Columns which may be added to the output data by the orbit fitting process
[docs]
ORBIT_FIT_COLS = [
("csq", "f8"), # Chi-square value
("ndof", "i4"), # Number of degrees of freedom
("niter", "i4"), # Number of iterations
("method", "O"), # Method used for orbit fitting
("flag", "i4"), # Single-character flag indicating success of the fit
]
# Columns which use degrees as units in each orbit format
[docs]
degree_columns = {
"BCOM": ["inc", "node", "argPeri"],
"COM": ["inc", "node", "argPeri"],
"BKEP": ["inc", "node", "argPeri", "ma"],
"KEP": ["inc", "node", "argPeri", "ma"],
}
# Add this to MJD to convert to JD
[docs]
MJD_TO_JD_CONVERSTION = 2400000.5
[docs]
def get_output_column_names_and_types(primary_id_column_name, has_covariance, extra_cols_to_keep):
"""
Get the output column names and types for the converted data.
Parameters
----------
primary_id_column_name : str
The name of the column in the data that contains the primary ID of the object.
has_covariance : bool
Whether the data has covariance information.
extra_cols_to_keep : list
List of tuples containing extra column names and dtypes to keep in the output data.
Returns
-------
tuple
A tuple containing two elements:
- A dictionary mapping orbit formats to the required column names for that format.
- A list of default column dtypes for the output data.
"""
# Required column names for each orbit format
required_column_names = {
"BCART": [primary_id_column_name, "FORMAT", "x", "y", "z", "xdot", "ydot", "zdot", "epochMJD_TDB"],
"BCART_EQ": [primary_id_column_name, "FORMAT", "x", "y", "z", "xdot", "ydot", "zdot", "epochMJD_TDB"],
"BCOM": [
primary_id_column_name,
"FORMAT",
"q",
"e",
"inc",
"node",
"argPeri",
"t_p_MJD_TDB",
"epochMJD_TDB",
],
"BKEP": [primary_id_column_name, "FORMAT", "a", "e", "inc", "node", "argPeri", "ma", "epochMJD_TDB"],
"CART": [primary_id_column_name, "FORMAT", "x", "y", "z", "xdot", "ydot", "zdot", "epochMJD_TDB"],
"COM": [
primary_id_column_name,
"FORMAT",
"q",
"e",
"inc",
"node",
"argPeri",
"t_p_MJD_TDB",
"epochMJD_TDB",
],
"KEP": [primary_id_column_name, "FORMAT", "a", "e", "inc", "node", "argPeri", "ma", "epochMJD_TDB"],
}
# Default column dtypes across all orbit formats. Note that the ordering of the dtypes matches
# the ordering of the column names in REQUIRED_COLUMN_NAMES.
default_column_dtypes = ["O", "<U8", "<f8", "<f8", "<f8", "<f8", "<f8", "<f8", "<f8"]
default_column_dtypes.extend([dtype for _, dtype in extra_cols_to_keep])
if has_covariance:
# Flattened 6x6 covariance matrix
default_column_dtypes += ["f8"] * 36
for format in required_column_names:
for col_name, _ in extra_cols_to_keep:
# Add the column name and dtype to the default column dtypes
required_column_names[format].append(col_name)
if has_covariance:
# Add the covariance columns to the required column names
required_column_names[format] += get_cov_columns()
return required_column_names, default_column_dtypes
[docs]
def _apply_convert(data, convert_to, cache_dir=None, primary_id_column_name=None, extra_cols_to_keep=[]):
"""
Apply the appropriate conversion function to the data
Parameters
----------
data : numpy structured array
The data to convert.
convert_to : str
The orbital format to convert the data to. Must be one of: "BCART", "BCART_EQ", "BCOM", "BKEP", "CART", "COM", "KEP"
cache_dir : str, optional
The base directory for downloaded files.
primary_id_column_name : str, optional
The name of the column in the data that contains the primary ID of the object.
extra_cols_to_keep : list, optional
List of tuples containing extra column names and dtypes to keep in the output data.
Returns
-------
data : numpy structured array
The converted data
"""
if len(data) == 0:
return data
expected_formats = ["BCART", "BCART_EQ", "BCOM", "BKEP", "CART", "COM", "KEP"]
if convert_to not in expected_formats:
raise ValueError(f"Invalid conversion type {convert_to}. Must be one of: {expected_formats}")
has_covariance = has_cov_columns(data)
logger.debug(f"Data has covariance: {has_covariance}")
# In addition to columns requested by the user, to be kept in the output, add in extra columns
# that may have been added by the orbit fitting process.
cols_to_keep = extra_cols_to_keep + ORBIT_FIT_COLS
# Filter the extra columns to keep to only those that are present in the data
cols_to_keep = [(col, dtype) for col, dtype in cols_to_keep if col in data.dtype.names]
logger.debug(f"Columns to keep: {cols_to_keep}")
# Now generate the required and columns names and dtypes for the output data
required_colum_names, default_column_dtypes = get_output_column_names_and_types(
primary_id_column_name, has_covariance, cols_to_keep
)
# Fetch layup configs to get the necessary auxiliary data
config = LayupConfigs()
ephem, gm_sun, gm_total = _create_assist_ephemeris(config.auxiliary, cache_dir)
# Construct the output dtype for the converted data
output_dtype = [
(col, dtype)
for col, dtype in zip(required_colum_names[convert_to], default_column_dtypes, strict=False)
]
# For each row in the data, convert the orbit to the desired format
results = []
for d in data:
# Note that the format value may be "NONE" for rows which were from layup orbit fit failures
# For these we simply output NaNs rather than try to convert.
row = (np.nan,) * 6
cov = np.full((6, 6), np.nan)
if d["FORMAT"] != "NONE":
# First we convert our data into equatorial barycentric cartesian coordinates,
# regardless of the input format. That allows us to simplify the conversion
# process below by only having the logic to convert from BCART_EQ to the other formats.
sun = ephem.get_particle("Sun", d["epochMJD_TDB"] + MJD_TO_JD_CONVERSTION - ephem.jd_ref)
if not isinstance(d["FORMAT"], str):
raise ValueError(f"FORMAT column must be a string {d['FORMAT']}.")
if d["FORMAT"] == "BCART_EQ":
# We don't use parse_orbit_row here because we already have the BCART_EQ coordinates
x, y, z, xdot, ydot, zdot = d["x"], d["y"], d["z"], d["xdot"], d["ydot"], d["zdot"]
else:
x, y, z, xdot, ydot, zdot = parse_orbit_row(
d, d["epochMJD_TDB"] + MJD_TO_JD_CONVERSTION, ephem, {}, gm_sun, gm_total
)
# Parse our 6x6 cartesian covariance matrix if it exists regardless of the input format.
# Note that this does not differentiate between BCART, BCART_EQ, and CART covariance matrices, and
# we handle whether or not it will be barycentric further below.
if has_covariance:
cov = parse_covariance_row_to_CART(d, gm_total, gm_sun)
# For each possible output format, covert our BCART_EQ coordinates to the requested format.
if convert_to == "BCART_EQ":
# Already in equatorial BCART so simply use the parsed coordinates
row = x, y, z, xdot, ydot, zdot
elif convert_to == "BCART":
# Convert our covariance matrix from equatorial to ecliptic.
cov = covariance_eq_to_ecl(cov)
# Convert to BCART by converting to ecliptic coordinates
equatorial_coords = np.array((x, y, z))
equatorial_velocities = np.array((xdot, ydot, zdot))
ecliptic_coords = np.array(equatorial_to_ecliptic(equatorial_coords))
ecliptic_velocities = np.array(equatorial_to_ecliptic(equatorial_velocities))
row = tuple(np.concatenate([ecliptic_coords, ecliptic_velocities]))
elif convert_to == "CART":
# Convert our covariance matrix from equatorial to ecliptic.
cov = covariance_eq_to_ecl(cov)
# Convert to CART by subtracting the Sun's position and velocity from the Barycentric Cartesian equatorial coordinates
sun = ephem.get_particle("Sun", d["epochMJD_TDB"] + MJD_TO_JD_CONVERSTION - ephem.jd_ref)
equatorial_coords = np.array((x, y, z)) - np.array((sun.x, sun.y, sun.z))
equatorial_velocities = np.array((xdot, ydot, zdot)) - np.array((sun.vx, sun.vy, sun.vz))
# Convert to ecliptic coordinates
ecliptic_coords = np.array(equatorial_to_ecliptic(equatorial_coords))
ecliptic_velocities = np.array(equatorial_to_ecliptic(equatorial_velocities))
row = tuple(np.concatenate([ecliptic_coords, ecliptic_velocities]))
elif convert_to == "BCOM":
if has_covariance:
# Our covariance matrix is already in equatorial cartesian so no tranformation
# and we can use the BCART_EQ coordinates to convert our covariance matrix to the cometary format.
cov = covariance_cometary_xyz(gm_total, x, y, z, xdot, ydot, zdot, d["epochMJD_TDB"], cov)
# Convert back to ecliptic from parse_orbit_row's equatorial output.
ecliptic_coords = np.array(equatorial_to_ecliptic([x, y, z]))
ecliptic_velocities = np.array(equatorial_to_ecliptic([xdot, ydot, zdot]))
x, y, z, xdot, ydot, zdot = tuple(np.concatenate([ecliptic_coords, ecliptic_velocities]))
# Use universal_cometary to convert to BCOM with mu = gm_total (used for barycentric)
row = universal_cometary(gm_total, x, y, z, xdot, ydot, zdot, d["epochMJD_TDB"])
elif convert_to == "COM":
# Convert out of barycentric by subtracting the Sun's position and velocity from the BCART coordinates
equatorial_coords = np.array((x, y, z)) - np.array((sun.x, sun.y, sun.z))
equatorial_velocities = np.array((xdot, ydot, zdot)) - np.array((sun.vx, sun.vy, sun.vz))
if has_covariance:
# We now use our cartesian equatorial coordinates, which have been adjusted to
# no longer be barycentric, to convert our covariance matrix to the cometary format.
cov = covariance_cometary_xyz(
gm_sun,
equatorial_coords[0],
equatorial_coords[1],
equatorial_coords[2],
equatorial_velocities[0],
equatorial_velocities[1],
equatorial_velocities[2],
d["epochMJD_TDB"],
cov,
)
# Convert back to ecliptic from parse_orbit_row's equatorial output.
ecliptic_coords = np.array(equatorial_to_ecliptic(equatorial_coords))
ecliptic_velocities = np.array(equatorial_to_ecliptic(equatorial_velocities))
x, y, z, xdot, ydot, zdot = tuple(np.concatenate([ecliptic_coords, ecliptic_velocities]))
# Use universal_cometary to convert to COM with mu = gm_sun
row = universal_cometary(gm_sun, x, y, z, xdot, ydot, zdot, d["epochMJD_TDB"])
elif convert_to == "BKEP":
if has_covariance:
# Our covariance matrix is already in equatorial cartesian so using the BCART coordinates
# we can convert our covariance matrix to barycentric keplerian.
cov = covariance_keplerian_xyz(
gm_total, x, y, z, xdot, ydot, zdot, d["epochMJD_TDB"], cov
)
# Convert back to ecliptic from parse_orbit_row's equatorial output.
ecliptic_coords = np.array(equatorial_to_ecliptic([x, y, z]))
ecliptic_velocities = np.array(equatorial_to_ecliptic([xdot, ydot, zdot]))
x, y, z, xdot, ydot, zdot = tuple(np.concatenate([ecliptic_coords, ecliptic_velocities]))
# Use universal_keplerian to convert to BKEP with mu = gm_total
row = universal_keplerian(gm_total, x, y, z, xdot, ydot, zdot, d["epochMJD_TDB"])
elif convert_to == "KEP":
# Convert out of barycentric by subtracting the Sun's position and velocity from the Barycentric Cartesian coordinates
sun = ephem.get_particle("Sun", d["epochMJD_TDB"] + MJD_TO_JD_CONVERSTION - ephem.jd_ref)
equatorial_coords = np.array((x, y, z)) - np.array((sun.x, sun.y, sun.z))
equatorial_velocities = np.array((xdot, ydot, zdot)) - np.array((sun.vx, sun.vy, sun.vz))
if has_covariance:
# We now use our cartesian equatorial coordinates, which have been adjusted to
# no longer be barycentric, to convert our covariance matrix to the keplerian format.
cov = covariance_keplerian_xyz(
gm_sun,
equatorial_coords[0],
equatorial_coords[1],
equatorial_coords[2],
equatorial_velocities[0],
equatorial_velocities[1],
equatorial_velocities[2],
d["epochMJD_TDB"],
cov,
)
# Convert back to ecliptic from parse_orbit_row's equatorial output.
ecliptic_coords = np.array(equatorial_to_ecliptic(equatorial_coords))
ecliptic_velocities = np.array(equatorial_to_ecliptic(equatorial_velocities))
# Use universal_keplerian to convert to KEP with mu = gm_sun
x, y, z, xdot, ydot, zdot = tuple(np.concatenate([ecliptic_coords, ecliptic_velocities]))
row = universal_keplerian(gm_sun, x, y, z, xdot, ydot, zdot, d["epochMJD_TDB"])
else:
raise ValueError(f"Invalid conversion type {convert_to}. Must be one of: {expected_formats}")
row += (d["epochMJD_TDB"],)
row += tuple(d[col] for col, _ in cols_to_keep)
# If the covariance matrix is present, convert it to a flattened tuple for output.
cov_res = tuple(val for val in cov.flatten()) if has_covariance else tuple()
# Turn our converted row into a structured array
output_format = convert_to if d["FORMAT"] != "NONE" else "NONE"
result_struct_array = np.array(
[(d[primary_id_column_name], output_format) + row + cov_res],
dtype=output_dtype,
)
results.append(result_struct_array)
# Convert the list of results to a numpy structured array
output = np.squeeze(np.array(results)) if len(results) > 1 else results[0]
# The outputs of the sorcha orbit conversion utilities are always in radians, so convert to degrees for any such columns.
for col in degree_columns.get(convert_to, []):
# Convert from radians to degrees and wrap to [0, 360)
output[col] = (output[col] * 180 / np.pi) % 360
return output
[docs]
def convert(
data,
convert_to,
num_workers=1,
cache_dir=None,
primary_id_column_name="ObjID",
extra_cols_to_keep=[],
):
"""
Convert a structured numpy array to a different orbital format with support for parallel processing.
Parameters
----------
data : numpy structured array
The data to convert.
convert_to : str
The format to convert the data to. Must be one of: "BCART_EQ", "BCOM", "BKEP", "CART", "COM", "KEP"
num_workers : int, optional (default=1)
The number of workers to use for parallel processing.
cache_dir : str, optional
The base directory for downloaded files.
primary_id_column_name : str, optional (default="ObjID")
The name of the column in the data that contains the primary ID of the object.
extra_cols_to_keep : list, optional
List of tuples containing additional column names and dtypes to keep in the output data.
Returns
-------
data : numpy structured array
The converted data
"""
if num_workers == 1:
return _apply_convert(
data,
convert_to,
cache_dir=cache_dir,
primary_id_column_name=primary_id_column_name,
extra_cols_to_keep=extra_cols_to_keep,
)
# Parallelize the conversion of the data across the requested number of workers
return process_data(
data,
num_workers,
_apply_convert,
convert_to=convert_to,
cache_dir=cache_dir,
primary_id_column_name=primary_id_column_name,
extra_cols_to_keep=extra_cols_to_keep,
)
[docs]
def convert_cli(
input: str,
output_file_stem: str,
convert_to: Literal["BCART", "BCART_EQ", "BCOM", "BKEP", "CART", "COM", "KEP"],
file_format: Literal["csv", "hdf5"] = "csv",
chunk_size: int = 10_000,
num_workers: int = -1,
cli_args: dict = None,
):
"""
Convert an orbit file from one format to another with support for parallel processing.
Note that the output file will be written in the caller's current working directory.
Parameters
----------
input : str
The path to the input file.
output_file_stem : str
The stem of the output file.
convert_to : str
The format to convert the input file to. Must be one of: "BCART", "BCART_EQ", "BCOM", "BKEP", "CART", "COM", "KEP"
file_format : str, optional (default="csv")
The format of the output file. Must be one of: "csv", "hdf5"
chunk_size : int, optional (default=10_000)
The number of rows to read in at a time.
num_workers : int, optional (default=-1)
The number of workers to use for parallel processing of the individual
chunk. If -1, the number of workers will be set to the number of CPUs on
the system.
cli_args : argparse, optional (default=None)
The argparse object that was created when running from the CLI.
"""
input_file = Path(input)
if file_format == "csv":
output_file = Path(f"{output_file_stem}.{file_format.lower()}")
else:
output_file = (
Path(f"{output_file_stem}")
if output_file_stem.endswith(".h5")
else Path(f"{output_file_stem}.h5")
)
primary_id_column_name = cli_args.primary_id_column_name if cli_args else "ObjID"
if num_workers < 0:
num_workers = os.cpu_count()
# Open the input file and read the first line
reader_class = INPUT_READERS.get(file_format)
if reader_class is None:
logger.error(f"Invalid file format: {file_format}. Must be one of: 'csv', 'hdf5'.")
raise ValueError(f"Invalid file format: {file_format}. Must be one of: 'csv', 'hdf5'.")
sample_reader = reader_class(
input_file,
format_column_name="FORMAT",
primary_id_column_name=primary_id_column_name,
)
sample_data = sample_reader.read_rows(block_start=0, block_size=1)
# Check orbit format in the file
input_format = get_format(sample_data)
# Check that the input format is not already the desired format
if convert_to == input_format:
logger.error("Input file is already in the desired format")
# Reopen the file now that we know the input format and can validate the column names
required_columns_names, _ = get_output_column_names_and_types(
primary_id_column_name,
False, # We don't need to know if the input data has covariance columns for basic validation.
[], # No additional columns to keep
)
required_columns = required_columns_names[input_format]
full_reader = reader_class(
input_file,
format_column_name="FORMAT",
primary_id_column_name=primary_id_column_name,
required_column_names=required_columns,
)
# Calculate the start and end indices for each chunk, as a list of tuples
# of the form (start, end) where start is the starting index of the chunk
# and the last index of the chunk + 1.
total_rows = full_reader.get_row_count()
chunks = [(i, min(i + chunk_size, total_rows)) for i in range(0, total_rows, chunk_size)]
cache_dir = cli_args.ar_data_file_path if cli_args else None
for chunk_start, chunk_end in chunks:
# Read the chunk of data
chunk_data = full_reader.read_rows(block_start=chunk_start, block_size=chunk_end - chunk_start)
# Parallelize conversion of this chunk of data.
converted_data = convert(
chunk_data,
convert_to,
num_workers=num_workers,
cache_dir=cache_dir,
primary_id_column_name=primary_id_column_name,
)
# Write out the converted data in in the requested file format.
if file_format == "hdf5":
write_hdf5(converted_data, output_file, key="data")
else:
write_csv(converted_data, output_file)
logger.info(f"Data has been written to {output_file}")