Skip to content

Commit

Permalink
feat: implement Google Batch preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Dec 3, 2023
1 parent 2e12439 commit 002c00a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## How to run a batch job

Set up:
```bash
export MODULE=eqtl_catalogue
```

Run:
```bash
# Upload code.
gsutil -m cp -r . gs://genetics_etl_python_playground/batch/code
# Submit batch job.
gcloud batch jobs submit \
"${MODULE//_/-}-$(date --utc +"%Y%m%d-%H%M%S")" \
--config <(python3 ${MODULE}.py) \
--location europe-west1
```
44 changes: 44 additions & 0 deletions src/batch/eqtl_catalogue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Cloud Batch pipeline to preprocess and partition the eQTL Catalogue data."""

from __future__ import annotations

import sys

import pandas as pd
from batch_common import generate_job_config
from spark_prep import SparkPrep

EQTL_CATALOGUE_IMPORTED_PATH = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/tabix/tabix_ftp_paths_imported.tsv"
EQTL_CATALOGUE_OUTPUT_BASE = (
"gs://genetics_etl_python_playground/1-smart-mirror/summary_stats"
)


if __name__ == "__main__":
# Are we running on Google Cloud?
args = sys.argv[1:]
if args:
# We are running inside Google Cloud, let's process one file.
batch_index = int(args[0])
# Read the study index and select one study.
df = pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH)
record = df.loc[batch_index].to_dict()
# Process the study.
worker = SparkPrep(
input_uri=record["ftp_path"],
analysis_type="eQTL",
source_id="eQTL_Catalogue",
project_id="GTEx_V8",
study_id=record["qtl_group"],
output_base_path=EQTL_CATALOGUE_OUTPUT_BASE,
)
worker.process()
else:
# We are running locally. Let's generate the job config.
# For this, we only really need the total number of jobs to run.
number_of_tasks = len(pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH))
print(
generate_job_config(
step_name="eqtl_catalogue", number_of_tasks=number_of_tasks
)
)
28 changes: 28 additions & 0 deletions src/batch/runner.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export MODULE=$1

# Create directory for logs, if necessary.
mkdir -p /mnt/share/logs/$MODULE

# Redirect all subsequent logs.
# The $BATCH_TASK_INDEX variable is set by Google Batch.
exec &> /mnt/share/logs/$MODULE/log.$BATCH_TASK_INDEX.log

echo ">>> Update APT"
sudo apt -y update

echo ">>> Install packages"
sudo apt -y install python3 python3-pip python3-setuptools

echo ">>> Update PIP and setuptools"
sudo python3 -m pip install --upgrade pip setuptools
echo $?

echo ">>> Install packages"
sudo python3 -m pip install -r /mnt/share/code/requirements.txt
echo $?

echo ">>> Run script"
sudo python3 /mnt/share/code/${MODULE}.py ${BATCH_TASK_INDEX}

echo ">>> Completed"
echo $?

0 comments on commit 002c00a

Please sign in to comment.