diff --git a/src/data/nc42netcdfs.py b/src/data/nc42netcdfs.py index 877a7adf..6411d3ce 100755 --- a/src/data/nc42netcdfs.py +++ b/src/data/nc42netcdfs.py @@ -276,26 +276,324 @@ def _get_available_variables( self.logger.debug(" Variables to extract: %s", vars_to_extract) return vars_to_extract + def _find_time_coordinate(self, src_group: netCDF4.Group) -> str: + """Find the time coordinate variable in a group using introspection. + + Returns: + str: Name of the time coordinate variable, or empty string if not found + """ + # Strategy 1: Look for variables with "time" in the name (most common) + time_vars = [var_name for var_name in src_group.variables if "time" in var_name.lower()] + if time_vars: + # Prefer variables that start with 'time' (like time_NAL9602) + time_vars.sort(key=lambda x: (not x.lower().startswith("time"), x)) + self.logger.debug("Found time coordinate %s via name pattern", time_vars[0]) + return time_vars[0] + + # Strategy 2: Look for variables with time-like units + for var_name, var in src_group.variables.items(): + if hasattr(var, "units"): + units = getattr(var, "units", "").lower() + time_patterns = ["seconds since", "days since", "hours since"] + if any(pattern in units for pattern in time_patterns): + self.logger.debug("Found time coordinate %s via units", var_name) + return var_name + + # Strategy 3: Look for unlimited dimension (backup) + for dim_name, dim in src_group.dimensions.items(): + if dim.isunlimited() and dim_name in src_group.variables: + self.logger.debug("Found time coordinate %s via unlimited dimension", dim_name) + return dim_name + + self.logger.debug("No time coordinate found in group") + return "" + + def _get_time_filters_for_variables( + self, src_group: netCDF4.Group, vars_to_extract: list[str] + ) -> dict[str, dict]: + """Get time filtering information for time coordinates used by vars_to_extract. + + Returns: + dict: Map of time_coord_name -> {"indices": list[int], "filtered": bool} + """ + time_filters = {} + + # Check if time filtering is enabled + if not getattr(self.args, "filter_monotonic_time", True): + return time_filters + + # Find all time coordinates used by variables in extraction list + time_coords_found = set() + for var_name in vars_to_extract: + if var_name in src_group.variables: + var = src_group.variables[var_name] + + # Check each dimension to see if it's a time coordinate + for dim_name in var.dimensions: + if dim_name in src_group.variables: + dim_var = src_group.variables[dim_name] + + # Check if this dimension variable is a time coordinate + if self._is_time_variable(dim_name, dim_var): + time_coords_found.add(dim_name) + + # Now process each unique time coordinate found + for time_coord_name in time_coords_found: + time_var = src_group.variables[time_coord_name] + time_data = time_var[:] + mono_indices = self._get_monotonic_indices(time_data) + + # Check if filtering was actually needed + filtered = len(mono_indices) < len(time_data) + if filtered: + self.logger.info( + "Time coordinate %s: filtered %d non-monotonic points (%d -> %d)", + time_coord_name, + len(time_data) - len(mono_indices), + len(time_data), + len(mono_indices), + ) + + time_filters[time_coord_name] = {"indices": mono_indices, "filtered": filtered} + + return time_filters + + def _is_time_variable(self, var_name: str, var) -> bool: + """Check if a variable is a time coordinate variable.""" + # Check name pattern + if "time" in var_name.lower(): + return True + + # Check units + if hasattr(var, "units"): + units = getattr(var, "units", "").lower() + time_patterns = ["seconds since", "days since", "hours since"] + if any(pattern in units for pattern in time_patterns): + return True + + return False + + def _get_monotonic_indices(self, time_data) -> list[int]: + """Get indices for monotonic time values from time data array.""" + mono_indices = [] + if len(time_data) > 0: + mono_indices.append(0) # Always include first point + + for i in range(1, len(time_data)): + if time_data[i] > time_data[mono_indices[-1]]: + mono_indices.append(i) + + return mono_indices + + def _get_monotonic_time_indices(self, src_group: netCDF4.Group) -> tuple[list[int], bool]: + """Get indices for monotonically increasing time data. + + Returns: + list[int]: List of indices for monotonic time points + bool: True if filtering was applied + """ + # Check if time filtering is enabled + if not getattr(self.args, "filter_monotonic_time", True): + return [], False + + # Find the time coordinate variable using introspection + time_var_name = self._find_time_coordinate(src_group) + if not time_var_name: + # No time variable found, return all data + return [], False + + time_var = src_group.variables[time_var_name] + time_data = time_var[:] + + # Find monotonically increasing indices + mono_indices = [] + if len(time_data) > 0: + mono_indices.append(0) # Always include first point + + for i in range(1, len(time_data)): + if time_data[i] > time_data[mono_indices[-1]]: + mono_indices.append(i) + else: + self.logger.debug( + "Non-monotonic time value at index %d: %s <= %s (var: %s)", + i, + time_data[i], + time_data[mono_indices[-1]], + time_var_name, + ) + + total_points = len(time_data) + filtered_points = len(mono_indices) + + if filtered_points < total_points: + self.logger.warning( + "Filtered %d non-monotonic time points (kept %d/%d) for variable %s", + total_points - filtered_points, + filtered_points, + total_points, + time_var_name, + ) + return mono_indices, True + + return mono_indices, False + + def _copy_variable_with_appropriate_time_filter( + self, + src_group: netCDF4.Group, + dst_dataset: netCDF4.Dataset, + var_name: str, + time_filters: dict[str, dict], + ): + """Copy a variable with appropriate time filtering applied.""" + try: + src_var = src_group.variables[var_name] + + # Create variable in destination + dst_var = dst_dataset.createVariable( + var_name, + src_var.dtype, + src_var.dimensions, + zlib=True, + complevel=6, + shuffle=True, + fletcher32=True, + ) + + # Check if this variable itself is a time coordinate that needs filtering + if var_name in time_filters and time_filters[var_name]["filtered"]: + # This is a time coordinate variable that needs filtering + time_indices = time_filters[var_name]["indices"] + dst_var[:] = src_var[:][time_indices] + self.logger.debug("Applied time filtering to time coordinate %s", var_name) + + # Check if this variable depends on any filtered time dimensions + elif src_var.dimensions: + # Find which (if any) of this variable's dimensions are filtered time coordinates + filtered_dims = {} + for dim_name in src_var.dimensions: + if dim_name in time_filters and time_filters[dim_name]["filtered"]: + filtered_dims[dim_name] = time_filters[dim_name]["indices"] + + if filtered_dims: + # Apply filtering for the appropriate dimensions + self._apply_multidimensional_time_filter( + src_var, dst_var, var_name, filtered_dims + ) + else: + # No time filtering needed + dst_var[:] = src_var[:] + else: + # Scalar variable or no dimensions + dst_var[:] = src_var[:] + + # Copy attributes + for attr_name in src_var.ncattrs(): + dst_var.setncattr(attr_name, src_var.getncattr(attr_name)) + + self.logger.debug(" Copied variable: %s", var_name) + + except Exception as e: # noqa: BLE001 + self.logger.warning("Failed to copy variable %s: %s", var_name, e) + + def _apply_multidimensional_time_filter( + self, src_var, dst_var, var_name: str, filtered_dims: dict[str, list[int]] + ): + """Apply time filtering to a multi-dimensional variable.""" + # For now, handle the common case where time is the first dimension + if len(filtered_dims) == 1: + dim_name = list(filtered_dims.keys())[0] + time_indices = filtered_dims[dim_name] + + if src_var.dimensions[0] == dim_name: + # Time is first dimension + if len(src_var.dimensions) == 1: + # 1D variable + dst_var[:] = src_var[:][time_indices] + else: + # Multi-dimensional with time as first dimension + dst_var[:] = src_var[:][time_indices, ...] + self.logger.debug( + "Applied time filtering to variable %s (dim: %s)", var_name, dim_name + ) + else: + # Time dimension is not first - more complex indexing needed + self.logger.warning( + "Variable %s has filtered time dimension %s but not as first dimension - " + "copying all data", + var_name, + dim_name, + ) + dst_var[:] = src_var[:] + else: + # Multiple time dimensions filtered - complex case + self.logger.warning( + "Variable %s has multiple filtered time dimensions - copying all data", var_name + ) + dst_var[:] = src_var[:] + + def _create_dimensions_with_time_filters( + self, + src_group: netCDF4.Group, + dst_dataset: netCDF4.Dataset, + dims_needed: set[str], + time_filters: dict[str, dict], + ): + """Create dimensions in the destination dataset, adjusting time dimensions if filtered.""" + for dim_name in dims_needed: + if dim_name in src_group.dimensions: + src_dim = src_group.dimensions[dim_name] + + # Check if this dimension corresponds to a filtered time variable + if dim_name in time_filters and time_filters[dim_name]["filtered"]: + # Use the number of filtered time points + filtered_size = len(time_filters[dim_name]["indices"]) + size = filtered_size if not src_dim.isunlimited() else None + self.logger.debug( + "Created filtered time dimension %s: %s -> %s", + dim_name, + len(src_dim), + size or filtered_size, + ) + else: + size = len(src_dim) if not src_dim.isunlimited() else None + + dst_dataset.createDimension(dim_name, size) + def _create_netcdf_file( self, src_group: netCDF4.Group, vars_to_extract: list[str], output_file: Path ): - """Create a new NetCDF file with the specified variables.""" + """Create a new NetCDF file with the specified variables and monotonic time.""" + # Get time filtering information for each time variable + time_filters = self._get_time_filters_for_variables(src_group, vars_to_extract) + with netCDF4.Dataset(output_file, "w", format="NETCDF4") as dst_dataset: # Copy global attributes self._copy_global_attributes(src_group, dst_dataset) - # Create dimensions + # Add note about time filtering if applied + if any(tf["filtered"] for tf in time_filters.values()): + dst_dataset.setncattr( + "processing_note", "Non-monotonic time values filtered out during extraction" + ) + + # Create dimensions - may need to adjust time dimension sizes dims_needed = self._get_required_dimensions(src_group, vars_to_extract) - self._create_dimensions(src_group, dst_dataset, dims_needed) + self._create_dimensions_with_time_filters( + src_group, dst_dataset, dims_needed, time_filters + ) - # Copy coordinate variables + # Copy coordinate variables with time filtering coord_vars = self._get_coordinate_variables(src_group, dims_needed, vars_to_extract) for var_name in coord_vars: - self._copy_variable(src_group, dst_dataset, var_name) + self._copy_variable_with_appropriate_time_filter( + src_group, dst_dataset, var_name, time_filters + ) - # Copy requested variables + # Copy requested variables with time filtering for var_name in vars_to_extract: - self._copy_variable(src_group, dst_dataset, var_name) + self._copy_variable_with_appropriate_time_filter( + src_group, dst_dataset, var_name, time_filters + ) def _copy_global_attributes(self, src_group: netCDF4.Group, dst_dataset: netCDF4.Dataset): """Copy global attributes from source to destination.""" @@ -438,6 +736,18 @@ def process_command_line(self): action="store_true", help="Use with --noinput to not re-process existing downloaded log files", ) + parser.add_argument( + "--filter_monotonic_time", + action="store_true", + default=True, + help="Filter out non-monotonic time values (default: True)", + ) + parser.add_argument( + "--no_filter_monotonic_time", + dest="filter_monotonic_time", + action="store_false", + help="Keep all time values, including non-monotonic ones", + ) parser.add_argument( "--start", action="store",