From dd0cc6cbdc39cce03dddda07a2b84d476722291d Mon Sep 17 00:00:00 2001 From: Mike McCann Date: Thu, 23 Oct 2025 10:34:05 -0700 Subject: [PATCH] Implement --filter_monotonic_time as a base level QC step. The coordiante variable (*time) for each data variable needs to be monotonically increasing. The data is unusable if this is not the case. Remove any data (the default behaviour) where time is not monotonically increasing --- src/data/nc42netcdfs.py | 324 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 317 insertions(+), 7 deletions(-) diff --git a/src/data/nc42netcdfs.py b/src/data/nc42netcdfs.py index 877a7adf..6411d3ce 100755 --- a/src/data/nc42netcdfs.py +++ b/src/data/nc42netcdfs.py @@ -276,26 +276,324 @@ def _get_available_variables( self.logger.debug(" Variables to extract: %s", vars_to_extract) return vars_to_extract + def _find_time_coordinate(self, src_group: netCDF4.Group) -> str: + """Find the time coordinate variable in a group using introspection. + + Returns: + str: Name of the time coordinate variable, or empty string if not found + """ + # Strategy 1: Look for variables with "time" in the name (most common) + time_vars = [var_name for var_name in src_group.variables if "time" in var_name.lower()] + if time_vars: + # Prefer variables that start with 'time' (like time_NAL9602) + time_vars.sort(key=lambda x: (not x.lower().startswith("time"), x)) + self.logger.debug("Found time coordinate %s via name pattern", time_vars[0]) + return time_vars[0] + + # Strategy 2: Look for variables with time-like units + for var_name, var in src_group.variables.items(): + if hasattr(var, "units"): + units = getattr(var, "units", "").lower() + time_patterns = ["seconds since", "days since", "hours since"] + if any(pattern in units for pattern in time_patterns): + self.logger.debug("Found time coordinate %s via units", var_name) + return var_name + + # Strategy 3: Look for unlimited dimension (backup) + for dim_name, dim in src_group.dimensions.items(): + if dim.isunlimited() and dim_name in src_group.variables: + self.logger.debug("Found time coordinate %s via unlimited dimension", dim_name) + return dim_name + + self.logger.debug("No time coordinate found in group") + return "" + + def _get_time_filters_for_variables( + self, src_group: netCDF4.Group, vars_to_extract: list[str] + ) -> dict[str, dict]: + """Get time filtering information for time coordinates used by vars_to_extract. + + Returns: + dict: Map of time_coord_name -> {"indices": list[int], "filtered": bool} + """ + time_filters = {} + + # Check if time filtering is enabled + if not getattr(self.args, "filter_monotonic_time", True): + return time_filters + + # Find all time coordinates used by variables in extraction list + time_coords_found = set() + for var_name in vars_to_extract: + if var_name in src_group.variables: + var = src_group.variables[var_name] + + # Check each dimension to see if it's a time coordinate + for dim_name in var.dimensions: + if dim_name in src_group.variables: + dim_var = src_group.variables[dim_name] + + # Check if this dimension variable is a time coordinate + if self._is_time_variable(dim_name, dim_var): + time_coords_found.add(dim_name) + + # Now process each unique time coordinate found + for time_coord_name in time_coords_found: + time_var = src_group.variables[time_coord_name] + time_data = time_var[:] + mono_indices = self._get_monotonic_indices(time_data) + + # Check if filtering was actually needed + filtered = len(mono_indices) < len(time_data) + if filtered: + self.logger.info( + "Time coordinate %s: filtered %d non-monotonic points (%d -> %d)", + time_coord_name, + len(time_data) - len(mono_indices), + len(time_data), + len(mono_indices), + ) + + time_filters[time_coord_name] = {"indices": mono_indices, "filtered": filtered} + + return time_filters + + def _is_time_variable(self, var_name: str, var) -> bool: + """Check if a variable is a time coordinate variable.""" + # Check name pattern + if "time" in var_name.lower(): + return True + + # Check units + if hasattr(var, "units"): + units = getattr(var, "units", "").lower() + time_patterns = ["seconds since", "days since", "hours since"] + if any(pattern in units for pattern in time_patterns): + return True + + return False + + def _get_monotonic_indices(self, time_data) -> list[int]: + """Get indices for monotonic time values from time data array.""" + mono_indices = [] + if len(time_data) > 0: + mono_indices.append(0) # Always include first point + + for i in range(1, len(time_data)): + if time_data[i] > time_data[mono_indices[-1]]: + mono_indices.append(i) + + return mono_indices + + def _get_monotonic_time_indices(self, src_group: netCDF4.Group) -> tuple[list[int], bool]: + """Get indices for monotonically increasing time data. + + Returns: + list[int]: List of indices for monotonic time points + bool: True if filtering was applied + """ + # Check if time filtering is enabled + if not getattr(self.args, "filter_monotonic_time", True): + return [], False + + # Find the time coordinate variable using introspection + time_var_name = self._find_time_coordinate(src_group) + if not time_var_name: + # No time variable found, return all data + return [], False + + time_var = src_group.variables[time_var_name] + time_data = time_var[:] + + # Find monotonically increasing indices + mono_indices = [] + if len(time_data) > 0: + mono_indices.append(0) # Always include first point + + for i in range(1, len(time_data)): + if time_data[i] > time_data[mono_indices[-1]]: + mono_indices.append(i) + else: + self.logger.debug( + "Non-monotonic time value at index %d: %s <= %s (var: %s)", + i, + time_data[i], + time_data[mono_indices[-1]], + time_var_name, + ) + + total_points = len(time_data) + filtered_points = len(mono_indices) + + if filtered_points < total_points: + self.logger.warning( + "Filtered %d non-monotonic time points (kept %d/%d) for variable %s", + total_points - filtered_points, + filtered_points, + total_points, + time_var_name, + ) + return mono_indices, True + + return mono_indices, False + + def _copy_variable_with_appropriate_time_filter( + self, + src_group: netCDF4.Group, + dst_dataset: netCDF4.Dataset, + var_name: str, + time_filters: dict[str, dict], + ): + """Copy a variable with appropriate time filtering applied.""" + try: + src_var = src_group.variables[var_name] + + # Create variable in destination + dst_var = dst_dataset.createVariable( + var_name, + src_var.dtype, + src_var.dimensions, + zlib=True, + complevel=6, + shuffle=True, + fletcher32=True, + ) + + # Check if this variable itself is a time coordinate that needs filtering + if var_name in time_filters and time_filters[var_name]["filtered"]: + # This is a time coordinate variable that needs filtering + time_indices = time_filters[var_name]["indices"] + dst_var[:] = src_var[:][time_indices] + self.logger.debug("Applied time filtering to time coordinate %s", var_name) + + # Check if this variable depends on any filtered time dimensions + elif src_var.dimensions: + # Find which (if any) of this variable's dimensions are filtered time coordinates + filtered_dims = {} + for dim_name in src_var.dimensions: + if dim_name in time_filters and time_filters[dim_name]["filtered"]: + filtered_dims[dim_name] = time_filters[dim_name]["indices"] + + if filtered_dims: + # Apply filtering for the appropriate dimensions + self._apply_multidimensional_time_filter( + src_var, dst_var, var_name, filtered_dims + ) + else: + # No time filtering needed + dst_var[:] = src_var[:] + else: + # Scalar variable or no dimensions + dst_var[:] = src_var[:] + + # Copy attributes + for attr_name in src_var.ncattrs(): + dst_var.setncattr(attr_name, src_var.getncattr(attr_name)) + + self.logger.debug(" Copied variable: %s", var_name) + + except Exception as e: # noqa: BLE001 + self.logger.warning("Failed to copy variable %s: %s", var_name, e) + + def _apply_multidimensional_time_filter( + self, src_var, dst_var, var_name: str, filtered_dims: dict[str, list[int]] + ): + """Apply time filtering to a multi-dimensional variable.""" + # For now, handle the common case where time is the first dimension + if len(filtered_dims) == 1: + dim_name = list(filtered_dims.keys())[0] + time_indices = filtered_dims[dim_name] + + if src_var.dimensions[0] == dim_name: + # Time is first dimension + if len(src_var.dimensions) == 1: + # 1D variable + dst_var[:] = src_var[:][time_indices] + else: + # Multi-dimensional with time as first dimension + dst_var[:] = src_var[:][time_indices, ...] + self.logger.debug( + "Applied time filtering to variable %s (dim: %s)", var_name, dim_name + ) + else: + # Time dimension is not first - more complex indexing needed + self.logger.warning( + "Variable %s has filtered time dimension %s but not as first dimension - " + "copying all data", + var_name, + dim_name, + ) + dst_var[:] = src_var[:] + else: + # Multiple time dimensions filtered - complex case + self.logger.warning( + "Variable %s has multiple filtered time dimensions - copying all data", var_name + ) + dst_var[:] = src_var[:] + + def _create_dimensions_with_time_filters( + self, + src_group: netCDF4.Group, + dst_dataset: netCDF4.Dataset, + dims_needed: set[str], + time_filters: dict[str, dict], + ): + """Create dimensions in the destination dataset, adjusting time dimensions if filtered.""" + for dim_name in dims_needed: + if dim_name in src_group.dimensions: + src_dim = src_group.dimensions[dim_name] + + # Check if this dimension corresponds to a filtered time variable + if dim_name in time_filters and time_filters[dim_name]["filtered"]: + # Use the number of filtered time points + filtered_size = len(time_filters[dim_name]["indices"]) + size = filtered_size if not src_dim.isunlimited() else None + self.logger.debug( + "Created filtered time dimension %s: %s -> %s", + dim_name, + len(src_dim), + size or filtered_size, + ) + else: + size = len(src_dim) if not src_dim.isunlimited() else None + + dst_dataset.createDimension(dim_name, size) + def _create_netcdf_file( self, src_group: netCDF4.Group, vars_to_extract: list[str], output_file: Path ): - """Create a new NetCDF file with the specified variables.""" + """Create a new NetCDF file with the specified variables and monotonic time.""" + # Get time filtering information for each time variable + time_filters = self._get_time_filters_for_variables(src_group, vars_to_extract) + with netCDF4.Dataset(output_file, "w", format="NETCDF4") as dst_dataset: # Copy global attributes self._copy_global_attributes(src_group, dst_dataset) - # Create dimensions + # Add note about time filtering if applied + if any(tf["filtered"] for tf in time_filters.values()): + dst_dataset.setncattr( + "processing_note", "Non-monotonic time values filtered out during extraction" + ) + + # Create dimensions - may need to adjust time dimension sizes dims_needed = self._get_required_dimensions(src_group, vars_to_extract) - self._create_dimensions(src_group, dst_dataset, dims_needed) + self._create_dimensions_with_time_filters( + src_group, dst_dataset, dims_needed, time_filters + ) - # Copy coordinate variables + # Copy coordinate variables with time filtering coord_vars = self._get_coordinate_variables(src_group, dims_needed, vars_to_extract) for var_name in coord_vars: - self._copy_variable(src_group, dst_dataset, var_name) + self._copy_variable_with_appropriate_time_filter( + src_group, dst_dataset, var_name, time_filters + ) - # Copy requested variables + # Copy requested variables with time filtering for var_name in vars_to_extract: - self._copy_variable(src_group, dst_dataset, var_name) + self._copy_variable_with_appropriate_time_filter( + src_group, dst_dataset, var_name, time_filters + ) def _copy_global_attributes(self, src_group: netCDF4.Group, dst_dataset: netCDF4.Dataset): """Copy global attributes from source to destination.""" @@ -438,6 +736,18 @@ def process_command_line(self): action="store_true", help="Use with --noinput to not re-process existing downloaded log files", ) + parser.add_argument( + "--filter_monotonic_time", + action="store_true", + default=True, + help="Filter out non-monotonic time values (default: True)", + ) + parser.add_argument( + "--no_filter_monotonic_time", + dest="filter_monotonic_time", + action="store_false", + help="Keep all time values, including non-monotonic ones", + ) parser.add_argument( "--start", action="store",