Skip to content

Commit

Permalink
feat: implement Google Batch preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Dec 3, 2023
1 parent 2e12439 commit 1e02545
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
17 changes: 17 additions & 0 deletions src/batch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## How to run a batch job

Set up:
```bash
export MODULE=eqtl_catalogue
```

Run:
```bash
# Upload code.
gsutil -m cp -r . gs://genetics_etl_python_playground/batch/code
# Submit batch job.
gcloud batch jobs submit \
"${MODULE//_/-}-$(date --utc +"%Y%m%d-%H%M%S")" \
--config <(python3 ${MODULE}.py) \
--location europe-west1
```
44 changes: 44 additions & 0 deletions src/batch/eqtl_catalogue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Cloud Batch pipeline to preprocess and partition the eQTL Catalogue data."""

from __future__ import annotations

import sys

import pandas as pd
from batch_common import generate_job_config
from spark_prep import SparkPrep

EQTL_CATALOGUE_IMPORTED_PATH = "https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/tabix/tabix_ftp_paths_imported.tsv"
EQTL_CATALOGUE_OUTPUT_BASE = (
"gs://genetics_etl_python_playground/1-smart-mirror/summary_stats"
)


if __name__ == "__main__":
# Are we running on Google Cloud?
args = sys.argv[1:]
if args:
# We are running inside Google Cloud, let's process one file.
batch_index = int(args[0])
# Read the study index and select one study.
df = pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH)
record = df.loc[batch_index].to_dict()
# Process the study.
worker = SparkPrep(
input_uri=record["ftp_path"],
analysis_type="eQTL",
source_id="eQTL_Catalogue",
project_id="GTEx_V8",
study_id=record["qtl_group"],
output_base_path=EQTL_CATALOGUE_OUTPUT_BASE,
)
worker.process()
else:
# We are running locally. Let's generate the job config.
# For this, we only really need the total number of jobs to run.
number_of_tasks = len(pd.read_table(EQTL_CATALOGUE_IMPORTED_PATH))
print(
generate_job_config(
step_name="eqtl_catalogue", number_of_tasks=number_of_tasks
)
)

0 comments on commit 1e02545

Please sign in to comment.