Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 317 additions & 7 deletions src/data/nc42netcdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,26 +276,324 @@ def _get_available_variables(
self.logger.debug(" Variables to extract: %s", vars_to_extract)
return vars_to_extract

def _find_time_coordinate(self, src_group: netCDF4.Group) -> str:
"""Find the time coordinate variable in a group using introspection.

Returns:
str: Name of the time coordinate variable, or empty string if not found
"""
# Strategy 1: Look for variables with "time" in the name (most common)
time_vars = [var_name for var_name in src_group.variables if "time" in var_name.lower()]
if time_vars:
# Prefer variables that start with 'time' (like time_NAL9602)
time_vars.sort(key=lambda x: (not x.lower().startswith("time"), x))
self.logger.debug("Found time coordinate %s via name pattern", time_vars[0])
return time_vars[0]

# Strategy 2: Look for variables with time-like units
for var_name, var in src_group.variables.items():
if hasattr(var, "units"):
units = getattr(var, "units", "").lower()
time_patterns = ["seconds since", "days since", "hours since"]
if any(pattern in units for pattern in time_patterns):
self.logger.debug("Found time coordinate %s via units", var_name)
return var_name

# Strategy 3: Look for unlimited dimension (backup)
for dim_name, dim in src_group.dimensions.items():
if dim.isunlimited() and dim_name in src_group.variables:
self.logger.debug("Found time coordinate %s via unlimited dimension", dim_name)
return dim_name

self.logger.debug("No time coordinate found in group")
return ""

def _get_time_filters_for_variables(
self, src_group: netCDF4.Group, vars_to_extract: list[str]
) -> dict[str, dict]:
"""Get time filtering information for time coordinates used by vars_to_extract.

Returns:
dict: Map of time_coord_name -> {"indices": list[int], "filtered": bool}
"""
time_filters = {}

# Check if time filtering is enabled
if not getattr(self.args, "filter_monotonic_time", True):
return time_filters

# Find all time coordinates used by variables in extraction list
time_coords_found = set()
for var_name in vars_to_extract:
if var_name in src_group.variables:
var = src_group.variables[var_name]

# Check each dimension to see if it's a time coordinate
for dim_name in var.dimensions:
if dim_name in src_group.variables:
dim_var = src_group.variables[dim_name]

# Check if this dimension variable is a time coordinate
if self._is_time_variable(dim_name, dim_var):
time_coords_found.add(dim_name)

# Now process each unique time coordinate found
for time_coord_name in time_coords_found:
time_var = src_group.variables[time_coord_name]
time_data = time_var[:]
mono_indices = self._get_monotonic_indices(time_data)

# Check if filtering was actually needed
filtered = len(mono_indices) < len(time_data)
if filtered:
self.logger.info(
"Time coordinate %s: filtered %d non-monotonic points (%d -> %d)",
time_coord_name,
len(time_data) - len(mono_indices),
len(time_data),
len(mono_indices),
)

time_filters[time_coord_name] = {"indices": mono_indices, "filtered": filtered}

return time_filters

def _is_time_variable(self, var_name: str, var) -> bool:
"""Check if a variable is a time coordinate variable."""
# Check name pattern
if "time" in var_name.lower():
return True

# Check units
if hasattr(var, "units"):
units = getattr(var, "units", "").lower()
time_patterns = ["seconds since", "days since", "hours since"]
if any(pattern in units for pattern in time_patterns):
return True

return False

def _get_monotonic_indices(self, time_data) -> list[int]:
"""Get indices for monotonic time values from time data array."""
mono_indices = []
if len(time_data) > 0:
mono_indices.append(0) # Always include first point

for i in range(1, len(time_data)):
if time_data[i] > time_data[mono_indices[-1]]:
mono_indices.append(i)

return mono_indices

def _get_monotonic_time_indices(self, src_group: netCDF4.Group) -> tuple[list[int], bool]:
"""Get indices for monotonically increasing time data.

Returns:
list[int]: List of indices for monotonic time points
bool: True if filtering was applied
"""
# Check if time filtering is enabled
if not getattr(self.args, "filter_monotonic_time", True):
return [], False

# Find the time coordinate variable using introspection
time_var_name = self._find_time_coordinate(src_group)
if not time_var_name:
# No time variable found, return all data
return [], False

time_var = src_group.variables[time_var_name]
time_data = time_var[:]

# Find monotonically increasing indices
mono_indices = []
if len(time_data) > 0:
mono_indices.append(0) # Always include first point

for i in range(1, len(time_data)):
if time_data[i] > time_data[mono_indices[-1]]:
mono_indices.append(i)
else:
self.logger.debug(
"Non-monotonic time value at index %d: %s <= %s (var: %s)",
i,
time_data[i],
time_data[mono_indices[-1]],
time_var_name,
)

total_points = len(time_data)
filtered_points = len(mono_indices)

if filtered_points < total_points:
self.logger.warning(
"Filtered %d non-monotonic time points (kept %d/%d) for variable %s",
total_points - filtered_points,
filtered_points,
total_points,
time_var_name,
)
return mono_indices, True

return mono_indices, False

def _copy_variable_with_appropriate_time_filter(
self,
src_group: netCDF4.Group,
dst_dataset: netCDF4.Dataset,
var_name: str,
time_filters: dict[str, dict],
):
"""Copy a variable with appropriate time filtering applied."""
try:
src_var = src_group.variables[var_name]

# Create variable in destination
dst_var = dst_dataset.createVariable(
var_name,
src_var.dtype,
src_var.dimensions,
zlib=True,
complevel=6,
shuffle=True,
fletcher32=True,
)

# Check if this variable itself is a time coordinate that needs filtering
if var_name in time_filters and time_filters[var_name]["filtered"]:
# This is a time coordinate variable that needs filtering
time_indices = time_filters[var_name]["indices"]
dst_var[:] = src_var[:][time_indices]
self.logger.debug("Applied time filtering to time coordinate %s", var_name)

# Check if this variable depends on any filtered time dimensions
elif src_var.dimensions:
# Find which (if any) of this variable's dimensions are filtered time coordinates
filtered_dims = {}
for dim_name in src_var.dimensions:
if dim_name in time_filters and time_filters[dim_name]["filtered"]:
filtered_dims[dim_name] = time_filters[dim_name]["indices"]

if filtered_dims:
# Apply filtering for the appropriate dimensions
self._apply_multidimensional_time_filter(
src_var, dst_var, var_name, filtered_dims
)
else:
# No time filtering needed
dst_var[:] = src_var[:]
else:
# Scalar variable or no dimensions
dst_var[:] = src_var[:]

# Copy attributes
for attr_name in src_var.ncattrs():
dst_var.setncattr(attr_name, src_var.getncattr(attr_name))

self.logger.debug(" Copied variable: %s", var_name)

except Exception as e: # noqa: BLE001
self.logger.warning("Failed to copy variable %s: %s", var_name, e)

def _apply_multidimensional_time_filter(
self, src_var, dst_var, var_name: str, filtered_dims: dict[str, list[int]]
):
"""Apply time filtering to a multi-dimensional variable."""
# For now, handle the common case where time is the first dimension
if len(filtered_dims) == 1:
dim_name = list(filtered_dims.keys())[0]
time_indices = filtered_dims[dim_name]

if src_var.dimensions[0] == dim_name:
# Time is first dimension
if len(src_var.dimensions) == 1:
# 1D variable
dst_var[:] = src_var[:][time_indices]
else:
# Multi-dimensional with time as first dimension
dst_var[:] = src_var[:][time_indices, ...]
self.logger.debug(
"Applied time filtering to variable %s (dim: %s)", var_name, dim_name
)
else:
# Time dimension is not first - more complex indexing needed
self.logger.warning(
"Variable %s has filtered time dimension %s but not as first dimension - "
"copying all data",
var_name,
dim_name,
)
dst_var[:] = src_var[:]
else:
# Multiple time dimensions filtered - complex case
self.logger.warning(
"Variable %s has multiple filtered time dimensions - copying all data", var_name
)
dst_var[:] = src_var[:]

def _create_dimensions_with_time_filters(
self,
src_group: netCDF4.Group,
dst_dataset: netCDF4.Dataset,
dims_needed: set[str],
time_filters: dict[str, dict],
):
"""Create dimensions in the destination dataset, adjusting time dimensions if filtered."""
for dim_name in dims_needed:
if dim_name in src_group.dimensions:
src_dim = src_group.dimensions[dim_name]

# Check if this dimension corresponds to a filtered time variable
if dim_name in time_filters and time_filters[dim_name]["filtered"]:
# Use the number of filtered time points
filtered_size = len(time_filters[dim_name]["indices"])
size = filtered_size if not src_dim.isunlimited() else None
self.logger.debug(
"Created filtered time dimension %s: %s -> %s",
dim_name,
len(src_dim),
size or filtered_size,
)
else:
size = len(src_dim) if not src_dim.isunlimited() else None

dst_dataset.createDimension(dim_name, size)

def _create_netcdf_file(
self, src_group: netCDF4.Group, vars_to_extract: list[str], output_file: Path
):
"""Create a new NetCDF file with the specified variables."""
"""Create a new NetCDF file with the specified variables and monotonic time."""
# Get time filtering information for each time variable
time_filters = self._get_time_filters_for_variables(src_group, vars_to_extract)

with netCDF4.Dataset(output_file, "w", format="NETCDF4") as dst_dataset:
# Copy global attributes
self._copy_global_attributes(src_group, dst_dataset)

# Create dimensions
# Add note about time filtering if applied
if any(tf["filtered"] for tf in time_filters.values()):
dst_dataset.setncattr(
"processing_note", "Non-monotonic time values filtered out during extraction"
)

# Create dimensions - may need to adjust time dimension sizes
dims_needed = self._get_required_dimensions(src_group, vars_to_extract)
self._create_dimensions(src_group, dst_dataset, dims_needed)
self._create_dimensions_with_time_filters(
src_group, dst_dataset, dims_needed, time_filters
)

# Copy coordinate variables
# Copy coordinate variables with time filtering
coord_vars = self._get_coordinate_variables(src_group, dims_needed, vars_to_extract)
for var_name in coord_vars:
self._copy_variable(src_group, dst_dataset, var_name)
self._copy_variable_with_appropriate_time_filter(
src_group, dst_dataset, var_name, time_filters
)

# Copy requested variables
# Copy requested variables with time filtering
for var_name in vars_to_extract:
self._copy_variable(src_group, dst_dataset, var_name)
self._copy_variable_with_appropriate_time_filter(
src_group, dst_dataset, var_name, time_filters
)

def _copy_global_attributes(self, src_group: netCDF4.Group, dst_dataset: netCDF4.Dataset):
"""Copy global attributes from source to destination."""
Expand Down Expand Up @@ -438,6 +736,18 @@ def process_command_line(self):
action="store_true",
help="Use with --noinput to not re-process existing downloaded log files",
)
parser.add_argument(
"--filter_monotonic_time",
action="store_true",
default=True,
help="Filter out non-monotonic time values (default: True)",
)
parser.add_argument(
"--no_filter_monotonic_time",
dest="filter_monotonic_time",
action="store_false",
help="Keep all time values, including non-monotonic ones",
)
parser.add_argument(
"--start",
action="store",
Expand Down