diff --git a/src/batch/README.md b/src/batch/README.md new file mode 100644 index 000000000..45e0c9930 --- /dev/null +++ b/src/batch/README.md @@ -0,0 +1,17 @@ +## How to run data ingestion using Google Batch + +Specify which data source to ingest: +```bash +export MODULE=eqtl_catalogue +``` + +Run: +```bash +# Upload code. +gsutil -m cp -r . gs://genetics_etl_python_playground/batch/code +# Submit batch job. +gcloud batch jobs submit \ + "${MODULE//_/-}-$(date --utc +"%Y%m%d-%H%M%S")" \ + --config <(python3 ${MODULE}.py) \ + --location europe-west1 +``` diff --git a/src/batch/batch_common.py b/src/batch/batch_common.py new file mode 100644 index 000000000..09b10cbc7 --- /dev/null +++ b/src/batch/batch_common.py @@ -0,0 +1,67 @@ +"""Shared utilities for running non-Spark ingestion on Google Batch.""" + +from __future__ import annotations + +import json + + +def generate_job_config( + step_name: str, + number_of_tasks: int, + cpu_per_job: int = 16, + max_parallelism: int = 50, +) -> str: + """Generate configuration for a Google Batch job. + + Args: + step_name (str): Name of the module which will run the step, without the ".py" extension. + number_of_tasks (int): How many tasks are there going to be in the batch. + cpu_per_job (int): How many CPUs to allocate for each VM worker for each job. Must be a power of 2, maximum value is 64. + max_parallelism (int): The maximum number of concurrently running tasks. + + Returns: + str: Google Batch job config in JSON format. + """ + config = { + "taskGroups": [ + { + "taskSpec": { + "runnables": [ + { + "script": { + "text": f"bash /mnt/share/code/runner.sh {step_name}", + } + } + ], + "computeResource": { + "cpuMilli": cpu_per_job * 1000, + "memoryMib": cpu_per_job * 3200, + }, + "volumes": [ + { + "gcs": { + "remotePath": "genetics_etl_python_playground/batch" + }, + "mountPath": "/mnt/share", + } + ], + "maxRetryCount": 1, + "maxRunDuration": "3600s", + }, + "taskCount": number_of_tasks, + "parallelism": min(number_of_tasks, max_parallelism), + } + ], + "allocationPolicy": { + "instances": [ + { + "policy": { + "machineType": f"n2d-standard-{cpu_per_job}", + "provisioningModel": "SPOT", + } + } + ] + }, + "logsPolicy": {"destination": "CLOUD_LOGGING"}, + } + return json.dumps(config, indent=4) diff --git a/src/batch/runner.sh b/src/batch/runner.sh new file mode 100755 index 000000000..c33c7ca39 --- /dev/null +++ b/src/batch/runner.sh @@ -0,0 +1,28 @@ +export MODULE=$1 + +# Create directory for logs, if necessary. +mkdir -p /mnt/share/logs/$MODULE + +# Redirect all subsequent logs. +# The $BATCH_TASK_INDEX variable is set by Google Batch. +exec &> /mnt/share/logs/$MODULE/log.$BATCH_TASK_INDEX.log + +echo ">>> Update APT" +sudo apt -y update + +echo ">>> Install packages" +sudo apt -y install python3 python3-pip python3-setuptools + +echo ">>> Update PIP and setuptools" +sudo python3 -m pip install --upgrade pip setuptools +echo $? + +echo ">>> Install packages" +sudo python3 -m pip install -r /mnt/share/code/requirements.txt +echo $? + +echo ">>> Run script" +sudo python3 /mnt/share/code/${MODULE}.py ${BATCH_TASK_INDEX} + +echo ">>> Completed" +echo $?