Skip to content

Commit

Permalink
feat: implement Google Batch preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Dec 3, 2023
1 parent 2e12439 commit 8eeda2c
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## How to run a batch job

Set up:
```bash
export MODULE=eqtl_catalogue
```

Run:
```bash
# Upload code.
gsutil -m cp -r . gs://genetics_etl_python_playground/batch/code
# Submit batch job.
gcloud batch jobs submit \
"${MODULE//_/-}-$(date --utc +"%Y%m%d-%H%M%S")" \
--config <(python3 ${MODULE}.py) \
--location europe-west1
```
67 changes: 67 additions & 0 deletions src/batch/batch_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Shared utilities for running non-Spark preprocessing on Google Batch."""

from __future__ import annotations

import json


def generate_job_config(
step_name: str,
number_of_tasks: int,
cpu_per_job: int = 16,
max_parallelism: int = 50,
) -> str:
"""Generate configuration for a Google Batch job.
Args:
step_name (str): Name of the module which will run the step, without the ".py" extension.
number_of_tasks (int): How many tasks are there going to be in the batch.
cpu_per_job (int): How many CPUs to allocate for each VM worker for each job. Must be a power of 2, maximum value is 64.
max_parallelism (int): The maximum number of concurrently running tasks.
Returns:
str: Google Batch job config in JSON format.
"""
config = {
"taskGroups": [
{
"taskSpec": {
"runnables": [
{
"script": {
"text": f"bash /mnt/share/code/runner.sh {step_name}",
}
}
],
"computeResource": {
"cpuMilli": cpu_per_job * 1000,
"memoryMib": cpu_per_job * 3200,
},
"volumes": [
{
"gcs": {
"remotePath": "genetics_etl_python_playground/batch"
},
"mountPath": "/mnt/share",
}
],
"maxRetryCount": 1,
"maxRunDuration": "3600s",
},
"taskCount": number_of_tasks,
"parallelism": min(number_of_tasks, max_parallelism),
}
],
"allocationPolicy": {
"instances": [
{
"policy": {
"machineType": f"n2d-standard-{cpu_per_job}",
"provisioningModel": "SPOT",
}
}
]
},
"logsPolicy": {"destination": "CLOUD_LOGGING"},
}
return json.dumps(config, indent=4)
44 changes: 44 additions & 0 deletions src/batch/eqtl_catalogue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Cloud Batch pipeline to preprocess and partition the eQTL Catalogue data."""

from __future__ import annotations

import sys

import pandas as pd
from batch_common import generate_job_config
from spark_prep import SparkPrep

EQTL_CATALOGUE_IMPORTED_PATH = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/tabix/tabix_ftp_paths_imported.tsv"
EQTL_CATALOGUE_OUTPUT_BASE = (
"gs://genetics_etl_python_playground/1-smart-mirror/summary_stats"
)


if __name__ == "__main__":
# Are we running on Google Cloud?
args = sys.argv[1:]
if args:
# We are running inside Google Cloud, let's process one file.
batch_index = int(args[0])
# Read the study index and select one study.
df = pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH)
record = df.loc[batch_index].to_dict()
# Process the study.
worker = SparkPrep(
input_uri=record["ftp_path"],
analysis_type="eQTL",
source_id="eQTL_Catalogue",
project_id="GTEx_V8",
study_id=record["qtl_group"],
output_base_path=EQTL_CATALOGUE_OUTPUT_BASE,
)
worker.process()
else:
# We are running locally. Let's generate the job config.
# For this, we only really need the total number of jobs to run.
number_of_tasks = len(pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH))
print(
generate_job_config(
step_name="eqtl_catalogue", number_of_tasks=number_of_tasks
)
)
28 changes: 28 additions & 0 deletions src/batch/runner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export MODULE=$1

# Create directory for logs, if necessary.
mkdir -p /mnt/share/logs/$MODULE

# Redirect all subsequent logs.
# The $BATCH_TASK_INDEX variable is set by Google Batch.
exec &> /mnt/share/logs/$MODULE/log.$BATCH_TASK_INDEX.log

echo ">>> Update APT"
sudo apt -y update

echo ">>> Install packages"
sudo apt -y install python3 python3-pip python3-setuptools

echo ">>> Update PIP and setuptools"
sudo python3 -m pip install --upgrade pip setuptools
echo $?

echo ">>> Install packages"
sudo python3 -m pip install -r /mnt/share/code/requirements.txt
echo $?

echo ">>> Run script"
sudo python3 /mnt/share/code/${MODULE}.py ${BATCH_TASK_INDEX}

echo ">>> Completed"
echo $?

0 comments on commit 8eeda2c

Please sign in to comment.