diff --git a/.gitattributes b/.gitattributes old mode 100644 new mode 100755 diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml new file mode 100644 index 0000000..b0e5366 --- /dev/null +++ b/.github/workflows/documentation.yml @@ -0,0 +1,27 @@ +name: documentation + +on: [push, pull_request, workflow_dispatch] + +permissions: + contents: write + +jobs: + docs: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - name: Install dependencies + run: | + pip install sphinx sphinx_rtd_theme myst_parser + - name: Sphinx build + run: | + sphinx-build docs _build + - name: Deploy to GitHub Pages + uses: peaceiris/actions-gh-pages@v3 + if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }} + with: + publish_branch: gh-pages + github_token: ${{ secrets.GITHUB_TOKEN }} + publish_dir: _build/ + force_orphan: true diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 index bc3e6af..78e731e --- a/.gitignore +++ b/.gitignore @@ -8,5 +8,6 @@ communicator/src/communicator containers/ config_dict.yaml cache/ +docs/_build/ logs/ .vscode/ \ No newline at end of file diff --git a/.readthedocs.yaml b/.readthedocs.yaml new file mode 100644 index 0000000..8e6dc7f --- /dev/null +++ b/.readthedocs.yaml @@ -0,0 +1,22 @@ +# Read the Docs configuration file +# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details + +# Required +version: 2 + +# Set the OS, Python version, and other tools you might need +build: + os: ubuntu-22.04 + tools: + python: "3.12" + +# Build documentation in the "docs/" directory with Sphinx +sphinx: + configuration: docs/conf.py + +# Optionally, but recommended, +# declare the Python requirements required to build your documentation +# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html +python: + install: + - requirements: docs/requirements.txt \ No newline at end of file diff --git a/LICENSE.md b/LICENSE.md old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 9ab39b6..cc8480d --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Scalable -[v0.5.7](https://github.com/JGCRI/scalable/tree/0.5.7) +[v0.6.0](https://github.com/JGCRI/scalable/tree/0.6.0) Scalable is a Python library which aids in running complex workflows on HPCs by orchestrating multiple containers, requesting appropriate HPC jobs to the scheduler, and providing a python environment for distributed computing. It's designed to be primarily used with JGCRI Climate Models but can be easily adapted for any arbitrary uses. @@ -80,9 +80,9 @@ cluster.add_container(tag="osiris", cpus=8, memory="20G", dirs={"/rcfs/projects/ Before launching the workers, the configuration of worker or container targets needs to be specified. The containers to be launched as workers need to be first added by specifying their tag, number of cpu cores they need, the memory they would need, and the directory on the HPC Host to bind to the containers so that these directories are accessible by the container. ```python -cluster.add_worker(n=3, tag="gcam") -cluster.add_worker(n=2, tag="stitches") -cluster.add_worker(n=3, tag="osiris") +cluster.add_workers(n=3, tag="gcam") +cluster.add_workers(n=2, tag="stitches") +cluster.add_workers(n=3, tag="osiris") ``` Launching workers on the cluster can be done by just adding workers to the cluster. This call will only be successful if the tags used have also had containers with the same tag added beforehand. Removing workers is similarly as easy. @@ -156,12 +156,12 @@ def func3(param): ``` -In the example above, the functions will wait 5, 3, and 10 seconds for the first time they are computed. However, their results will be cached due to the decorator and so, if the functions are ran again with the same arguments, their results are going to be returned from memory instead and they wouldn't sleep. There are arguments which directly can be given to the cacheable decorator. **It is always recommended to specify the return type and the type of arguments for each use.** This ensures expected functioning of the module and for correct caching. --TODO-- +In the example above, the functions will wait 5, 3, and 10 seconds for the first time they are computed. However, their results will be cached due to the decorator and so, if the functions are ran again with the same arguments, their results are going to be returned from memory instead and they wouldn't sleep. There are arguments which directly can be given to the cacheable decorator. **It is always recommended to specify the return type and the type of arguments for each use.** This ensures expected functioning of the module and for correct caching. ## Contact -For any contribution, questions, or requests, please feel free to [open an issue](https://github.com/JGCRI/scalable/issues) or contact us directly: -**Shashank Lamba** [shashank.lamba@pnnl.gov](mailto:shashank.lamba@pnnl.gov) +For any contribution, questions, or requests, please feel free to [open an issue](https://github.com/JGCRI/scalable/issues) or contact us directly:\ +**Shashank Lamba** [shashank.lamba@pnnl.gov](mailto:shashank.lamba@pnnl.gov)\ **Pralit Patel** [pralit.patel@pnnl.gov](mailto:pralit.patel@pnnl.gov) ## [License](https://github.com/JGCRI/scalable/blob/master/LICENSE.md) \ No newline at end of file diff --git a/communicator/src/communicator.go b/communicator/src/communicator.go old mode 100644 new mode 100755 index 5afe589..73c2dfb --- a/communicator/src/communicator.go +++ b/communicator/src/communicator.go @@ -17,9 +17,10 @@ import ( // Changing CONNECTION_TYPE is not recommended const ( - DEFAULT_HOST = "0.0.0.0" - DEFAULT_PORT = "1919" - CONNECTION_TYPE = "tcp" + DEFAULT_HOST = "0.0.0.0" + DEFAULT_PORT = "1919" + CONNECTION_TYPE = "tcp" + NUM_PORT_RETRIES = 5 ) var BUFFER_LEN = 5120 @@ -27,16 +28,17 @@ var BUFFER_LEN = 5120 func main() { arguments := os.Args[1:] listen_port := DEFAULT_PORT - if len(arguments) > 1 { - listen_port = arguments[1] - } else if len(arguments) == 0 { - fmt.Println("Either -s or -c option needed") + argslen := len(arguments) + if argslen == 0 { + fmt.Println("Either -s or -c option needed. Use -h for help.") gracefulExit() + } else if argslen > 1 { + listen_port = arguments[1] } if arguments[0] == "-s" { loop := 0 server, err := net.Listen(CONNECTION_TYPE, DEFAULT_HOST+":"+listen_port) - for err != nil && loop < 5 && len(arguments) <= 1 { + for err != nil && loop < NUM_PORT_RETRIES && argslen <= 1 { listen_port = strconv.Itoa(rand.Intn(40000-2000) + 2000) server, err = net.Listen(CONNECTION_TYPE, DEFAULT_HOST+":"+listen_port) loop++ @@ -119,6 +121,13 @@ func main() { received += read } fmt.Print(output.String()) + } else if arguments[0] == "-h" { + fmt.Println("Usage: communicator [OPTION] [PORT]") + fmt.Println("Options:") + fmt.Println(" -s\t\tStart server") + fmt.Println(" -c\t\tStart client") + fmt.Println(" -h\t\tShow help") + fmt.Println("PORT is optional and defaults to 1919 for server use.") } else { fmt.Println("Invalid option") gracefulExit() @@ -141,7 +150,7 @@ func handleRequest(client net.Conn) { buffer := make([]byte, BUFFER_LEN) received := 0 flag := 0 - for received < len(lenBuffer) { + for received < len(lenBuffer) && received < BUFFER_LEN { read, err := client.Read(lenBuffer[received:]) if err != nil { clientClose(err, client) @@ -230,7 +239,7 @@ func handleRequest(client net.Conn) { return } sent = 0 - for sent < len(lastOutput.String()) { + for sent < len(lastOutput.String()) && sent < BUFFER_LEN { wrote, err := client.Write(([]byte(lastOutput.String()))[sent:]) if err != nil { clientClose(err, client) diff --git a/docs/Makefile b/docs/Makefile new file mode 100755 index 0000000..d4bb2cb --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/docs/_static/custom.css b/docs/_static/custom.css new file mode 100755 index 0000000..e69de29 diff --git a/docs/caching.rst b/docs/caching.rst new file mode 100755 index 0000000..dfde54c --- /dev/null +++ b/docs/caching.rst @@ -0,0 +1,17 @@ +Caching +======= + +.. autofunction:: scalable.cacheable + +.. autoclass:: scalable.GenericType + :exclude-members: __init__ + +.. autoclass:: scalable.FileType + +.. autoclass:: scalable.DirType + +.. autoclass:: scalable.ValueType + +.. autoclass:: scalable.ObjectType + +.. autoclass:: scalable.UtilityType \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py new file mode 100755 index 0000000..33d36ae --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,45 @@ +# Configuration file for the Sphinx documentation builder. +# +# For the full list of built-in configuration values, see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +import os +import sys + +sys.path.insert(0, os.path.abspath('..')) +sys.path.insert(0, os.path.abspath('../scalable')) + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +project = 'Scalable' +copyright = '2024, Joint Global Change Research Institute' +author = 'Shashank Lamba, Pralit Patel' +release = '0.6.0' + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration + +extensions = ["sphinx.ext.autodoc", "sphinx.ext.todo", "sphinx.ext.viewcode", "sphinx.ext.napoleon"] + +templates_path = ['_templates'] +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] + +autodoc_default_options = { + 'members': True, + 'undoc-members': True, + 'private-members': False, + 'special-members': '__init__', + 'inherited-members': False, + 'show-inheritance': False, + 'no-index': True, +} + +# add_module_names = False + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +html_theme = 'sphinx_rtd_theme' +html_static_path = ['_static'] +html_css_files = ['custom.css'] \ No newline at end of file diff --git a/docs/functions.rst b/docs/functions.rst new file mode 100755 index 0000000..6ddcc02 --- /dev/null +++ b/docs/functions.rst @@ -0,0 +1,15 @@ +Submitting Functions +==================== + +.. autoclass:: scalable.ScalableClient + :exclude-members: submit, map, get_versions, cancel, close + +.. autofunction:: scalable.ScalableClient.submit + +.. autofunction:: scalable.ScalableClient.map + +.. autofunction:: scalable.ScalableClient.get_versions + +.. autofunction:: scalable.ScalableClient.cancel + +.. autofunction:: scalable.ScalableClient.close \ No newline at end of file diff --git a/docs/images/scalable_architecture.png b/docs/images/scalable_architecture.png new file mode 100755 index 0000000..2a001f7 Binary files /dev/null and b/docs/images/scalable_architecture.png differ diff --git a/docs/index.rst b/docs/index.rst new file mode 100755 index 0000000..2615ff6 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,48 @@ +.. Scalable documentation master file, created by + sphinx-quickstart on Thu Aug 22 10:55:42 2024. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Scalable Documentation +====================== + +Scalable is a Python library for running complex workflows on HPC systems +efficiently and with minimal manual intervention. It uses a dask backend and a +range of custom programs to achieve this. The figure below shows the general +architecture of Scalable. + +.. image:: images/scalable_architecture.png + :align: center + +These questions can help answering if Scalable would be useful for you: + +* Is your workflow ran on a HPC system and takes a significant amount of time? +* Does your workflow involve pipelines, where outputs from certain functions or + models are passed as inputs to other functions or models? +* Do you want the hardware allocation to be done automatically? + + +Scalable could be useful if one of more of the above questions are affirmative. +To incorporate the ability to run functions under different environments, +docker containers can be used. A Dockerfile with multiple targets can be used +to make multiple containers, each with different installed libraries and models. +When adding workers to cluster, it can be specified how many workers of +each type should be added. + +Contents: +--------- + +.. toctree:: + :maxdepth: 1 + + workers + +.. toctree:: + :maxdepth: 1 + + caching + +.. toctree:: + :maxdepth: 1 + + functions diff --git a/docs/make.bat b/docs/make.bat new file mode 100755 index 0000000..32bb245 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..90b7cf8 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,2 @@ +sphinx-rtd-theme +scalable \ No newline at end of file diff --git a/docs/workers.rst b/docs/workers.rst new file mode 100755 index 0000000..c881d4f --- /dev/null +++ b/docs/workers.rst @@ -0,0 +1,16 @@ +Worker Management +================= + + +.. autoclass:: scalable.SlurmCluster + :exclude-members: close, job_cls, set_default_request_quantity + +.. autofunction:: scalable.SlurmCluster.add_container + +.. autofunction:: scalable.SlurmCluster.add_workers + +.. autofunction:: scalable.SlurmCluster.remove_workers + +.. autofunction:: scalable.SlurmCluster.close + +.. autofunction:: scalable.SlurmCluster.set_default_request_quantity diff --git a/pyproject.toml b/pyproject.toml old mode 100644 new mode 100755 index 0a6b57d..8f999e6 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "joblib >= 1.3.2", "xxhash >= 3.4.1", "versioneer >= 0.29", + "numpy >= 1.26.4", + "pandas >= 2.2.3" ] classifiers = [ "Development Status :: 4 - Beta", @@ -48,7 +50,6 @@ test = [ [project.urls] "Github" = "https://github.com/JGCRI/scalable/tree/master/scalable" -"Homepage" = "https://www.pnnl.gov" [project.scripts] scalable_bootstrap = "scalable.utilities:run_bootstrap" diff --git a/scalable/Dockerfile b/scalable/Dockerfile old mode 100644 new mode 100755 index a560961..d7a5f9f --- a/scalable/Dockerfile +++ b/scalable/Dockerfile @@ -16,6 +16,7 @@ RUN apt-get install -y --no-install-recommends python3-rpy2 ENV R_LIBS_USER usr/lib/R/site-library RUN chmod a+w /usr/lib/R/site-library RUN cp -r /usr/lib/R/site-library /usr/local/lib/R/site-library +RUN python3 -m pip install -U pip FROM build_env AS conda RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh @@ -30,7 +31,7 @@ RUN eval "$(/root/miniconda3/bin/conda shell.bash hook)" \ ENV PATH /root/miniconda3/bin:$PATH RUN conda init -FROM build_env as scalable +FROM build_env AS scalable ADD "https://api.github.com/repos/JGCRI/scalable/commits?per_page=1" latest_commit RUN git clone https://github.com/JGCRI/scalable.git /scalable RUN pip3 install /scalable/. @@ -172,7 +173,8 @@ RUN cd /gcamwrapper && sed -i "s/python_requires='>=3.6.*, <4'/python_requires=' RUN cd /gcamwrapper && pip3 install . RUN pip install gcamreader RUN git clone https://github.com/JGCRI/gcam_config.git /gcam_config -RUN pip3 install /gcam_config/. +RUN pip3 install /gcam_config/. +RUN pip3 install dtaidistance scipy COPY --from=scalable /scalable /scalable RUN pip3 install /scalable/. RUN pip3 install --force-reinstall numpy==1.26.4 diff --git a/scalable/__init__.py b/scalable/__init__.py index 3227333..37b9fd5 100755 --- a/scalable/__init__.py +++ b/scalable/__init__.py @@ -1,12 +1,12 @@ -# flake8: noqa -from .core import JobQueueCluster -from .slurm import SlurmCluster -from .caching import * -from .common import SEED -from .client import ScalableClient + from dask.distributed import Security from ._version import get_versions +from .caching import * +from .client import ScalableClient +from .common import SEED +from .core import JobQueueCluster +from .slurm import SlurmCluster __version__ = get_versions()["version"] del get_versions diff --git a/scalable/_version.py b/scalable/_version.py old mode 100644 new mode 100755 diff --git a/scalable/caching.py b/scalable/caching.py old mode 100644 new mode 100755 index 905674c..d61febd --- a/scalable/caching.py +++ b/scalable/caching.py @@ -1,13 +1,16 @@ +import dill import os import pickle -from diskcache import Cache -from xxhash import xxh32 import types + import numpy as np import pandas as pd -import dill -from .common import logger, cachedir, SEED +from diskcache import Cache +from xxhash import xxh32 + +from .common import SEED, cachedir, logger + class GenericType: """The GenericType class is a base class for all types that can be hashed. @@ -31,8 +34,8 @@ class FileType(GenericType): """ def __hash__(self) -> int: - digest = 0 if os.path.exists(self.value): + digest = 0 with open(self.value, 'rb') as file: x = xxh32(seed=SEED) x.update(str(os.path.basename(self.value)).encode('utf-8')) @@ -52,10 +55,10 @@ class DirType(GenericType): """ def __hash__(self) -> int: - digest = 0 - x = xxh32(seed=SEED) - x.update(str(os.path.basename(self.value)).encode('utf-8')) if os.path.exists(self.value): + digest = 0 + x = xxh32(seed=SEED) + x.update(str(os.path.basename(self.value)).encode('utf-8')) filenames = os.listdir(self.value) filenames = sorted(filenames) for filename in filenames: @@ -103,10 +106,6 @@ def __hash__(self) -> int: x = xxh32(seed=SEED) if isinstance(self.value, list): value_list = self.value - try: - value_list = sorted(self.value) - except: - pass for element in value_list: x.update(hash_to_bytes(hash(convert_to_type(element)))) elif isinstance(self.value, dict): @@ -141,7 +140,7 @@ def __hash__(self) -> int: if isinstance(self.value, np.ndarray): x.update(self.value.tobytes()) elif isinstance(self.value, pd.DataFrame): - x.update(self.value.to_string().encode('utf-8')) + x.update(pickle.dumps(self.value)) digest = x.intdigest() return digest @@ -189,13 +188,13 @@ def convert_to_type(arg): elif isinstance(arg, (np.ndarray, pd.DataFrame)): ret = UtilityType(arg) else: - logger.warn(f"Could not identify type for argument: {arg}. Using default hash function. " + logger.warning(f"Could not identify type for argument: {arg}. Using default hash function. " "For more reliable performance, either wrap the argument in a class with a defined" " __hash__() function or open an issue on the scalable Github: github.com/JGCRI/scalable.") ret = ObjectType(arg) return ret -def cacheable(return_type=None, void=False, recompute=False, store=True, **arg_types): +def cacheable(return_type=None, void=False, check_output=False, recompute=False, store=True, **arg_types): """Decorator function to cache the output of a function. This function is used to cache other functions' outputs for certain @@ -219,6 +218,10 @@ def cacheable(return_type=None, void=False, recompute=False, store=True, **arg_t void : bool, optional Whether the function returns a value or not. A function is void if it does not return a value. The default is False. + check_output : bool, optional + Whether to check the output of a function has the same hash as when + it was stored. Useful to ensure entities like files haven't been + modified since initially stored. The default is False. recompute : bool, optional Whether to recompute the value or not. The default is False. store : bool, optional @@ -298,16 +301,18 @@ def inner(*args, **kwargs): raise KeyError(f"Key for function {func.__name__} could not be found.") stored_digest = value[0] new_digest = 0 - if return_type is None: - new_digest = hash(convert_to_type(value[1])) - else: - new_digest = hash(return_type(value[1])) - if new_digest == stored_digest: - ret = value[1] - else: - if not disk.delete(key, True): - logger.warn(f"{func.__name__} could not be deleted from cache after hash" + if check_output: + if return_type is None: + new_digest = hash(convert_to_type(value[1])) + else: + new_digest = hash(return_type(value[1])) + if new_digest == stored_digest: + ret = value[1] + elif not disk.delete(key, True): + logger.warning(f"{func.__name__} could not be deleted from cache after hash" " mismatch.") + else: + ret = value[1] if ret is None: ret = func(*args, **kwargs) if store: @@ -318,7 +323,7 @@ def inner(*args, **kwargs): new_digest = hash(return_type(ret)) value = [new_digest, ret] if not disk.add(key=key, value=value, retry=True): - logger.warn(f"{func.__name__} could not be added to cache.") + logger.warning(f"{func.__name__} could not be added to cache.") disk.close() return ret ret = inner diff --git a/scalable/client.py b/scalable/client.py old mode 100644 new mode 100755 index ff07faf..f27e3fe --- a/scalable/client.py +++ b/scalable/client.py @@ -1,58 +1,40 @@ -from collections.abc import Awaitable -from distributed import Scheduler, Client +from dask.typing import no_default +from distributed import Client from distributed.diagnostics.plugin import SchedulerPlugin - from .common import logger -from .slurm import SlurmJob, SlurmCluster +from .slurm import SlurmCluster -class SlurmSchedulerPlugin(SchedulerPlugin): +class SlurmSchedulerPlugin(SchedulerPlugin): def __init__(self, cluster): self.cluster = cluster super().__init__() - class ScalableClient(Client): + """Client for submitting tasks to a Dask cluster. Inherits the dask + client object. + + Parameters + ---------- + cluster : Cluster + The cluster object to connect to for submitting tasks. + """ def __init__(self, cluster, *args, **kwargs): super().__init__(address = cluster, *args, **kwargs) if isinstance(cluster, SlurmCluster): self.register_scheduler_plugin(SlurmSchedulerPlugin(None)) - def submit( - self, - func, - *args, - key=None, - workers=None, - tag=None, - n=1, - retries=None, - priority=0, - fifo_timeout="100 ms", - allow_other_workers=False, - actor=False, - actors=False, - pure=True, - **kwargs, - ): - """Submit a function application to the scheduler + def submit(self, func, *args, tag=None, n=1, **kwargs): + """Submit a function to be ran by workers in the cluster. Parameters ---------- - func : callable - Callable to be scheduled as ``func(*args,**kwargs)``. If ``func`` - returns a coroutine, it will be run on the main event loop of a - worker. Otherwise ``func`` will be run in a worker's task executor - pool (see ``Worker.executors`` for more information.) - \*args : tuple - Optional positional arguments - key : str - Unique identifier for the task. Defaults to function-name and hash - workers : string or iterable of strings - A set of worker addresses or hostnames on which computations may be - performed. Leave empty to default to all workers (common case) + func : function + Function to be scheduled for execution. + *args : tuple + Optional positional arguments to pass to the function. tag : str (optional) User-defined tag for the container that can run func. If not provided, func is assigned to be ran on a random container. @@ -60,48 +42,17 @@ def submit( Number of workers needed to run this task. Meant to be used with tag. Multiple workers can be useful for application level distributed computing. - retries : int (default to 0) - Number of allowed automatic retries if the task fails - priority : Number - Optional prioritization of task. Zero is default. - Higher priorities take precedence - fifo_timeout : str timedelta (default '100ms') - Allowed amount of time between calls to consider the same priority - allow_other_workers : bool (defaults to False) - Used with ``workers``. Indicates whether or not the computations - may be performed on workers that are not in the `workers` set(s). - actor : bool (default False) - Whether this task should exist on the worker as a stateful actor. - actors : bool (default False) - Alias for `actor` - pure : bool (defaults to True) - Whether or not the function is pure. Set ``pure=False`` for - impure functions like ``np.random.random``. Note that if both - ``actor`` and ``pure`` kwargs are set to True, then the value - of ``pure`` will be reverted to False, since an actor is stateful. - \*\*kwargs : dict + **kwargs : dict (optional) Optional key-value pairs to be passed to the function. Examples -------- >>> c = client.submit(add, a, b) - Notes - ----- - The current implementation of a task graph resolution searches for - occurrences of ``key`` and replaces it with a corresponding ``Future`` - result. That can lead to unwanted substitution of strings passed as - arguments to a task if these strings match some ``key`` that already - exists on a cluster. To avoid these situations it is required to use - unique values if a ``key`` is set manually. See - https://github.com/dask/dask/issues/9969 to track progress on resolving - this issue. - Returns ------- Future - If running in asynchronous mode, returns the future. Otherwise - returns the concrete value + Returns the future object that runs the function. Raises ------ @@ -114,22 +65,97 @@ def submit( resources = None if tag is not None: resources = {tag: n} - return super().submit(func, - *args, - key=key, - workers=workers, - resources=resources, - retries=retries, - priority=priority, - fifo_timeout=fifo_timeout, - allow_other_workers=allow_other_workers, - actor=actor, - actors=actors, - pure=False, - **kwargs) - - + return super().submit(func, resources=resources, *args, **kwargs) + def cancel(self, futures, *args, **kwargs): + """ + Cancel running futures + This stops future tasks from being scheduled if they have not yet run + and deletes them if they have already run. After calling, this result + and all dependent results will no longer be accessible + + Parameters + ---------- + futures : future | future, list + One or more futures to cancel (as a list). + *args : tuple + Positional arguments to pass to dask client's cancel method. + **kwargs : dict + Keyword arguments to pass to dask client's cancel method. + """ + return super().cancel(futures, *args, **kwargs) + def close(self, timeout=no_default): + """Close this client + + Clients will also close automatically when your Python session ends + + Parameters + ---------- + timeout : number + Time in seconds after which to raise a + ``dask.distributed.TimeoutError`` + + """ + return super().close(timeout) + def map(self, func, parameters, tag, n, *args, **kwargs): + """Map a function on multiple sets of arguments to run the function + multiple times with different inputs. + + Parameters + ---------- + func : function + Function to be scheduled for execution. + parameters : list of lists + Lists of parameters to be passed to the function. The first list + should have the first parameter values, the second list should have + the second parameter values, and so on. The lists should be of the + same length. + tag : str (optional) + User-defined tag for the container that can run func. If not + provided, func is assigned to be ran on a random container. + n : int (default 1) + Number of workers needed to run this task. Meant to be used with + tag. Multiple workers can be useful for application level + distributed computing. + *args : tuple + Positional arguments to pass to dask client's map method. + **kwargs : dict + Keyword arguments to pass to dask client's map method. + Examples + -------- + >>> def add(a, b): ... + >>> L = client.map(add, [[1, 2, 3], [4, 5, 6]]) + + Returns + ------- + List of futures + Returns a list of future objects, each for a separate run of the + function with the given parameters. + """ + resources = None + if tag is not None: + resources = {tag: n} + return super().map(func=func, iterables=parameters, resources=resources, *args, **kwargs) + + def get_versions(self, check=False, packages = None): + """Return version info for the scheduler, all workers and myself + + Parameters + ---------- + check : bool + Raise ValueError if all required & optional packages do not match. + Default is False. + packages : list + Extra package names to check. + + Examples + -------- + >>> c.get_versions() + + >>> c.get_versions(packages=['sklearn', 'geopandas']) + """ + return super().get_versions(check, packages) + \ No newline at end of file diff --git a/scalable/common.py b/scalable/common.py old mode 100644 new mode 100755 diff --git a/scalable/core.py b/scalable/core.py index f8778f7..310323b 100755 --- a/scalable/core.py +++ b/scalable/core.py @@ -1,105 +1,36 @@ -from contextlib import suppress + +import abc +import asyncio +import copy import os import re import shlex import sys -import abc import tempfile -import threading import time -import copy -import warnings -import asyncio -from dask.utils import parse_bytes +from contextlib import suppress +from dask.utils import parse_bytes from distributed.core import Status from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.scheduler import Scheduler from distributed.security import Security from distributed.utils import NoOpAwaitable -from .utilities import * -from .support import * - from .common import logger +from .support import * +from .utilities import * DEFAULT_WORKER_COMMAND = "distributed.cli.dask_worker" -# The length of the period (in mins) to check and account for dead workers -CHECK_DEAD_WORKER_PERIOD = 1 - -job_parameters = """ - cpus : int - Akin to the number of cores the current job should have available. - memory : str - Amount of memory the current job should have available. - nanny : bool - Whether or not to start a nanny process. - interface : str - Network interface like 'eth0' or 'ib0'. This will be used both for the - Dask scheduler and the Dask workers interface. If you need a different - interface for the Dask scheduler you can pass it through - the ``scheduler_options`` argument: ``interface=your_worker_interface, - scheduler_options={'interface': your_scheduler_interface}``. - death_timeout : float - Seconds to wait for a scheduler before closing workers - local_directory : str - Dask worker local directory for file spilling. - worker_command : list - Command to run when launching a worker. Defaults to - "distributed.cli.dask_worker" - worker_extra_args : list - Additional arguments to pass to `dask-worker` - python : str - Python executable used to launch Dask workers. - Defaults to the Python that is submitting these jobs - hardware : HardwareResources - A shared object containing the hardware resources available to the - cluster. - tag : str - The tag or the container type of the worker to be launched. - container : Container - The container object containing the information about launching the - worker. - launched : list - A list of launched workers which is shared across all workers and the - cluster object to keep track of the workers that have been launched. - security : Security - A security object containing the TLS configuration for the worker. If - True then a temporary security object with a self signed certificate - is created. - run_scripts_path : str - The path where the run scripts are located. Defaults to ./run_scripts. - The run scripts should be in the format _script.sh. - use_run_scripts : bool - Whether or not to use the run scripts. Defaults to True. +job_parameters = """ """.strip() cluster_parameters = """ - silence_logs : str - Log level like "debug", "info", or "error" to emit here if the - scheduler is started locally - asynchronous : bool - Whether or not to run this cluster object with the async/await syntax - name : str - The name of the cluster, which would also be used to name workers. - Defaults to class name. - scheduler_options : dict - Used to pass additional arguments to Dask Scheduler. For example use - ``scheduler_options={'dashboard_address': ':12435'}`` to specify which - port the web dashboard should use or - ``scheduler_options={'host': 'your-host'}`` to specify the host the Dask - scheduler should run on. See :class:`distributed.Scheduler` for more - details. - scheduler_cls : type - Changes the class of the used Dask Scheduler. Defaults to Dask's - :class:`distributed.Scheduler`. - shared_temp_directory : str - Shared directory between scheduler and worker (used for example by - temporary security certificates) defaults to current working directory - if not set. + account : str + Accounting string associated with each worker job. comm_port : int The network port on which the cluster can contact the host config_overwrite : bool @@ -108,7 +39,24 @@ logs_location : str The location to store worker logs. Default to the logs folder in the current directory. - + suppress_logs : bool + Whether or not to suppress logs. Defaults to False. + name : str + The name of the cluster, which would also be used to name workers. + Defaults to class name. + queue : str + Destination queue for each worker job. + run_scripts_path : str + The path where the run scripts are located. Defaults to ./run_scripts. + The run scripts should be in the format _script.sh. + security : Security + A security object containing the TLS configuration for the worker. If + True then a temporary security object with a self signed certificate + is created. + use_run_scripts : bool + Whether or not to use the run scripts. Defaults to True. + walltime : str + Walltime for each worker job. """.strip() @@ -138,7 +86,7 @@ class Job(ProcessInterface, abc.ABC): SLURMCluster """.format(job_parameters=job_parameters) - # Following class attributes should be overridden by extending classes. + # Following class attributes should be overridden by extending classes if necessary. cancel_command = None job_id_regexp = r"(?P\d+)" @@ -269,7 +217,6 @@ def __init__( if protocol and "--protocol" not in worker_extra_args: worker_extra_args.extend(["--protocol", protocol]) - # Keep information on process, cores, and memory, for use in subclasses self.worker_memory = parse_bytes(self.memory) if self.memory is not None else None # dask-worker command line build @@ -436,9 +383,12 @@ def __init__( config_overwrite=True, comm_port=None, logs_location=None, + suppress_logs=False, **job_kwargs ): + if comm_port is None: + comm_port = os.getenv("COMM_PORT") if comm_port is None: raise ValueError( "Communicator port not given. You must specify the communicator port " @@ -458,6 +408,9 @@ def __init__( type(self) ) ) + + if interface is None: + interface = "ib0" if dashboard_address is not None: raise ValueError( @@ -515,12 +468,14 @@ def __init__( "cls": scheduler_cls, "options": scheduler_options, } - - self.logs_location = logs_location - if self.logs_location is None: - directory_name = self.job_cls.__name__.replace("Job", "") + "Cluster" - self.logs_location = create_logs_folder("logs", directory_name) + if not suppress_logs: + if logs_location is None: + directory_name = self.job_cls.__name__.replace("Job", "") + "Cluster" + logs_location = create_logs_folder("logs", directory_name) + self.logs_location = logs_location + else: + self.logs_location = None self.shared_temp_directory = shared_temp_directory @@ -549,7 +504,7 @@ def __init__( name=name, ) - def add_worker(self, tag=None, n=0): + def add_workers(self, tag=None, n=0): """Add workers to the cluster. Parameters @@ -564,7 +519,7 @@ def add_worker(self, tag=None, n=0): Examples -------- - >>> cluster.add_worker("gcam", 4) + >>> cluster.add_workers("gcam", 4) """ if self.exited or self.status in (Status.closing, Status.closed): @@ -618,7 +573,7 @@ def remove_workers(self, tag=None, n=0): can_remove.extend([worker_name for worker_name in list(self.worker_spec.keys()) if tag in worker_name]) current = len(can_remove) if n > current: - logger.warn(f"Cannot remove {n} workers. Only {current} workers found, removing all.") + logger.warning(f"Cannot remove {n} workers. Only {current} workers found, removing all.") n = current can_remove = can_remove[:n] if n != 0 and self.status not in (Status.closing, Status.closed): @@ -632,42 +587,6 @@ def remove_workers(self, tag=None, n=0): if self.asynchronous: return NoOpAwaitable() - def _check_dead_workers(self): - """Periodically check for dead workers. - - This function essentially calls self.add_worker() with default - parameters which only syncs the cluster to the current state of - the workers. Any dead workers may be relaunched. - """ - next_call = time.time() - while not self.exited: - self.add_worker() - temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=True) - temp_file_name = temp_file.name - temp_file.close() - with open(temp_file_name, 'w+') as temp_file: - temp_file.write("scalable") - temp_file.flush() - temp_file.seek(0) - temp_file.read() - os.remove(temp_file_name) - next_call = next_call + (60 * CHECK_DEAD_WORKER_PERIOD) - time.sleep(next_call - time.time()) - - def _get_dead_worker_tags(self): - """Get the list of dead workers. - - This function returns the list of workers that are dead. - """ - dead_workers = [] - for worker in self.launched: - if worker[0] not in self.workers: - dead_workers.append(worker) - for worker in dead_workers: - del self.worker_spec[worker[0]] - self.launched.remove(worker) - return [worker[1] for worker in dead_workers] - def add_container(self, tag, dirs, path=None, cpus=None, memory=None): """Add containers to enable them launching as workers. @@ -737,7 +656,6 @@ def new_worker_spec(self, tag): Dictionary containing the name and spec for the next worker """ if tag not in self.specifications: - lock = self.new_spec["options"]["shared_lock"] self.specifications[tag] = copy.copy(self.new_spec) if tag not in self.containers: raise ValueError(f"The tag ({tag}) given is not a recognized tag for any of the containers." @@ -775,11 +693,11 @@ def _get_worker_security(self, security): # a shared temp directory should be configured correctly elif self.shared_temp_directory is None: shared_temp_directory = os.getcwd() - warnings.warn( - "Using a temporary security object without explicitly setting a shared_temp_directory: \ -writing temp files to current working directory ({}) instead. You can set this value by \ -using dask for e.g. `dask.config.set({{'jobqueue.pbs.shared_temp_directory': '~'}})`\ -or by setting this value in the config file found in `~/.config/dask/jobqueue.yaml` ".format( + logger.warning( + "Using a temporary security object without explicitly setting a shared_temp_directory: " + "writing temp files to current working directory ({}) instead. You can set this value by " + "using dask for e.g. `dask.config.set({{'jobqueue.pbs.shared_temp_directory': '~'}})` " + "or by setting this value in the config file found in `~/.config/dask/jobqueue.yaml` ".format( shared_temp_directory ), category=UserWarning, @@ -841,7 +759,7 @@ def scale(self, n=None, jobs=0, memory=None, cores=None): Target number of cores """ - logger.warn("This function must only be called internally on exit. " + + logger.warning("This function must only be called internally on exit. " + "Any calls made explicity or during execution can result " + "in undefined behavior. " + "If called accidentally, an " + "immediate shutdown and restart of the cluster is recommended.") diff --git a/scalable/scalable_bootstrap.sh b/scalable/scalable_bootstrap.sh index 68756f5..526828b 100755 --- a/scalable/scalable_bootstrap.sh +++ b/scalable/scalable_bootstrap.sh @@ -1,9 +1,13 @@ -#!/bin/bash +#!/usr/bin/env bash + +### EDITABLE CONSTANTS ### GO_VERSION_LINK="https://go.dev/VERSION?m=text" GO_DOWNLOAD_LINK="https://go.dev/dl/*.linux-amd64.tar.gz" SCALABLE_REPO="https://github.com/JGCRI/scalable.git" APPTAINER_VERSION="1.3.2" +DEFAULT_PORT="1919" +CONFIG_FILE="/tmp/.scalable_config" # set -x @@ -14,24 +18,6 @@ GREEN='\033[0;32m' YELLOW='\033[0;33m' NC='\033[0m' -prompt() { - local color="$1" - local prompt_text="$2" - echo -e -n "${color}${prompt_text}${NC}" # Print prompt in specified color - read input -} - -flush() { - read -t 0.1 -n 10000 discard -} - -echo -e "${RED}Connection to HPC/Cloud...${NC}" -flush -prompt "$RED" "Hostname: " -host=$input -flush -prompt "$RED" "Username: " -user=$input if [[ $* == *"-i"* ]]; then while getopts ":i:" flag; do case $flag in @@ -43,6 +29,10 @@ if [[ $* == *"-i"* ]]; then done fi +### FUNCTIONS ### + +### check_exit_code: checks the exit code of the last command and exits if it is non-zero + check_exit_code() { if [ $1 -ne 0 ]; then echo -e "${RED}Command failed with exit code $1${NC}" @@ -51,6 +41,60 @@ check_exit_code() { fi } +### prompt: prompts the user for input + +prompt() { + local color="$1" + local prompt_text="$2" + echo -e -n "${color}${prompt_text}${NC}" + read input +} + +### flush: flushes the input buffer + +flush() { + read -t 0.1 -n 10000 discard +} + +echo -e "${RED}Connection to HPC/Cloud...${NC}" + +choice="N" + +if [[ -f $CONFIG_FILE ]]; then + echo -e "${YELLOW}Found saved configuration file${NC}" + flush + prompt "$RED" "Do you want to use the saved configuration? (Y/n): " + choice=$input +fi + +if [[ "$choice" =~ [Yy]|^[Yy][Ee]|^[Yy][Ee][Ss]$ ]]; then + source $CONFIG_FILE + check_exit_code $? +else + flush + prompt "$RED" "Hostname: " + host=$input + flush + prompt "$RED" "Username: " + user=$input + + flush + prompt "$RED" "Enter Remote Work Directory Name (created in home directory of remote system/if one exists): " + work_dir=$input + + flush + prompt "$RED" "Do you want to save the username, hostname, and work directory for future use? (Y/n): " + save=$input + + if [[ "$save" =~ [Yy]|^[Yy][Ee]|^[Yy][Ee][Ss]$ ]]; then + rm -f $CONFIG_FILE + check_exit_code $? + echo -e "host=$host\nuser=$user\nwork_dir=$work_dir" > $CONFIG_FILE + check_exit_code $? + fi +fi + +### Go version is set to latest ### GO_VERSION=$(ssh $user@$host "curl -s $GO_VERSION_LINK | head -n 1 | tr -d '\n'") check_exit_code $? @@ -59,24 +103,20 @@ DOWNLOAD_LINK="${GO_DOWNLOAD_LINK//\*/$GO_VERSION}" FILENAME=$(basename $DOWNLOAD_LINK) check_exit_code $? -flush -prompt "$RED" "Enter Work Directory Name \ -(created in home directory of remote system or if it already exists): " -work_dir=$input - echo -e "${GREEN}To prevent local environment setup every time on launch, please run the \ scalable_bootstrap script from the same directory each time.${NC}" +if [[ ! -f "Dockerfile" ]]; then + flush + echo -e "${YELLOW}Dockefile not found in current directory. Downloading default Dockerfile from remote...${NC}" + curl -O "https://raw.githubusercontent.com/JGCRI/scalable/master/scalable/Dockerfile" + check_exit_code $? +fi + prompt "$RED" "Do you want to build and transfer containers? (Y/n): " transfer=$input build=() if [[ "$transfer" =~ [Yy]|^[Yy][Ee]|^[Yy][Ee][Ss]$ ]]; then - if [[ ! -f "Dockerfile" ]]; then - flush - echo -e "${YELLOW}Dockefile not found in current directory. Downloading from remote...${NC}" - wget "https://raw.githubusercontent.com/JGCRI/scalable/master/Dockerfile" - check_exit_code $? - fi echo -e "${YELLOW}Available container targets: ${NC}" avail=$(sed -n -E 's/^FROM[[:space:]]{1,}[^ ]{1,}[[:space:]]{1,}AS[[:space:]]{1,}([^ ]{1,})$/\1/p' Dockerfile) check_exit_code $? @@ -118,55 +158,40 @@ please delete it from remote and run this script again${NC}" flush ssh -t $user@$host \ "{ - [[ -d \"$work_dir\" ]] && - [[ -d \"$work_dir/logs\" ]] && - echo '$work_dir already exists on remote' -} || -{ - mkdir -p $work_dir - mkdir -p $work_dir/logs -}" -check_exit_code $? - -flush -ssh -t $user@$host \ -"{ - [[ -d \"$work_dir/go\" ]] && - echo '$work_dir/go already exists on remote' -} || -{ - wget $DOWNLOAD_LINK -P $work_dir && - tar -C $work_dir -xzf $work_dir/$FILENAME -}" -check_exit_code $? - -flush -ssh -t $user@$host \ -"{ - [[ -d \"$work_dir/scalable\" ]] && - echo '$work_dir/scalable already exists on remote' -} || + if [[ -d \"$work_dir\" && -d \"$work_dir/logs\" ]]; then + echo '$work_dir already exists on remote' + else + mkdir -p $work_dir + mkdir -p $work_dir/logs + fi +} && { - git clone $SCALABLE_REPO $work_dir/scalable -}" -check_exit_code $? - -GO_PATH=$(ssh $user@$host "cd $work_dir/go/bin/ && pwd") -GO_PATH="$GO_PATH/go" -flush -ssh -t $user@$host \ -"{ - [[ -f \"$work_dir/communicator\" ]] && - echo '$work_dir/communicator file already exists on remote' && - [[ -f \"$work_dir/scalable/communicator/communicator\" ]] && - cp $work_dir/scalable/communicator/communicator $work_dir/. -} || + if [[ -d \"$work_dir/go\" ]]; then + echo '$work_dir/go already exists on remote' + else + echo 'go directory not found on remote...installing version $GO_VERSION (likely latest)' && + wget $DOWNLOAD_LINK -P $work_dir && + tar -C $work_dir -xzf $work_dir/$FILENAME + fi +} && { - cd $work_dir/scalable/communicator && - $GO_PATH mod init communicator && - $GO_PATH build src/communicator.go && - cd && - cp $work_dir/scalable/communicator/communicator $work_dir/. + if [[ -d \"$work_dir/scalable\" ]]; then + echo '$work_dir/scalable already exists on remote' + else + git clone $SCALABLE_REPO $work_dir/scalable + fi +} && +{ + if [[ -f \"$work_dir/communicator\" ]]; then + echo '$work_dir/communicator file already exists on remote' + elif [[ -f \"$work_dir/scalable/communicator/communicator\" ]]; then + cp $work_dir/scalable/communicator/communicator $work_dir/. + else + cd $work_dir/scalable/communicator && + ../../go/bin/go mod init communicator && + ../../go/bin/go build src/communicator.go && + cp communicator ../../. + fi }" check_exit_code $? @@ -179,6 +204,18 @@ mkdir -p tmp-apptainer/cache APPTAINER_TMPDIR="/tmp-apptainer/tmp" APPTAINER_CACHEDIR="/tmp-apptainer/cache" +ssh $user@$host "[[ -f $work_dir/containers/scalable_container.sif ]]" +exist=$(echo $?) +if [[ "$exist" -eq 0 ]]; then + docker images | grep scalable_container + exist=$(echo $?) +fi +if [[ "$exist" -ne 0 ]]; then + echo -e "${YELLOW}Scalable container not found locally or on remote. Building and transferring...${NC}" + transfer=Y + build+=("scalable") +fi + if [[ "$transfer" =~ [Yy]|^[Yy][Ee]|^[Yy][Ee][Ss]$ ]]; then flush @@ -189,6 +226,24 @@ if [[ "$transfer" =~ [Yy]|^[Yy][Ee]|^[Yy][Ee][Ss]$ ]]; then mkdir -p run_scripts check_exit_code $? + rebuild="false" + docker images | grep apptainer_container + if [ "$?" -ne 0 ]; then + rebuild="true" + fi + current_version=$(docker run --rm apptainer_container version) + if [ "$current_version" != "$APPTAINER_VERSION" ]; then + rebuild="true" + fi + if [ "$rebuild" == "true" ]; then + flush + APPTAINER_COMMITISH="v$APPTAINER_VERSION" + docker build --target apptainer --build-arg APPTAINER_COMMITISH=$APPTAINER_COMMITISH \ + --build-arg APPTAINER_TMPDIR=$APPTAINER_TMPDIR --build-arg APPTAINER_CACHEDIR=$APPTAINER_CACHEDIR \ + -t apptainer_container . + check_exit_code $? + fi + for target in "${build[@]}" do flush @@ -212,34 +267,13 @@ if [[ "$transfer" =~ [Yy]|^[Yy][Ee]|^[Yy][Ee][Ss]$ ]]; then chmod +x run_scripts/$target\_script.sh check_exit_code $? - done - - rebuild="false" - docker images | grep apptainer_container - if [ "$?" -ne 0 ]; then - rebuild="true" - fi - current_version=$(docker run --rm apptainer_container version) - if [ "$current_version" != "$APPTAINER_VERSION" ]; then - rebuild="true" - fi - if [ "$rebuild" == "true" ]; then - flush - APPTAINER_COMMITISH="v$APPTAINER_VERSION" - docker build --target apptainer --build-arg APPTAINER_COMMITISH=$APPTAINER_COMMITISH \ - --build-arg APPTAINER_TMPDIR=$APPTAINER_TMPDIR --build-arg APPTAINER_CACHEDIR=$APPTAINER_CACHEDIR \ - -t apptainer_container . - check_exit_code $? - fi - - for target in "${build[@]}" - do flush IMAGE_NAME=$(docker images | grep $target\_container | sed -E 's/[\t ][\t ]*/ /g' | cut -d ' ' -f 1) IMAGE_TAG=$(docker images | grep $target\_container | sed -E 's/[\t ][\t ]*/ /g' | cut -d ' ' -f 2) + flush docker run --rm -v //var/run/docker.sock:/var/run/docker.sock -v /$(pwd):/work -v /$(pwd)/tmp-apptainer:/tmp-apptainer \ - apptainer_container build --userns --force containers/$target\_container.sif docker-daemon://$IMAGE_NAME:$IMAGE_TAG + apptainer_container build --userns --force //work/containers/$target\_container.sif docker-daemon://$IMAGE_NAME:$IMAGE_TAG check_exit_code $? done @@ -252,20 +286,37 @@ flush docker run --rm -v /$(pwd):/host -v /$HOME/.ssh:/root/.ssh scalable_container \ bash -c "chmod 700 /root/.ssh && chmod 600 ~/.ssh/* \ && cd /host \ - && rsync -aP --include '*.sif' containers $user@$host:~/$work_dir \ - && rsync -aP --include '*.sh' run_scripts $user@$host:~/$work_dir \ + && (rsync -aP --include '*.sif' containers $user@$host:~/$work_dir || true) \ + && (rsync -aP --include '*.sh' run_scripts $user@$host:~/$work_dir || true) \ && rsync -aP Dockerfile $user@$host:~/$work_dir" check_exit_code $? -ssh -L 8787:deception.pnl.gov:8787 -t $user@$host \ +COMM_PORT=$DEFAULT_PORT +ssh $user@$host "netstat -tuln | grep :$COMM_PORT" +while [[ $? -eq 0 && "$COMM_PORT" != "8787" ]]; do + COMM_PORT=$(awk -v min=1024 -v max=49151 'BEGIN{srand(); print int(min+rand()*(max-min+1))}') + check_exit_code $? + ssh $user@$host "netstat -tuln | grep :$COMM_PORT" +done + +ssh $user@$host "netstat -tuln | grep :8787" +if [ $? -eq 0 ]; then + echo -e "${RED}Port 8787 is already in use on remote system${NC}" + echo -e "${YELLOW}Not forwarding port 8787, dask dashboard may be unavailable...${NC}" + connect="ssh" +else + connect="ssh -L 8787:$host:8787" +fi + +$connect -t $user@$host \ "{ module load apptainer/$APPTAINER_VERSION && cd $work_dir && $SHELL --rcfile <(echo \". $RC_FILE; python3() { - ./communicator -s >> logs/communicator.log & + ./communicator -s $COMM_PORT >> logs/communicator.log & COMMUNICATOR_PID=\\\$! - apptainer exec --userns --compat --home ~/$work_dir --cwd ~/$work_dir ~/$work_dir/containers/scalable_container.sif python3 \\\$@ + apptainer exec --userns --compat --env COMM_PORT=$COMM_PORT --home ~/$work_dir --cwd ~/$work_dir ~/$work_dir/containers/scalable_container.sif python3 \\\$@ kill -9 \\\$COMMUNICATOR_PID } \" ); }" diff --git a/scalable/slurm.py b/scalable/slurm.py index 6e081ee..0a30eab 100755 --- a/scalable/slurm.py +++ b/scalable/slurm.py @@ -5,7 +5,7 @@ from distributed.deploy.spec import ProcessInterface from .common import logger -from .core import Job, JobQueueCluster, job_parameters, cluster_parameters +from .core import Job, JobQueueCluster, cluster_parameters, job_parameters from .support import * from .utilities import * @@ -29,7 +29,6 @@ def __init__( tag=None, hardware=None, logs_location=None, - log=True, shared_lock=None, worker_env_vars=None, **base_class_kwargs @@ -47,11 +46,11 @@ def __init__( self.job_name = job_name self.job_id = None self.job_node = None + self.log_file = None - if log: + if logs_location is not None: self.log_file = os.path.abspath(os.path.join(logs_location, f"{self.name}-{self.tag}.log")) - # All the wanted commands should be set here self.send_command = self.container.get_command(worker_env_vars) self.send_command.extend(self.command_args) @@ -64,7 +63,7 @@ async def _srun_command(self, command): async def _ssh_command(self, command): prefix = ["ssh", self.job_node] if self.log_file: - suffix = [f">>{self.log_file}", "2>&1", "&"] + suffix = [f">> {self.log_file}", "2>&1", "&"] command = command + suffix command = list(map(str, command)) command_str = " ".join(command) @@ -149,30 +148,26 @@ async def close(self): cluster.loop.call_later(RECOVERY_DELAY, cluster._correct_state) class SlurmCluster(JobQueueCluster): - __doc__ = """ Launch Dask on a SLURM cluster + __doc__ = """Launch Dask on a SLURM cluster. Inherits the JobQueueCluster + class. Parameters ---------- - queue : str - Destination queue for each worker job. - project : str - Deprecated: use ``account`` instead. This parameter will be removed in a future version. - account : str - Accounting string associated with each worker job. - {job} {cluster} - walltime : str - Walltime for each worker job. - + *args : tuple + Positional arguments to pass to JobQueueCluster. + **kwargs : dict + Keyword arguments to pass to JobQueueCluster. """.format( - job=job_parameters, cluster=cluster_parameters + cluster=cluster_parameters ) job_cls = SlurmJob def close(self, timeout: float | None = None) -> Awaitable[None] | None: """Close the cluster - This closes all running jobs and the scheduler.""" + This closes all running jobs and the scheduler. Pending jobs belonging + to the user are also cancelled.""" active_jobs = self.hardware.get_active_jobids() jobs_command = "squeue -o \"%i %t\" -u $(whoami) | sed '1d'" result = subprocess.run(jobs_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -195,6 +190,21 @@ def close(self, timeout: float | None = None) -> Awaitable[None] | None: @staticmethod def set_default_request_quantity(nodes): + """Set the default number of nodes to request when scaling the cluster. + + Static Function. Does not require an instance of the class. + + If set to 1 (the original default), the cluster will request one + hardware node at a time when scaling. If set to a higher number, like 5, + the cluster will request 5 hardware nodes at a time when scaling. This + is helpful when each worker may need almost all the resources of a + node and it is more efficient to request multiple nodes at once. + + Parameters + ---------- + nodes : int + Number of nodes to request when scaling the cluster. + """ global DEFAULT_REQUEST_QUANTITY DEFAULT_REQUEST_QUANTITY = nodes \ No newline at end of file diff --git a/scalable/support.py b/scalable/support.py index 404215c..d97806d 100755 --- a/scalable/support.py +++ b/scalable/support.py @@ -1,7 +1,8 @@ -from datetime import datetime import os import re import shlex +from datetime import datetime + def salloc_command(account=None, chdir=None, clusters=None, exclusive=True, gpus=None, name=None, memory=None, nodes=None, partition=None, time=None, extras=None): @@ -85,7 +86,6 @@ def core_command(): """ return ["nproc", "--all"] -# Handle what to do if name is null or invalid def jobid_command(name): """Make the command to get the job id of a job with a given name. @@ -173,7 +173,7 @@ def parse_nodelist(nodelist): def create_logs_folder(folder, worker_name): """Create a folder for logs. Uses the current date and time along with the given worker name to create a unique folder name. - + Parameters ---------- folder : str diff --git a/scalable/utilities.py b/scalable/utilities.py index 879f501..9076293 100755 --- a/scalable/utilities.py +++ b/scalable/utilities.py @@ -1,11 +1,12 @@ -import subprocess -import yaml -import os import asyncio -from dask.utils import parse_bytes -from importlib.resources import files +import os import re +import subprocess import sys +import yaml + +from importlib.resources import files +from dask.utils import parse_bytes from .common import logger @@ -44,24 +45,10 @@ async def get_cmd_comm(port, communicator_path=None): ) return proc -def get_comm_port(logpath=None): - if logpath is None: - logpath = "./communicator.log" - ret = -1 - with open(logpath, 'r') as file: - for line in file: - match = re.search(comm_port_regex, line) - if match: - port = int(match.group(1)) - if 0 <= port <= 65535: - ret = port - break - return ret - def run_bootstrap(): bootstrap_location = files('scalable').joinpath('scalable_bootstrap.sh') - result = subprocess.run(["/bin/bash", bootstrap_location], stdin=sys.stdin, - stdout=sys.stdout, stderr=sys.stderr) + result = subprocess.run([os.environ.get("SHELL"), bootstrap_location.as_posix()], stdin=sys.stdin, + stdout=sys.stdout, stderr=sys.stdout) if result.returncode != 0: sys.exit(result.returncode) @@ -105,8 +92,8 @@ def __init__(self, path=None, path_overwrite=True): self.config_dict = {} cwd = os.getcwd() if path is None: - self.path = os.path.abspath(os.path.join(cwd, "scalable", "config_dict.yaml")) - dockerfile_path = os.path.abspath(os.path.join(cwd, "scalable", "Dockerfile")) + self.path = os.path.abspath(os.path.join(cwd, "config_dict.yaml")) + dockerfile_path = os.path.abspath(os.path.join(cwd, "Dockerfile")) list_avial_command = \ f"sed -n 's/^FROM[[:space:]]\+[^ ]\+[[:space:]]\+AS[[:space:]]\+\([^ ]\+\)$/\\1/p' {dockerfile_path}" result = subprocess.run(list_avial_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -122,7 +109,7 @@ def __init__(self, path=None, path_overwrite=True): logger.error("Failed to run sed command...manual entry of container info may be required") return if not os.path.exists(self.path): - logger.warn("No resource dict found...making one") + logger.warning("No resource dict found...making one") path_overwrite = True for container in avail_containers: self.config_dict[container] = ModelConfig.default_spec() @@ -535,7 +522,6 @@ class Container: def __init__(self, name, spec_dict): """ - Parameters ---------- name : str @@ -546,15 +532,12 @@ def __init__(self, name, spec_dict): be in gigabytes, megabytes, or bytes. '500MB' or '2GB' are valid. A valid spec_dict can look like: { - 'CPUs': 4, - 'Memory': '8G', - 'Path': '/home/user/work/containers/container.sif', - 'Dirs': { - '/home/work/inputs': '/inputs' - '/home/work/shared': '/shared' - } - } - + 'CPUs': 4, + 'Memory': '8G', + 'Path': '/home/user/work/containers/container.sif', + 'Dirs': { + '/home/work/inputs': '/inputs' + '/home/work/shared': '/shared'}} """ self.name = name self.cpus = spec_dict['CPUs'] diff --git a/versioneer.py b/versioneer.py old mode 100644 new mode 100755