Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Parsl dependency to 2025.01.20 #142

Merged
merged 4 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/mpas/bin/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def main(user_config_file: Path) -> None:
machine = user_config["user"]["platform"]
user_resolution = user_config["user"]["resolution"]

experiment_config.update_values(user_config)
experiment_config.update_from(user_config)

experiment_config["user"]["mpas_app"] = mpas_app.as_posix()
experiment_config.dereference()
Expand Down
1 change: 1 addition & 0 deletions apps/mpas/config/default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ create_lbcs:
scheduler: '{{ platform.scheduler }}'
forecast:
mpas:
domain: regional
execution:
batchargs:
cores: 32
Expand Down
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ build-backend = "setuptools.build_meta"
name = "chiltepin"
version = "0.0.1"
dependencies = [
"globus-compute-sdk>=2.30.1",
"globus-compute-endpoint>=2.30.1",
"parsl[monitoring] @ git+https://github.com/Parsl/parsl.git@globus_compute_executor.py",
"uwtools @ git+https://github.com/ufs-community/uwtools@v2.4.2#subdirectory=src",
"globus-compute-sdk @ git+https://github.com/globus/globus-compute@main#subdirectory=compute_sdk",
"globus-compute-endpoint @ git+https://github.com/globus/globus-compute@main#subdirectory=compute_endpoint",
"parsl[monitoring]>=2025.01.20",
"uwtools @ git+https://github.com/ufs-community/uwtools@v2.5.1#subdirectory=src",
]
requires-python = ">=3.9.0"
authors = [
Expand Down
64 changes: 33 additions & 31 deletions src/chiltepin/configure.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

import yaml
from globus_compute_sdk import Client, Executor
Expand Down Expand Up @@ -141,7 +141,7 @@ def make_mpi_executor(name: str, config: Dict[str, Any]) -> MPIExecutor:


def make_globus_compute_executor(
name: str, config: Dict[str, Any], client: Client | None = None
name: str, config: Dict[str, Any], client: Optional[Client] = None
) -> GlobusComputeExecutor:
"""Construct a GlobusComputeExecutor from the input configuration

Expand Down Expand Up @@ -186,28 +186,31 @@ def make_globus_compute_executor(
"""
e = GlobusComputeExecutor(
label=name,
executor=Executor(endpoint_id=config["endpoint id"], client=client),
user_endpoint_config={
"engine": config.get("engine", "GlobusComputeEngine"),
"max_mpi_apps": config.get("max mpi apps", 1),
"cores_per_node": config.get("cores per node", 1),
"nodes_per_block": config.get("nodes per block", 1),
"init_blocks": config.get("init blocks", 0),
"min_blocks": config.get("min blocks", 0),
"max_blocks": config.get("max blocks", 1),
"exclusive": config.get("exclusive", True),
"partition": config["partition"],
"account=config": config["account"],
"worker_init": "\n".join(config.get("environment", [])),
},
executor=Executor(
endpoint_id=config["endpoint id"],
client=client,
user_endpoint_config={
"engine": config.get("engine", "GlobusComputeEngine"),
"max_mpi_apps": config.get("max mpi apps", 1),
"cores_per_node": config.get("cores per node", 1),
"nodes_per_block": config.get("nodes per block", 1),
"init_blocks": config.get("init blocks", 0),
"min_blocks": config.get("min blocks", 0),
"max_blocks": config.get("max blocks", 1),
"exclusive": config.get("exclusive", True),
"partition": config["partition"],
"account=config": config["account"],
"worker_init": "\n".join(config.get("environment", [])),
},
),
)
return e


def load(
config: Dict[str, Any],
resources: List[str] | None = None,
client: Client | None = None,
resources: Optional[List[str]] = None,
client: Optional[Client] = None,
) -> Config:
"""Construct a list of Executors from the input configuration dictionary

Expand Down Expand Up @@ -250,17 +253,16 @@ def load(
]
for name, spec in config.items():
if resources is None or name in resources:
match spec["engine"]:
case "HTEX":
# Make a HighThroughputExecutor
executors.append(make_htex_executor(name, spec))
case "MPI":
# Make an MPIExecutor
executors.append(make_mpi_executor(name, spec))
case "GlobusComputeEngine":
# Make a GlobusComputeExecutor for non-MPI jobs
executors.append(make_globus_compute_executor(name, spec, client))
case "GlobusMPIEngine":
# Make a GlobusComputeExecutor for MPI jobs
executors.append(make_globus_compute_executor(name, spec, client))
if spec["engine"] == "HTEX":
# Make a HighThroughputExecutor
executors.append(make_htex_executor(name, spec))
elif spec["engine"] == "MPI":
# Make an MPIExecutor
executors.append(make_mpi_executor(name, spec))
elif spec["engine"] == "GlobusComputeEngine":
# Make a GlobusComputeExecutor for non-MPI jobs
executors.append(make_globus_compute_executor(name, spec, client))
elif spec["engine"] == "GlobusMPIEngine":
# Make a GlobusComputeExecutor for MPI jobs
executors.append(make_globus_compute_executor(name, spec, client))
return Config(executors)
13 changes: 7 additions & 6 deletions src/chiltepin/data.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from concurrent.futures import Future
from typing import Optional

from globus_sdk import TransferClient

Expand All @@ -14,9 +15,9 @@ def transfer_task(
dst_path: str,
timeout: int = 3600,
polling_interval: int = 30,
client: TransferClient | None = None,
client: Optional[TransferClient] = None,
recursive: bool = False,
dependencies: Future | None = None,
dependencies: Optional[Future] = None,
):
"""Trnsfer data asynchronously in a Parsl task

Expand Down Expand Up @@ -83,9 +84,9 @@ def delete_task(
src_path: str,
timeout: int = 3600,
polling_interval: int = 30,
client: TransferClient | None = None,
client: Optional[TransferClient] = None,
recursive: bool = False,
dependencies: Future | None = None,
dependencies: Optional[Future] = None,
):
"""Delete data asynchronously in a Parsl task

Expand Down Expand Up @@ -143,7 +144,7 @@ def transfer(
dst_path: str,
timeout: int = 3600,
polling_interval: int = 30,
client: TransferClient | None = None,
client: Optional[TransferClient] = None,
recursive: bool = False,
):
"""Trnsfer data synchronously with Globus
Expand Down Expand Up @@ -247,7 +248,7 @@ def delete(
src_path: str,
timeout: int = 3600,
polling_interval: int = 30,
client: TransferClient | None = None,
client: Optional[TransferClient] = None,
recursive: bool = False,
):
"""Delete data synchronously with Globus.
Expand Down
19 changes: 10 additions & 9 deletions src/chiltepin/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess
import sys
import time
from typing import Dict
from typing import Dict, Optional, Union

import yaml
from globus_compute_sdk import Client
Expand Down Expand Up @@ -139,7 +139,7 @@ def get_chiltepin_apps() -> (GlobusApp, GlobusApp):
return (compute_app, transfer_app)


def login() -> Dict[str, Client | TransferClient]:
def login() -> Dict[str, Union[Client, TransferClient]]:
"""Log in to the Chiltepin app

This initiates the Globus login flow to log the user in to the Globus compute
Expand Down Expand Up @@ -177,6 +177,7 @@ def login() -> Dict[str, Client | TransferClient]:
)
)

# Return the clients
return {"compute": compute_client, "transfer": transfer_client}


Expand All @@ -194,7 +195,7 @@ def logout():

def configure(
name: str,
config_dir: str | None = None,
config_dir: Optional[str] = None,
multi: bool = False,
timeout: int = 5,
):
Expand Down Expand Up @@ -284,7 +285,7 @@ def configure(
f.write(f"PATH: {chiltepin_path}:{login_path}\n")


def is_multi(name: str, config_dir: str | None = None) -> bool:
def is_multi(name: str, config_dir: Optional[str] = None) -> bool:
"""Return True if the endpoint is a multi endpoint, False otherwise

Parameters
Expand Down Expand Up @@ -318,7 +319,7 @@ def is_multi(name: str, config_dir: str | None = None) -> bool:
return yaml_config.get("multi_user", False)


def list(config_dir: str | None = None, timeout: int = 60) -> Dict[str, str]:
def list(config_dir: Optional[str] = None, timeout: int = 60) -> Dict[str, str]:
"""Return a list of configured Globus Compute Endpoints

This is a thin wrapper around the globus-compute-endpoint list command.
Expand Down Expand Up @@ -368,7 +369,7 @@ def list(config_dir: str | None = None, timeout: int = 60) -> Dict[str, str]:
return ep_list


def is_running(name: str, config_dir: str | None = None) -> bool:
def is_running(name: str, config_dir: Optional[str] = None) -> bool:
"""Return True if the endpoint is running, otherwise False

Parameters
Expand Down Expand Up @@ -396,7 +397,7 @@ def is_running(name: str, config_dir: str | None = None) -> bool:
return ep_info.get("state", None) == "Running"


def start(name: str, config_dir: str | None = None, timeout: int = 60):
def start(name: str, config_dir: Optional[str] = None, timeout: int = 60):
"""Start the specified Globus Compute Endpoint

This is a thin wrapper around the globus-compute-endpoint start command
Expand Down Expand Up @@ -455,7 +456,7 @@ def start(name: str, config_dir: str | None = None, timeout: int = 60):
assert p.returncode == 0, p.stdout


def stop(name: str, config_dir: str | None = None, timeout: int = 60):
def stop(name: str, config_dir: Optional[str] = None, timeout: int = 60):
"""Stop the specified Globus Compute Endpoint

This is a thin wrapper around the globus-compute-endpoint stop command
Expand Down Expand Up @@ -495,7 +496,7 @@ def stop(name: str, config_dir: str | None = None, timeout: int = 60):
assert p.returncode == 0, p.stdout


def delete(name: str, config_dir: str | None = None, timeout: int = 60):
def delete(name: str, config_dir: Optional[str] = None, timeout: int = 60):
"""Delete the specified Globus Compute Endpoint

This is a thin wrapper around the globus-compute-endpoint delete command
Expand Down
Loading