From 8eeda2c78d8b4bd83ae793bdf1283f2ec76449eb Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Sun, 3 Dec 2023 18:18:25 +0000 Subject: [PATCH] feat: implement Google Batch preprocessing --- src/batch/README.md | 17 ++++++++++ src/batch/batch_common.py | 67 +++++++++++++++++++++++++++++++++++++ src/batch/eqtl_catalogue.py | 44 ++++++++++++++++++++++++ src/batch/runner.sh | 28 ++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 src/batch/README.md create mode 100644 src/batch/batch_common.py create mode 100644 src/batch/eqtl_catalogue.py create mode 100755 src/batch/runner.sh diff --git a/src/batch/README.md b/src/batch/README.md new file mode 100644 index 000000000..8f6f5a74e --- /dev/null +++ b/src/batch/README.md @@ -0,0 +1,17 @@ +## How to run a batch job + +Set up: +```bash +export MODULE=eqtl_catalogue +``` + +Run: +```bash +# Upload code. +gsutil -m cp -r . gs://genetics_etl_python_playground/batch/code +# Submit batch job. +gcloud batch jobs submit \ + "${MODULE//_/-}-$(date --utc +"%Y%m%d-%H%M%S")" \ + --config <(python3 ${MODULE}.py) \ + --location europe-west1 +``` diff --git a/src/batch/batch_common.py b/src/batch/batch_common.py new file mode 100644 index 000000000..531784eac --- /dev/null +++ b/src/batch/batch_common.py @@ -0,0 +1,67 @@ +"""Shared utilities for running non-Spark preprocessing on Google Batch.""" + +from __future__ import annotations + +import json + + +def generate_job_config( + step_name: str, + number_of_tasks: int, + cpu_per_job: int = 16, + max_parallelism: int = 50, +) -> str: + """Generate configuration for a Google Batch job. + + Args: + step_name (str): Name of the module which will run the step, without the ".py" extension. + number_of_tasks (int): How many tasks are there going to be in the batch. + cpu_per_job (int): How many CPUs to allocate for each VM worker for each job. Must be a power of 2, maximum value is 64. + max_parallelism (int): The maximum number of concurrently running tasks. + + Returns: + str: Google Batch job config in JSON format. + """ + config = { + "taskGroups": [ + { + "taskSpec": { + "runnables": [ + { + "script": { + "text": f"bash /mnt/share/code/runner.sh {step_name}", + } + } + ], + "computeResource": { + "cpuMilli": cpu_per_job * 1000, + "memoryMib": cpu_per_job * 3200, + }, + "volumes": [ + { + "gcs": { + "remotePath": "genetics_etl_python_playground/batch" + }, + "mountPath": "/mnt/share", + } + ], + "maxRetryCount": 1, + "maxRunDuration": "3600s", + }, + "taskCount": number_of_tasks, + "parallelism": min(number_of_tasks, max_parallelism), + } + ], + "allocationPolicy": { + "instances": [ + { + "policy": { + "machineType": f"n2d-standard-{cpu_per_job}", + "provisioningModel": "SPOT", + } + } + ] + }, + "logsPolicy": {"destination": "CLOUD_LOGGING"}, + } + return json.dumps(config, indent=4) diff --git a/src/batch/eqtl_catalogue.py b/src/batch/eqtl_catalogue.py new file mode 100644 index 000000000..43add2aa3 --- /dev/null +++ b/src/batch/eqtl_catalogue.py @@ -0,0 +1,44 @@ +"""Cloud Batch pipeline to preprocess and partition the eQTL Catalogue data.""" + +from __future__ import annotations + +import sys + +import pandas as pd +from batch_common import generate_job_config +from spark_prep import SparkPrep + +EQTL_CATALOGUE_IMPORTED_PATH = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/tabix/tabix_ftp_paths_imported.tsv" +EQTL_CATALOGUE_OUTPUT_BASE = ( + "gs://genetics_etl_python_playground/1-smart-mirror/summary_stats" +) + + +if __name__ == "__main__": + # Are we running on Google Cloud? + args = sys.argv[1:] + if args: + # We are running inside Google Cloud, let's process one file. + batch_index = int(args[0]) + # Read the study index and select one study. + df = pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH) + record = df.loc[batch_index].to_dict() + # Process the study. + worker = SparkPrep( + input_uri=record["ftp_path"], + analysis_type="eQTL", + source_id="eQTL_Catalogue", + project_id="GTEx_V8", + study_id=record["qtl_group"], + output_base_path=EQTL_CATALOGUE_OUTPUT_BASE, + ) + worker.process() + else: + # We are running locally. Let's generate the job config. + # For this, we only really need the total number of jobs to run. + number_of_tasks = len(pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH)) + print( + generate_job_config( + step_name="eqtl_catalogue", number_of_tasks=number_of_tasks + ) + ) diff --git a/src/batch/runner.sh b/src/batch/runner.sh new file mode 100755 index 000000000..c33c7ca39 --- /dev/null +++ b/src/batch/runner.sh @@ -0,0 +1,28 @@ +export MODULE=$1 + +# Create directory for logs, if necessary. +mkdir -p /mnt/share/logs/$MODULE + +# Redirect all subsequent logs. +# The $BATCH_TASK_INDEX variable is set by Google Batch. +exec &> /mnt/share/logs/$MODULE/log.$BATCH_TASK_INDEX.log + +echo ">>> Update APT" +sudo apt -y update + +echo ">>> Install packages" +sudo apt -y install python3 python3-pip python3-setuptools + +echo ">>> Update PIP and setuptools" +sudo python3 -m pip install --upgrade pip setuptools +echo $? + +echo ">>> Install packages" +sudo python3 -m pip install -r /mnt/share/code/requirements.txt +echo $? + +echo ">>> Run script" +sudo python3 /mnt/share/code/${MODULE}.py ${BATCH_TASK_INDEX} + +echo ">>> Completed" +echo $?