diff --git a/.gitignore b/.gitignore index ded85e0bd..848f6a007 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,8 @@ dmypy.json # Pyre type checker .pyre/ *.egg-info + +# R2D2 credentials +r2d2_credentials.yaml +**/r2d2_credentials.yaml +.swell/r2d2_credentials.yaml diff --git a/docs/configs/r2d2_v3_credentials.md b/docs/configs/r2d2_v3_credentials.md new file mode 100644 index 000000000..a5b7b04f4 --- /dev/null +++ b/docs/configs/r2d2_v3_credentials.md @@ -0,0 +1,100 @@ +# R2D2 v3 Credentials Configuration + +This document explains how to configure R2D2 v3 credentials for SWELL workflows. + +## Overview + +SWELL now uses R2D2 v3 for metadata-driven data storage and retrieval. R2D2 v3 requires authentication credentials to access the centralized API. SWELL automatically loads these credentials from a YAML configuration file. + +## Quick Setup + +1. **Create the credentials directory:** + ```bash + mkdir -p ~/.swell + ``` + +2. **Create the credentials file:** + ```bash + cp /path/to/swell/r2d2_credentials.yaml ~/.swell/r2d2_credentials.yaml + ``` + +3. **Edit with your credentials:** + ```bash + vim ~/.swell/r2d2_credentials.yaml + ``` + +4. **Set secure permissions:** + ```bash + chmod 600 ~/.swell/r2d2_credentials.yaml + ``` + +## Credentials File Format + +Create `~/.swell/r2d2_credentials.yaml` with the following structure: + +```yaml +# R2D2 v3 credentials file +# Save this as ~/.swell/r2d2_credentials.yaml +# Set permissions: chmod 600 ~/.swell/r2d2_credentials.yaml + +# Required credentials +user: your_username # Your R2D2 username +api_key: your_api_key # Your R2D2 API key + +# Platform-specific values (automatically determined by SWELL with an option to use YAML-first) +# host: discover-gmao # Automatically set based on platform +# compiler: intel # Automatically set based on platform + +``` + +## Required Fields + +| Field | Description | Example | +|-------|-------------|---------| +| `user` | Your R2D2 username | `jdoe` | +| `api_key` | Your R2D2 API authentication key | `abcd1234-ef56-7890-abcd-1234567890ab` | + +## Platform-Specific Fields (Automatically Set) + +| Field | Description | NCCS Discover Value | +|-------|-------------|---------------------| +| `host` | Compute host identifier | `discover-gmao` | +| `compiler` | Compiler type used | `intel` | + +**Important**: `host` and `compiler` are automatically determined by SWELL based on your platform configuration. You can also set these manually in your credentials file. + +### Loading Precedence + +The credential loading follows this priority order: + +1. **Environment Variables** (highest priority) +2. **YAML Configuration File** +3. **Platform Detection** (for host/compiler only) + +**For host and compiler specifically:** +- YAML `host`/`compiler` values override platform detection +- Platform detection is used as fallback when not specified in YAML + +### Platform-Specific Configuration + +SWELL automatically determines `host` and `compiler` based on your platform: + +| Platform | R2D2 Host | R2D2 Compiler | Notes | +|----------|-----------|---------------|-------| +| `nccs_discover_sles15` | `discover-gmao` | `intel` | NCCS Discover SLES15 | +| `nccs_discover_cascade` | `discover-gmao` | `intel` | NCCS Discover Cascade | + + + +## Environment Variables Set + +When loaded, the following environment variables are set: + +- `R2D2_USER`: Your R2D2 username +- `R2D2_API_KEY`: Your R2D2 API key +- `R2D2_HOST`: Compute host name +- `R2D2_COMPILER`: Compiler type + + diff --git a/pycodestyle.cfg b/pycodestyle.cfg index a1bae2aaa..c8aad235a 100644 --- a/pycodestyle.cfg +++ b/pycodestyle.cfg @@ -8,4 +8,5 @@ max-line-length = 100 indent-size = 4 statistics = True -exclude = ._*, build, .venv +exclude = ._*, build, .venv, venv, .git, __pycache__, .tox, .eggs, *.egg-info, dist + diff --git a/r2d2_credentials.yaml b/r2d2_credentials.yaml new file mode 100644 index 000000000..22940f3ee --- /dev/null +++ b/r2d2_credentials.yaml @@ -0,0 +1,13 @@ +# R2D2 v3 credentials file +# Save this as ~/.swell/r2d2_credentials.yaml +# Set permissions: chmod 600 ~/.swell/r2d2_credentials.yaml + +# R2D2 v3 API credentials +user: your_username +api_key: your_api_key_here + +# Platform configuration +host: discover-gmao +compiler: intel + + diff --git a/src/swell/__init__.py b/src/swell/__init__.py index aa855f5fd..d02359e0e 100644 --- a/src/swell/__init__.py +++ b/src/swell/__init__.py @@ -9,4 +9,4 @@ repo_directory = os.path.dirname(__file__) # Set the version for swell -__version__ = '1.10.0' +__version__ = '1.20.0' diff --git a/src/swell/deployment/platforms/nccs_discover_cascade/modules b/src/swell/deployment/platforms/nccs_discover_cascade/modules index 63b7bfeec..bfcd4f8e1 100644 --- a/src/swell/deployment/platforms/nccs_discover_cascade/modules +++ b/src/swell/deployment/platforms/nccs_discover_cascade/modules @@ -37,9 +37,7 @@ module load other/mepo # Load r2d2 modules # ----------------- module use -a /discover/nobackup/projects/gmao/advda/JediOpt/modulefiles/core -module load solo/sles15_skylab9 -module load py-boto3 -module load r2d2/sles15_spack19 +module load r2d2-client/112025 # Load eva and jedi_bundle # ------------------------ diff --git a/src/swell/deployment/platforms/nccs_discover_cascade/r2d2_config.yaml b/src/swell/deployment/platforms/nccs_discover_cascade/r2d2_config.yaml index 053f0dbb9..deca7be52 100755 --- a/src/swell/deployment/platforms/nccs_discover_cascade/r2d2_config.yaml +++ b/src/swell/deployment/platforms/nccs_discover_cascade/r2d2_config.yaml @@ -1,22 +1,21 @@ databases: - ${USER}: class: LocalDB root: {{r2d2_local_path}} cache_fetch: false - gmao-shared: - class: LocalDB - root: /discover/nobackup/projects/gmao/advda/R2D2DataStore/Shared - cache_fetch: false +# v3 API configuration (for centralized nccs-gmao data) +data_hub: nccs-gmao +data_store: r2d2-experiments-nccs-gmao +compute_host: discover-gmao-intel -# when fetching data, in which order should the databases accessed? +# Fetch order: try local first, then API fetch_order: - ${USER} - - gmao-shared -# when storing data, in which order should the databases accessed? -store_order: - - ${USER} +# Optional: credentials file for v3 API authentication +# credentials_file: ~/.swell/swell-r2d2-credentials.yaml +# Cache settings cache_name: ${USER} +cache_fetch: false diff --git a/src/swell/deployment/platforms/nccs_discover_sles15/modules b/src/swell/deployment/platforms/nccs_discover_sles15/modules index 111e667a6..13e1edd7c 100644 --- a/src/swell/deployment/platforms/nccs_discover_sles15/modules +++ b/src/swell/deployment/platforms/nccs_discover_sles15/modules @@ -37,15 +37,16 @@ module load other/mepo # Load r2d2 modules # ----------------- module use -a /discover/nobackup/projects/gmao/advda/JediOpt/modulefiles/core -module load solo/sles15_skylab9 -module load py-boto3 -module load r2d2/sles15_spack19 +module load r2d2-client/112025 # Load eva and jedi_bundle # ------------------------ module load eva/sles15_milan_1.6.5 module load jedi_bundle/sles15_skylab9 +# Remove any r2d2 v2 paths from PYTHONPATH to prevent conflicts +export PYTHONPATH=$(echo $PYTHONPATH | tr ':' '\n' | grep -v "/r2d2/sles15_spack19/" | tr '\n' ':' | sed 's/:$//') + # Set the swell paths # ------------------- PATH={{swell_bin_path}}:$PATH @@ -57,3 +58,4 @@ ulimit -S -s unlimited ulimit -S -v unlimited umask 022 + diff --git a/src/swell/deployment/platforms/nccs_discover_sles15/r2d2_config.yaml b/src/swell/deployment/platforms/nccs_discover_sles15/r2d2_config.yaml index 053f0dbb9..deca7be52 100755 --- a/src/swell/deployment/platforms/nccs_discover_sles15/r2d2_config.yaml +++ b/src/swell/deployment/platforms/nccs_discover_sles15/r2d2_config.yaml @@ -1,22 +1,21 @@ databases: - ${USER}: class: LocalDB root: {{r2d2_local_path}} cache_fetch: false - gmao-shared: - class: LocalDB - root: /discover/nobackup/projects/gmao/advda/R2D2DataStore/Shared - cache_fetch: false +# v3 API configuration (for centralized nccs-gmao data) +data_hub: nccs-gmao +data_store: r2d2-experiments-nccs-gmao +compute_host: discover-gmao-intel -# when fetching data, in which order should the databases accessed? +# Fetch order: try local first, then API fetch_order: - ${USER} - - gmao-shared -# when storing data, in which order should the databases accessed? -store_order: - - ${USER} +# Optional: credentials file for v3 API authentication +# credentials_file: ~/.swell/swell-r2d2-credentials.yaml +# Cache settings cache_name: ${USER} +cache_fetch: false diff --git a/src/swell/suites/3dfgat_cycle/flow.cylc b/src/swell/suites/3dfgat_cycle/flow.cylc index 9dbdf1b89..42a135561 100644 --- a/src/swell/suites/3dfgat_cycle/flow.cylc +++ b/src/swell/suites/3dfgat_cycle/flow.cylc @@ -77,12 +77,8 @@ LinkGeosOutput-{{model_component}} => GenerateBClimatology-{{model_component}} # Data assimilation preperation - GetObservations-{{model_component}} - GenerateBClimatologyByLinking-{{model_component}} :fail? => GenerateBClimatology-{{model_component}} - - LinkGeosOutput-{{model_component}} => RunJediFgatExecutable-{{model_component}} StageJediCycle-{{model_component}} => RunJediFgatExecutable-{{model_component}} - GenerateBClimatologyByLinking-{{model_component}}? | GenerateBClimatology-{{model_component}} => RunJediFgatExecutable-{{model_component}} + GenerateBClimatology-{{model_component}} => RunJediFgatExecutable-{{model_component}} GetObservations-{{model_component}} => RunJediFgatExecutable-{{model_component}} # Run analysis diagnostics @@ -94,14 +90,17 @@ RunJediFgatExecutable-{{model_component}} => EvaIncrement-{{model_component}} {% if 'cice6' in models[model_component]["marine_models"] %} PrepareAnalysis-{{model_component}} => RunJediConvertStateSoca2ciceExecutable-{{model_component}} - RunJediConvertStateSoca2ciceExecutable-{{model_component}} => SaveRestart-{{model_component}} + # RunJediConvertStateSoca2ciceExecutable-{{model_component}} => SaveRestart-{{model_component}} + RunJediConvertStateSoca2ciceExecutable-{{model_component}} => MoveDaRestart-{{model_component}} RunJediConvertStateSoca2ciceExecutable-{{model_component}} => CleanCycle-{{model_component}} {% else %} - PrepareAnalysis-{{model_component}} => SaveRestart-{{model_component}} + # PrepareAnalysis-{{model_component}} => SaveRestart-{{model_component}} + PrepareAnalysis-{{model_component}} => MoveDaRestart-{{model_component}} {% endif %} + # Temporarily disable saving restarts # Move restart to next cycle - SaveRestart-{{model_component}} => MoveDaRestart-{{model_component}} + # SaveRestart-{{model_component}} => MoveDaRestart-{{model_component}} # Save analysis output # RunJediFgatExecutable-{{model_component}} => SaveAnalysis-{{model_component}} @@ -192,9 +191,6 @@ [[MoveDaRestart-{{model_component}}]] script = "swell task MoveDaRestart $config -d $datetime -m {{model_component}}" - [[SaveRestart-{{model_component}}]] - script = "swell task SaveRestart $config -d $datetime -m {{model_component}}" - [[StageJedi-{{model_component}}]] script = "swell task StageJedi $config -m {{model_component}}" @@ -213,9 +209,6 @@ --{{key}} = {{value}} {%- endfor %} - [[GenerateBClimatologyByLinking-{{model_component}}]] - script = "swell task GenerateBClimatologyByLinking $config -d $datetime -m {{model_component}}" - {% if 'cice6' in models["geos_marine"]["marine_models"] %} [[RunJediConvertStateSoca2ciceExecutable-{{model_component}}]] @@ -253,8 +246,8 @@ --{{key}} = {{value}} {%- endfor %} - [[SaveRestart-{{model_component}}]] - script = "swell task SaveRestart $config -d $datetime -m {{model_component}}" + # [[SaveRestart-{{model_component}}]] + # script = "swell task SaveRestart $config -d $datetime -m {{model_component}}" [[SaveObsDiags-{{model_component}}]] script = "swell task SaveObsDiags $config -d $datetime -m {{model_component}}" diff --git a/src/swell/suites/3dvar/flow.cylc b/src/swell/suites/3dvar/flow.cylc index 3e598bfaa..56813cde8 100644 --- a/src/swell/suites/3dvar/flow.cylc +++ b/src/swell/suites/3dvar/flow.cylc @@ -47,14 +47,7 @@ {% if cycle_time[model_component] %} # Task triggers for: {{model_component}} # ------------------ - # Get background - GetBackground-{{model_component}} - - # Get observations - GetObservations-{{model_component}} - # GenerateBClimatology, for ocean it is cycle dependent - GenerateBClimatologyByLinking-{{model_component}} :fail? => GenerateBClimatology-{{model_component}} GetBackground-{{model_component}} => GenerateBClimatology-{{model_component}} # Perform staging that is cycle dependent @@ -65,7 +58,7 @@ StageJedi-{{model_component}}[^] => RunJediVariationalExecutable-{{model_component}} StageJediCycle-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GetBackground-{{model_component}} => RunJediVariationalExecutable-{{model_component}} - GenerateBClimatologyByLinking-{{model_component}}? | GenerateBClimatology-{{model_component}} => RunJediVariationalExecutable-{{model_component}} + GenerateBClimatology-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GetObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} # EvaObservations @@ -132,9 +125,6 @@ [[GetObservations-{{model_component}}]] script = "swell task GetObservations $config -d $datetime -m {{model_component}}" - [[GenerateBClimatologyByLinking-{{model_component}}]] - script = "swell task GenerateBClimatologyByLinking $config -d $datetime -m {{model_component}}" - [[GenerateBClimatology-{{model_component}}]] script = "swell task GenerateBClimatology $config -d $datetime -m {{model_component}}" platform = {{platform}} diff --git a/src/swell/suites/3dvar_cycle/flow.cylc b/src/swell/suites/3dvar_cycle/flow.cylc index 8630756b0..0db57a255 100644 --- a/src/swell/suites/3dvar_cycle/flow.cylc +++ b/src/swell/suites/3dvar_cycle/flow.cylc @@ -76,12 +76,8 @@ LinkGeosOutput-{{model_component}} => GenerateBClimatology-{{model_component}} # Data assimilation things - GetObservations-{{model_component}} - GenerateBClimatologyByLinking-{{model_component}} :fail? => GenerateBClimatology-{{model_component}} - - LinkGeosOutput-{{model_component}} => RunJediVariationalExecutable-{{model_component}} StageJediCycle-{{model_component}} => RunJediVariationalExecutable-{{model_component}} - GenerateBClimatologyByLinking-{{model_component}}? | GenerateBClimatology-{{model_component}} => RunJediVariationalExecutable-{{model_component}} + GenerateBClimatology-{{model_component}} => RunJediVariationalExecutable-{{model_component}} GetObservations-{{model_component}} => RunJediVariationalExecutable-{{model_component}} # Run analysis diagnostics @@ -93,14 +89,16 @@ EvaIncrement-{{model_component}} => PrepareAnalysis-{{model_component}} {% if 'cice6' in models[model_component]["marine_models"] %} PrepareAnalysis-{{model_component}} => RunJediConvertStateSoca2ciceExecutable-{{model_component}} - RunJediConvertStateSoca2ciceExecutable-{{model_component}} => SaveRestart-{{model_component}} + # RunJediConvertStateSoca2ciceExecutable-{{model_component}} => SaveRestart-{{model_component}} + RunJediConvertStateSoca2ciceExecutable-{{model_component}} => MoveDaRestart-{{model_component}} RunJediConvertStateSoca2ciceExecutable-{{model_component}} => CleanCycle-{{model_component}} {% else %} - PrepareAnalysis-{{model_component}} => SaveRestart-{{model_component}} + # PrepareAnalysis-{{model_component}} => SaveRestart-{{model_component}} + PrepareAnalysis-{{model_component}} => MoveDaRestart-{{model_component}} {% endif %} # Move restart to next cycle - SaveRestart-{{model_component}} => MoveDaRestart-{{model_component}} + # SaveRestart-{{model_component}} => MoveDaRestart-{{model_component}} # Save analysis output # RunJediVariationalExecutable-{{model_component}} => SaveAnalysis-{{model_component}} @@ -192,9 +190,6 @@ [[MoveDaRestart-{{model_component}}]] script = "swell task MoveDaRestart $config -d $datetime -m {{model_component}}" - [[SaveRestart-{{model_component}}]] - script = "swell task SaveRestart $config -d $datetime -m {{model_component}}" - [[StageJedi-{{model_component}}]] script = "swell task StageJedi $config -m {{model_component}}" @@ -253,8 +248,8 @@ --{{key}} = {{value}} {%- endfor %} - [[SaveRestart-{{model_component}}]] - script = "swell task SaveRestart $config -d $datetime -m {{model_component}}" + # [[SaveRestart-{{model_component}}]] + # script = "swell task SaveRestart $config -d $datetime -m {{model_component}}" [[SaveObsDiags-{{model_component}}]] script = "swell task SaveObsDiags $config -d $datetime -m {{model_component}}" diff --git a/src/swell/tasks/base/task_base.py b/src/swell/tasks/base/task_base.py index eb6d82d26..1fa9c7f84 100644 --- a/src/swell/tasks/base/task_base.py +++ b/src/swell/tasks/base/task_base.py @@ -278,11 +278,22 @@ def create_task( ensemblePacket: Optional[str] ) -> taskBase: + # Load R2D2 credentials before importing any task modules + # ------------------------------------------------------- + from swell.utilities.config import Config + from swell.utilities.r2d2 import load_r2d2_credentials + from swell.utilities.logger import get_logger + + # Get platform info from config to load credentials + temp_logger = get_logger('R2D2Setup') + temp_config = Config(config, temp_logger, task, model) + load_r2d2_credentials(temp_logger, temp_config.__platform__) + # Convert camel case string to snake case task_lower = camel_case_to_snake_case(task) # Import class based on user selected task - task_class = getattr(importlib.import_module('swell.tasks.'+task_lower), task) + task_class = getattr(importlib.import_module('swell.tasks.' + task_lower), task) # Return task object return task_class(config, datetime, model, ensemblePacket, task) diff --git a/src/swell/tasks/get_background.py b/src/swell/tasks/get_background.py index 01f3f557f..9aa77dcf2 100644 --- a/src/swell/tasks/get_background.py +++ b/src/swell/tasks/get_background.py @@ -13,14 +13,13 @@ import isodate import os -from r2d2 import fetch - +import r2d2 # -------------------------------------------------------------------------------------------------- r2d2_model_dict = { 'geos_atmosphere': 'geos', - 'geos_marine': 'mom6_cice6_UFS', + 'geos_marine': 'mom6', # 'mom6_cice6_UFS' } @@ -29,7 +28,6 @@ class GetBackground(taskBase): def execute(self) -> None: - """Acquires background files for a given experiment and cycle Parameters @@ -95,7 +93,7 @@ def execute(self) -> None: # Check for a sensible frequency # ------------------------------ - if (window_length_dur/bkg_freq_dur) % 2: + if (window_length_dur / bkg_freq_dur) % 2: self.logger.abort('Window length not divisible by background frequency') # Loop over window @@ -122,7 +120,7 @@ def execute(self) -> None: # Loop over background files in the R2D2 config and fetch # ------------------------------------------------------- - self.logger.info('Background steps being fetched: '+' '.join(str(e) for e in bkg_steps)) + self.logger.info('Background steps being fetched: ' + ' '.join(str(e) for e in bkg_steps)) # Get r2d2 dictionary r2d2_dict = self.jedi_rendering.render_interface_model('r2d2') @@ -148,16 +146,19 @@ def execute(self) -> None: # --------------------------------------------------- target_file = background_time.strftime(target_file_template) - fetch( - date=forecast_start_time, + file_extension = file_type.split('.')[-1] if '.' in file_type else 'nc' + + r2d2.fetch( + item='forecast', target_file=target_file, - model=r2d2_model_dict[model_component], - file_type=file_type, - fc_date_rendering='analysis', - step=bkg_step, + model=r2d2_model_dict[model_component], # 'mom6' need to be registered mom6 + experiment=background_experiment, + file_extension=file_extension, resolution=horizontal_resolution, - type='fc', - experiment=background_experiment) + step=bkg_step, + date=forecast_start_time.strftime('%Y-%m-%dT%H:%M:%SZ'), + file_type=file_type, + ) # Change permission os.chmod(target_file, 0o644) diff --git a/src/swell/tasks/get_observations.py b/src/swell/tasks/get_observations.py index c9f91e35f..72fdc3bb4 100644 --- a/src/swell/tasks/get_observations.py +++ b/src/swell/tasks/get_observations.py @@ -17,8 +17,15 @@ from swell.tasks.base.task_base import taskBase from swell.utilities.r2d2 import create_r2d2_config from swell.utilities.datetime_util import datetime_formats -from r2d2 import fetch +import r2d2 +# -------------------------------------------------------------------------------------------------- + +# R2D2 model name mapping +r2d2_model_dict = { + 'geos_atmosphere': 'geos', + 'geos_marine': 'mom6', +} # -------------------------------------------------------------------------------------------------- @@ -26,7 +33,6 @@ class GetObservations(taskBase): def execute(self) -> None: - """ Acquires observation files for a given experiment and cycle. @@ -92,8 +98,8 @@ def execute(self) -> None: # Parse config # ------------ - obs_experiment = self.config.obs_experiment() obs_providers = self.config.obs_provider() + obs_experiment = self.config.obs_experiment() background_time_offset = self.config.background_time_offset() observations = self.config.observations() window_length = self.config.window_length() @@ -102,6 +108,10 @@ def execute(self) -> None: r2d2_local_path = self.config.r2d2_local_path() cycling_varbc = self.config.cycling_varbc(None) + # Get model component + model_component = self.get_model() + r2d2_model = r2d2_model_dict.get(model_component, model_component) + # Set the observing system records path self.jedi_rendering.set_obs_records_path(self.config.observing_system_records_path(None)) @@ -112,6 +122,8 @@ def execute(self) -> None: dto=True) background_time = self.da_window_params.background_time(window_offset, background_time_offset) + background_time_iso = self.da_window_params.background_time_iso(window_offset, + background_time_offset) # Determine the input observation files to be fetched, this mainly depends on # the observation file organization in R2D2. In other words, they could be @@ -154,14 +166,22 @@ def execute(self) -> None: obs_window_begin = dt.strftime(obs_time, datetime_formats['iso_format']) target_file = os.path.join(self.cycle_dir(), f'{observation}.{obs_num}.nc4') combine_input_files.append(target_file) - fetch(date=obs_window_begin, - target_file=target_file, - provider=obs_provider, - ignore_missing=True, - obs_type=observation, - time_window=obs_window_length, - type='ob', - experiment=obs_experiment) + + fetch_criteria = { + 'item': 'observation', # Required for r2d2 v3 + 'provider': obs_provider, # What we registered with + 'observation_type': observation, # From filename + 'file_extension': 'nc4', + 'window_start': obs_window_begin, # From filename timestamp + 'window_length': obs_window_length, # From filename + 'target_file': target_file, # Where to save + } + + try: + r2d2.fetch(**fetch_criteria) + self.logger.info(f"Successfully fetched {target_file}") + except Exception as e: + self.logger.info(f"Failed to fetch {target_file}: {str(e)}") # Check how many of the combine_input_files exist in the cycle directory. # If all of them are missing proceed without creating an observation input @@ -170,21 +190,17 @@ def execute(self) -> None: # ----------------------------------------------------------------------- if not any([os.path.exists(f) for f in combine_input_files]): self.logger.info(f'None of the {observation} files exist for this cycle!') - # continue else: jedi_obs_file = observation_dict['obs space']['obsdatain']['engine']['obsfile'] self.logger.info(f'Processing observation file {jedi_obs_file}') - # If obs_list_dto has one member, then just rename the file # --------------------------------------------------------- if len(obs_list_dto) == 1: os.rename(combine_input_files[0], jedi_obs_file) else: self.read_and_combine(combine_input_files, jedi_obs_file) - # Change permission os.chmod(jedi_obs_file, 0o644) - # Observations were found for this provider, so we can break the provider loop break @@ -205,49 +221,64 @@ def execute(self) -> None: if self.cycle_time_dto() == self.start_cycle_point_dto(): self.logger.info(f'Process bias file {target_bccoef} for the first cycle') self.logger.info(f'Process bias file {target_bccovr} for the first cycle') - else: self.logger.info(f'Using bias files from the previous cycle') previous_bias_coef = self.previous_cycle_bias(target_bccoef, window_length) previous_bias_covr = self.previous_cycle_bias(target_bccovr, window_length) - # Link the previous bias file to the current cycle directory self.logger.info(f'Linking {previous_bias_coef} to {target_bccoef}') self.geos.linker(previous_bias_coef, target_bccoef, dst_dir=self.cycle_dir()) self.logger.info(f'Linking {previous_bias_covr} to {target_bccovr}') self.geos.linker(previous_bias_covr, target_bccovr, dst_dir=self.cycle_dir()) - fetch_required = False - # Determine the bias file type + # Determine the bias file extension and map to R2D2 file_type enum if observation == 'aircraft_temperature': - bias_file_type = 'acftbias' + bias_file_ext = 'acftbias' + bias_file_type = 'obsbias_coefficients' # Official JCSDA enum + bias_err_type = 'obsbias_coeff_errors' # Official JCSDA enum + # TODO: Do we want to use bias corrections for winds? + # TODO: Confirm for extension and err_type. Bias files exist for aircraft_wind elif observation == 'aircraft_wind': - bias_file_type = 'null' + bias_file_type = None # Option A: Skip + # Option B: Enable (if bias files should be used for aircraft_wind) + # bias_file_ext = 'acftbias' + # bias_coef_type = 'obsbias_coefficients' + # bias_err_type = 'obsbias_coeff_errors' else: - bias_file_type = 'satbias' + # Satellite observations + bias_file_ext = 'satbias' + bias_file_type = 'satbias' # Official JCSDA enum + bias_err_type = 'obsbias_coeff_errors' # Official JCSDA enum # This will skip the fetch if we are cycling VarBC - if bias_file_type != 'null': - if bias_file_type != 'null' and fetch_required: + if bias_file_type is not None: + if fetch_required: + # Fetch coefficients file (.acftbias or .satbias) self.logger.info(f'Processing bias file {target_bccoef}') - fetch(date=background_time, - target_file=target_bccoef, - provider='gsi', - obs_type=observation, - type='bc', - experiment=obs_experiment, - file_type=bias_file_type) - - self.logger.info(f'Processing bias file {target_bccovr}') - fetch(date=background_time, - target_file=target_bccovr, - provider='gsi', - obs_type=observation, - type='bc', - experiment=obs_experiment, - file_type=bias_file_type+'_cov') - + r2d2.fetch( + item='bias_correction', + target_file=target_bccoef, + model=r2d2_model, + experiment=obs_experiment, + provider='gsi', + observation_type=observation, + file_extension=bias_file_ext, + file_type=bias_file_type, + date=background_time_iso + ) + + r2d2.fetch( + item='bias_correction', + target_file=target_bccovr, + model=r2d2_model, + experiment=obs_experiment, + provider='gsi', + observation_type=observation, + file_extension=bias_file_ext + '_cov', + file_type=bias_err_type, # obsbias_coeff_errors Official JCSDA enum + date=background_time_iso + ) # Change permission os.chmod(target_bccoef, 0o644) os.chmod(target_bccovr, 0o644) @@ -263,13 +294,17 @@ def execute(self) -> None: self.logger.info(f'Processing satellite time lapse file {target_file}') - fetch(date=background_time, - target_file=target_file, - provider='gsi', - obs_type=observation, - type='bc', - experiment=obs_experiment, - file_type='tlapse') + r2d2.fetch( + item='bias_correction', + target_file=target_file, + model=r2d2_model, + experiment=obs_experiment, + provider='gsi', + observation_type=observation, + file_extension='tlapse', + file_type='obsbias_tlapse', # Official JCSDA enum + date=background_time_iso + ) # Change permission os.chmod(target_file, 0o644) @@ -338,8 +373,8 @@ def create_obs_time_list( window_end_dto: dt ) -> list: - day_before_dto = window_begin_dto-timedelta(days=1) - day_after_dto = window_end_dto+timedelta(days=1) + day_before_dto = window_begin_dto - timedelta(days=1) + day_after_dto = window_end_dto + timedelta(days=1) # Create a full list of all the observation times that starts from day_before_dto # and ends at day_after_dto using obs_times @@ -471,15 +506,18 @@ def read_and_combine(self, input_filenames: list, output_filename: str) -> None: # Fill value needs to be assigned while creating variables # -------------------------------------------------------- - subset_var = out_group.createVariable(var_name, - variable_data.dtype, - var_dims, - fill_value=group[var_name]. - getncattr('_FillValue')) + subset_var = out_group.createVariable( + var_name, + variable_data.dtype, + var_dims, + fill_value=group[var_name].getncattr('_FillValue') + ) for attr_name in group[var_name].ncattrs(): if attr_name == '_FillValue': continue - subset_var.setncattr(attr_name, group[var_name].getncattr(attr_name)) + subset_var.setncattr( + attr_name, group[var_name].getncattr(attr_name) + ) # Write subset data to the new file # -------------------------------- diff --git a/src/swell/tasks/save_obs_diags.py b/src/swell/tasks/save_obs_diags.py index 8868e3de7..d6be09c76 100644 --- a/src/swell/tasks/save_obs_diags.py +++ b/src/swell/tasks/save_obs_diags.py @@ -8,8 +8,8 @@ # -------------------------------------------------------------------------------------------------- import os +import r2d2 from swell.tasks.base.task_base import taskBase -from r2d2 import store from swell.utilities.r2d2 import create_r2d2_config from swell.utilities.run_jedi_executables import check_obs @@ -37,7 +37,7 @@ def execute(self) -> None: self.jedi_rendering.add_key('marine_models', self.config.marine_models(None)) # Get window beginning - window_begin = self.da_window_params.window_begin(window_offset) + window_begin = self.da_window_params.window_begin(window_offset) # dto background_time = self.da_window_params.background_time(window_offset, background_time_offset) @@ -57,29 +57,75 @@ def execute(self) -> None: # Load the observation dictionary observation_dict = self.jedi_rendering.render_interface_observations(observation) - # Check if observation was used, here we don't care about the output + # Check if observation was used - this checks INPUT file exists and has data + input_obs_file = observation_dict['obs space']['obsdatain']['engine']['obsfile'] + self.logger.info(f'Checking input observation file: {input_obs_file}') + use_obs = check_obs(self.jedi_rendering.observing_system_records_path, observation, observation_dict, self.cycle_time_dto()) + + self.logger.info(f'Checking observation {observation}: use_obs = {use_obs}') + if not use_obs: + self.logger.info(f'Input observation file analysis for {observation}:') + self.logger.info(f' Expected file: {input_obs_file}') + # Check if file exists and is readable + # --------------------------------------- + try: + import netCDF4 as nc + dataset = nc.Dataset(input_obs_file, 'r') + dims = {dim_name: dim.size for dim_name, dim in dataset.dimensions.items()} + self.logger.info(f' File exists but dimensions: {dims}') + dataset.close() + except Exception as e: + self.logger.info(f' File exists but error reading: {str(e)}') + + self.logger.info(f' Skipping {observation}') continue - # Store observation files - # ----------------------- + # Store diagnostic/feedback files produced by JEDI executables + # (e.g., variational, hofx, localensembleda). + # -------------------------------------------------------------- + name = observation_dict['obs space']['name'] obs_path_file = observation_dict['obs space']['obsdataout']['engine']['obsfile'] + self.logger.info(f'Looking for diagnostic output file: {obs_path_file}') + # Check for need to add 0000 to the file if not os.path.exists(obs_path_file): obs_path_file_name, obs_path_file_ext = os.path.splitext(obs_path_file) obs_path_file_0000 = obs_path_file_name + '_0000' + obs_path_file_ext + self.logger.info(f'Primary file not found, checking: {obs_path_file_0000}') + if not os.path.exists(obs_path_file_0000): - self.logger.abort(f'No observation file found for {obs_path_file} or ' + - f'{obs_path_file_0000}') + self.logger.info(f'Diagnostic output files not found for {observation}:') + self.logger.info(f' Expected: {obs_path_file}') + self.logger.info(f' Expected: {obs_path_file_0000}') + self.logger.info(f' RunJediVariationalExecutable did not run successfully') + self.logger.info(f' Skipping storage of {observation} diagnostic file') + continue obs_path_file = obs_path_file_0000 - store(date=window_begin, - provider='ncdiag', - source_file=obs_path_file, - obs_type=name, - type='ob', - experiment=self.experiment_id()) + self.logger.info(f'Found diagnostic output file: {obs_path_file}') + + # Store to R2D2 + # --------------- + + try: + r2d2.store( + item='feedback', + experiment=self.experiment_id(), + observation_type=name, + file_extension=obs_path_file.split('.')[-1], + window_length='PT6H', + window_start=window_begin, + source_file=obs_path_file, + member=-9999, + ) + self.logger.info(f'Successfully stored feedback file for {observation}') + + except Exception as e: + self.logger.info(f'Failed to store feedback file for {observation}: {str(e)}') + # Don't abort - continue with other observations + continue diff --git a/src/swell/utilities/data_assimilation_window_params.py b/src/swell/utilities/data_assimilation_window_params.py index 03ea6d540..04446cf22 100644 --- a/src/swell/utilities/data_assimilation_window_params.py +++ b/src/swell/utilities/data_assimilation_window_params.py @@ -97,6 +97,14 @@ def background_time(self, window_offset: str, background_time_offset: str) -> st # ---------------------------------------------------------------------------------------------- + def background_time_iso(self, window_offset: str, background_time_offset: str) -> str: + + background_time_offset_dur = isodate.parse_duration(background_time_offset) + background_time_dto = self.__current_cycle_dto__ - background_time_offset_dur + return background_time_dto.strftime(datetime_formats['iso_format']) + + # ---------------------------------------------------------------------------------------------- + def local_background_time_iso(self, window_offset: str, window_type: str) -> str: local_background_time = self.__get_local_background_time__(window_type, window_offset) diff --git a/src/swell/utilities/r2d2.py b/src/swell/utilities/r2d2.py index d29bce306..b39ad8b41 100644 --- a/src/swell/utilities/r2d2.py +++ b/src/swell/utilities/r2d2.py @@ -8,6 +8,7 @@ import os +import yaml from swell.swell_path import get_swell_path from swell.utilities.jinja2 import template_string_jinja2 @@ -23,6 +24,10 @@ def create_r2d2_config( r2d2_local_path: str ) -> None: + # Load R2D2 v3 credentials from ~/.swell/r2d2_credentials.yaml + # ----------------------------------------------------------- + load_r2d2_credentials(logger, platform) + # R2D2 config file that will be created r2d2_config_file = os.path.join(cycle_dir, 'r2d2_config.yaml') @@ -55,4 +60,105 @@ def create_r2d2_config( f.write(r2d2_config_file_template_str) +def _get_platform_r2d2_config(logger: Logger, platform: str = None) -> tuple: + if not platform: + logger.info("No platform specified, cannot determine R2D2 host/compiler") + return None, None + + # Platform-specific R2D2 configurations + platform_configs = { + 'nccs_discover_sles15': { + 'host': 'discover-gmao', + 'compiler': 'intel' + }, + 'nccs_discover_cascade': { + 'host': 'discover-gmao', + 'compiler': 'intel' + }, + 'aws': { + 'host': 'aws-gmao', + 'compiler': 'intel' # or 'gnu' depending on AWS setup + }, + 'generic': { + 'host': None, + 'compiler': None + } + } + + if platform in platform_configs: + config = platform_configs[platform] + logger.info(f"Using R2D2 configuration for platform \ + '{platform}': host={config['host']}, \ + compiler={config['compiler']}") + return config['host'], config['compiler'] + else: + logger.warning(f"Unknown platform '{platform}', cannot determine R2D2 host/compiler") + return None, None + + +def load_r2d2_credentials( + logger: Logger, + platform: str = None, + yaml_path: str = "~/.swell/r2d2_credentials.yaml" +) -> None: + """ + Load R2D2 v3 credentials from YAML file and set environment variables. + Host and compiler are automatically determined from platform configuration or YAML file. + + Args: + logger: SWELL logger instance + platform: Platform name (e.g., 'nccs_discover_sles15') + yaml_path: Path to R2D2 credentials YAML file + """ + yaml_path = os.path.expanduser(yaml_path) + + # Determine platform-specific host and compiler + r2d2_host, r2d2_compiler = _get_platform_r2d2_config(logger, platform) + + # Load credentials from YAML file if it exists + credentials = {} + if os.path.exists(yaml_path): + logger.info(f"Loading R2D2 v3 credentials from {yaml_path}") + try: + with open(yaml_path, 'r') as yaml_file: + credentials = yaml.safe_load(yaml_file) + except Exception as e: + logger.error(f"Error loading R2D2 credentials from {yaml_path}: {e}") + logger.info("Continuing with existing environment variables...") + credentials = {} + else: + logger.info(f"R2D2 credentials file not found at {yaml_path}") + logger.info("R2D2 v3 will use existing environment variables if set") + + # Set user credentials from YAML file + if 'user' in credentials and 'R2D2_USER' not in os.environ: + os.environ['R2D2_USER'] = credentials['user'] + + if 'api_key' in credentials and 'R2D2_API_KEY' not in os.environ: + os.environ['R2D2_API_KEY'] = credentials['api_key'] + + # Set host and compiler (YAML config takes precedence over platform detection) + if 'host' in credentials and 'R2D2_HOST' not in os.environ: + os.environ['R2D2_HOST'] = credentials['host'] + logger.info(f"Using platform host '{r2d2_host}' (overriding YAML '{credentials['host']}')") + logger.warning("Using host from YAML file") + + elif r2d2_host and 'R2D2_HOST' not in os.environ: + os.environ['R2D2_HOST'] = r2d2_host + logger.info(f"Set R2D2_HOST={r2d2_host} from platform configuration") + + # Set compiler + if 'compiler' in credentials and 'R2D2_COMPILER' not in os.environ: + os.environ['R2D2_COMPILER'] = credentials['compiler'] + logger.info(f"Using platform compiler '{r2d2_compiler}' \ + (overriding YAML '{credentials['compiler']}')") + logger.warning("Using compiler from YAML file") + + elif r2d2_compiler and 'R2D2_COMPILER' not in os.environ: + os.environ['R2D2_COMPILER'] = r2d2_compiler + logger.info(f"Set R2D2_COMPILER={r2d2_compiler} from platform configuration") + + logger.info("R2D2 v3 credentials loaded successfully") + + # ---------------------------------------------------------------------------------------------- diff --git a/src/swell/utilities/scripts/README_ingest_files.md b/src/swell/utilities/scripts/README_ingest_files.md new file mode 100644 index 000000000..7a955885f --- /dev/null +++ b/src/swell/utilities/scripts/README_ingest_files.md @@ -0,0 +1,768 @@ +# ingest_files.py - R2D2 v3 File Ingestion Standalone Script + +## Overview + +`ingest_files.py` is a command-line utility for batch ingesting data files to R2D2 v3. It automatically parses filenames, extracts metadata, and stores files with proper R2D2 indexing. + +**Features**: +- ✅ Batch processing (can ingest entire directories) +- ✅ Metadata extraction from filenames +- ✅ Dry-run mode (test before actual ingestion) +- ✅ Duplicate file detection (tracks ingested files) +- ✅ Ingests observations, bias corrections, and backgrounds +- ✅ Error logging + +--- + +## Quick Start +- [Installation](#installation) +- [Register Your Experiment](#register-your-experiment) +- [Quick Start](#quick-start) +- [Usage](#usage) +- [Examples](#examples) +- [More Usage](#more-usage) + +--- + +## Installation + +### Prerequisites + +**Quick Setup** (Recommended - Use This!): + +Use the provided setup scripts: + +```bash +# Load R2D2 client module +source load_r2d2.sh + +# For production R2D2: +# Edit prod_setup_env.sh with your credentials, then: +source prod_setup_env.sh +``` + +**Manual Setup** (Alternative): + +1. **Load R2D2 Client Module**: + ```bash + mod_swell + module use -a /discover/nobackup/projects/gmao/advda/JediOpt/modulefiles/core + module load r2d2-client/sles15_0604 + ``` + +2. **R2D2 Credentials**: Ensure `~/.swell/r2d2_credentials.yaml` exists with: + ```yaml + compute_host: "discover-gmao-intel" + storage_host: "discover" + user: "your_username" + api_version: "v3" + url: "https://r2d2-api.jcsda.org" + ``` + + Or set environment variables: + ```bash + export R2D2_USER=your_username + export R2D2_API_KEY=your_api_key + export R2D2_HOST=discover-gmao + export R2D2_COMPILER=intel + ``` + +3. **Python 3.7+** with `r2d2` package + +### Script Location + +All scripts are in the same directory: + +```bash +/swell/src/swell/utilities/scripts/ +├── ingest_files.py # This script +├── load_r2d2.sh # Load R2D2 module +├── prod_setup_env.sh # Production environment setup +└── README_INGEST_FILES.md # This documentation +``` + +**Usage**: +```bash +# From the scripts directory: +cd /path/to/swell/src/swell/utilities/scripts/ + +# Setup environment +source load_r2d2.sh + +# Run the script +python ingest_files.py /path/to/files/ bias_correction --ingest +``` + +--- + +## Register Your Experiment + +**Before ingesting files**, you must register your experiment in R2D2 v3. + +### Quick Registration Script + +```python +import r2d2 +import os + +# Set your details +experiment_name = 'my-experiment' # Change this +user = os.environ.get('R2D2_USER', 'your_username') +host = os.environ.get('R2D2_HOST', 'discover-gmao') +compiler = os.environ.get('R2D2_COMPILER', 'intel') + +# Register experiment +r2d2.R2D2Client.register_experiment( + name=experiment_name, + compute_host=f'{host}-{compiler}', + user=user, + lifetime='science' # Options: debug, science, publication, release +) + +print(f"Registered experiment: {experiment_name}") +``` + +### Lifetime Options + +| Lifetime | Duration | Use Case | +|----------|----------|----------| +| `debug` | Days/weeks | Testing, development | +| `science` | Months | Research experiments | +| `publication` | Years | Published results | +| `release` | Permanent | Operational/reference data | + +### Check if Experiment Exists + +```python +import r2d2 +import os + +user = os.environ.get('R2D2_USER') +results = r2d2.R2D2Client.search_experiment(user=user) + +for exp in results: + print(f"{exp['name']}: {exp['lifetime']}") +``` + +### Update Experiment Lifetime + +```python +import r2d2 + +r2d2.R2D2Client.update_experiment( + name='my-experiment', + key='lifetime', + value='publication' # Change to new lifetime +) +``` + +--- + +## Quick Start + +### Step 0: Setup Environment + +```bash +# First, set up R2D2 environment +source load_r2d2.sh + +# Or for production with credentials: +# Edit prod_setup_env.sh first, then: +source prod_setup_env.sh +``` + +### 1. Dry Run (Test Mode) +```bash +# Test without actually ingesting +python ingest_files.py /path/to/files/ bias_correction +``` + +### 2. Actual Ingestion +```bash +# Actually ingest files to R2D2 +python ingest_files.py /path/to/files/ bias_correction --ingest +``` + +### 3. Single File +```bash +# Ingest one specific file +python ingest_files.py /path/to/file.acftbias bias_correction --ingest +``` + +--- + +## Usage + +### Basic Syntax + +```bash +python ingest_files.py [--ingest] +``` + +### Arguments + +| Argument | Required | Description | Values | +|----------|----------|-------------|--------| +| `path` | Yes | File or directory path | File path or directory | +| `item_type` | Yes | Type of data | `observation`, `bias_correction`, `background`, `forecast` | +| `--ingest` | No | Actually ingest (omit for dry run) | Flag | + +### Item Types + +| Item Type | R2D2 Item | Description | +|-----------|-----------|-------------| +| `observation` | `observation` | IODA observation files | +| `bias_correction` | `bias_correction` | Bias correction coefficients/errors | +| `background` | `forecast` | Model backgrounds | +| `forecast` | `forecast` | Model forecasts | + +--- + +## Supported File Types + +### Observations (`.nc4`, `.nc`) + +**Extensions**: `.nc4`, `.nc` + +**R2D2 Parameters**: +- `item`: `'observation'` +- `provider`: Auto-detected from path or filename +- `observation_type`: Extracted from filename +- `file_extension`: `'nc4'` or `'nc'` +- `window_start`: Extracted from filename timestamp +- `window_length`: `'PT6H'` (default) + +**Example Filename**: +``` +gdas.aircraft_temperature.2023-10-09T15:00:00Z.nc4 +``` + +--- + +### Bias Corrections (`.acftbias`, `.satbias`, `.tlapse`, `*_cov`) + +**Extensions**: `.acftbias`, `.acftbias_cov`, `.satbias`, `.satbias_cov`, `.tlapse` + +**R2D2 Parameters**: +- `item`: `'bias_correction'` +- `model`: `'geos'` (default) +- `experiment`: Extracted from filename +- `provider`: Extracted from filename +- `observation_type`: Extracted from filename +- `file_extension`: Actual file extension +- `file_type`: Mapped from extension to R2D2 enum +- `date`: Extracted from filename timestamp + +**Extension → file_type Mapping**: +```python +'acftbias' → 'obsbias_coefficients' +'acftbias_cov' → 'obsbias_coeff_errors' +'satbias' → 'satbias' +'satbias_cov' → 'obsbias_coeff_errors' +'tlapse' → 'obsbias_tlapse' +``` + +**Example Filenames**: +``` +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias_cov +gsi.x0050.bc.amsua_n19.2023-10-09T15:00:00Z.satbias +gsi.x0050.bc.amsua_n19.2023-10-09T15:00:00Z.satbias_cov +gsi.x0050.bc.amsua_n19.2023-10-09T15:00:00Z.tlapse +``` + +--- + +### Backgrounds/Forecasts (`.nc4`, `.nc`, `.res`) + +**Extensions**: `.nc4`, `.nc`, `.res` + +**R2D2 Parameters**: +- `item`: `'forecast'` +- `model`: Auto-detected (default: `'geos'`) +- `experiment`: Hardcoded or extracted +- `resolution`: Hardcoded or extracted +- `file_type`: `'bkg'` or `'fc'` +- `file_extension`: Actual extension +- `date`: Extracted from filename +- `step`: Forecast length + +**Example Filenames**: +``` +geos.C180.x0050.bkg.20231009_12z.nc4 +mom6.72x36.s2s.MOM.res.20231009.nc +``` + +--- + +## Filename Requirements + +### General Format + +All filenames must be **dot-separated** with at least **4 parts**: + +``` +part1.part2.part3.part4.extension +``` + +### Bias Correction Format (Required) + +``` +{provider}.{experiment}.bc.{observation_type}.{timestamp}.{extension} + +Example: +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias +│ │ │ │ │ │ +│ │ │ │ │ └─ Extension +│ │ │ │ └─ ISO 8601 timestamp +│ │ │ └─ Observation type +│ │ └─ "bc" indicator (bias correction) +│ └─ Experiment ID +└─ Provider +``` + +**Requirements**: +- Minimum **6 parts** (provider, experiment, bc, obs_type, timestamp, extension) +- Timestamp must be **ISO 8601** format: `YYYY-MM-DDTHH:MM:SSZ` +- Extension must match one of: `acftbias`, `acftbias_cov`, `satbias`, `satbias_cov`, `tlapse` + +### Observation Format + +``` +{provider}.{observation_type}.{timestamp}.{extension} + +Example: +gdas.aircraft_temperature.2023-10-09T15:00:00Z.nc4 +``` + +### Background/Forecast Format + +``` +{model}.{resolution}.{experiment}.{file_type}.{date}.{extension} + +Example: +geos.C180.x0050.bkg.20231009_12z.nc4 +``` + +--- + +## Examples + +### Example 1: Ingest All Bias Corrections for an Experiment + +**Scenario**: You have a directory with all bias correction files for experiment `x0050` + +**Directory Structure**: +``` +/discover/.../gsi/bc/x0050/2023-10-09/ +├── gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias +├── gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias_cov +├── gsi.x0050.bc.amsua_n19.2023-10-09T15:00:00Z.satbias +├── gsi.x0050.bc.amsua_n19.2023-10-09T15:00:00Z.satbias_cov +└── gsi.x0050.bc.amsua_n19.2023-10-09T15:00:00Z.tlapse +``` + +**Step 1: Dry Run** +```bash +python ingest_files.py \ + /discover/nobackup/projects/gmao/advda/R2D2DataStore/Shared/gsi/bc/x0050/2023-10-09/ \ + bias_correction +``` + +**Expected Output**: +``` +DRY RUN bias_correction files from: /discover/.../2023-10-09/ +Found 5 files + +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias + BIAS CORRECTION: + provider=gsi, experiment=x0050 + model=geos, obs_type=aircraft_temperature + file_extension=acftbias, file_type=obsbias_coefficients + date=2023-10-09T15:00:00Z + DRY RUN + +[... similar for other 4 files ...] + +Successfully processed 5/5 files + +This was a DRY RUN. Use --ingest to actually ingest files +``` + +**Step 2: Actual Ingestion** +```bash +python ingest_files.py \ + /discover/nobackup/projects/gmao/advda/R2D2DataStore/Shared/gsi/bc/x0050/2023-10-09/ \ + bias_correction \ + --ingest +``` + +**Expected Output**: +``` +INGESTING bias_correction files from: /discover/.../2023-10-09/ +Found 5 files + +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias + BIAS CORRECTION: + provider=gsi, experiment=x0050 + model=geos, obs_type=aircraft_temperature + file_extension=acftbias, file_type=obsbias_coefficients + date=2023-10-09T15:00:00Z + SUCCESS + +[... similar for other 4 files ...] + +Successfully processed 5/5 files +``` + +--- + +### Example 2: Ingest Single Bias Correction File + +```bash +python ingest_files.py \ + /path/to/gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias \ + bias_correction \ + --ingest +``` + +--- + +### Example 3: Ingest Observations for a Month + +**Directory Structure**: +``` +/obs/aircraft/2023-10/ +├── gdas.aircraft_temperature.2023-10-01T00:00:00Z.nc4 +├── gdas.aircraft_temperature.2023-10-01T06:00:00Z.nc4 +├── ... +└── gdas.aircraft_temperature.2023-10-31T18:00:00Z.nc4 +``` + +**Command**: +```bash +python ingest_files.py \ + /obs/aircraft/2023-10/ \ + observation \ + --ingest +``` + +--- + +### Example 4: Batch Ingest Multiple Dates + +```bash +# Script to ingest bias corrections for multiple dates + +DATES=("2023-10-09" "2023-10-10" "2023-10-11") +BASE_PATH="/discover/nobackup/projects/gmao/advda/R2D2DataStore/Shared/gsi/bc/x0050" + +for date in "${DATES[@]}"; do + echo "Processing $date..." + python ingest_files.py \ + "${BASE_PATH}/${date}/" \ + bias_correction \ + --ingest +done +``` + +--- + +## Output Interpretation + +### Success Output + +``` +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias + BIAS CORRECTION: + provider=gsi, experiment=x0050 + model=geos, obs_type=aircraft_temperature + file_extension=acftbias, file_type=obsbias_coefficients + date=2023-10-09T15:00:00Z + SUCCESS +``` + +**Meaning**: File successfully ingested to R2D2 + +--- + +### Error Output + +``` +gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias + BIAS CORRECTION: + provider=gsi, experiment=x0050 + model=geos, obs_type=aircraft_temperature + file_extension=acftbias, file_type=obsbias_coefficients + date=2023-10-09T15:00:00Z + ERROR: R2D2Client.store_bias_correction() got an unexpected keyword argument 'window_start' +``` + +**Meaning**: Ingestion failed - check error message for details + +--- + +### Skipped Output + +``` +**** gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias - already ingested +``` + +**Meaning**: File was previously ingested (tracked in `ingested_files.txt`) + +--- + +### Summary Output + +``` +Successfully processed 143/144 files +Skipped 1 files (already ingested or invalid format) +``` + +**Meaning**: +- 143 files ingested successfully +- 1 file skipped (already ingested or invalid) + +--- + +### Failed Files Output + +``` +Failed to ingest 1 file(s): + gsi.x0050.bc.bad_filename.acftbias: Ingestion failed +``` + +**Meaning**: Lists all files that failed with reasons + +--- + +## Troubleshooting + +### Issue 1: "Failed to import r2d2" + +**Error**: +``` +ImportError: Failed to import r2d2 +Load module: module load r2d2-client/sles15_0604 +``` + +**Solution**: +```bash +module load r2d2-client/sles15_0604 +``` + +--- + +### Issue 2: "File doesn't have a valid extension" + +**Error**: +``` +File /path/to/file.txt doesn't have a valid extension for bias_correction +Valid extensions: .acftbias, .satbias, .tlapse, .acftbias_cov, .satbias_cov +``` + +**Solution**: Ensure file has correct extension for the item type + +--- + +### Issue 3: "Not enough parts" + +**Error**: +``` +Skip filename.nc4 - not enough parts +``` + +**Solution**: Filename must have at least 4 dot-separated parts + +**Bad**: `file.nc4` (2 parts) +**Good**: `provider.obs_type.timestamp.nc4` (4 parts) + +--- + +### Issue 4: "400 Client Error: BAD REQUEST" + +**Error**: +``` +ERROR: 400 Client Error: BAD REQUEST for url: https://r2d2-api.jcsda.org:443/... +Content info: "Record does not exist in MySQL Database..." +``` + +**Possible Causes**: +1. **Invalid `file_type`**: Using extension instead of R2D2 enum + - Bad: `file_type='acftbias'` + - Good: `file_type='obsbias_coefficients'` + +2. **Experiment not registered**: Register experiment first + ```python + r2d2.register( + item='experiment', + name='x0050', + compute_host='discover-gmao-intel', + user='your_username', + lifetime='science' + ) + ``` + +3. **Model not registered**: Contact R2D2 admins + +**Solution**: Check the script's `file_ext_to_type` mapping is correct + +--- + +### Issue 5: "Permission denied" + +**Error**: +``` +ERROR: Permission denied for data_store 'r2d2-experiments-nccs-gmao' +``` + +**Solution**: Check your R2D2 credentials and permissions with R2D2 admin + +--- + +### Issue 6: Can't Find Failed File (143/144 success) + +**Solution**: Look for files without "SUCCESS" in output: + +```bash +# If you saved output to file +grep -B 10 "ERROR" output.log + +# Or look for files without SUCCESS +grep "BIAS CORRECTION:" output.log -A 1 | grep -v "SUCCESS" +``` + +The failed file will be the one with an ERROR message or missing SUCCESS. + +--- + +## More Usage + +### Tracking Ingested Files + +The script maintains an `ingested_files.txt` file in the current directory: + +```bash +# View ingested files +cat ingested_files.txt + +# Clear ingestion history (re-ingest everything) +rm ingested_files.txt +``` + +--- + +### Custom Provider Detection + +Edit the `guess_provider_from_path()` function: + +```python +def guess_provider_from_path(file_path): + path_lower = file_path.lower() + if 'ncdiag' in path_lower: + return 'ncdiag' + elif 'my_custom_provider' in path_lower: + return 'my_provider' + # Add more... + else: + return 'unknown' +``` + +--- + +### Custom Model Detection + +Edit the `register_bias_correction()` function: + +```python +# Around line 190-200 +path_lower = file_path.lower() +if 'gfs' in path_lower: + model = 'gfs' +elif 'fv3' in path_lower: + model = 'fv3' +else: + model = 'geos' # default +``` + +--- + +## Best Practices + +### 1. Always Dry Run First +```bash +# Test first +python ingest_files.py /path/to/files/ bias_correction + +# Then ingest +python ingest_files.py /path/to/files/ bias_correction --ingest +``` + +### 2. Process by Date/Experiment +```bash +# Good: Process one date at a time +python ingest_files.py /path/to/bc/x0050/2023-10-09/ bias_correction --ingest + +# Avoid: Processing entire experiment at once (harder to debug) +python ingest_files.py /path/to/bc/x0050/ bias_correction --ingest +``` + +### 3. Verify in R2D2 +```python +# After ingestion, verify with R2D2 search +import r2d2 + +results = r2d2.search( + item='bias_correction', + model='geos', + experiment='x0050', + observation_type='aircraft_temperature', + date='2023-10-09T15:00:00Z' +) + +print(f"Found {len(results)} files") +``` + +--- + +## File Organization + +### Directory Structure + +``` +R2D2DataStore/ +├── Shared/ +│ ├── gsi/ +│ │ └── bc/ +│ │ ├── x0050/ +│ │ │ ├── 2023-10-09/ +│ │ │ │ ├── *.acftbias +│ │ │ │ ├── *.acftbias_cov +│ │ │ │ ├── *.satbias +│ │ │ │ ├── *.satbias_cov +│ │ │ │ └── *.tlapse +│ │ │ └── 2023-10-10/ +│ │ └── x0051/ +│ └── obs/ +│ ├── aircraft/ +│ │ └── 2023-10/ +│ │ └── *.nc4 +│ └── satellite/ +└── Experiments/ + └── [auto-managed by R2D2] +``` + +--- + +## Some Limitations + +1. **Hardcoded values**: Some parameters (model, resolution) are hardcoded +2. **Limited validation**: Minimal validation of file contents +3. **Filename dependency**: Relies heavily on filename format + +--- + +## Future Enhancements + +Planned improvements: +- [ ] Create a Swell suite for file ingestion. diff --git a/src/swell/utilities/scripts/ingest_files.py b/src/swell/utilities/scripts/ingest_files.py new file mode 100755 index 000000000..8c70023cb --- /dev/null +++ b/src/swell/utilities/scripts/ingest_files.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 +""" +Simple generic script to ingest observation files to r2d2 v3 +Works for ncdiag, odas, gdas_marine, etc. +""" + +import os +import glob + +# Color codes for printing +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +RESET = "\033[0m" + +try: + import r2d2 + +except ImportError as e: + raise ImportError( + f"Failed to import r2d2: {e}\nLoad module: module load r2d2-client/sles15_0604" + ) + +ingestED_FILE = "ingested_files.txt" + + +def load_ingested(): + if os.path.exists(ingestED_FILE): + with open(ingestED_FILE, 'r') as f: + return set(line.strip() for line in f) + return set() + + +def save_ingested(filename): + with open(ingestED_FILE, 'a') as f: + f.write(filename + '\n') + + +def guess_provider_from_path(file_path): + """Find provider from file path""" + path_lower = file_path.lower() + if 'ncdiag' in path_lower: + return 'ncdiag' + elif 'odas' in path_lower: + return 'odas' + elif 'gdas' in path_lower: + return 'gdas_marine' + else: + return 'unknown' + + +def ingest_observation(filename, file_path, parts, dry_run=True): + """ingest observation files using observation-specific parameters""" + + file_ext = parts[-1] + provider = guess_provider_from_path(file_path) + + if len(parts) >= 6: + obs_type = parts[-3] # third from end (before timestamp and extension) + timestamp = parts[-2] # second from end (before extension) + window_length = 'PT6H' # default for all + else: + print(f"{file_path} can not be ingested - not enough parts") + return False + + print(f"\n{BLUE}{filename}{RESET}") + print(f" {YELLOW}OBSERVATION:{RESET} provider={provider}, \ + obs_type={obs_type}, time={timestamp}") + + if dry_run: + print(f" {YELLOW}DRY RUN{RESET}") + print(f"timestamp is {timestamp}") + return True + + try: + r2d2.store( + item='observation', + provider=provider, + observation_type=obs_type, + file_extension=file_ext, + window_start=timestamp, + window_length=window_length, + source_file=file_path + ) + # no need to specify data_store='r2d2-experiments-nccs-gmao' + # this will be set by credentials + + print(f" {GREEN}SUCCESS{RESET}") + save_ingested(filename) + return True + except Exception as e: + print(f" {RED}ERROR:{RESET} {e}") + return False + + +def ingest_background(filename, file_path, parts, dry_run=True): + """ingest background/forecast files using forecast specific parameters""" + + # Guess model from filename/path + name_lower = filename.lower() + if 'mom6' in name_lower or 'ocean' in name_lower: + model = 'mom6_cice6_UFS' + elif 'cice' in name_lower or 'ice' in name_lower: + model = 'mom6_cice6_UFS' + else: + model = 'geos' # default + + # Extract timestamp - try different patterns + timestamp = None + for part in parts: + # Look for YYYYMMDD pattern + if len(part) == 8 and part.isdigit(): + year, month, day = part[:4], part[4:6], part[6:8] + timestamp = f"{year}-{month}-{day}T12:00:00Z" + break + # Look for YYYYMMDDHH pattern + elif len(part) == 10 and part.isdigit(): + year, month, day, hour = part[:4], part[4:6], part[6:8], part[8:10] + timestamp = f"{year}-{month}-{day}T{hour}:00:00Z" + break + + if not timestamp: + timestamp = "2023-10-09T12:00:00Z" # fallback + + print(f"\n{BLUE}{filename}{RESET}") + print(f" {YELLOW}BACKGROUND:{RESET} model={model}, time={timestamp}") + + if dry_run: + print(f" {YELLOW}DRY RUN{RESET}") + return True + + try: + r2d2.store( + item='forecast', + model='mom6', # model, + experiment='s2s', # Use this for testing + file_extension='res', # file_ext, + resolution='72x36', # C180 + step='P1DT12H', + date=timestamp, + file_type='MOM.res', + source_file=file_path + ) + print(f" {GREEN}SUCCESS{RESET}") + save_ingested(filename) + return True + except Exception as e: + print(f" {RED}ERROR:{RESET} {e}") + return False + + +def ingest_bias_correction(filename, file_path, parts, dry_run=True): + """ingest bias correction files""" + + # Parse filename: gsi.x0050.bc.aircraft_temperature.2023-10-09T15:00:00Z.acftbias + # Parts would be: ['gsi', 'x0050', 'bc', 'aircraft_temperature', + # '2023-10-09T15:00:00Z', 'acftbias'] + + if len(parts) < 6: + print(f"{file_path} can not be ingested - not enough parts") + return False + + provider = parts[0] # 'gsi' + experiment = parts[1] # 'x0050' + obs_type = parts[3] # 'aircraft_temperature' + timestamp = parts[4] # '2023-10-09T15:00:00Z' + file_ext = parts[-1] # 'acftbias' + + # Determine file_type from file_extension + # Map file_extension -> file_type (R2D2 enum) + # Using file extension as file_type since R2D2 instance accepts these + # Following official JCSDA enums: + # satbias, tlapse, obsbias_tlapse, + # obsbias_coeff_errors, obsbias_coefficients + + file_ext_to_type = { + # Aircraft bias corrections + 'acftbias': 'obsbias_coefficients', # Aircraft bias coefficients + 'acftbias_cov': 'obsbias_coeff_errors', # Aircraft bias coefficient errors + + # Satellite bias corrections + 'satbias': 'satbias', + 'satbias_cov': 'obsbias_coeff_errors', # Coefficient errors (same as aircraft) + + # Timelapse + 'tlapse': 'obsbias_tlapse', + } + + file_type = file_ext_to_type.get(file_ext, file_ext) + + # TODO: Add model determination + # Determine model from observation type or path + # Aircraft and most conventional obs - geos + # Satellite radiances -> could be geos or gfs, default to geos + # path_lower = file_path.lower() + # if 'gfs' in path_lower: + # model = 'gfs' + # elif 'fv3' in path_lower: + # model = 'fv3' + # else: + model = 'geos' # default + + print(f"\n{BLUE}{filename}{RESET}") + print(f" {YELLOW}BIAS CORRECTION:{RESET}") + print(f" provider={provider}, experiment={experiment}") + print(f" model={model}, obs_type={obs_type}") + print(f" file_extension={file_ext}, file_type={file_type}") + print(f" date={timestamp}") + + if dry_run: + print(f" {YELLOW}DRY RUN{RESET}") + return True + + try: + r2d2.store( + item='bias_correction', + source_file=file_path, + model=model, + experiment=experiment, # CRITICAL: experiment-specific + provider=provider, # From filename + observation_type=obs_type, + file_extension=file_ext, + date=timestamp, + file_type=file_type, # Map extension to R2D2 enum + ) + + # window_length='PT6H' is not required for bias correction + + print(f" {GREEN}SUCCESS{RESET}") + save_ingested(filename) + return True + except Exception as e: + print(f" {RED}ERROR:{RESET} {e}") + return False + + +def ingest_files(file_path, item_type, dry_run=True): + """ingest files found recursively from file_path""" + + # Load already ingested files + ingested = load_ingested() + + # Define valid extensions based on item type + valid_extensions = { + 'observation': ['.nc4', '.nc'], + 'bias_correction': ['.acftbias', '.satbias', '.tlapse', '.acftbias_cov', '.satbias_cov'], + 'background': ['.nc4', '.nc', '.res'], + 'forecast': ['.nc4', '.nc', '.res'] + } + + extensions = valid_extensions.get(item_type, ['.nc4', '.nc']) + + # If it's a file, just process that file + if os.path.isfile(file_path) or os.path.islink(file_path): + # Check if file has valid extension for this item type + if any(file_path.endswith(ext) for ext in extensions): + # If it has a valid extension, process it + files = [file_path] + else: + print(f"{RED}File {file_path} doesn't have a valid extension for {item_type}{RESET}") + print(f"{YELLOW}Valid extensions: {', '.join(extensions)}{RESET}") + return + else: + # If it's a directory, find all matching files recursively + if os.path.isdir(file_path): + files = [] + + for ext in extensions: + files.extend(glob.glob(os.path.join(file_path, "**/*" + ext), recursive=True)) + else: + print(f"{RED}Path not found: {file_path} {RESET}") + return + + print(f"{YELLOW}Found {len(files)} files{RESET}") + + success_count = 0 + failed_files = [] + skipped_files = [] + + for link_path in files: + # Use the link name for parsing metadata + filename = os.path.basename(link_path) + + # Resolve to the actual file for source_file parameter + if os.path.islink(link_path): + actual_file_path = os.path.realpath(link_path) + print(f"{BLUE}Link: {filename}{RESET}") + print(f" -> {actual_file_path}") + else: + actual_file_path = link_path + + # Check if already ingested + if filename in ingested: + print(f"{YELLOW}**** {filename} - already ingested{RESET}") + skipped_files.append(filename) + continue + + # Split filename by "." - parse the LINK name, not the original + parts = filename.split(".") + + if len(parts) < 4: + print(f"{YELLOW}Skip {filename} - not enough parts{RESET}") + skipped_files.append(filename) + continue + + # Call appropriate ingestion function based on item type + # Pass both the link filename (for parsing) and actual file path (for r2d2.store) + if item_type == "observation": + success = ingest_observation(filename, actual_file_path, parts, dry_run) + elif item_type in ["background", "forecast"]: + success = ingest_background(filename, actual_file_path, parts, dry_run) + elif item_type in ["bias_coefficient", "bias_correction"]: + success = ingest_bias_correction(filename, actual_file_path, parts, dry_run) + else: + print(f"{RED}Unknown item type: {item_type}{RESET}") + failed_files.append((filename, "Unknown item type")) + continue + + if success: + success_count += 1 + else: + failed_files.append((filename, "Ingestion failed")) + + # Print summary + print(f"\n{GREEN}Successfully processed {success_count}/{len(files)} files{RESET}") + + if skipped_files: + print(f"{YELLOW}Skipped {len(skipped_files)} " + f"files (already ingested or invalid format){RESET}") + + if failed_files: + print(f"\n{RED}Failed to ingest {len(failed_files)} file(s):{RESET}") + for filename, reason in failed_files: + print(f" {RED}{RESET} {filename}: {reason}") + + +def main(): + import argparse + + parser = argparse.ArgumentParser(description="ingest files to r2d2 v3") + parser.add_argument('path', help="File or directory path to ingest") + parser.add_argument('item_type', help="Item type: observation, background, bias_correction") + parser.add_argument( + '--ingest', + action='store_true', + help="Actually ingest (default is dry run)") + + args = parser.parse_args() + + dry_run = not args.ingest + + print(f"{YELLOW}{'DRY RUN' if dry_run else 'INGESTING'} {args.item_type} files from: " + f"{args.path}{RESET}") + ingest_files(args.path, args.item_type, dry_run=dry_run) + + if dry_run: + print(f"\n{YELLOW}This was a DRY RUN. Use --ingest to actually ingest files{RESET}") + + +if __name__ == "__main__": + main() diff --git a/src/swell/utilities/scripts/load_r2d2.sh b/src/swell/utilities/scripts/load_r2d2.sh new file mode 100644 index 000000000..ee567ab96 --- /dev/null +++ b/src/swell/utilities/scripts/load_r2d2.sh @@ -0,0 +1,6 @@ +# load spack-stack modules +mod_swell +# load r2d2-client installation +module use -a /discover/nobackup/projects/gmao/advda/JediOpt/modulefiles/core +module load r2d2-client/112025 + diff --git a/src/swell/utilities/scripts/prod_setup_env.sh b/src/swell/utilities/scripts/prod_setup_env.sh new file mode 100644 index 000000000..48531d2b8 --- /dev/null +++ b/src/swell/utilities/scripts/prod_setup_env.sh @@ -0,0 +1,17 @@ +unset R2D2_SERVER_HOST +unset R2D2_SERVER_PORT + +export R2D2_USER=username +export R2D2_API_KEY=api_key +export R2D2_HOST=discover-gmao +export R2D2_COMPILER=intel + +source venv_client/bin/activate + +echo “ R2D2 Production environment:” +echo “ R2D2_API_KEY: [set]” +echo “ R2D2_SERVER_HOST: $R2D2_SERVER_HOST '(should be empty)'” +echo “ R2D2_SERVER_PORT: $R2D2_SERVER_PORT '(should be empty)'” +echo “ - Client should default to https://r2d2-api.jcsda.org” +echo “ R2D2_HOST: $R2D2_HOST” +echo “ R2D2_COMPILER: $R2D2_COMPILER” \ No newline at end of file