diff --git a/apps/mpas/bin/experiment.py b/apps/mpas/bin/experiment.py index 48fd2ec7..bca2cc76 100644 --- a/apps/mpas/bin/experiment.py +++ b/apps/mpas/bin/experiment.py @@ -25,7 +25,7 @@ def main(user_config_file: Path) -> None: machine = user_config["user"]["platform"] user_resolution = user_config["user"]["resolution"] - experiment_config.update_values(user_config) + experiment_config.update_from(user_config) experiment_config["user"]["mpas_app"] = mpas_app.as_posix() experiment_config.dereference() diff --git a/apps/mpas/config/default_config.yaml b/apps/mpas/config/default_config.yaml index faef2c62..f9c37fd6 100644 --- a/apps/mpas/config/default_config.yaml +++ b/apps/mpas/config/default_config.yaml @@ -152,6 +152,7 @@ create_lbcs: scheduler: '{{ platform.scheduler }}' forecast: mpas: + domain: regional execution: batchargs: cores: 32 diff --git a/pyproject.toml b/pyproject.toml index bfd00a50..54560e2e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,10 +6,10 @@ build-backend = "setuptools.build_meta" name = "chiltepin" version = "0.0.1" dependencies = [ - "globus-compute-sdk>=2.30.1", - "globus-compute-endpoint>=2.30.1", - "parsl[monitoring] @ git+https://github.com/Parsl/parsl.git@globus_compute_executor.py", - "uwtools @ git+https://github.com/ufs-community/uwtools@v2.4.2#subdirectory=src", + "globus-compute-sdk @ git+https://github.com/globus/globus-compute@main#subdirectory=compute_sdk", + "globus-compute-endpoint @ git+https://github.com/globus/globus-compute@main#subdirectory=compute_endpoint", + "parsl[monitoring]>=2025.01.20", + "uwtools @ git+https://github.com/ufs-community/uwtools@v2.5.1#subdirectory=src", ] requires-python = ">=3.9.0" authors = [ diff --git a/src/chiltepin/configure.py b/src/chiltepin/configure.py index bc75c13a..fada7c20 100644 --- a/src/chiltepin/configure.py +++ b/src/chiltepin/configure.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import yaml from globus_compute_sdk import Client, Executor @@ -141,7 +141,7 @@ def make_mpi_executor(name: str, config: Dict[str, Any]) -> MPIExecutor: def make_globus_compute_executor( - name: str, config: Dict[str, Any], client: Client | None = None + name: str, config: Dict[str, Any], client: Optional[Client] = None ) -> GlobusComputeExecutor: """Construct a GlobusComputeExecutor from the input configuration @@ -186,28 +186,31 @@ def make_globus_compute_executor( """ e = GlobusComputeExecutor( label=name, - executor=Executor(endpoint_id=config["endpoint id"], client=client), - user_endpoint_config={ - "engine": config.get("engine", "GlobusComputeEngine"), - "max_mpi_apps": config.get("max mpi apps", 1), - "cores_per_node": config.get("cores per node", 1), - "nodes_per_block": config.get("nodes per block", 1), - "init_blocks": config.get("init blocks", 0), - "min_blocks": config.get("min blocks", 0), - "max_blocks": config.get("max blocks", 1), - "exclusive": config.get("exclusive", True), - "partition": config["partition"], - "account=config": config["account"], - "worker_init": "\n".join(config.get("environment", [])), - }, + executor=Executor( + endpoint_id=config["endpoint id"], + client=client, + user_endpoint_config={ + "engine": config.get("engine", "GlobusComputeEngine"), + "max_mpi_apps": config.get("max mpi apps", 1), + "cores_per_node": config.get("cores per node", 1), + "nodes_per_block": config.get("nodes per block", 1), + "init_blocks": config.get("init blocks", 0), + "min_blocks": config.get("min blocks", 0), + "max_blocks": config.get("max blocks", 1), + "exclusive": config.get("exclusive", True), + "partition": config["partition"], + "account=config": config["account"], + "worker_init": "\n".join(config.get("environment", [])), + }, + ), ) return e def load( config: Dict[str, Any], - resources: List[str] | None = None, - client: Client | None = None, + resources: Optional[List[str]] = None, + client: Optional[Client] = None, ) -> Config: """Construct a list of Executors from the input configuration dictionary @@ -250,17 +253,16 @@ def load( ] for name, spec in config.items(): if resources is None or name in resources: - match spec["engine"]: - case "HTEX": - # Make a HighThroughputExecutor - executors.append(make_htex_executor(name, spec)) - case "MPI": - # Make an MPIExecutor - executors.append(make_mpi_executor(name, spec)) - case "GlobusComputeEngine": - # Make a GlobusComputeExecutor for non-MPI jobs - executors.append(make_globus_compute_executor(name, spec, client)) - case "GlobusMPIEngine": - # Make a GlobusComputeExecutor for MPI jobs - executors.append(make_globus_compute_executor(name, spec, client)) + if spec["engine"] == "HTEX": + # Make a HighThroughputExecutor + executors.append(make_htex_executor(name, spec)) + elif spec["engine"] == "MPI": + # Make an MPIExecutor + executors.append(make_mpi_executor(name, spec)) + elif spec["engine"] == "GlobusComputeEngine": + # Make a GlobusComputeExecutor for non-MPI jobs + executors.append(make_globus_compute_executor(name, spec, client)) + elif spec["engine"] == "GlobusMPIEngine": + # Make a GlobusComputeExecutor for MPI jobs + executors.append(make_globus_compute_executor(name, spec, client)) return Config(executors) diff --git a/src/chiltepin/data.py b/src/chiltepin/data.py index e7634bbf..f867a841 100644 --- a/src/chiltepin/data.py +++ b/src/chiltepin/data.py @@ -1,4 +1,5 @@ from concurrent.futures import Future +from typing import Optional from globus_sdk import TransferClient @@ -14,9 +15,9 @@ def transfer_task( dst_path: str, timeout: int = 3600, polling_interval: int = 30, - client: TransferClient | None = None, + client: Optional[TransferClient] = None, recursive: bool = False, - dependencies: Future | None = None, + dependencies: Optional[Future] = None, ): """Trnsfer data asynchronously in a Parsl task @@ -83,9 +84,9 @@ def delete_task( src_path: str, timeout: int = 3600, polling_interval: int = 30, - client: TransferClient | None = None, + client: Optional[TransferClient] = None, recursive: bool = False, - dependencies: Future | None = None, + dependencies: Optional[Future] = None, ): """Delete data asynchronously in a Parsl task @@ -143,7 +144,7 @@ def transfer( dst_path: str, timeout: int = 3600, polling_interval: int = 30, - client: TransferClient | None = None, + client: Optional[TransferClient] = None, recursive: bool = False, ): """Trnsfer data synchronously with Globus @@ -247,7 +248,7 @@ def delete( src_path: str, timeout: int = 3600, polling_interval: int = 30, - client: TransferClient | None = None, + client: Optional[TransferClient] = None, recursive: bool = False, ): """Delete data synchronously with Globus. diff --git a/src/chiltepin/endpoint.py b/src/chiltepin/endpoint.py index c445d26f..c180cb57 100644 --- a/src/chiltepin/endpoint.py +++ b/src/chiltepin/endpoint.py @@ -4,7 +4,7 @@ import subprocess import sys import time -from typing import Dict +from typing import Dict, Optional, Union import yaml from globus_compute_sdk import Client @@ -139,7 +139,7 @@ def get_chiltepin_apps() -> (GlobusApp, GlobusApp): return (compute_app, transfer_app) -def login() -> Dict[str, Client | TransferClient]: +def login() -> Dict[str, Union[Client, TransferClient]]: """Log in to the Chiltepin app This initiates the Globus login flow to log the user in to the Globus compute @@ -177,6 +177,7 @@ def login() -> Dict[str, Client | TransferClient]: ) ) + # Return the clients return {"compute": compute_client, "transfer": transfer_client} @@ -194,7 +195,7 @@ def logout(): def configure( name: str, - config_dir: str | None = None, + config_dir: Optional[str] = None, multi: bool = False, timeout: int = 5, ): @@ -284,7 +285,7 @@ def configure( f.write(f"PATH: {chiltepin_path}:{login_path}\n") -def is_multi(name: str, config_dir: str | None = None) -> bool: +def is_multi(name: str, config_dir: Optional[str] = None) -> bool: """Return True if the endpoint is a multi endpoint, False otherwise Parameters @@ -318,7 +319,7 @@ def is_multi(name: str, config_dir: str | None = None) -> bool: return yaml_config.get("multi_user", False) -def list(config_dir: str | None = None, timeout: int = 60) -> Dict[str, str]: +def list(config_dir: Optional[str] = None, timeout: int = 60) -> Dict[str, str]: """Return a list of configured Globus Compute Endpoints This is a thin wrapper around the globus-compute-endpoint list command. @@ -368,7 +369,7 @@ def list(config_dir: str | None = None, timeout: int = 60) -> Dict[str, str]: return ep_list -def is_running(name: str, config_dir: str | None = None) -> bool: +def is_running(name: str, config_dir: Optional[str] = None) -> bool: """Return True if the endpoint is running, otherwise False Parameters @@ -396,7 +397,7 @@ def is_running(name: str, config_dir: str | None = None) -> bool: return ep_info.get("state", None) == "Running" -def start(name: str, config_dir: str | None = None, timeout: int = 60): +def start(name: str, config_dir: Optional[str] = None, timeout: int = 60): """Start the specified Globus Compute Endpoint This is a thin wrapper around the globus-compute-endpoint start command @@ -455,7 +456,7 @@ def start(name: str, config_dir: str | None = None, timeout: int = 60): assert p.returncode == 0, p.stdout -def stop(name: str, config_dir: str | None = None, timeout: int = 60): +def stop(name: str, config_dir: Optional[str] = None, timeout: int = 60): """Stop the specified Globus Compute Endpoint This is a thin wrapper around the globus-compute-endpoint stop command @@ -495,7 +496,7 @@ def stop(name: str, config_dir: str | None = None, timeout: int = 60): assert p.returncode == 0, p.stdout -def delete(name: str, config_dir: str | None = None, timeout: int = 60): +def delete(name: str, config_dir: Optional[str] = None, timeout: int = 60): """Delete the specified Globus Compute Endpoint This is a thin wrapper around the globus-compute-endpoint delete command