Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask clusters getting stuck during pangeo-forge workloads #88

Open
rabernat opened this issue Mar 29, 2021 · 8 comments
Open

Dask clusters getting stuck during pangeo-forge workloads #88

rabernat opened this issue Mar 29, 2021 · 8 comments

Comments

@rabernat
Copy link
Contributor

I have been running full size Pangeo Forge recipes on Pangeo Cloud, using Dask Gateway clusters of 10-50 workers. This has been a good opportunity to see how things perform at scale.

In general, I'm pretty happy. I've processed several large datasets (pangeo-forge/staged-recipes#23, pangeo-forge/staged-recipes#24) more-or-less successfully.

Example Recipe Workflow

Set up Dask Cluster

import subprocess
import logging
from distributed import WorkerPlugin

class PipPlugin(WorkerPlugin):
    """
    Install packages on a worker as it starts up.

    Parameters
    ----------
    packages : List[str]
        A list of packages to install with pip on startup.
    """
    def __init__(self, packages):
        self.packages = packages

    def setup(self, worker):
        logger = logging.getLogger("distributed.worker")
        subprocess.call(['python', '-m', 'pip', 'install', '--no-deps', '--upgrade'] + self.packages)
        logger.info("Installed %s", self.packages)

from dask_gateway import Gateway

gateway = Gateway()
options = gateway.cluster_options()
options.worker_memory = 4
options.worker_cores = 1
cluster = gateway.new_cluster(options)
client = cluster.get_client()

plugin = PipPlugin(
    ['git+https://github.com/pangeo-data/rechunker.git',
     'git+https://github.com/rabernat/xarray.git@zarr-chunk-fixes',
     'git+https://github.com/rabernat/pangeo-forge.git@write-with-zarr-and-local-cache'
    ]
)
client.register_worker_plugin(plugin)
cluster.scale(50)

from prefect.executors import DaskExecutor
prefect_executor = DaskExecutor(
    address=cluster.scheduler_address,
    client_kwargs={"security": cluster.security}
)

Create Recipe and Configure Storage

import pandas as pd
from pangeo_forge.recipe import NetCDFtoZarrSequentialRecipe

url_base = 'https://dsrs.atmos.umd.edu/DATA/soda3.4.2/ORIGINAL/ocean/'

dates = pd.date_range(start='1993-01-04', end='2019-12-19', freq='5D')
date_string = [d.strftime('%Y_%m_%d') for d in dates]
urls = [url_base + f'soda3.4.2_5dy_ocean_or_{dstring}.nc' for dstring in date_string]

recipe = NetCDFtoZarrSequentialRecipe(
    input_urls=urls,
    sequence_dim="time",
    inputs_per_chunk=1,
    cache_inputs=True
)

# configure storage

import os
from pangeo_forge.storage import FSSpecTarget, CacheFSSpecTarget
import json
import gcsfs

with open('pangeo-181919-cc01a4fbb1ef.json') as fp:
    token = json.load(fp)
fs_target = gcsfs.GCSFileSystem(token=token, cache_timeout=-1, cache_type="none")
fs_scratch = gcsfs.GCSFileSystem(cache_timeout=-1, cache_type="none")
recipe_name = 'soda3.4.2_5dy_ocean_or'
target_path = f'gs://pangeo-forge-us-central1/pangeo-forge/soda/{recipe_name}'
cache_path = os.environ['PANGEO_SCRATCH'] + f'pangeo-forge-cache/{recipe_name}'
cache_target = CacheFSSpecTarget(fs_scratch, cache_path)
target = FSSpecTarget(fs_target, target_path)
recipe.input_cache = cache_target
recipe.target = target

Run Recipe on Cluster

from pangeo_forge.executors import PrefectPipelineExecutor
pipelines = recipe.to_pipelines()
executor = PrefectPipelineExecutor()
flow = executor.pipelines_to_plan(pipelines)
state = flow.run(executor=prefect_executor)  # triggers many hours of work 

The problem

However with pangeo-forge/staged-recipes#23, which involves nearly 2000 input files, I started to see some weird behavior from Dask. Specifically, it looks like one worker is getting stuck with all the tasks and then becoming unresponsive. This grinds the whole flow to a halt.

Here's what the dashboard looks like in this state:
frozen_cluster

Here's a similar example with fewer workers:
stuck_workers

(Note that most workers have zero tasks to process, while one has all the rest. However, this worker has stopped working

Here's what the worker page looks like:
frozen_worker

In the worker log for the stuck worker, I see messages about "Comm closed".

``` distributed.worker - INFO - Start worker at: tls://10.36.27.69:43361 distributed.worker - INFO - Listening to: tls://10.36.27.69:43361 distributed.worker - INFO - dashboard at: 10.36.27.69:8787 distributed.worker - INFO - Waiting to connect to: tls://dask-098a0a4542fa4de0a7cbc3e00b8d3212.prod:8786 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Threads: 1 distributed.worker - INFO - Memory: 4.29 GB distributed.worker - INFO - Local Directory: /home/jovyan/dask-worker-space/dask-worker-space/worker-mzjd2u0g distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Starting Worker plugin <__main__.PipPlugin object at 0x7fc2ba85ce80>-294a0cab-7425-4b2d-860f-a4897f5e363c distributed.worker - INFO - Installed ['git+https://github.com/pangeo-data/rechunker.git', 'git+https://github.com/rabernat/xarray.git@zarr-chunk-fixes', 'git+https://github.com/rabernat/pangeo-forge.git@write-with-zarr-and-local-cache'] distributed.worker - INFO - Registered to: tls://dask-098a0a4542fa4de0a7cbc3e00b8d3212.prod:8786 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Connection to scheduler broken. Reconnecting... distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Starting Worker plugin <__main__.PipPlugin object at 0x7fc2ba85cb50>-77274dc9-8886-4c12-84ad-0dab61ee97f6 distributed.worker - INFO - Installed ['git+https://github.com/pangeo-data/rechunker.git', 'git+https://github.com/rabernat/xarray.git@zarr-chunk-fixes', 'git+https://github.com/rabernat/pangeo-forge.git@write-with-zarr-and-local-cache'] distributed.worker - INFO - Registered to: tls://dask-098a0a4542fa4de0a7cbc3e00b8d3212.prod:8786 distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Connection to scheduler broken. Reconnecting... distributed.worker - INFO - ------------------------------------------------- distributed.worker - INFO - Comm closed ```

It would be great if a Dask expert could help us get to the bottom of this.

@rabernat
Copy link
Contributor Author

rabernat commented Mar 29, 2021

A possibly related issue is that there is a huge amount of variability in the time taken by store_chunk tasks. Sometimes 90s, sometimes >800s.

image

@martindurant
Copy link
Contributor

Do you have profiler data on what's happening during store_chunk? Do you have typical times on a local process/thread? Do you think this might be hitting a cloud object store bottleneck or throttle?

@rabernat
Copy link
Contributor Author

I'm running it with a dask profile now. Also having better logging on the workers (#84) would help a lot.

@rabernat
Copy link
Contributor Author

Some workers are consistently like 10x faster than others. What could explain that?

image

@rabernat
Copy link
Contributor Author

rabernat commented Mar 29, 2021

@rabernat
Copy link
Contributor Author

For a recent test, I was able to eliminate the problem of high variance among workers by setting copy_input_to_local_file=False (see #87).

@martindurant
Copy link
Contributor

What kind of local storage do they have?

@rabernat
Copy link
Contributor Author

Some kind of kuberenetes volume mount, don't know the details. However our dask-gateway workers are configured.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants