import numpy as np
from layup.utilities.file_io.ObjectDataReader import ObjectDataReader
[docs]
def two_line_row_start(line):
"""Checks if the MPC Obs80 line is the first line of a two-line row format."""
note2 = line[14]
return note2 == "S" or note2 == "R" or note2 == "V"
[docs]
def ra_to_deg_ra(ra):
"""Converts the Obs80 RA string to degrees."""
hr = ra[0:2].strip()
hr = float(hr) if hr != "" else 0.0
mn = ra[3:5].strip()
mn = float(mn) if mn != "" else 0.0
sc = ra[6:].strip()
sc = float(sc) if sc != "" else 0.0
deg_ra = 15.0 * (hr + 1.0 / 60.0 * (mn + 1.0 / 60.0 * sc))
return deg_ra
[docs]
def dec_to_deg_dec(dec):
"""Converts the Obs80 Dec string to degrees."""
dg = dec[1:3].strip()
dg = float(dg) if dg != "" else 0.0
mn = dec[4:6].strip()
mn = float(mn) if mn != "" else 0.0
sc = dec[7:].strip()
sc = float(sc) if sc != "" else 0.0
deg_dec = dg + 1.0 / 60.0 * (mn + 1.0 / 60.0 * sc)
sign = dec[0]
if sign == "-":
deg_dec = -deg_dec
return deg_dec
[docs]
def mpctime_to_isotime(mpc_time):
"""Converts from the MPC time formatted string to ISO time format."""
yr = mpc_time[0:4]
mn = mpc_time[5:7]
dy = float(mpc_time[8:])
frac_day, day = np.modf(dy)
frac_hrs, hrs = np.modf(frac_day * 24)
frac_mins, mins = np.modf(frac_hrs * 60)
secs = frac_mins * 60
if np.round(secs, 4) >= 60.0:
secs = np.round(secs, 4) - 60
if secs < 0.0:
secs = 0.0
mins += 1
if mins >= 60:
mins -= 60
hrs += 1
if hrs >= 24:
hrs -= 24
day += 1 # Could mess up the number of days in the month
format_string = "%4s-%2s-%02dT%02d:%02d:%02." + str(4) + "f"
return format_string % (yr, mn, day, hrs, mins, secs)
[docs]
class Obs80DataReader(ObjectDataReader):
"""A class to read in object data files stored in the MPC's obs80
format.
Note that we will ignore the header lines that might accompany the
file.
"""
def __init__(self, filename, **kwargs):
"""A class for reading observational data from an MPC obs80 file.
Parameters
----------
filename : string
Location/name of the data file.
**kwargs: dictionary, optional
Extra arguments
"""
super().__init__(**kwargs)
[docs]
self.filename = filename
# The output data type for the structured array of the obs80 reader
[docs]
self.output_dtype = [
(self._primary_id_column_name, "U10"),
("obsTime", "U25"),
("ra", "f8"),
("dec", "f8"),
("mag", "f4"),
("filt", "U1"),
("stn", "U3"),
("cat", "U1"),
("prg", "U1"),
("sys", "U7"),
("ctr", "i4"),
("pos1", "f8"),
("pos2", "f8"),
("pos3", "f8"),
]
# define the column slices for the obs80 format
[docs]
self.col_names = {
"ObjID": slice(0, 5),
"provID": slice(5, 12),
"prg": slice(13, 14),
"obsTime": slice(15, 32),
"ra": slice(32, 44),
"dec": slice(44, 56),
"mag": slice(65, 70),
"filt": slice(70, 71),
"cat": slice(71, 72),
"obs_code": slice(77, 80),
}
if self._primary_id_column_name not in self.col_names:
raise ValueError(
f"Primary ID column name '{self._primary_id_column_name}' not found in Obs80 column definitions. "
"Valid column names are: " + ", ".join(self.col_names.keys())
)
# A table holding just the object ID for each row. Only populated
# if we try to read data for specific object IDs.
[docs]
self.obj_id_table = None
# A dictionary to hold the number of rows for each object ID. Only populated
# if we try to read data for specific object IDs.
[docs]
self.obj_id_counts = {}
[docs]
def get_reader_info(self):
"""Return a string identifying the current reader name
and input information (for logging and output).
Returns
--------
name : string
The reader information.
"""
return f"Obs80DataReader:{self.filename}"
[docs]
def get_row_count(self):
"""Return the total number of rows in the file.
Note that the obs 80 format allows for two-line rows, so the number of
lines used to store the data is not the same as the number of rows.
Returns
-------
int
Total rows in the file.
"""
row_cnt = 0
with open(self.filename, "r") as f:
for line in f:
# Skip empty lines, header rows, and the starting line of two-line rows.
if line.strip() != "" and not self._is_header_row(line) and not two_line_row_start(line):
row_cnt += 1
return row_cnt
[docs]
def _read_rows_internal(self, block_start=0, block_size=None, **kwargs):
"""Reads in a set number of rows from the input.
Parameters
-----------
block_start : integer, optional
The 0-indexed row number from which
to start reading the data. For example in a CSV file
block_start=2 would skip the first two lines after the header
and return data starting on row=2. Default =0
block_size: integer, optional, default=None
The number of rows to read in.
Use block_size=None to read in all available data.
default =None
**kwargs : dictionary, optional
Extra arguments
Returns
-----------
res : numpy structured array
The data read in from the file.
"""
records = []
with open(self.filename, "r") as f:
curr_block = 0
block_end = block_start + block_size if block_size is not None else None
prev_line = None
check_header = True
for curr_line in f:
if check_header and self._is_header_row(curr_line):
continue
else:
check_header = False
if block_end is not None and curr_block >= block_end:
# We have read enough rows from the file.
break
if two_line_row_start(curr_line):
# We have a two-line row. We will save our current line
# and wait for the next line to merge them as a single row to process.
prev_line = curr_line
continue
# Process our current MPC Obs80 row.
if curr_block >= block_start:
if prev_line is not None:
# We have a two-line row to process.
records.append(self.convert_obs80(prev_line, second_line=curr_line))
# Remove the previous line so we don't process it again.
prev_line = None
else:
# We have a single line to process.
records.append(self.convert_obs80(curr_line))
curr_block += 1
return np.array(records, dtype=self.output_dtype)
[docs]
def _build_id_map(self):
"""Builds a table of just the object IDs"""
if self.obj_id_table is not None:
return
obj_ids = []
with open(self.filename, "r") as f:
check_header = True
for curr_line in f:
if check_header and self._is_header_row(curr_line):
continue
else:
check_header = False
if two_line_row_start(curr_line):
# We have a two-line row, so skip it and only
# add the object ID from the final row.
continue
obj_id = self.get_obs80_id(curr_line)
obj_ids.append(obj_id)
# Count the number of times we see this object ID.
self.obj_id_counts[obj_id] = self.obj_id_counts.get(obj_id, 0) + 1
self.obj_id_table = np.array(obj_ids, dtype=np.dtype([(self._primary_id_column_name, "U10")]))
self.obj_id_table = self._validate_object_id_column(self.obj_id_table)
[docs]
def _read_objects_internal(self, obj_ids, **kwargs):
"""Read in a chunk of data for given object IDs.
Parameters
-----------
obj_ids : list
A list of object IDs to use.
**kwargs : dictionary, optional
Extra arguments
Returns
-----------
res : numpy structured array
The data read in from the file.
"""
self._build_id_map()
# Find which rows map to the requested object ID list
skipped_rows = ~np.isin(self.obj_id_table[self._primary_id_column_name], obj_ids)
records = []
# The index of the current row we are processing to check against skipped_rows.
# We start at -1 because we will increment it before processing the first row.
curr_row_idx = -1
prev_line = None
check_header = True
with open(self.filename, "r") as f:
for curr_line in f:
if check_header and self._is_header_row(curr_line):
continue
else:
check_header = False
if two_line_row_start(curr_line):
# We have a two-line row. We will save our current line
# and wait for the next line to merge them as a single row to process.
prev_line = curr_line
continue
# We're at a potentially processable row, so increment our index.
curr_row_idx += 1
if skipped_rows[curr_row_idx]:
continue
# Process our current MPC Obs80 row.
if prev_line is not None:
records.append(self.convert_obs80(prev_line, curr_line))
# Remove the previous line so we don't process it again.
prev_line = None
else:
# Our row is a single line to process.
records.append(self.convert_obs80(curr_line))
return np.array(records, dtype=self.output_dtype)
[docs]
def get_obs80_id(self, line):
"""Get the object ID from the Obs80 line. Note that we have already confirmed
that `self.primary_id_column_name` is in `self.col_names`.
Parameters
----------
line : str
The line of obs80 data to extract the object ID from.
Returns
-------
str
The object ID extracted from the line.
"""
# Use the object name as object ID if provided, otherwise use the provisional ID.
return line[self.col_names[self._primary_id_column_name]].strip()
[docs]
def convert_obs80(self, line, second_line=None):
"""
Converts a row of obs80 data to a tuple of values.
The second line is optional and may contain the observatory position.
Parameters
----------
line : str
The line of obs80 data to convert.
second_line : str, optional
The optional second line of obs80 data to convert. Default is None.
Returns
-------
tuple
A tuple of values containing the object ID, ISO time, RA in degrees,
Dec in degrees, magnitude, filter, observatory code, catalog, program,
and observatory position (x, y, z).
"""
# Extract the relevant fields from the first mpc obs80 line.
# obj_name = line[self.col_names["obj_name"]].strip()
# prov_id = line[self.col_names["prov_id"]].strip()
prg = line[self.col_names["prg"]].strip()
obstime = line[self.col_names["obsTime"]].strip()
ra = line[self.col_names["ra"]].strip()
dec = line[self.col_names["dec"]].strip()
mag = line[self.col_names["mag"]].strip()
filt = line[self.col_names["filt"]].strip()
cat = line[self.col_names["cat"]].strip()
obs_code = line[self.col_names["obs_code"]].strip()
# Use the object name as object ID if provided, otherwise use the provisional ID.
obj_id = line[self.col_names[self._primary_id_column_name]].strip()
# Do any unit and type conversions. Note that various layup verbs will likely
# convert these values again to their internal formats.
iso_time = mpctime_to_isotime(obstime)
ra_deg, dec_deg = ra_to_deg_ra(ra), dec_to_deg_dec(dec)
mag = float(mag) if mag != "" else 0.0
# Process the observatory position if provided.
obs_geo_x, obs_geo_y, obs_geo_z = np.nan, np.nan, np.nan
ades_sys = ""
if second_line is not None:
# Check that the second line is long enough to contain the observatory position.
if len(second_line) < 80:
raise ValueError(
f"Observatory position line is too short for {obj_id} and line (with length {len(second_line)}: {second_line}"
)
if second_line[77:80].strip() != obs_code:
raise ValueError(
f"Observatory codes do not match in the second line provided for the observatory position. {obs_code} and {second_line[77:80].rstrip()}"
)
unit_flag = second_line[32:34].strip()
if unit_flag in ["1", "2"]:
ades_sys = "ICRF_KM" if unit_flag == "1" else "ICRF_AU"
# For each coordinate, the first character is a sign (+/-) and the next 10 characters are the value.
obs_geo_x = float(second_line[34] + second_line[35:45].strip())
obs_geo_y = float(second_line[46] + second_line[47:57].strip())
obs_geo_z = float(second_line[58] + second_line[59:69].strip())
else:
raise ValueError(
f"Unknown observatory position unit flag '{unit_flag}' in the second line of obs80 data. Should be '1' (km) or '2' (AU)."
)
return (
obj_id,
iso_time,
ra_deg,
dec_deg,
mag,
filt,
obs_code,
cat,
prg,
ades_sys,
399, # The 'ctr' code for the Earth
obs_geo_x,
obs_geo_y,
obs_geo_z,
)