Skip to content

A Dask scheduler that uses a Python concurrent.futures.Executor to run tasks

License

Notifications You must be signed in to change notification settings

tomwhite/dask-executor-scheduler

Repository files navigation

Dask Executor Scheduler

A Dask scheduler that uses a Python concurrent.futures.Executor to run tasks.

The motivation for building this was as a way to get Dask use serverless cloud functions for executing tasks. Using serverless cloud functions allows scaling to thousands of concurrent workers, with no cluster to set up and manage. This code has been used with Pywren, see instructions below.

The implementation is fairly naive - tasks are placed on an in-memory queue and processed by the executor in batches. Tasks are accumulated in a batch until they reach a certain size, or a timeout occurs - whichever happens first.

The tasks are generated by the Dask local scheduler, so there is no guarantee that they will be produced in an order that works well for this style of execution. However, batch-style parallel processing is generally a good fit for this scheduler.

Bookkeeping tasks (i.e. those that don't do any real work) are executed locally.

For testing, it's useful to use a ThreadPoolExecutor. This is the default if no executor is specified.

Have a look in the examples directory to see how to use the scheduler.

Upstream discussion/implementation

See also dask#6220 for discussion about including this in Dask; and dask#6322 for an implementation.

Installation

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
pip install -e .

Or using Conda (easier to install Zarr):

conda env create -f environment.yml 
conda activate dask_executor_scheduler
pip install -e .

Running locally

Local thread pool:

python examples/threadpool_executor.py

Pywren using a local executor:

python examples/pywren_local_executor.py

Configuring Pywren for Google Cloud

I've created a branch of pywren-ibm-cloud with support for Google Cloud Storage and Google Cloud Run here: https://github.com/tomwhite/pywren-ibm-cloud.

Edit your ~/.pywren_config file as follows, where <BUCKET> is the name of a newly-created bucket:

pywren:
    storage_bucket: <BUCKET>
    storage_backend: gcsfs
    compute_backend: cloudrun

gcsfs:
    project_id: <PROJECT_ID>

cloudrun:
    project_id: <PROJECT_ID>
    region: <REGION>

Run using the Cloud Run executor:

python examples/pywren_cloudrun_executor.py

Pywren runtimes

The default runtime will be automatically built for you when you first run Pywren. To run examples using Zarr you will need to build a custom conda runtime (since zarr installation via pip requires compilation of numcodecs). Note that this requires that https://github.com/pywren/pywren-ibm-cloud is checked out in the parent directory).

PROJECT_ID=...
PYWREN_LOGLEVEL=DEBUG pywren-ibm-cloud runtime build -f ../pywren-ibm-cloud/runtime/cloudrun/Dockerfile.conda37 "$PROJECT_ID/pywren-cloudrun-conda-v37:latest"

You can run this repeatedly to rebuild the runtime. You can create (or update) the Cloud Run function that uses the runtime with

pywren-ibm-cloud runtime create "$PROJECT_ID/pywren-cloudrun-conda-v37:latest"

The full docs on runtimes are here: https://github.com/pywren/pywren-ibm-cloud/tree/master/runtime

Example: Rechunking Zarr files

Rechunking Zarr files is a common, but surprisingly difficult problem to get right. This thread has an excellent discussion of the problem, and lots of suggested approaches and solutions.

The rechunker library is a general purpose solution, and one that is well suited to Pywren, since the Dask graph is small and the IO can be offloaded to the cloud without starting a dedicated Dask cluster.

The examples directory has a few examples of running rechunker on Zarr files using Pywren.

To run it you will need to create a conda runtime as explained in the previous section; and you will need to create a GCS bucket for the Zarr files.

Run using local files and local compute (local Dask and Pywren):

python examples/rechunk_local_storage_local_compute.py delete
python examples/rechunk_local_storage_local_compute.py create
python examples/rechunk_local_storage_local_compute.py rechunk

Run using Cloud storage and compute:

PROJECT_ID=...
BUCKET=...
python examples/rechunk_cloud_storage_cloud_compute.py delete $PROJECT_ID $BUCKET
python examples/rechunk_cloud_storage_cloud_compute.py create $PROJECT_ID $BUCKET
python examples/rechunk_cloud_storage_cloud_compute.py rechunk $PROJECT_ID $BUCKET

You can inspect the files in the bucket using regular CLI tools or cloud console.

Delete the files from the bucket after you have finished:

python examples/rechunk_cloud_storage_cloud_compute.py delete $PROJECT_ID $BUCKET

Related projects

The idea for this came from the work I did in Zappy to run NumPy processing on Pywren.

About

A Dask scheduler that uses a Python concurrent.futures.Executor to run tasks

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages