diff --git a/apps/audibleAlerts/audibleAlerts/core.py b/apps/audibleAlerts/audibleAlerts/core.py index 6c48c8db5..47d7c03a7 100644 --- a/apps/audibleAlerts/audibleAlerts/core.py +++ b/apps/audibleAlerts/audibleAlerts/core.py @@ -94,6 +94,7 @@ def reaction_handler(self, new_message, element_name, transition, utterance_choi return value = new_message[element_name] self.log.debug(f"Judging reaction for {element_name} change to {repr(value)} using {transition}") + self.log.debug(f"before check {self.latch_transitions=}") last_value = self.latch_transitions.get(transition) self.log.debug(f"{new_message}\n{transition.compare(value)=}, last value was {last_value}, {value != last_value=} {(not transition.compare(last_value))=}") if transition.compare(value) and ( @@ -104,10 +105,12 @@ def reaction_handler(self, new_message, element_name, transition, utterance_choi (not transition.compare(last_value)) ): self.latch_transitions[transition] = value + self.log.debug(f"after update {self.latch_transitions=}") + self.log.debug(f"latched {transition=} with {value=}") last_transition_ts = self.per_transition_cooldown_ts.get(transition, 0) sec_since_trigger = time.time() - last_transition_ts debounce_expired = sec_since_trigger > transition.debounce_sec - self.log.debug(f"Debounced {sec_since_trigger=}") + self.log.debug(f"Checking for debounce: {sec_since_trigger=} {debounce_expired=}") if debounce_expired: utterance = choice(utterance_choices) self.log.debug(f"Submitting speech request: {utterance}") @@ -115,8 +118,10 @@ def reaction_handler(self, new_message, element_name, transition, utterance_choi else: self.log.debug(f"Would have spoken, but it's only been {sec_since_trigger=}") elif transition.compare(last_value) and not transition.compare(value): - # un-latch, so next time we change to a value that compares True we trigger again: + self.log.debug(f"un-latch {transition}, so next time we change to a value that compares True we trigger again. ({last_value=} {value=})") + self.log.debug(f"before {self.latch_transitions=}") del self.latch_transitions[transition] + self.log.debug(f"after {self.latch_transitions=}") else: self.log.debug(f"Got {new_message.device}.{new_message.name} but {transition=} did not match") @@ -169,7 +174,7 @@ def handle_soundboard_switch(self, prop: properties.IndiProperty, new_message): def load_personality(self, personality_name): personality_file = os.path.join(HERE, "personalities", f"{personality_name}.xml") - for cb, device_name, property_name in self._cb_handles: + for cb, device_name, property_name, _ in self._cb_handles: try: self.client.unregister_callback(cb, device_name=device_name, property_name=property_name) except ValueError: @@ -202,7 +207,7 @@ def load_personality(self, personality_name): device_name=device_name, property_name=property_name ) - self._cb_handles.add((cb, device_name, property_name)) + self._cb_handles.add((cb, device_name, property_name, t)) self.log.debug(f"Registered reaction handler on {device_name=} {property_name=} {element_name=} using transition {t}") for idx, utterance in enumerate(reaction.transitions[t]): self.log.debug(f"{reaction.indi_id}: {t}: {utterance}") diff --git a/apps/audibleAlerts/audibleAlerts/personalities/default.xml b/apps/audibleAlerts/audibleAlerts/personalities/default.xml index d3ef3a866..ade4bbc02 100644 --- a/apps/audibleAlerts/audibleAlerts/personalities/default.xml +++ b/apps/audibleAlerts/audibleAlerts/personalities/default.xml @@ -13,6 +13,7 @@ Entregue todas sus empanadas al jefe. We, at Mag AOX, recommend Mag AOX. + non è consentito il cappuccino dopo mezzogiorno + + + non è consentito il cappuccino dopo mezzogiorno + + Saving data for {observers.current_observer.pfoa}. diff --git a/apps/dbIngest/dbIngest.py b/apps/dbIngest/dbIngest.py index d0fe63dbd..0282f75be 100644 --- a/apps/dbIngest/dbIngest.py +++ b/apps/dbIngest/dbIngest.py @@ -204,9 +204,10 @@ def setup(self): self.fs_observer.start() def rescan_files(self): + search_paths = [self.config.common_path_prefix / name for name in self.config.data_dirs] with self.conn.cursor() as cur: - ingest.update_file_inventory(cur, self.config.hostname, self.config.data_dirs) - self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname}") + ingest.update_file_inventory(cur, self.config.hostname, search_paths) + self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname} from {search_paths}") def ingest_line(self, line): # avoid infinite loop of modifying log file and getting notified of the modification diff --git a/python/magaox/constants.py b/python/magaox/constants.py index af9374b93..b19c4227c 100644 --- a/python/magaox/constants.py +++ b/python/magaox/constants.py @@ -1,4 +1,5 @@ from enum import Enum +import upath class StateCodes(Enum): FAILURE = -20 # The application has failed should be used when m_shutdown is set for an error. diff --git a/python/magaox/db/ingest.py b/python/magaox/db/ingest.py index a722c9b9e..deeb329d7 100644 --- a/python/magaox/db/ingest.py +++ b/python/magaox/db/ingest.py @@ -52,7 +52,7 @@ def batch_file_origins(cur: psycopg.Cursor, records: list[FileOrigin]): ''', [(rec.origin_host, rec.origin_path, rec.creation_time, rec.modification_time, rec.size_bytes) for rec in records]) cur.execute("COMMIT") -def identify_new_files(cur: psycopg.Cursor, this_host: str, paths: list[str]): +def identify_new_files(cur: psycopg.Cursor, this_host: str, paths: list[pathlib.Path]): '''Returns the paths from ``paths`` that are not already part of the ``file_origins`` table''' if len(paths) == 0: return [] @@ -64,7 +64,7 @@ def identify_new_files(cur: psycopg.Cursor, this_host: str, paths: list[str]): INSERT INTO on_disk_files (path) VALUES (%s) ''' - cur.executemany(query, [(x,) for x in paths]) + cur.executemany(query, [(x.as_posix(),) for x in paths]) # execute_values(cur, query, ) log.debug(f"Loaded {len(paths)} paths into temporary table for new file identification") @@ -126,6 +126,7 @@ def update_file_inventory(cur: psycopg.Cursor, host: str, data_dirs: list[pathli cur.execute("BEGIN") for prefix in data_dirs: for dirpath, dirnames, filenames in os.walk(prefix): + dirpath = pathlib.Path(dirpath) log.info(f"Checking for new files in {dirpath}") new_files = identify_new_files(cur, host, [dirpath / fn for fn in filenames]) if len(new_files) == 0: diff --git a/python/magaox/indi/device.py b/python/magaox/indi/device.py index 77a158042..4459b2183 100644 --- a/python/magaox/indi/device.py +++ b/python/magaox/indi/device.py @@ -75,12 +75,9 @@ def log_level_to_label(levelno): class MagAOXLogFormatter(logging.Formatter): def __init__(self, fmt='%(asctime)s %(levelname)s %(message)s (%(name)s:%(funcName)s:%(lineno)d)'): - super().__init__( - fmt=fmt, - datefmt='%Y-%m-%dT%H:%M:%S.%f000', - ) - # def formatTime(self, record, datefmt): - # return datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).strftime() + super().__init__(fmt=fmt) + def formatTime(self, record, datefmt): + return datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f') + '000' class NDJSONLogFormatter(MagAOXLogFormatter): def __init__(self): diff --git a/python/magaox/pack/__init__.py b/python/magaox/pack/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/magaox/pack/core.py b/python/magaox/pack/core.py new file mode 100644 index 000000000..b96652666 --- /dev/null +++ b/python/magaox/pack/core.py @@ -0,0 +1,363 @@ +import os +import time +import typing +from scipy.stats import mode +from tqdm import tqdm +import concurrent.futures +import shutil +import tempfile +import sys +import fixr +import fsspec +import logging +import numcodecs +from numcodecs import Blosc, Delta +import zarr +import numpy as np +from magaox import db + +from ..constants import FOLDER_TIMESTAMP_FORMAT +from ..quicklook.core import datestamp_strings_from_ts + + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) +log.setLevel(logging.DEBUG) + + +class EmptyKeyError(Exception): + pass + +def infer_dtype(val): + if not np.isscalar(val): + raise ValueError(f"Got {repr(val)} ({type(val)}) but this only works on scalar types") + elif isinstance(val, str): + return str + else: + return np.result_type(val) + +def tel2zarr(msg: dict, path: list[str], root, row_count, chunk_size): + flattened_msg_keys = [] + for k in msg: + if not isinstance(msg[k], dict): + p = '/'.join(path + [k]) + # infer dtype from msg[k] + shape = (row_count,) + if isinstance(msg[k], list) and len(msg[k]) > 0: + shape += (len(msg[k]),) + dtype = infer_dtype(msg[k][0]) + elif isinstance(msg[k], list): + # length-zero lists cannot have useful information + continue + else: + dtype = infer_dtype(msg[k]) + arr = root.zeros(p, shape=shape, chunks=(chunk_size,), dtype=dtype) + def accessor(the_msg, k=k): + for part in path: + the_msg = the_msg[part] + return the_msg[k] + flattened_msg_keys.append((p, arr, accessor)) + else: + flattened_msg_keys.extend(tel2zarr(msg[k], path + [k], root, row_count, chunk_size)) + return flattened_msg_keys + +def datetime_to_seconds_nanos(dt): + seconds = int(dt.timestamp()) + nanoseconds = int(dt.microsecond * 1000) + return (seconds, nanoseconds) + +def unpack_one_xrif(local_path, log): + # log.debug(f"Unpacking {local_path}") + with open(local_path, 'rb') as f: + img = fixr.XrifReader(f).copy_data() + # n.b. after reading `img`, file `f` has seeked (sought?) to + # the beginning of the timing xrif archive concatenated onto + # the image data + times = fixr.XrifReader(f).copy_data() + img = img.reshape((-1,) + img.shape[2:]) + times = times.reshape((-1,) + (times.shape[-1],)) + return img, times + +def preprocess_path(origin_host, origin_path): + if origin_host == 'exao2': + local_path = origin_path.replace('/opt/MagAOX', '/srv/rtc/data') + elif origin_host == 'exao3': + local_path = origin_path.replace('/opt/MagAOX', '/srv/icc/data') + else: + local_path = origin_path + return local_path + +def infer_common_xrif_cube_size_dtype(paths): + frames = [] + planes = [] + height = [] + width = [] + shapes = set() + img_dtype = None + times_dtype = None + for p in paths: + with open(p, 'rb') as fh: + xr = fixr.XrifReader(fh) + img_dtype = xr.array.dtype + fr, pl, ht, wd = xr.shape + shapes.add(xr.shape) + frames.append(fr) + planes.append(pl) + height.append(ht) + width.append(wd) + xr2 = fixr.XrifReader(fh) + times_dtype = xr2.array.dtype + + common_shape = np.max(frames), mode(planes).mode, mode(height).mode, mode(width).mode + log.debug(f"Guessed {common_shape=} from {shapes=}") + return common_shape, img_dtype, times_dtype + +def repack_cam_channel(camera_channel, root, bounds, cur, chunk_size) -> typing.Optional[tuple[zarr.Array, zarr.Array]]: + other_files_args = bounds + (camera_channel,) + other_files_q_body = ''' + FROM file_origins + WHERE creation_time + BETWEEN %s AND %s + AND origin_path LIKE '%%' || %s || '%%.xrif' + ''' + other_files_count_q = '''SELECT COUNT(*) as count''' + other_files_q_body + cur.execute(other_files_count_q, other_files_args) + xrifs_row_count = cur.fetchone()['count'] + if xrifs_row_count == 0: + log.debug(f"No {camera_channel} frames to process") + return None + else: + log.debug(f"Got {xrifs_row_count} XRIF files to process") + other_files_q = ''' + SELECT + origin_host, origin_path, creation_time, size_bytes + ''' + other_files_q_body + + # xrif archives contain an unpredictable number of frames. + # guessing a shape from the largest (by bytes) archive can get + # confused when there's an outlier xrif (i.e. changed ROI during obs) + # + # instead, maybe just find the most common shape, and drop the rest + # with a warning? + n_shape_samples = 10 + cur.execute(other_files_q + f" order by random() desc limit {n_shape_samples}", other_files_args) + ex_rows = cur.fetchall() + ex_paths = [preprocess_path(r['origin_host'], r['origin_path']) for r in ex_rows] + # skip planes per frame (for now?) + (frames_per_xrif_chunk, _, img_height, img_width), img_dtype, times_dtype = infer_common_xrif_cube_size_dtype(ex_paths) + times_width = 5 # five time values per frame are stored (idx, acq sec, acq nsec, wrt sec, wrt nsec) + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) + + with tempfile.TemporaryDirectory() as td: + rechunk = zarr.open_group(f"file://" + td) + frames_tmp = rechunk.zeros( + 'frames', + shape=(frames_per_xrif_chunk * xrifs_row_count,) + (img_height, img_width), + chunks=(frames_per_xrif_chunk,), + dtype=img_dtype, + ) + times_tmp = rechunk.zeros( + 'times', + shape=(frames_per_xrif_chunk * xrifs_row_count,) + (times_width,), + chunks=(frames_per_xrif_chunk,), + dtype=times_dtype, + ) + + results = [] + cur.execute(other_files_q + " ORDER BY creation_time ASC", bounds + (camera_channel,)) + log.info("Submitting futures...") + total_bytes = 0 + for idx, row in enumerate(cur): + total_bytes += row['size_bytes'] + local_path = preprocess_path(row['origin_host'], row['origin_path']) + unpack_args = local_path, log + results.append(pool.submit(unpack_one_xrif, *unpack_args)) + log.info(f"Submitted {idx + 1} futures, total compressed size {total_bytes} bytes") + start_idx = 0 + # iterate in-order because `other_files_q` has an `ORDER BY` and we want to + # preserve time ordering + for res in tqdm(results): + frames, times = res.result() + if frames.shape[0] != times.shape[0]: + log.warning(f"Discarding {frames.shape[0]} frames because xrif wrote {times.shape[0]} timestamps for this archive") + continue + n_frames = frames.shape[0] + if frames.shape[1:] != (img_height, img_width): + log.warning(f"Skipping {n_frames} frames because {frames.shape=} but {frames_tmp.shape=}") + continue + # log.debug(f"Got {frames.shape=}, {n_frames=}, going in to {frames_tmp.shape=} (guessed {frames_per_xrif_chunk, img_height, img_width=}) at {start_idx=}") + frames_tmp[start_idx:start_idx + n_frames] = frames + times_tmp[start_idx:start_idx + n_frames] = times + start_idx += n_frames + total_frames = start_idx + log.debug(f"Results gotten: {total_frames=}, assigning into final archives") + + compressor = Blosc(cname='zstd', clevel=5, shuffle=Blosc.SHUFFLE) + # filters = [Delta(dtype='u2')] + # test showed no delta compresses better / decompresses faster + filters = [] + cam_frames = root.zeros( + f'frames', + shape=(total_frames,) + (img_height, img_width), + chunks=(chunk_size,), + dtype=img_dtype, + filters=filters, + compressor=compressor, + ) + cam_times = root.zeros( + f'times', + shape=(total_frames,) + (times_width,), + chunks=(chunk_size,), + dtype=times_dtype, + ) + log.info(f"Created {cam_frames} with {filters=} {compressor=}") + start = time.perf_counter() + cam_frames[:] = frames_tmp[:start_idx] + dt = time.perf_counter() - start + log.debug(f"Compressed and wrote frames in {dt} sec") + + start = time.perf_counter() + cam_times[:] = times_tmp[:start_idx] + dt = time.perf_counter() - start + log.debug(f"Compressed and wrote times in {dt} sec") + return cam_frames, cam_times + +def pack_one_obs(start_ts, obs_email, obs_name): + raise NotImplementedError() + semester, night = datestamp_strings_from_ts(start_ts) + title = f"{start_ts.strftime(FOLDER_TIMESTAMP_FORMAT)}_{obs_name}" + + if not obs_email: # can be empty + obs_email = "_no_email_" + + # ... / 2022B / a@b.edu / 2022-02-02_020304_label + simple_observer_prefix = f"{semester}/{obs_email}/{title}.zarr" + root = zarr.open_group(f"file://./output/{simple_observer_prefix}/") + +def main(): + obs_name = 'HD141569_2x2binning_iz_unsats' + sci_chunk_size, wfs_chunk_size = 400, 5 * 2000 + root = zarr.open_group(f"file:///home/jlong/packr/output/{obs_name}.zarr/") + + # dbconfig = db.DbConfig(user='jlong', host='localhost') + os.environ['XTELEMDB_PASSWORD'] = 'extremeAO!' + dbconfig = db.DbConfig(user='xsup') + conn = dbconfig.connect() + cur = conn.cursor() + cur.execute("SELECT * FROM observations WHERE obsName = %s", (obs_name,)) + rows = cur.fetchall() + if len(rows) > 1: + print("Got more than one matching obs:", rows) + res = rows[0] + bounds = start_ts, end_ts = res['start_ts'], res['end_ts'] + + # bounds = '2023-03-13 07:17:30.589645+00', '2023-03-13 07:19:54.23374+00' + start_ts, end_ts = "2024-05-25T07:44:01.893517+00:00", "2024-05-25T09:27:42.979882+00:00" + + if False: + cur.execute(''' + SELECT DISTINCT device, ec FROM telem WHERE ts BETWEEN %s AND %s; + ''', bounds) + device_event_code_pairs = cur.fetchall() + + for dev_ec in device_event_code_pairs: + log.debug(f"Processing {dev_ec}") + telem_query_args = (dev_ec['device'], dev_ec['ec']) + bounds + cur.execute(""" + SELECT COUNT(*) + FROM telem + WHERE + device = %s + AND ec = %s + AND ts BETWEEN %s AND %s + """, telem_query_args) + row_count = cur.fetchone()['count'] + log.debug(f"{dev_ec['device']} {dev_ec['ec']} has {row_count} rows") + + telem_range_query = "SELECT * FROM telem WHERE device = %s AND ec = %s AND ts BETWEEN %s AND %s ORDER BY ts ASC" + cur.execute(telem_range_query + " LIMIT 1", telem_query_args) + one_example = cur.fetchone() + + this_dev_ec_path = f"telem/{dev_ec['device']}/{dev_ec['ec']}" + this_ec_root = root.require_group(this_dev_ec_path) + telem_element_sequence = tel2zarr(one_example['msg'], [], this_ec_root, row_count, 1000) + ts_dtype = [('sec', 'i4'), ('nsec', 'i4')] + ts_arr = this_ec_root.zeros('ts', shape=(row_count,), chunks=(wfs_chunk_size,), dtype=ts_dtype) + chunks = max(1, row_count // wfs_chunk_size + 1) + log.debug(f"Using {row_count=} {wfs_chunk_size=} gives {chunks=}") + # allocate chunks per telem element + per_telem_chunks = [] + buffer_size = min(wfs_chunk_size, row_count) + for path, arr, accessor in telem_element_sequence: + per_telem_chunks.append(np.zeros((buffer_size,) + arr.shape[1:], dtype=arr.dtype)) + log.debug(f"Preallocating {buffer_size} {arr.dtype} elements for {path}") + ts_chunk = np.zeros(buffer_size, dtype=ts_dtype) + for i in range(chunks): + log.debug(f"Chunk {i+1}") + q = telem_range_query + if chunks == 1: + q = telem_range_query + else: + q = telem_range_query + f" LIMIT {wfs_chunk_size}" + if i != 0: + q += f" OFFSET {i * wfs_chunk_size}" + log.debug(f"Querying: {q}") + cur.execute(q, telem_query_args) + + # _count = 0 + for idx, row in enumerate(cur): + # log.debug(f"Chunk {i+1} row {idx}") + sec, nsec = datetime_to_seconds_nanos(row['ts']) + ts_chunk[idx]['sec'] = sec + ts_chunk[idx]['nsec'] = nsec + # log.debug(f"{ts_chunk[idx]=}") + for buffer_array, (path, _, accessor) in zip(per_telem_chunks, telem_element_sequence): + # log.debug(f"{path} {buffer_array[idx]=}") + buffer_array[idx] = accessor(row['msg']) + # _count += 1 + + # assert _count == idx + 1 + slice_start = i * wfs_chunk_size + chunk_len = idx + 1 # for partial chunks: after loop, `idx` is last index, exclusive bound is one more + slice_stop = slice_start + chunk_len + + # assign chunk into zarr + for buffer_array, (path, arr, accessor) in zip(per_telem_chunks, telem_element_sequence): + log.debug(f"{path} {arr} {slice_start=} {slice_stop=}") + arr[slice_start:slice_stop] = buffer_array[:chunk_len] + ts_arr[slice_start:slice_stop] = ts_chunk[:chunk_len] + + # loop over other matching files + + for camera_channel in [ + # 'camsci1', + # 'camsci2', + # 'camtip', + # 'camacq', + ]: + log.info(f"Checking for {camera_channel}...") + res = repack_cam_channel(camera_channel, root.create_group('sci/' + camera_channel), bounds, cur, sci_chunk_size) + if res is not None: + frames, times = res + log.debug(f"{frames.info=}") + else: + log.debug(f"No {camera_channel} frames") + for camera_channel in [ + # 'camwfs', + 'camlowfs', + ]: + log.info(f"Checking for {camera_channel}...") + res = repack_cam_channel(camera_channel, root.create_group('wfs/' + camera_channel), bounds, cur, wfs_chunk_size) + if res is not None: + frames, times = res + log.debug(f"{frames.info=}") + else: + log.debug(f"No {camera_channel} frames") + + import IPython + IPython.embed() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/python/magaox/quicklook/commands/_base.py b/python/magaox/quicklook/commands/_base.py index 00250c633..5e9ea4363 100644 --- a/python/magaox/quicklook/commands/_base.py +++ b/python/magaox/quicklook/commands/_base.py @@ -1,22 +1,61 @@ import datetime +from datetime import timezone import socket import typing import psycopg import xconf +import upath import magaox.db.config as dbconfig -from ... import constants +from ... import constants, utils @xconf.config class BaseQuicklookCommand(dbconfig.BaseConfig, xconf.Command): + database : dbconfig.DbConfig = xconf.field(default=dbconfig.DbConfig(), help="PostgreSQL database connection") dry_run : bool = xconf.field(default=False, help="Whether to perform a dry run or actually execute the necessary commands") title : typing.Optional[str] = xconf.field(default=None, help="All or part of the observation name to process") email : typing.Optional[str] = xconf.field(default=None, help="Email address for the observer to process") - semester : typing.Optional[str] = xconf.field(default=None, help="Semester to search in, 202XXA/20XXB format") + semester : typing.Optional[str] = xconf.field(default=utils.get_current_semester(), help="Semester to search in, 202XXA/20XXB format") utc_start : typing.Optional[datetime.datetime] = xconf.field(default=None, help="ISO UTC datetime stamp of earliest observation start time to process (supersedes semester)") utc_end : typing.Optional[datetime.datetime] = xconf.field(default=None, help="ISO UTC datetime stamp of latest observation end time to process (supersedes semester)") - data_roots : list[str] = xconf.field(default_factory=constants.LOOKYLOO_DATA_ROOTS.copy, help=f"Search directory for telem and rawimages subdirectories, repeat to specify multiple roots. (default: {constants.LOOKYLOO_DATA_ROOTS})") + data_roots : list[upath.UPath] = xconf.field(default_factory=constants.LOOKYLOO_DATA_ROOTS.copy, help=f"Search directory for telem and rawimages subdirectories, repeat to specify multiple roots. (default: {constants.LOOKYLOO_DATA_ROOTS})") + common_path_prefix : str = xconf.field(default=constants.DEFAULT_PREFIX, help="Prefix for all instrument data and config directories") + + def get_search_start_end_timestamps( + self, + semester : str, + utc_start : typing.Optional[datetime.datetime] = None, + utc_end : typing.Optional[datetime.datetime] = None, + ): + letter = semester[-1].upper() + try: + if len(semester) != 5 or semester[-1].upper() not in ['A', 'B']: + raise ValueError() + year = int(semester[:-1]) + month = 1 if letter == 'A' else 6 + day = 15 if month == 6 else 1 + except ValueError: + raise RuntimeError(f"Got {semester=} but need a 4 digit year + A or B (e.g. 2022A)") + semester_start_dt = datetime.datetime(year, month, day) + semester_start_dt = semester_start_dt.replace(tzinfo=timezone.utc) + start_dt = semester_start_dt + semester_end_dt = datetime.datetime( + year=year + 1 if letter == 'B' else year, + month=1 if letter == 'B' else 6, + day = 15 if letter == 'A' else 1, + ).replace(tzinfo=timezone.utc) + end_dt = semester_end_dt + + if utc_start is not None: + start_dt = utc_start + + if utc_end is not None: + end_dt = utc_end + + if end_dt < start_dt: + raise ValueError("End time is before start time") + return start_dt, end_dt def main(self): raise NotImplementedError("Command subclasses must implement main()") diff --git a/python/magaox/quicklook/commands/bundle.py b/python/magaox/quicklook/commands/bundle.py index 82deb00d4..30eac48c8 100644 --- a/python/magaox/quicklook/commands/bundle.py +++ b/python/magaox/quicklook/commands/bundle.py @@ -11,6 +11,7 @@ from ...utils import parse_iso_datetime, format_timestamp_for_filename, utcnow, get_search_start_end_timestamps from ..core import ( TimestampedFile, + PathRewriteConfig, ObservationSpan, load_file_history, do_quicklook_for_camera, @@ -30,110 +31,111 @@ log = logging.getLogger(__name__) +def generate_path_rewrites(): + return [ + PathRewriteConfig(hostname='exao2', from_path='/opt/MagAOX', to_path='/srv/rtc/data'), + PathRewriteConfig(hostname='exao3', from_path='/opt/MagAOX', to_path='/srv/icc/data'), + ] + @xconf.config class Bundle(BaseQuicklookCommand): - cameras : list[str] = xconf.field(default_factory=lambda: list(constants.ALL_CAMERAS.keys()), help="Camera names (i.e. rawimages subfolder names)") output_dir : upath.UPath = xconf.field(default=upath.UPath('.'), help="Path or URI to destination") + parallel_jobs : int = xconf.field(default=10, help="How many export jobs to start in parallel") + path_rewrites : list[PathRewriteConfig] = xconf.field(default_factory=generate_path_rewrites, help="Rewrite the paths in the inventory (e.g. to use an NFS mount to read from another host)") def main(self): - if not self.output_path.is_dir(): - self.output_path.mkdir(parents=True, exist_ok=True) - timestamp_str = format_timestamp_for_filename(utcnow()) - - # Specifying a filename results in no console output, so add it back - if args.verbose or args.dry_run: - console = logging.StreamHandler() - console.setLevel(logging.DEBUG) - logging.getLogger('').addHandler(console) - formatter = logging.Formatter(log_format) - console.setFormatter(formatter) - log.debug(f"Logging to {log_file_path}") + if not self.output_dir.is_dir(): + self.output_dir.mkdir(parents=True, exist_ok=True) + # timestamp_str = format_timestamp_for_filename(utcnow()) + # # Specifying a filename results in no console output, so add it back + # if self.dry_run: + # console = logging.StreamHandler() + # console.setLevel(logging.DEBUG) + # logging.getLogger('').addHandler(console) + # formatter = logging.Formatter(log_format) + # console.setFormatter(formatter) + # log.debug(f"Logging to {log_file_path}") - cameras = args.camera - if args.data_root: - data_roots = [pathlib.Path(x) for x in args.data_root] - else: - data_roots = [pathlib.Path(x) for x in LOOKYLOO_DATA_ROOTS.split(':')] - output_dir = pathlib.Path(args.output_dir) - start_dt, end_dt = get_search_start_end_timestamps(args.semester, args.utc_start, args.utc_end) - new_observation_spans, _ = get_new_observation_spans(data_roots, set(), start_dt, end_dt, ignore_data_integrity=args.ignore_data_integrity) + start_dt, end_dt = get_search_start_end_timestamps(self.semester, self.utc_start, self.utc_end) + new_observation_spans, _ = get_new_observation_spans(start_dt, end_dt, email=self.email, title=self.title) - with futures.ThreadPoolExecutor(max_workers=args.parallel_jobs) as threadpool: + with futures.ThreadPoolExecutor(max_workers=self.parallel_jobs) as threadpool: for span in new_observation_spans: if span.end is None: log.debug(f"Skipping {span} because it is an open interval") continue - if decide_to_process(args, span): - log.info(f"Observation interval to process: {span}") - create_bundle_from_span( - span, - output_dir, - data_roots, - threadpool, - args.dry_run, - cameras, - ) - -def main(): - now = datetime.datetime.now() - this_year = now.year - this_semester = str(this_year) + ("B" if now.month > 6 else "A") - parser = argparse.ArgumentParser(description="Bundle observations for upload") - parser.add_argument('-r', '--dry-run', help="Commands to run are printed in debug output (implies --verbose)", action='store_true') - parser.add_argument('-v', '--verbose', help="Turn on debug output", action='store_true') - parser.add_argument('-t', '--title', help="Title of observation to collect", action='store') - parser.add_argument('-e', '--observer-email', help="Skip observations that are not by this observer (matches substrings, case-independent)", action='store') - parser.add_argument('-p', '--partial-match-ok', help="A partial match (title provided is found anywhere in recorded title) is processed", action='store_true') - parser.add_argument('-s', '--semester', help=f"Semester to search in, default: {this_semester}", default=this_semester) - parser.add_argument('--utc-start', help=f"ISO UTC datetime stamp of earliest observation start time to process (supersedes --semester)", type=parse_iso_datetime) - parser.add_argument('--utc-end', help=f"ISO UTC datetime stamp of latest observation end time to process (ignored in daemon mode)", type=parse_iso_datetime) - parser.add_argument('-c', '--camera', help=f"Camera name (i.e. rawimages subfolder name), repeat to specify multiple names. (default: all XRIF-writing cameras found)", action='append') - parser.add_argument('-X', '--data-root', help=f"Search directory for telem and rawimages subdirectories, repeat to specify multiple roots. (default: {LOOKYLOO_DATA_ROOTS.split(':')})", action='append') - parser.add_argument('-D', '--output-dir', help=f"output directory, defaults to current dir", action='store', default=os.getcwd()) - parser.add_argument('-j', '--parallel-jobs', default=8, help="Max number of parallel xrif2fits processes to launch (default: 8; if the number of archives in an interval is smaller than this, fewer processes will be launched)") - parser.add_argument('--ignore-data-integrity', help="[DEBUG USE ONLY]", action='store_true') - args = parser.parse_args() - output_path = pathlib.Path(args.output_dir) - if not output_path.is_dir(): - output_path.mkdir(parents=True, exist_ok=True) - timestamp_str = format_timestamp_for_filename(utcnow()) - log_file_path = f"./lookyloo_bundle_{timestamp_str}.log" if args.verbose or args.dry_run else None - log_format = '%(filename)s:%(lineno)d: [%(levelname)s] %(message)s' - logging.basicConfig( - level='DEBUG' if args.verbose or args.dry_run else 'INFO', - filename=log_file_path, - format=log_format - ) - # Specifying a filename results in no console output, so add it back - if args.verbose or args.dry_run: - console = logging.StreamHandler() - console.setLevel(logging.DEBUG) - logging.getLogger('').addHandler(console) - formatter = logging.Formatter(log_format) - console.setFormatter(formatter) - log.debug(f"Logging to {log_file_path}") - - cameras = args.camera - if args.data_root: - data_roots = [pathlib.Path(x) for x in args.data_root] - else: - data_roots = [pathlib.Path(x) for x in LOOKYLOO_DATA_ROOTS.split(':')] - output_dir = pathlib.Path(args.output_dir) - start_dt, end_dt = get_search_start_end_timestamps(args.semester, args.utc_start, args.utc_end) - new_observation_spans, _ = get_new_observation_spans(data_roots, set(), start_dt, end_dt, ignore_data_integrity=args.ignore_data_integrity) - - with futures.ThreadPoolExecutor(max_workers=args.parallel_jobs) as threadpool: - for span in new_observation_spans: - if span.end is None: - log.debug(f"Skipping {span} because it is an open interval") - continue - if decide_to_process(args, span): + dest_dir = self.output_dir / f'bundle_{format_timestamp_for_filename(span.begin)}_to_{format_timestamp_for_filename(span.end)}' log.info(f"Observation interval to process: {span}") - create_bundle_from_span( + dest_paths = create_bundle_from_span( span, - output_dir, - data_roots, + dest_dir, + self.path_rewrites, threadpool, - args.dry_run, - cameras, - ) \ No newline at end of file + self.dry_run, + self.common_path_prefix, + ) + log.debug(f"Bundled {len(dest_paths)} files to {dest_dir}") + +# def main(): +# now = datetime.datetime.now() +# this_year = now.year +# this_semester = str(this_year) + ("B" if now.month > 6 else "A") +# parser = argparse.ArgumentParser(description="Bundle observations for upload") +# parser.add_argument('-r', '--dry-run', help="Commands to run are printed in debug output (implies --verbose)", action='store_true') +# parser.add_argument('-v', '--verbose', help="Turn on debug output", action='store_true') +# parser.add_argument('-t', '--title', help="Title of observation to collect", action='store') +# parser.add_argument('-e', '--observer-email', help="Skip observations that are not by this observer (matches substrings, case-independent)", action='store') +# parser.add_argument('-p', '--partial-match-ok', help="A partial match (title provided is found anywhere in recorded title) is processed", action='store_true') +# parser.add_argument('-s', '--semester', help=f"Semester to search in, default: {this_semester}", default=this_semester) +# parser.add_argument('--utc-start', help=f"ISO UTC datetime stamp of earliest observation start time to process (supersedes --semester)", type=parse_iso_datetime) +# parser.add_argument('--utc-end', help=f"ISO UTC datetime stamp of latest observation end time to process (ignored in daemon mode)", type=parse_iso_datetime) +# parser.add_argument('-c', '--camera', help=f"Camera name (i.e. rawimages subfolder name), repeat to specify multiple names. (default: all XRIF-writing cameras found)", action='append') +# parser.add_argument('-X', '--data-root', help=f"Search directory for telem and rawimages subdirectories, repeat to specify multiple roots. (default: {LOOKYLOO_DATA_ROOTS.split(':')})", action='append') +# parser.add_argument('-D', '--output-dir', help=f"output directory, defaults to current dir", action='store', default=os.getcwd()) +# parser.add_argument('-j', '--parallel-jobs', default=8, help="Max number of parallel xrif2fits processes to launch (default: 8; if the number of archives in an interval is smaller than this, fewer processes will be launched)") +# parser.add_argument('--ignore-data-integrity', help="[DEBUG USE ONLY]", action='store_true') +# args = parser.parse_args() +# output_path = pathlib.Path(args.output_dir) +# if not output_path.is_dir(): +# output_path.mkdir(parents=True, exist_ok=True) +# timestamp_str = format_timestamp_for_filename(utcnow()) +# log_file_path = f"./lookyloo_bundle_{timestamp_str}.log" if args.verbose or args.dry_run else None +# log_format = '%(filename)s:%(lineno)d: [%(levelname)s] %(message)s' +# logging.basicConfig( +# level='DEBUG' if args.verbose or args.dry_run else 'INFO', +# filename=log_file_path, +# format=log_format +# ) +# # Specifying a filename results in no console output, so add it back +# if args.verbose or args.dry_run: +# console = logging.StreamHandler() +# console.setLevel(logging.DEBUG) +# logging.getLogger('').addHandler(console) +# formatter = logging.Formatter(log_format) +# console.setFormatter(formatter) +# log.debug(f"Logging to {log_file_path}") + +# cameras = args.camera +# if args.data_root: +# data_roots = [pathlib.Path(x) for x in args.data_root] +# else: +# data_roots = [pathlib.Path(x) for x in LOOKYLOO_DATA_ROOTS.split(':')] +# output_dir = pathlib.Path(args.output_dir) +# start_dt, end_dt = get_search_start_end_timestamps(args.semester, args.utc_start, args.utc_end) +# new_observation_spans, _ = get_new_observation_spans(data_roots, set(), start_dt, end_dt, ignore_data_integrity=args.ignore_data_integrity) + +# with futures.ThreadPoolExecutor(max_workers=args.parallel_jobs) as threadpool: +# for span in new_observation_spans: +# if span.end is None: +# log.debug(f"Skipping {span} because it is an open interval") +# continue +# if decide_to_process(args, span): +# log.info(f"Observation interval to process: {span}") +# create_bundle_from_span( +# span, +# output_dir, +# data_roots, +# threadpool, +# args.dry_run, +# cameras, +# ) \ No newline at end of file diff --git a/python/magaox/quicklook/core.py b/python/magaox/quicklook/core.py index c815174ff..aacd918f0 100644 --- a/python/magaox/quicklook/core.py +++ b/python/magaox/quicklook/core.py @@ -32,18 +32,32 @@ from concurrent import futures import logging import glob -import tempfile -from astropy.io import fits import shutil +import tempfile + +import xconf +from astropy.io import fits +from ..db import fetch, query_observations, query_files from ..utils import parse_iso_datetime, format_timestamp_for_filename, utcnow from ..constants import ( PRETTY_MODIFIED_TIME_FORMAT, LINE_FORMAT_REGEX, LINE_BUFFERED, MODIFIED_TIME_FORMAT, OBSERVERS_DEVICE, HISTORY_FILENAME, FAILED_HISTORY_FILENAME, XRIF2FITS_TIMEOUT_SEC, SLEEP_FOR_TELEMS, ALL_CAMERAS, DEFAULT_SEPARATE, DEFAULT_CUBE, CHECK_INTERVAL_SEC, LOOKYLOO_DATA_ROOTS, QUICKLOOK_PATH, - FOLDER_TIMESTAMP_FORMAT, + FOLDER_TIMESTAMP_FORMAT, DEFAULT_PREFIX ) log = logging.getLogger(__name__) +@xconf.config +class PathRewriteConfig: + hostname : str = xconf.field(help="Hostname under which the file was inventoried") + from_path : str = xconf.field(help="Substring to replace from the inventoried path") + to_path : str = xconf.field(help="Replacement substring for the actual 'physical' path where we read it (e.g. to use an NFS mount to read from another host)") + + def rewrite(self, hostname, path): + if hostname == self.hostname: + return path.replace(self.from_path, self.to_path) + return path + @dataclasses.dataclass(frozen=True, eq=True) class TimestampedFile: path : pathlib.Path @@ -59,6 +73,16 @@ class ObserverTelem: obs : str on : bool + @classmethod + def from_row(cls, row): + print(row) + return cls( + ts=row['ts'], + email=row['email'], + obs=row['obsName'], + on=row['observing'], + ) + def __str__(self): return f"" @@ -137,6 +161,16 @@ def __str__(self, ): endpart = f" to {self.end.strftime(PRETTY_MODIFIED_TIME_FORMAT)}" if self.end is not None else "" return f"" + @classmethod + def from_row(cls, row): + print(row) + return cls( + email=row['email'], + title=row['obsname'], + begin=row['start_ts'], + end=row['end_ts'], + ) + def xfilename_to_utc_timestamp(filename): _, filename = os.path.split(filename) name, ext = os.path.splitext(filename) @@ -294,14 +328,19 @@ def _add_span(email, title, begin, end): return spans, start_dt def get_new_observation_spans( - data_roots: typing.List[pathlib.Path], - existing_observation_spans : typing.Set[ObservationSpan], start_dt : datetime.datetime, end_dt : typing.Optional[datetime.datetime]=None, - ignore_data_integrity : bool=False, + email : typing.Optional[str]=None, + title : typing.Optional[str]=None, + existing_observation_spans : typing.Set[ObservationSpan]=None, ) -> tuple[set[ObservationSpan, datetime.datetime]]: - events = get_observation_telems(data_roots, start_dt, end_dt, ignore_data_integrity) - spans, start_dt = transform_telems_to_spans(events, start_dt, end_dt) + if existing_observation_spans is None: + existing_observation_spans = set() + # events = get_observation_telems(data_roots, start_dt, end_dt, ignore_data_integrity) + # result = fetch('observers', ec="telem_observer", start=start_dt, end=end_dt) + result = query_observations(start_dt=start_dt, end_dt=end_dt, title=title, email=email) + spans = set(ObservationSpan.from_row(r) for r in result) + if len(spans): new_observation_spans = set(spans) - existing_observation_spans return new_observation_spans, start_dt @@ -582,8 +621,8 @@ def convert_xrif( log.debug(f"Extracted {len(paths)} XRIF archives to FITS") return successful_paths, failed_paths -def decide_to_process(args, span): - if args.title is not None: +def decide_to_process(title, email): + if title is not None: title_match = span.title.strip().lower() == args.title.strip().lower() if args.partial_match_ok: title_match = title_match or args.title.strip().lower() in span.title.lower() @@ -730,104 +769,121 @@ def check_telem_in_span(span : ObservationSpan, log_file : TimestampedFile): def create_bundle_from_span( span : ObservationSpan, output_dir : pathlib.Path, - data_roots: typing.List[pathlib.Path], + path_rewrites: list[PathRewriteConfig], threadpool : futures.ThreadPoolExecutor, dry_run : bool, - cameras : typing.Optional[typing.List[str]] = None, + common_path_prefix: str=DEFAULT_PREFIX, ): log.info(f"Observation interval to bundle: {span}") - bundle_root = output_dir / f'bundle_{format_timestamp_for_filename(span.begin)}_to_{format_timestamp_for_filename(span.end)}' - log.debug(f"Staging to {bundle_root.as_posix()}") - for subfolder in ['logs', 'telem', 'rawimages']: - subfolder_path = bundle_root / subfolder - if not dry_run: - os.makedirs(subfolder_path, exist_ok=True) - log.debug(f"mkdir {subfolder_path}") - else: - log.debug(f"dry run: mkdir {subfolder_path}") + files = query_files(span.begin, span.end) + copy_tasks = [] + for file_row in files: + real_path = file_row['origin_path'] + path_sans_prefix = pathlib.Path(file_row['origin_path']).relative_to(common_path_prefix) + origin_host = file_row['origin_host'] + for rw in path_rewrites: + real_path = rw.rewrite(origin_host, real_path) + if not os.path.exists(real_path): + raise RuntimeError(f"Cannot bundle {real_path=} for {origin_host}:{file_row['origin_path']} because file does not exist") + + dest = output_dir / path_sans_prefix + copy_tasks.append(threadpool.submit(_copy_task, real_path, dest, dry_run)) + dest_paths = [dst for dst in futures.as_completed(copy_tasks)] + log.debug("dest_paths = %s", dest_paths) + return dest_paths - bundle_contents = [] - for data_root in data_roots: - # collect logs - logs_root = data_root / 'logs' - for devname in find_device_names_in_folder(logs_root, 'binlog'): - log_files = get_matching_paths( - logs_root, - device=devname, - extension='binlog', - newer_than_dt=span.begin, - older_than_dt=span.end, - grab_one_before_start=True, - ) - if not len(log_files): - continue - elif not check_log_in_span(span, log_files[0]): - # drop first file if it was grabbed by optimistic matching for pre-interval-start frames - log_files = log_files[1:] - bundle_contents.extend(stage_for_bundling( - data_root, - log_files, - bundle_root, - dry_run, - threadpool, - )) - # collect telems - telem_root = data_root / 'telem' - for devname in find_device_names_in_folder(telem_root, 'bintel'): - telem_files = get_matching_paths( - telem_root, - device=devname, - extension='bintel', - newer_than_dt=span.begin, - older_than_dt=span.end, - grab_one_before_start=True, - ) - if not len(telem_files): - continue - elif not check_telem_in_span(span, telem_files[0]): - # drop first file if it was grabbed by optimistic matching for pre-interval-start frames - telem_files = telem_files[1:] - bundle_contents.extend(stage_for_bundling( - data_root, - telem_files, - bundle_root, - dry_run, - threadpool, - )) - # for every camera: - if cameras is not None: - images_dirs = list(filter(lambda x: x.is_dir(), [data_root / 'rawimages' / camname for camname in cameras])) - else: - images_dirs = list(filter(lambda x: x.is_dir(), (data_root / 'rawimages').glob('*'))) - log.debug(f"{data_root=} {images_dirs=}") - for imgdir in images_dirs: - log.debug(f"{imgdir=}") - # collect image archives - cam_files = get_matching_paths( - imgdir, - device=imgdir.name, - extension='xrif', - newer_than_dt=span.begin, - older_than_dt=span.end, - grab_one_before_start=True, - ) - if not len(cam_files): - continue - elif not check_xrif_in_span(span, data_roots, cam_files[0]): - # drop first file if it was grabbed by optimistic matching for pre-interval-start frames - cam_files = cam_files[1:] - # only make dir if there's a nonzero number of camera archives - images_dest = bundle_root / 'rawimages' / imgdir.name - if not dry_run: - os.makedirs(images_dest, exist_ok=True) - log.debug(f"mkdir {images_dest}") - else: - log.debug(f"dry run: mkdir {images_dest}") - bundle_contents.extend(stage_for_bundling( - data_root, - cam_files, - bundle_root, - dry_run, - threadpool, - )) - return bundle_root \ No newline at end of file + # bundle_root = output_dir / f'bundle_{format_timestamp_for_filename(span.begin)}_to_{format_timestamp_for_filename(span.end)}' + # log.debug(f"Staging to {bundle_root.as_posix()}") + # for subfolder in ['logs', 'telem', 'rawimages']: + # subfolder_path = bundle_root / subfolder + # if not dry_run: + # os.makedirs(subfolder_path, exist_ok=True) + # log.debug(f"mkdir {subfolder_path}") + # else: + # log.debug(f"dry run: mkdir {subfolder_path}") + + # bundle_contents = [] + # for data_root in data_roots: + # # collect logs + # logs_root = data_root / 'logs' + # for devname in find_device_names_in_folder(logs_root, 'binlog'): + # log_files = get_matching_paths( + # logs_root, + # device=devname, + # extension='binlog', + # newer_than_dt=span.begin, + # older_than_dt=span.end, + # grab_one_before_start=True, + # ) + # if not len(log_files): + # continue + # elif not check_log_in_span(span, log_files[0]): + # # drop first file if it was grabbed by optimistic matching for pre-interval-start frames + # log_files = log_files[1:] + # bundle_contents.extend(stage_for_bundling( + # data_root, + # log_files, + # bundle_root, + # dry_run, + # threadpool, + # )) + # # collect telems + # telem_root = data_root / 'telem' + # for devname in find_device_names_in_folder(telem_root, 'bintel'): + # telem_files = get_matching_paths( + # telem_root, + # device=devname, + # extension='bintel', + # newer_than_dt=span.begin, + # older_than_dt=span.end, + # grab_one_before_start=True, + # ) + # if not len(telem_files): + # continue + # elif not check_telem_in_span(span, telem_files[0]): + # # drop first file if it was grabbed by optimistic matching for pre-interval-start frames + # telem_files = telem_files[1:] + # bundle_contents.extend(stage_for_bundling( + # data_root, + # telem_files, + # bundle_root, + # dry_run, + # threadpool, + # )) + # # for every camera: + # if cameras is not None: + # images_dirs = list(filter(lambda x: x.is_dir(), [data_root / 'rawimages' / camname for camname in cameras])) + # else: + # images_dirs = list(filter(lambda x: x.is_dir(), (data_root / 'rawimages').glob('*'))) + # log.debug(f"{data_root=} {images_dirs=}") + # for imgdir in images_dirs: + # log.debug(f"{imgdir=}") + # # collect image archives + # cam_files = get_matching_paths( + # imgdir, + # device=imgdir.name, + # extension='xrif', + # newer_than_dt=span.begin, + # older_than_dt=span.end, + # grab_one_before_start=True, + # ) + # if not len(cam_files): + # continue + # elif not check_xrif_in_span(span, data_roots, cam_files[0]): + # # drop first file if it was grabbed by optimistic matching for pre-interval-start frames + # cam_files = cam_files[1:] + # # only make dir if there's a nonzero number of camera archives + # images_dest = bundle_root / 'rawimages' / imgdir.name + # if not dry_run: + # os.makedirs(images_dest, exist_ok=True) + # log.debug(f"mkdir {images_dest}") + # else: + # log.debug(f"dry run: mkdir {images_dest}") + # bundle_contents.extend(stage_for_bundling( + # data_root, + # cam_files, + # bundle_root, + # dry_run, + # threadpool, + # )) + # return bundle_root \ No newline at end of file