Skip to content

Commit b4615fd

Browse files
author
Szymon Szyszkowski
committed
feat: pipeline setup
1 parent fbe3e31 commit b4615fd

File tree

23 files changed

+1199
-682
lines changed

23 files changed

+1199
-682
lines changed

.env

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,8 @@ GOOGLE_DOCKER_CREDENTIALS_PATH=/.config/gcloud
44
GOOGLE_APPLICATION_CREDENTIALS=/.config/gcloud/service_account_credentials.json
55
AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT='google-cloud-platform://?extra__google_cloud_platform__key_path=/.config/gcloud/service_account_credentials.json'
66
GCP_PROJECT_ID=open-targets-genetics-dev
7+
AIRFLOW_UID=503
8+
AIRFLOW__CORE__MAX_MAP_LENGTH=50000
9+
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
10+
AIRFLOW__CORE__PARALLELISM=500
11+
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG=500

Makefile

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ APP_NAME ?= $$(cat pyproject.toml| grep -m 1 "name" | cut -d" " -f3 | sed 's/"/
66
VERSION := $$(grep '^version' pyproject.toml | sed 's%version = "\(.*\)"%\1%')
77
BUCKET_NAME=gs://genetics_etl_python_playground/initialisation/${VERSION}/
88
DOCKER_IMAGE := "Orchestration-Airflow"
9+
TEST_COVERAGE := 40
910

1011
.PHONY: $(shell sed -n -e '/^$$/ { n ; /^[^ .\#][^ ]*:/ { s/:.*$$// ; p ; } ; }' $(MAKEFILE_LIST))
1112
.DEFAULT_GOAL := help
@@ -29,17 +30,18 @@ format: ## run formatting
2930
@poetry run python -m ruff check --fix src/$(APP_NAME) tests
3031

3132
test: ## run unit tests
32-
@poetry run python -m pytest tests/*.py
33+
@poetry run coverage run -m pytest tests/*.py -s -p no:warnings
34+
@poetry run coverage report --omit="tests/*" --fail-under=$(TEST_COVERAGE)
3335

3436
check: format check-types test ## run all checks
3537

3638
generate-requirements: ## generate requirements.txt from poetry dependencies to install in the docker image
37-
poetry export --without-hashes --with dev --format=requirements.txt > requirements.txt
39+
poetry export --without-hashes --with dev --format=requirements.txt > docker/airflow-dev/requirements.txt
3840

3941
build-airflow-image: generate-requirements ## build local airflow image for the infrastructure
40-
docker build . \
42+
docker build docker/airflow-dev \
4143
--tag extending_airflow:latest \
42-
-f Dockerfile \
44+
-f docker/airflow-dev/Dockerfile \
4345
--no-cache
4446

4547
build-whl: ## build ot-orchestration package wheel
@@ -53,13 +55,14 @@ build-genetics-etl-image: build-whl ## build local genetics-etl image for the te
5355
--build-arg DIST=$(shell find dist -name 'ot_orchestration*')
5456

5557
test-gwas-catalog-batch-script: ## test harmonisation task
56-
# mkdir -p test_batch
57-
# gsutil -m rsync -r gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600 test_batch
58+
mkdir -p test_batch
59+
gsutil -m rsync -r gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600 test_batch
5860
docker run \
5961
-v $(HOME)/.config/gcloud:/root/.config/gcloud \
6062
-e MANIFEST_PATH=gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600/manifest.json \
6163
-e GOOGLE_APPLICATION_CREDENTIALS=/root/.config/gcloud/service_account_credentials.json \
6264
-ti \
6365
--rm \
64-
genetics_etl:test
65-
# gsutil -m rsync -r test_batch gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600
66+
genetics_etl:test \
67+
-c "ot gwas-catalog-process-in-batch"
68+
gsutil -m rsync -r test_batch gs://ot_orchestration/tests/gwas_catalog/basic_test/GCST004600
File renamed without changes.
File renamed without changes.

docker/genetics_etl/.dockerignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Ignore generated credentials from google-github-actions/auth
2+
gha-creds-*.json
File renamed without changes.

docker/genetics_etl/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# genetics_etl docker image
2+
3+
This is the genetics_etl docker image that is currently used by the GWAS_Catalog_dag batch processing. The image is based on gentropy image from [gentropy repository](https://github.com/opentargets/gentropy)

docs/gwas_catalog_dag.svg

Lines changed: 242 additions & 0 deletions
Loading

images/genetics_etl/README.md

Lines changed: 0 additions & 3 deletions
This file was deleted.

poetry.lock

Lines changed: 321 additions & 317 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ pyyaml = "^6.0.1"
2727
google = "^3.0.0"
2828
pendulum = "^3.0.0"
2929
apache-airflow-providers-apache-beam = "^5.7.1"
30-
pandas = "<2.2"
3130
typing-extensions = "^4.12.2"
3231
requests = "^2.32.3"
32+
pandas = "<2.2"
3333

3434
[tool.poetry.group.dev.dependencies]
3535
ruff = "^0.4.9"
@@ -95,13 +95,14 @@ ignore = [
9595

9696
]
9797

98-
9998
[tool.ruff.lint.flake8-quotes]
10099
docstring-quotes = "double"
101100

102101
[tool.ruff.lint.pydocstyle]
103102
convention = "google"
104103

105-
106104
[tool.pytest]
107105
pytonpath = "src/"
106+
107+
[tool.pytest.ini_options]
108+
markers = ["integration_test"]

src/ot_orchestration/cli/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
from ot_orchestration.cli.fetch_raw_sumstat_paths import fetch_raw_sumstat_paths
66
from ot_orchestration.cli.generate_dotenv import generate_dotenv
7-
from ot_orchestration.cli.process_in_batch import gwas_catalog_process_in_batch
7+
from ot_orchestration.cli.process_in_batch import gwas_catalog_pipeline
88

99
logging.basicConfig(level=logging.INFO)
1010
asci_art = """
@@ -24,7 +24,7 @@ def cli():
2424

2525
cli.add_command(fetch_raw_sumstat_paths)
2626
cli.add_command(generate_dotenv)
27-
cli.add_command(gwas_catalog_process_in_batch)
27+
cli.add_command(gwas_catalog_pipeline)
2828

2929

3030
__all__ = ["cli"]
Lines changed: 61 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
"""Process GWAS Catalog summary statistics in batch job."""
22

3-
import os
4-
from ot_orchestration.utils import GCSIOManager
5-
from ot_orchestration.types import Manifest_Object
3+
from ot_orchestration.utils import IOManager, GWASCatalogPipelineManifest
64
import logging
75
import subprocess
86
import click
7+
import os
8+
import sys
99

10+
MANIFEST_PATH_ENV_VAR = "MANIFEST_PATH"
1011

11-
def harmonise(manifest: Manifest_Object) -> Manifest_Object:
12+
13+
def harmonise_step(
14+
manifest: GWASCatalogPipelineManifest,
15+
) -> GWASCatalogPipelineManifest:
1216
"""Run Harmonisation."""
1317
raw_path = manifest["rawPath"]
1418
harmonised_path = manifest["harmonisedPath"]
1519
study_id = manifest["studyId"]
20+
manifest_path = manifest["manifestPath"]
21+
pass_harmonisation = manifest["passHarmonisation"]
22+
logging.info("Running %s for %s", "harmonisation", study_id)
23+
1624
command = [
17-
"poetry",
18-
"run",
1925
"gentropy",
2026
"step=gwas_catalog_sumstat_preprocess",
2127
f'step.raw_sumstats_path="{raw_path}"',
@@ -26,34 +32,38 @@ def harmonise(manifest: Manifest_Object) -> Manifest_Object:
2632
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
2733
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
2834
]
29-
if GCSIOManager().exists(harmonised_path):
30-
logging.info("Harmonisation result exists for %s. Skipping", study_id)
31-
manifest["passHarmonisation"] = True
35+
if IOManager().resolve(harmonised_path).exists():
36+
if not pass_harmonisation:
37+
logging.info("Harmonisation result exists for %s. Skipping", study_id)
38+
manifest["passHarmonisation"] = True
3239
return manifest
3340

41+
logging.info("Running command %s", " ".join(command))
42+
command = ["echo", "RUNNING!"]
3443
result = subprocess.run(args=command, capture_output=True)
44+
logging.info(result)
3545
if result.returncode != 0:
3646
logging.error("Harmonisation for study %s failed!", study_id)
3747
error_msg = result.stderr.decode()
3848
logging.error(error_msg)
3949
manifest["passHarmonisation"] = False
40-
logging.info("Dumping manifest to %s", manifest["manifestPath"])
41-
GCSIOManager().dump(manifest["manifestPath"], manifest)
42-
exit(1)
50+
logging.info("Dumping manifest to %s", manifest_path)
51+
IOManager().resolve(manifest_path).dump(manifest)
52+
sys.exit(1)
4353

4454
logging.info("Harmonisation for study %s succeded!", study_id)
4555
manifest["passHarmonisation"] = True
4656
return manifest
4757

4858

49-
def qc(manifest: Manifest_Object) -> Manifest_Object:
59+
def qc_step(manifest: GWASCatalogPipelineManifest) -> GWASCatalogPipelineManifest:
5060
"""Run QC."""
5161
harmonised_path = manifest["harmonisedPath"]
5262
qc_path = manifest["qcPath"]
5363
study_id = manifest["studyId"]
64+
manifest_path = manifest["manifestPath"]
65+
5466
command = [
55-
"poetry",
56-
"run",
5767
"gentropy",
5868
"step=summary_statistics_qc",
5969
f'step.gwas_path="{harmonised_path}"',
@@ -65,9 +75,8 @@ def qc(manifest: Manifest_Object) -> Manifest_Object:
6575
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
6676
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
6777
]
68-
result_exists = GCSIOManager().exists(qc_path)
69-
logging.info("Result exists: %s", result_exists)
70-
if GCSIOManager().exists(qc_path):
78+
result_exists = IOManager().resolve(qc_path).exists()
79+
if result_exists:
7180
logging.info("QC result exists for %s. Skipping", study_id)
7281
manifest["passQC"] = True
7382
return manifest
@@ -78,76 +87,51 @@ def qc(manifest: Manifest_Object) -> Manifest_Object:
7887
error_msg = result.stderr.decode()
7988
logging.error(error_msg)
8089
manifest["passQC"] = False
81-
logging.info("Dumping manifest to %s", manifest["manifestPath"])
82-
GCSIOManager().dump(manifest["manifestPath"], manifest)
90+
logging.info("Dumping manifest to %s", manifest_path)
91+
IOManager().resolve(manifest_path).dump(manifest)
8392
exit(1)
8493

8594
logging.info("QC for study %s succeded!", study_id)
8695
manifest["passQC"] = True
8796
return manifest
8897

8998

90-
def qc_consolidation(manifest: Manifest_Object) -> Manifest_Object:
91-
pass
99+
def qc_consolidation_step(
100+
manifest: GWASCatalogPipelineManifest,
101+
) -> GWASCatalogPipelineManifest:
102+
"""Check if sumstats pass qc thresholds."""
103+
return manifest
92104

93105

94-
def clumping(manifest: Manifest_Object) -> Manifest_Object:
106+
def clump_step(manifest: GWASCatalogPipelineManifest) -> GWASCatalogPipelineManifest:
95107
"""Run Clumping."""
96-
harmonised_path = manifest["harmonisedPath"]
97-
clumping_path = manifest["clumpingPath"]
98-
study_id = manifest["studyId"]
99-
command = [
100-
"poetry",
101-
"run",
102-
"gentropy",
103-
"step=clumping",
104-
f'step.gwas_path="{harmonised_path}"',
105-
f'step.output_path="{clumping_path}"',
106-
f'step.study_id="{study_id}"',
107-
"+step.session.extended_spark_conf={spark.jars:'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar'}",
108-
"+step.session.extended_spark_conf={spark.dynamicAllocation.enabled:'false'}",
109-
"+step.session.extended_spark_conf={spark.driver.memory:'30g'}",
110-
"+step.session.extended_spark_conf={spark.kryoserializer.buffer.max:'500m'}",
111-
"+step.session.extended_spark_conf={spark.driver.maxResultSize:'5g'}",
112-
]
113-
if GCSIOManager().exists(clumping_path):
114-
logging.info("Clumping result exists for %s. Skipping", study_id)
115-
manifest["passClumping"] = True
116-
return manifest
117-
118-
result = subprocess.run(args=command, capture_output=True)
119-
if result.returncode != 0:
120-
logging.error("Clumping for study %s failed!", study_id)
121-
error_msg = result.stderr.decode()
122-
logging.error(error_msg)
123-
manifest["passClumping"] = False
124-
logging.info("Dumping manifest to %s", manifest["manifestPath"])
125-
GCSIOManager().dump(manifest["manifestPath"], manifest)
126-
exit(1)
127108
return manifest
128109

129110

130111
@click.command()
131-
def gwas_catalog_process_in_batch():
132-
"""Run gwas catalog processing of summary statistics in batch. This includes harmonisation, QC and clumping."""
133-
PROCESSING_ORDER = ["harmonisation"]
134-
MANIFEST_PATH = os.environ.get("MANIFEST_PATH")
135-
if MANIFEST_PATH is None:
136-
logging.error("MANIFEST_PATH not set!")
137-
exit(1)
138-
139-
manifest = GCSIOManager().load(MANIFEST_PATH)
140-
study = manifest["studyId"]
141-
PROCESSING_STEPS = {"harmonisation": harmonise, "qc": qc, "clumping": clumping}
142-
for step in PROCESSING_ORDER:
143-
if manifest[f"pass{step.capitalize()}"]:
144-
logging.info("Skipping %s", step)
145-
continue
146-
logging.info("Running %s for %s", step, study)
147-
manifest = PROCESSING_STEPS[step](manifest)
148-
logging.info("Finished %s for %s", step, study)
149-
150-
GCSIOManager().dump(MANIFEST_PATH, manifest)
151-
152-
153-
__all__ = ["gwas_catalog_process_in_batch"]
112+
def gwas_catalog_pipeline():
113+
"""Run gwas catalog processing of summary statistics in batch.
114+
115+
This includes harmonisation, QC and clumping.
116+
This command requires setting the `MANIFEST_PATH` in the
117+
environment. The variable should be the reference to the path with the
118+
manifest file.
119+
"""
120+
logging.debug("Reading MANIFEST_PATH env variable")
121+
manifest_path = os.getenv(MANIFEST_PATH_ENV_VAR)
122+
if not manifest_path:
123+
logging.error("MANIFEST_PATH environment variable is missing")
124+
sys.exit(1)
125+
logging.debug("MANIFEST_PATH: %s", manifest_path)
126+
manifest = GWASCatalogPipelineManifest.from_file(manifest_path)
127+
logging.debug("MANIFEST: %s", manifest)
128+
# for now dummy implementatin of the pipeline processing order
129+
manifest = harmonise_step(manifest)
130+
manifest = qc_step(manifest)
131+
manifest = qc_consolidation_step(manifest)
132+
manifest = clump_step(manifest)
133+
134+
IOManager().resolve(manifest_path).dump(manifest)
135+
136+
137+
__all__ = ["gwas_catalog_pipeline"]

0 commit comments

Comments
 (0)