diff --git a/src/data/AUV.py b/src/data/AUV.py index 75accb25..c8bef718 100755 --- a/src/data/AUV.py +++ b/src/data/AUV.py @@ -9,9 +9,8 @@ """ import logging -from datetime import UTC, datetime +from datetime import datetime -import coards import numpy as np import xarray as xr @@ -28,29 +27,6 @@ def monotonic_increasing_time_indices(time_array: np.array) -> np.ndarray: return np.array(monotonic) -class AUV: - def add_global_metadata(self): - iso_now = datetime.now(UTC).isoformat() + "Z" - - self.nc_file.netcdf_version = "4" - self.nc_file.Conventions = "CF-1.6" - self.nc_file.date_created = iso_now - self.nc_file.date_update = iso_now - self.nc_file.date_modified = iso_now - self.nc_file.featureType = "trajectory" - - self.nc_file.comment = "" - - self.nc_file.time_coverage_start = ( - coards.from_udunits(self.time[0], self.time.units).isoformat() + "Z" - ) - self.nc_file.time_coverage_end = ( - coards.from_udunits(self.time[-1], self.time.units).isoformat() + "Z" - ) - - self.nc_file.distribution_statement = "Any use requires prior approval from MBARI" - - def nudge_positions( # noqa: C901, PLR0912, PLR0913, PLR0915 nav_longitude: xr.DataArray, nav_latitude: xr.DataArray, diff --git a/src/data/combine.py b/src/data/combine.py index f39a1bd0..e29963cb 100755 --- a/src/data/combine.py +++ b/src/data/combine.py @@ -181,12 +181,12 @@ def global_metadata(self): metadata["license"] = metadata["distribution_statement"] metadata["useconst"] = "Not intended for legal use. Data may contain inaccuracies." metadata["history"] = f"Created by {self.commandline} on {iso_now}" - + log_file = self.args.log_file metadata["title"] = ( - f"Calibrated AUV sensor data from {self.args.auv_name} mission {self.args.mission}" + f"Combined LRAUV data from {log_file} - relevant variables extracted for STOQS" ) metadata["summary"] = ( - "Observational oceanographic data obtained from an Autonomous" + "Observational oceanographic data obtained from a Long Range Autonomous" " Underwater Vehicle mission with measurements at" " original sampling intervals. The data have been processed" " by MBARI's auv-python software." @@ -557,13 +557,23 @@ def combine_groups(self): group_files = sorted(src_dir.glob(f"{Path(log_file).stem}_{GROUP}_*.nc")) self.combined_nc = xr.Dataset() for group_file in group_files: - self.logger.info("Found group file: %s", group_file) + self.logger.info("Group file: %s", group_file.name) # Make nudged_longitude, nudged_latitude = self._nudge_pos() call on when appropriate + # Loop through each variable in the group file and add it to the combined_nc member list + with xr.open_dataset(group_file) as ds: + for orig_var in ds.variables: + if orig_var.lower().endswith("time"): + self.logger.debug("Skipping time variable: %s", orig_var) + continue + new_group = group_file.stem.split(f"{GROUP}_")[1].replace("_", "").lower() + new_var = new_group + "_" + orig_var.lower() + self.logger.info("Adding variable %-65s %s", f"{orig_var} as", new_var) + self.combined_nc[new_var] = ds[orig_var] def write_netcdf(self) -> None: log_file = self.args.log_file netcdfs_dir = Path(BASE_LRAUV_PATH, Path(log_file).parent) - out_fn = Path(netcdfs_dir, f"{self.args.log_file.stem}_cal.nc") + out_fn = Path(netcdfs_dir, f"{Path(log_file).stem}_cal.nc") self.combined_nc.attrs = self.global_metadata() self.logger.info("Writing combined group data to %s", out_fn) @@ -641,5 +651,5 @@ def process_command_line(self): combine.process_command_line() start = time.time() combine.combine_groups() - ##combine.write_netcdf() + combine.write_netcdf() combine.logger.info("Time to process: %.2f seconds", (time.time() - start)) diff --git a/src/data/correct_log_times.py b/src/data/correct_log_times.py index 6604cf22..0a417f84 100755 --- a/src/data/correct_log_times.py +++ b/src/data/correct_log_times.py @@ -18,7 +18,6 @@ from pathlib import Path from shutil import copyfile -from AUV import AUV from logs2netcdfs import AUV_NetCDF from readauvlog import log_record @@ -41,7 +40,7 @@ TIME = "time" -class TimeCorrect(AUV): +class TimeCorrect: logger = logging.getLogger(__name__) _handler = logging.StreamHandler() _handler.setFormatter(AUV_NetCDF._formatter) diff --git a/src/data/logs2netcdfs.py b/src/data/logs2netcdfs.py index 28da0359..33d4e315 100755 --- a/src/data/logs2netcdfs.py +++ b/src/data/logs2netcdfs.py @@ -17,15 +17,17 @@ import subprocess import sys import time +from datetime import UTC, datetime from http import HTTPStatus from pathlib import Path import aiofiles +import coards import numpy as np import requests from aiohttp import ClientSession from aiohttp.client_exceptions import ClientConnectorError -from AUV import AUV, monotonic_increasing_time_indices +from AUV import monotonic_increasing_time_indices from netCDF4 import Dataset from readauvlog import log_record @@ -57,7 +59,7 @@ class CustomException(Exception): pass -class AUV_NetCDF(AUV): +class AUV_NetCDF: logger = logging.getLogger(__name__) _handler = logging.StreamHandler() _formatter = logging.Formatter( @@ -662,6 +664,27 @@ def _remove_bad_values(self, netcdf_filename): self.nc_file.close() self.logger.info("Wrote (without bad values) %s", netcdf_filename) + def add_global_metadata(self): + iso_now = datetime.now(UTC).isoformat() + "Z" + + self.nc_file.netcdf_version = "4" + self.nc_file.Conventions = "CF-1.6" + self.nc_file.date_created = iso_now + self.nc_file.date_update = iso_now + self.nc_file.date_modified = iso_now + self.nc_file.featureType = "trajectory" + + self.nc_file.comment = "" + + self.nc_file.time_coverage_start = ( + coards.from_udunits(self.time[0], self.time.units).isoformat() + "Z" + ) + self.nc_file.time_coverage_end = ( + coards.from_udunits(self.time[-1], self.time.units).isoformat() + "Z" + ) + + self.nc_file.distribution_statement = "Any use requires prior approval from MBARI" + def _process_log_file(self, log_filename, netcdf_filename, src_dir=None): log_data = self.read(log_filename) if Path(netcdf_filename).exists(): diff --git a/src/data/nc42netcdfs.py b/src/data/nc42netcdfs.py index 32584a18..aa3ace2b 100755 --- a/src/data/nc42netcdfs.py +++ b/src/data/nc42netcdfs.py @@ -12,12 +12,13 @@ import logging import os import sys +from datetime import UTC, datetime from pathlib import Path from typing import Any +import git import netCDF4 import pooch -import xarray as xr # Local directory that serves as the work area for log_files and netcdf files BASE_LRAUV_WEB = "https://dods.mbari.org/data/lrauv/" @@ -99,7 +100,9 @@ "DeadReckonUsingMultipleVelocitySources": [ { "name": "fix_residual_percent_distance_traveled", - "rename": "fix_residual_percent_distance_traveled_DeadReckonUsingMultipleVelocitySources", # noqa: E501 + "rename": ( + "fix_residual_percent_distance_traveled_DeadReckonUsingMultipleVelocitySources" + ), }, {"name": "longitude", "rename": "pose_longitude_DeadReckonUsingMultipleVelocitySources"}, {"name": "latitude", "rename": "pose_latitude_DeadReckonUsingMultipleVelocitySources"}, @@ -209,7 +212,7 @@ def extract_groups_to_files_netcdf4(self, log_file: str) -> Path: self.logger.info("Extracting data from %s", input_file) with netCDF4.Dataset(input_file, "r") as src_dataset: # Extract root group first - self._extract_root_group(src_dataset, log_file, netcdfs_dir) + self._extract_root_group(log_file, "/", src_dataset, netcdfs_dir) # Extract all other groups all_groups = list(src_dataset.groups.keys()) @@ -218,11 +221,13 @@ def extract_groups_to_files_netcdf4(self, log_file: str) -> Path: if group_name != "/" and group_name not in all_groups: self.logger.warning("Group %s not found in %s", group_name, input_file) continue - self._extract_single_group(src_dataset, group_name, log_file, netcdfs_dir) + self._extract_single_group(log_file, group_name, src_dataset, netcdfs_dir) return netcdfs_dir - def _extract_root_group(self, src_dataset: netCDF4.Dataset, log_file: str, output_dir: Path): + def _extract_root_group( + self, log_file: str, group_name: str, src_dataset: netCDF4.Dataset, output_dir: Path + ): """Extract variables from the root group to _{GROUP}_Universals.nc.""" root_parms = SCIENG_PARMS.get("/", []) if not root_parms: @@ -234,7 +239,9 @@ def _extract_root_group(self, src_dataset: netCDF4.Dataset, log_file: str, outpu if vars_to_extract: output_file = output_dir / f"{Path(log_file).stem}_{GROUP}_Universals.nc" - self._create_netcdf_file(src_dataset, vars_to_extract, output_file) + self._create_netcdf_file( + log_file, group_name, src_dataset, vars_to_extract, output_file + ) self.logger.info("Extracted root group '/' to %s", output_file) else: self.logger.warning("No requested variables found in root group '/'") @@ -243,7 +250,11 @@ def _extract_root_group(self, src_dataset: netCDF4.Dataset, log_file: str, outpu self.logger.warning("Could not extract root group '/': %s", e) def _extract_single_group( - self, src_dataset: netCDF4.Dataset, group_name: str, log_file: str, output_dir: Path + self, + log_file: str, + group_name: str, + src_dataset: netCDF4.Dataset, + output_dir: Path, ): "Extract a single group to its own NetCDF file named like _{GROUP}_.nc." group_parms = SCIENG_PARMS[group_name] @@ -256,7 +267,9 @@ def _extract_single_group( if vars_to_extract: output_file = output_dir / f"{Path(log_file).stem}_{GROUP}_{group_name}.nc" - self._create_netcdf_file(src_group, vars_to_extract, output_file) + self._create_netcdf_file( + log_file, group_name, src_group, vars_to_extract, output_file + ) self.logger.info("Extracted %s to %s", group_name, output_file) else: self.logger.warning("No requested variables found in group %s", group_name) @@ -310,7 +323,7 @@ def _find_time_coordinate(self, src_group: netCDF4.Group) -> str: return "" def _get_time_filters_for_variables( - self, src_group: netCDF4.Group, vars_to_extract: list[str] + self, log_file: str, group_name: str, src_group: netCDF4.Group, vars_to_extract: list[str] ) -> dict[str, dict]: """Get time filtering information for time coordinates used by vars_to_extract. @@ -325,6 +338,10 @@ def _get_time_filters_for_variables( # Find all time coordinates used by variables in extraction list time_coords_found = set() + self.logger.debug( + "=================================== Group: %s =======================================", + group_name, + ) for var_name in vars_to_extract: if var_name in src_group.variables: var = src_group.variables[var_name] @@ -342,28 +359,32 @@ def _get_time_filters_for_variables( for time_coord_name in time_coords_found: time_var = src_group.variables[time_coord_name] time_data = time_var[:] + self.logger.debug("Time coordinate %s: %d points", time_coord_name, len(time_data)) mono_indices = self._get_monotonic_indices(time_data) # Check if filtering was actually needed filtered = len(mono_indices) < len(time_data) + comment = "" if filtered: - self.logger.info( - "Time coordinate %s: filtered %d non-monotonic points (%d -> %d), %.2f%%", - time_coord_name, - len(time_data) - len(mono_indices), - len(time_data), - len(mono_indices), - 100 * (len(time_data) - len(mono_indices)) / len(time_data), + comment = ( + f"Filtered {len(time_data) - len(mono_indices)} non-monotonic points " + f"({len(time_data)} -> {len(mono_indices)}), " + f"{100 * (len(time_data) - len(mono_indices)) / len(time_data):.2f}%" ) + self.logger.info("Time coordinate %s: %s", time_coord_name, comment) - time_filters[time_coord_name] = {"indices": mono_indices, "filtered": filtered} + time_filters[time_coord_name] = { + "indices": mono_indices, + "filtered": filtered, + "comment": comment, + } return time_filters def _is_time_variable(self, var_name: str, var) -> bool: """Check if a variable is a time coordinate variable.""" # Check name pattern - if "time" in var_name.lower(): + if var_name.lower().endswith("time"): return True # Check units @@ -379,37 +400,7 @@ def _get_monotonic_indices(self, time_data) -> list[int]: """Get indices for monotonic time values from time data array.""" mono_indices = [] if len(time_data) > 0: - mono_indices.append(0) # Always include first point - - for i in range(1, len(time_data)): - if time_data[i] > time_data[mono_indices[-1]]: - mono_indices.append(i) - - return mono_indices - - def _get_monotonic_time_indices(self, src_group: netCDF4.Group) -> tuple[list[int], bool]: - """Get indices for monotonically increasing time data. - - Returns: - list[int]: List of indices for monotonic time points - bool: True if filtering was applied - """ - # Check if time filtering is enabled - if not getattr(self.args, "filter_monotonic_time", True): - return [], False - - # Find the time coordinate variable using introspection - time_var_name = self._find_time_coordinate(src_group) - if not time_var_name: - # No time variable found, return all data - return [], False - - time_var = src_group.variables[time_var_name] - time_data = time_var[:] - - # Find monotonically increasing indices - mono_indices = [] - if len(time_data) > 0: + # TODO: What if first point is not valid? May need to a pre-filtering step. mono_indices.append(0) # Always include first point for i in range(1, len(time_data)): @@ -417,27 +408,13 @@ def _get_monotonic_time_indices(self, src_group: netCDF4.Group) -> tuple[list[in mono_indices.append(i) else: self.logger.debug( - "Non-monotonic time value at index %d: %s <= %s (var: %s)", + "Non-monotonic time value at index %8d: %17.6f <= %17.6f", i, time_data[i], time_data[mono_indices[-1]], - time_var_name, ) - total_points = len(time_data) - filtered_points = len(mono_indices) - - if filtered_points < total_points: - self.logger.warning( - "Filtered %d non-monotonic time points (kept %d/%d) for variable %s", - total_points - filtered_points, - filtered_points, - total_points, - time_var_name, - ) - return mono_indices, True - - return mono_indices, False + return mono_indices def _copy_variable_with_appropriate_time_filter( self, @@ -466,6 +443,7 @@ def _copy_variable_with_appropriate_time_filter( # This is a time coordinate variable that needs filtering time_indices = time_filters[var_name]["indices"] dst_var[:] = src_var[:][time_indices] + dst_var.setncattr("comment", time_filters[var_name]["comment"]) self.logger.debug("Applied time filtering to time coordinate %s", var_name) # Check if this variable depends on any filtered time dimensions @@ -561,21 +539,34 @@ def _create_dimensions_with_time_filters( dst_dataset.createDimension(dim_name, size) - def _create_netcdf_file( - self, src_group: netCDF4.Group, vars_to_extract: list[str], output_file: Path + def _create_netcdf_file( # noqa: PLR0913 + self, + log_file: str, + group_name: str, + src_group: netCDF4.Group, + vars_to_extract: list[str], + output_file: Path, ): """Create a new NetCDF file with the specified variables and monotonic time.""" # Get time filtering information for each time variable - time_filters = self._get_time_filters_for_variables(src_group, vars_to_extract) + time_filters = self._get_time_filters_for_variables( + log_file, group_name, src_group, vars_to_extract + ) with netCDF4.Dataset(output_file, "w", format="NETCDF4") as dst_dataset: # Copy global attributes self._copy_global_attributes(src_group, dst_dataset) + # Add standard global attributes + log_file = self.args.log_file + for attr_name, attr_value in self.global_metadata(log_file, group_name).items(): + dst_dataset.setncattr(attr_name, attr_value) + # Add note about time filtering if applied if any(tf["filtered"] for tf in time_filters.values()): dst_dataset.setncattr( - "processing_note", "Non-monotonic time values filtered out during extraction" + "processing_note", + "Non-monotonic time values filtered from original, see comment in variables", ) # Create dimensions - may need to adjust time dimension sizes @@ -659,37 +650,49 @@ def _copy_variable(self, src_group: netCDF4.Group, dst_dataset: netCDF4.Dataset, except Exception as e: # noqa: BLE001 self.logger.warning("Failed to copy variable %s: %s", var_name, e) - def extract_groups_to_files(self, input_file, output_dir): - """Extract each group to a separate NetCDF file.""" - output_dir = Path(output_dir) - output_dir.mkdir(exist_ok=True, parents=True) - - all_groups = self.get_groups_netcdf4(input_file) - - self.logger.info("Extracting data from %s", input_file) - for group_name, group_parms in SCIENG_PARMS.items(): - if group_name not in all_groups: - self.logger.warning("Group %s not found in %s", group_name, input_file) - continue - try: - self.logger.info(" Group %s", group_name) - ds = xr.open_dataset(input_file, group=group_name) - output_file = output_dir / f"{group_name}.nc" - # Output only the variables of interest - parms = [p["name"] for p in group_parms if "name" in p] - self.logger.debug(" Variables to extract: %s", parms) - ds = ds[parms] - ds.to_netcdf(path=str(output_file), format="NETCDF4") - ds.close() - self.logger.info("Extracted %s to %s", group_name, output_file) - except (FileNotFoundError, OSError, ValueError): - self.logger.warning("Could not extract %s", group_name) - except KeyError: - self.logger.warning("Variable %s not found in group %s", parms, group_name) - except TypeError: - self.logger.warning( - "Type error processing group %s: %s", group_name, sys.exc_info() - ) + def global_metadata(self, log_file: str, group_name: str): + """Use instance variables to return a dictionary of + metadata specific for the data that are written + """ + repo = git.Repo(search_parent_directories=True) + try: + gitcommit = repo.head.object.hexsha + except (ValueError, BrokenPipeError) as e: + self.logger.warning( + "could not get head commit sha for %s: %s", + repo.remotes.origin.url, + e, + ) + gitcommit = "" + iso_now = datetime.now(UTC).isoformat() + "Z" + + metadata = {} + metadata["netcdf_version"] = "4" + metadata["Conventions"] = "CF-1.6" + metadata["date_created"] = iso_now + metadata["date_update"] = iso_now + metadata["date_modified"] = iso_now + + metadata["distribution_statement"] = "Any use requires prior approval from MBARI" + metadata["license"] = metadata["distribution_statement"] + metadata["useconst"] = "Not intended for legal use. Data may contain inaccuracies." + metadata["history"] = f"Created by {self.commandline} on {iso_now}" + log_file = self.args.log_file + metadata["title"] = f"Extracted LRAUV data from {log_file}, Group: {group_name}" + metadata["source"] = ( + f"MBARI LRAUV data extracted from {log_file}" + f" with execution of '{self.commandline}' at {iso_now}" + f" using git commit {gitcommit} from" + f" software at 'https://github.com/mbari-org/auv-python'" + ) + metadata["summary"] = ( + "Observational oceanographic data obtained from a Long Range Autonomous" + " Underwater Vehicle mission with measurements at original sampling" + f" intervals. The data in group {group_name} have been extracted from the" + " original .nc4 log file with non-monotonic time values removed using" + " MBARI's auv-python software" + ) + return metadata def process_command_line(self): examples = "Examples:" + "\n\n"