From 94143aaa8ae9ae4559b3d6ca83f231a612a2ef21 Mon Sep 17 00:00:00 2001 From: "Walter.Kolczynski" Date: Mon, 6 Jan 2025 15:37:17 -0600 Subject: [PATCH] Reinstate product groups Testing with full-sized GEFS found that the sheer number of tasks overloads rocoto, resulting in `rocotorun` taking over 10 min to complete or hanging entirely. To reduce the number of tasks, product groups are reimplemented so that multiple forecast hour are processed in a single task. However, the implementation is a little different than previously. The jobs where groups are enabled (atmos_products, oceanice_products, and wavepostsbs) have a new variable, `MAX_TASKS`, that controls how many groups to use. This setting is currently *per member*. The forecast hours to be processed are then divided into this many groups as evenly as possible without crossing forecast segment boundaries. The walltime for those jobs is then multiplied by the number of times in the largest group. A number of helper methods are added to Tasks to determine these groups and make a standard metatask variable dict in a centralized location. There is also a function to multiply the walltime, but this may be better off relocated to wxflow with the other time functions. As part of switching from a single value to a list, hours are no longer passed by rocoto as zero-padded values. The lists are comma-delimited (without spaces) and split apart in the job stub (`jobs/rocoto/*`), so each j-job call is still a single forecast hour. The offline post (upp) job is not broken into groups, since it really isn't used outside the analysis anymore. Gempak jobs that run over multiple forecast hours also aren't broken into groups yet. Resolves #2999 --- jobs/rocoto/atmos_products.sh | 20 ++- jobs/rocoto/oceanice_products.sh | 18 +- jobs/rocoto/wavepostsbs.sh | 19 ++- parm/config/gefs/config.atmos_products | 4 +- parm/config/gefs/config.oceanice_products | 4 +- parm/config/gefs/config.resources | 5 +- parm/config/gefs/config.wavepostsbs | 3 + parm/config/gfs/config.atmos_products | 4 +- parm/config/gfs/config.oceanice_products | 3 + parm/config/gfs/config.resources | 7 +- parm/config/gfs/config.wavepostsbs | 3 + workflow/rocoto/gefs_tasks.py | 72 ++++---- workflow/rocoto/gfs_tasks.py | 56 +++--- workflow/rocoto/tasks.py | 199 +++++++++++++++++++++- 14 files changed, 331 insertions(+), 86 deletions(-) diff --git a/jobs/rocoto/atmos_products.sh b/jobs/rocoto/atmos_products.sh index f6adbcf861..5d54597519 100755 --- a/jobs/rocoto/atmos_products.sh +++ b/jobs/rocoto/atmos_products.sh @@ -15,13 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi export job="atmos_products" export jobid="${job}.$$" -# Negatation needs to be before the base -fhr3_base="10#${FHR3}" -export FORECAST_HOUR=$(( ${fhr3_base/10#-/-10#} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" +export FORECAST_HOUR +for FORECAST_HOUR in "${fhr_list[@]}"; do + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_ATMOS_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/oceanice_products.sh b/jobs/rocoto/oceanice_products.sh index 2a3b617d05..d72afcb99f 100755 --- a/jobs/rocoto/oceanice_products.sh +++ b/jobs/rocoto/oceanice_products.sh @@ -15,11 +15,17 @@ if (( status != 0 )); then exit "${status}"; fi export job="oceanice_products" export jobid="${job}.$$" -export FORECAST_HOUR=$(( 10#${FHR3} )) +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" -############################################################### -# Execute the JJOB -############################################################### -"${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" +export FORECAST_HOUR +for FORECAST_HOUR in "${fhr_list[@]}"; do + ############################################################### + # Execute the JJOB + ############################################################### + "${HOMEgfs}/jobs/JGLOBAL_OCEANICE_PRODUCTS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done -exit $? +exit 0 diff --git a/jobs/rocoto/wavepostsbs.sh b/jobs/rocoto/wavepostsbs.sh index f4789210d8..b2eec789ae 100755 --- a/jobs/rocoto/wavepostsbs.sh +++ b/jobs/rocoto/wavepostsbs.sh @@ -5,17 +5,24 @@ source "${HOMEgfs}/ush/preamble.sh" ############################################################### # Source FV3GFS workflow modules #. ${HOMEgfs}/ush/load_fv3gfs_modules.sh -. ${HOMEgfs}/ush/load_ufswm_modules.sh +source "${HOMEgfs}/ush/load_ufswm_modules.sh" status=$? -[[ ${status} -ne 0 ]] && exit ${status} +[[ ${status} -ne 0 ]] && exit "${status}" export job="wavepostsbs" export jobid="${job}.$$" ############################################################### -# Execute the JJOB -${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS -status=$? -[[ ${status} -ne 0 ]] && exit ${status} +# shellcheck disable=SC2153 +IFS=', ' read -r -a fhr_list <<< "${FHR_LIST}" + +export FHR3 +for FORECAST_HOUR in "${fhr_list[@]}"; do + FHR3=$(printf '%03d' "${FORECAST_HOUR}") + # Execute the JJOB + "${HOMEgfs}/jobs/JGLOBAL_WAVE_POST_SBS" + status=$? + [[ ${status} -ne 0 ]] && exit "${status}" +done exit 0 diff --git a/parm/config/gefs/config.atmos_products b/parm/config/gefs/config.atmos_products index e8aae324e1..d1f36a7bc9 100644 --- a/parm/config/gefs/config.atmos_products +++ b/parm/config/gefs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gefs/config.oceanice_products b/parm/config/gefs/config.oceanice_products index 3b8b064947..6bb604d0ca 100644 --- a/parm/config/gefs/config.oceanice_products +++ b/parm/config/gefs/config.oceanice_products @@ -9,7 +9,7 @@ source "${EXPDIR}/config.resources" oceanice_products export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products_gefs.yaml" -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 echo "END: config.oceanice_products" diff --git a/parm/config/gefs/config.resources b/parm/config/gefs/config.resources index 68f81c1039..b221e89fe9 100644 --- a/parm/config/gefs/config.resources +++ b/parm/config/gefs/config.resources @@ -234,6 +234,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=24 export threads_per_task=1 @@ -250,6 +251,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size export walltime="00:15:00" export ntasks=1 export tasks_per_node=1 @@ -258,7 +260,8 @@ case ${step} in ;; "wavepostsbs") - export walltime="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + export walltime="00:15:00" export ntasks=1 export threads_per_task=1 export tasks_per_node=$(( max_tasks_per_node / threads_per_task )) diff --git a/parm/config/gefs/config.wavepostsbs b/parm/config/gefs/config.wavepostsbs index 82cec321da..b43ea33d40 100644 --- a/parm/config/gefs/config.wavepostsbs +++ b/parm/config/gefs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/parm/config/gfs/config.atmos_products b/parm/config/gfs/config.atmos_products index 451f5eff86..5b6e4067b5 100644 --- a/parm/config/gfs/config.atmos_products +++ b/parm/config/gfs/config.atmos_products @@ -8,8 +8,8 @@ echo "BEGIN: config.atmos_products" # Get task specific resources . "${EXPDIR}/config.resources" atmos_products -# No. of forecast hours to process in a single job -export NFHRS_PER_GROUP=3 +## Maximum number of rocoto tasks per member +export MAX_TASKS=25 # Scripts used by this job export INTERP_ATMOS_MASTERSH="${USHgfs}/interp_atmos_master.sh" diff --git a/parm/config/gfs/config.oceanice_products b/parm/config/gfs/config.oceanice_products index 9e5c5b1c68..a618cbe10c 100644 --- a/parm/config/gfs/config.oceanice_products +++ b/parm/config/gfs/config.oceanice_products @@ -7,6 +7,9 @@ echo "BEGIN: config.oceanice_products" # Get task specific resources source "${EXPDIR}/config.resources" oceanice_products +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + export OCEANICEPRODUCTS_CONFIG="${PARMgfs}/post/oceanice_products.yaml" # No. of forecast hours to process in a single job diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index eeb33716c0..c8eb7592be 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -191,8 +191,9 @@ case ${step} in ;; "wavepostsbs") - walltime_gdas="00:20:00" - walltime_gfs="03:00:00" + # Walltime is per forecast hour; will be multipled by group size + walltime_gdas="00:15:00" + walltime_gfs="00:15:00" ntasks=8 threads_per_task=1 tasks_per_node=$(( max_tasks_per_node / threads_per_task )) @@ -911,6 +912,7 @@ case ${step} in ;; "oceanice_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=1 tasks_per_node=1 @@ -944,6 +946,7 @@ case ${step} in ;; "atmos_products") + # Walltime is per forecast hour; will be multipled by group size walltime="00:15:00" ntasks=24 threads_per_task=1 diff --git a/parm/config/gfs/config.wavepostsbs b/parm/config/gfs/config.wavepostsbs index 82cec321da..b43ea33d40 100644 --- a/parm/config/gfs/config.wavepostsbs +++ b/parm/config/gfs/config.wavepostsbs @@ -8,6 +8,9 @@ echo "BEGIN: config.wavepostsbs" # Get task specific resources source "${EXPDIR}/config.resources" wavepostsbs +# Maximum number of rocoto tasks per member +export MAX_TASKS=25 + # Subgrid info for grib2 encoding export WAV_SUBGRBSRC="" export WAV_SUBGRB="" diff --git a/workflow/rocoto/gefs_tasks.py b/workflow/rocoto/gefs_tasks.py index ca29bcdf1e..293236e5a6 100644 --- a/workflow/rocoto/gefs_tasks.py +++ b/workflow/rocoto/gefs_tasks.py @@ -190,39 +190,57 @@ def _atmosoceaniceprod(self, component: str): fhout_ice_gfs = self._configs['base']['FHOUT_ICE_GFS'] products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.{fhout_ocn_gfs}hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.{fhout_ice_gfs}hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] resources = self.get_resource(config) + fhrs = self._get_forecast_hours('gefs', self._configs[config], component) + + # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs + is_replay = self._configs[config]['REPLAY_ICS'] + if is_replay and component in ['atmos'] and 0 in fhrs: + fhrs.remove(0) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + history_path = self._template_to_rocoto_cycstring(self._base[history_path_tmpl], {'MEMDIR': 'mem#member#'}) deps = [] data = f'{history_path}/{history_file_tmpl}' dep_dict = {'type': 'data', 'data': data, 'age': 120} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'metatask', 'name': 'gefs_fcst_mem#member#'} + dep_dict = {'type': 'task', 'name': 'gefs_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') postenvars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) - task_name = f'gefs_{component}_prod_mem#member#_f#fhr#' + task_name = f'gefs_{component}_prod_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -233,22 +251,6 @@ def _atmosoceaniceprod(self, component: str): 'log': f'{self.rotdir}/logs/@Y@m@d@H/{task_name}.log', 'maxtries': '&MAXTRIES;'} - fhrs = self._get_forecast_hours('gefs', self._configs[config], component) - - # when replaying, atmos component does not have fhr 0, therefore remove 0 from fhrs - is_replay = self._configs[config]['REPLAY_ICS'] - if is_replay and component in ['atmos'] and 0 in fhrs: - fhrs.remove(0) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) - fhr_metatask_dict = {'task_name': f'gefs_{component}_prod_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -308,22 +310,35 @@ def atmos_ensstat(self): return task def wavepostsbs(self): + deps = [] - dep_dict = {'type': 'metatask', 'name': f'gefs_fcst_mem#member#'} + dep_dict = {'type': 'task', 'name': f'gefs_fcst_mem#member#_#seg_dep#'} deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') + is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] + if is_replay: + fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] + + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + wave_post_envars = self.envars.copy() postenvar_dict = {'ENSMEM': '#member#', 'MEMDIR': 'mem#member#', - 'FHR3': '#fhr#', + 'FHR_LIST': '#fhr_list#', } for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'gefs_wave_post_grid_mem#member#_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'gefs_wave_post_grid_mem#member#_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -335,13 +350,6 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gefs', self._configs['wavepostsbs'], 'wave') - is_replay = self._configs['wavepostsbs']['REPLAY_ICS'] - if is_replay: - fhrs = [fhr for fhr in fhrs if fhr not in [0, 1, 2]] - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - fhr_metatask_dict = {'task_name': f'gefs_wave_post_grid_#member#', 'task_dict': task_dict, 'var_dict': fhr_var_dict} diff --git a/workflow/rocoto/gfs_tasks.py b/workflow/rocoto/gfs_tasks.py index d2a3e43719..1b52186f65 100644 --- a/workflow/rocoto/gfs_tasks.py +++ b/workflow/rocoto/gfs_tasks.py @@ -1029,7 +1029,7 @@ def atmanlupp(self): def atmanlprod(self): postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '-001'} + postenvar_dict = {'FHR_LIST': '-1'} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1126,21 +1126,36 @@ def _atmosoceaniceprod(self, component: str): products_dict = {'atmos': {'config': 'atmos_products', 'history_path_tmpl': 'COM_ATMOS_MASTER_TMPL', - 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr#'}, + 'history_file_tmpl': f'{self.run}.t@Hz.master.grb2f#fhr3_last#'}, 'ocean': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_OCEAN_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr_next#.nc'}, + 'history_file_tmpl': f'{self.run}.ocean.t@Hz.6hr_avg.f#fhr3_next#.nc'}, 'ice': {'config': 'oceanice_products', 'history_path_tmpl': 'COM_ICE_HISTORY_TMPL', - 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr#.nc'}} + 'history_file_tmpl': f'{self.run}.ice.t@Hz.6hr_avg.f#fhr3_last#.nc'}} component_dict = products_dict[component] config = component_dict['config'] history_path_tmpl = component_dict['history_path_tmpl'] history_file_tmpl = component_dict['history_file_tmpl'] + max_tasks = self._configs[config]['MAX_TASKS'] + resources = self.get_resource(component_dict['config']) + + fhrs = self._get_forecast_hours(self.run, self._configs[config], component) + + # ocean/ice components do not have fhr 0 as they are averaged output + if component in ['ocean', 'ice'] and 0 in fhrs: + fhrs.remove(0) + + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + postenvars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#', 'COMPONENT': component} + postenvar_dict = {'FHR_LIST': '#fhr_list#', 'COMPONENT': component} for key, value in postenvar_dict.items(): postenvars.append(rocoto.create_envar(name=key, value=str(value))) @@ -1154,9 +1169,8 @@ def _atmosoceaniceprod(self, component: str): dependencies = rocoto.create_dependency(dep=deps, dep_condition='or') cycledef = 'gdas_half,gdas' if self.run in ['gdas'] else self.run - resources = self.get_resource(component_dict['config']) - task_name = f'{self.run}_{component}_prod_f#fhr#' + task_name = f'{self.run}_{component}_prod_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1168,17 +1182,6 @@ def _atmosoceaniceprod(self, component: str): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours(self.run, self._configs[config], component) - - # ocean/ice components do not have fhr 0 as they are averaged output - if component in ['ocean', 'ice'] and 0 in fhrs: - fhrs.remove(0) - - fhr_var_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} - - if component in ['ocean']: - fhrs_next = fhrs[1:] + [fhrs[-1] + (fhrs[-1] - fhrs[-2])] - fhr_var_dict['fhr_next'] = ' '.join([f"{fhr:03d}" for fhr in fhrs_next]) metatask_dict = {'task_name': f'{self.run}_{component}_prod', 'task_dict': task_dict, 'var_dict': fhr_var_dict} @@ -1194,13 +1197,21 @@ def wavepostsbs(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') + max_tasks = self._configs['wavepostsbs']['MAX_TASKS'] + fhr_var_dict = self.get_grouped_fhr_dict(fhrs=fhrs, ngroups=max_tasks) + wave_post_envars = self.envars.copy() - postenvar_dict = {'FHR3': '#fhr#'} + postenvar_dict = {'FHR_LIST': '#fhr_list#'} for key, value in postenvar_dict.items(): wave_post_envars.append(rocoto.create_envar(name=key, value=str(value))) resources = self.get_resource('wavepostsbs') - task_name = f'{self.run}_wavepostsbs_f#fhr#' + # Adjust walltime based on the largest group + largest_group = max([len(grp.split(',')) for grp in fhr_var_dict['fhr_list'].split(' ')]) + resources['walltime'] = Tasks.multiply_HMS(resources['walltime'], largest_group) + + task_name = f'{self.run}_wavepostsbs_#fhr_label#' task_dict = {'task_name': task_name, 'resources': resources, 'dependency': dependencies, @@ -1212,12 +1223,9 @@ def wavepostsbs(self): 'maxtries': '&MAXTRIES;' } - fhrs = self._get_forecast_hours('gfs', self._configs['wavepostsbs'], 'wave') - - fhr_metatask_dict = {'fhr': ' '.join([f"{fhr:03d}" for fhr in fhrs])} metatask_dict = {'task_name': f'{self.run}_wavepostsbs', 'task_dict': task_dict, - 'var_dict': fhr_metatask_dict} + 'var_dict': fhr_var_dict} task = rocoto.create_task(metatask_dict) diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index d9c769ffbe..7a42c6594c 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -3,7 +3,7 @@ import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto -from wxflow import Template, TemplateConstants, to_timedelta +from wxflow import Template, TemplateConstants, to_timedelta, timedelta_to_HMS from typing import List __all__ = ['Tasks'] @@ -176,6 +176,203 @@ def _get_forecast_hours(run, config, component='atmos') -> List[str]: return fhrs + @staticmethod + def get_job_groups(fhrs: List[int], ngroups: int, breakpoints: List[int] = None) -> List[dict]: + ''' + Split forecast hours into a number of groups, obeying a list of pre-set breakpoints. + + Takes a list of forecast hours and splits it into a number of groups while obeying + a list of pre-set breakpoints and recording which segment each belongs to. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + breakpoints: List[int] + List of preset forecast hour break points to use (default: []) + + Returns + ------- + List[dict]: List of dicts, where each dict contains two keys: + 'fhrs': the forecast hours for that group + 'seg': the forecast segment (from the original breakpoint list) + the group belong to + ''' + if breakpoints is None: + breakpoints = [] + + num_segs = len(breakpoints) + 1 + if num_segs > ngroups: + raise ValueError(f"Number of segments ({num_segs}) is greater than the number of groups ({ngroups}") + + if ngroups > len(fhrs): + ngroups = len(fhrs) + + # First, split at segment boundaries + fhrs_segs = [grp.tolist() for grp in np.array_split(fhrs, [fhrs.index(bpnt) + 1 for bpnt in breakpoints])] + seg_lens = [len(seg) for seg in fhrs_segs] + + # Initialize each segment to be split into one job group + ngroups_segs = [1 for _ in range(0, len(fhrs_segs))] + + # For remaining job groups, iteratively assign to the segment with the most + # hours per group + for _ in range(0, ngroups - len(fhrs_segs)): + current_lens = [size / weight for size, weight in zip(seg_lens, ngroups_segs)] + index_max = max(range(len(current_lens)), key=current_lens.__getitem__) + ngroups_segs[index_max] += 1 + + # Now that we know how many groups each forecast segment should be split into, + # Split them and flatten to a single list. + groups = [] + for seg_num, (fhrs_seg, ngroups_seg) in enumerate(zip(fhrs_segs, ngroups_segs)): + [groups.append({'fhrs': grp.tolist(), 'seg': seg_num}) for grp in np.array_split(fhrs_seg, ngroups_seg)] + + return groups + + @staticmethod + def test_job_groups(): + test_array = list(range(0, 24)) + + # Test simple splitting with no breakpoints + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 0}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4) == test_groups + + # Test with a break point that aligns with normal split point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11]) == test_groups + + # Test with a break point not at a normal splilt point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14], 'seg': 0}, + {'fhrs': [15, 16, 17, 18, 19], 'seg': 1}, + {'fhrs': [20, 21, 22, 23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[14]) == test_groups + + # Test highly skewed break point + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5, 6, 7], 'seg': 0}, + {'fhrs': [8, 9, 10, 11, 12, 13, 14, 15], 'seg': 0}, + {'fhrs': [16, 17, 18, 19, 20, 21, 22], 'seg': 0}, + {'fhrs': [23], 'seg': 1}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[22]) == test_groups + + # Test with two break points that align + test_groups = [{'fhrs': [0, 1, 2, 3, 4, 5], 'seg': 0}, + {'fhrs': [6, 7, 8, 9, 10, 11], 'seg': 0}, + {'fhrs': [12, 13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=4, breakpoints=[11, 17]) == test_groups + + # Test with two skewed break points + test_groups = [{'fhrs': [0, 1], 'seg': 0}, + {'fhrs': [2, 3, 4, 5, 6, 7], 'seg': 1}, + {'fhrs': [8, 9, 10, 11, 12], 'seg': 1}, + {'fhrs': [13, 14, 15, 16, 17], 'seg': 1}, + {'fhrs': [18, 19, 20, 21, 22], 'seg': 1}, + {'fhrs': [23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[1, 22]) == test_groups + + # Test slightly irregular break points + test_groups = [{'fhrs': [0, 1, 2, 3], 'seg': 0}, + {'fhrs': [4, 5, 6], 'seg': 0}, + {'fhrs': [7, 8, 9, 10], 'seg': 1}, + {'fhrs': [11, 12, 13, 14], 'seg': 1}, + {'fhrs': [15, 16, 17, 18], 'seg': 1}, + {'fhrs': [19, 20, 21, 22, 23], 'seg': 2}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=6, breakpoints=[6, 18]) == test_groups + + # Test more groups than fhrs available + test_array = list(range(0, 6)) + test_groups = [{'fhrs': [0], 'seg': 0}, + {'fhrs': [1], 'seg': 0}, + {'fhrs': [2], 'seg': 0}, + {'fhrs': [3], 'seg': 0}, + {'fhrs': [4], 'seg': 0}, + {'fhrs': [5], 'seg': 0}] + assert Tasks.get_job_groups(fhrs=test_array, ngroups=15) == test_groups + + def get_grouped_fhr_dict(self, fhrs: List[int], ngroups: int) -> dict: + ''' + Prepare a metatask dictionary for forecast hour groups. + + Takes a list of forecast hours and splits it into a number of groups while not + crossing forecast segment boundaries. Then use that to prepare a dict with key + variable lists for use in a rocoto metatask. + + Parameters + ---------- + fhrs: List[int] + List of forecast hours to break into groups + ngroups: int + Number of groups to split the forecast hours into + + Returns + ------- + dict: Several variable lists for use in rocoto metatasks: + fhr_list: list of comma-separated lists of fhr groups + fhr_label: list of labels corrsponding to the fhr range + fhr3_last: list of the last fhr in each group, formatted to three digits + fhr3_next: list of the fhr that would follow each group, formatted to + three digits + seg_dep: list of segments each group belongs to + ''' + fhr_breakpoints = self.options['fcst_segments'][1:-1] + group_dicts = Tasks.get_job_groups(fhrs=fhrs, ngroups=ngroups, breakpoints=fhr_breakpoints) + + fhrs_group = [dct['fhrs'] for dct in group_dicts] + fhrs_first = [grp[0] for grp in fhrs_group] + fhrs_last = [grp[-1] for grp in fhrs_group] + fhrs_next = fhrs_first[1:] + [fhrs_last[-1] + (fhrs[-1] - fhrs[-2])] + grp_str = [f'f{grp[0]:03d}-f{grp[-1]:03d}' if len(grp) > 1 else f'f{grp[0]:03d}' for grp in fhrs_group] + seg_deps = [f'seg{dct["seg"]}' for dct in group_dicts] + + fhr_var_dict = {'fhr_list': ' '.join(([','.join(str(fhr) for fhr in grp) for grp in fhrs_group])), + 'fhr_label': ' '.join(grp_str), + 'seg_dep': ' '.join(seg_deps), + 'fhr3_last': ' '.join([f'{fhr:03d}' for fhr in fhrs_last]), + 'fhr3_next': ' '.join([f'{fhr:03d}' for fhr in fhrs_next]) + } + + return fhr_var_dict + + @staticmethod + def multiply_HMS(hms_timedelta: str, multiplier: int | float) -> str: + ''' + Multiplies an HMS timedelta string + + Parameters + ---------- + hms_timedelta: str + String represnting a time delta in HH:MM:SS format + multiplier: int | float + Value to multiply the time delta by + + Returns + ------- + str: String represnting a time delta in HH:MM:SS format + + ''' + input_timedelta = to_timedelta(hms_timedelta) + output_timedelta = input_timedelta * multiplier + return timedelta_to_HMS(output_timedelta) + + @staticmethod + def test_multiply_HMS(): + assert Tasks.multiply_HMS('00:10:00', 2) == '00:20:00' + assert Tasks.multiply_HMS('00:30:00', 10) == '05:00:00' + assert Tasks.multiply_HMS('01:15:00', 4) == '05:00:00' + assert Tasks.multiply_HMS('00:05:00', 1.5) == '00:07:30' + assert Tasks.multiply_HMS('00:40:00', 2.5) == '01:40:00' + assert Tasks.multiply_HMS('00:10:00', 1) == '00:10:00' + def get_resource(self, task_name): """ Given a task name (task_name) and its configuration (task_names),