Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…thon into il-l2g-negative-gs
  • Loading branch information
ireneisdoomed committed Nov 27, 2023
2 parents 4e4e4f5 + 024d084 commit 5de5b77
Show file tree
Hide file tree
Showing 29 changed files with 684 additions and 175 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ docs/assets/schemas/
src/airflow/logs/*
!src/airflow/logs/.gitkeep
site/
.env
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ repos:
- id: black

- repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook
rev: v9.9.0
rev: v9.10.0
hooks:
- id: commitlint
additional_dependencies: ["@commitlint/config-conventional"]
Expand Down
5 changes: 4 additions & 1 deletion config/datasets/gcp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ catalog_associations: ${datasets.inputs}/v2d/gwas_catalog_v1.0.2-associations_e1
catalog_studies: ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-studies-r2023-09-11.tsv
catalog_ancestries: ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-ancestries-r2023-09-11.tsv
catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-09-11.txt
finngen_phenotype_table_url: https://r9.finngen.fi/api/phenos
ukbiobank_manifest: gs://genetics-portal-input/ukb_phenotypes/neale2_saige_study_manifest.190430.tsv
l2g_gold_standard_curation: ${datasets.inputs}/l2g/gold_standard/curation.json
gene_interactions: ${datasets.inputs}/l2g/interaction # 23.09 data
finngen_phenotype_table_url: https://r9.finngen.fi/api/phenos
eqtl_catalogue_paths_imported: ${datasets.inputs}/preprocess/eqtl_catalogue/tabix_ftp_paths_imported.tsv

# Output datasets
gene_index: ${datasets.outputs}/gene_index
Expand All @@ -39,6 +40,8 @@ from_sumstats_pics: ${datasets.credible_set}/from_sumstats
ukbiobank_study_index: ${datasets.outputs}/ukbiobank_study_index
l2g_model: ${datasets.outputs}/l2g_model
l2g_predictions: ${datasets.outputs}/l2g_predictions
eqtl_catalogue_study_index_out: ${datasets.outputs}/preprocess/eqtl_catalogue/study_index
eqtl_catalogue_summary_stats_out: ${datasets.outputs}/preprocess/eqtl_catalogue/summary_stats

# Constants
finngen_release_prefix: FINNGEN_R9
Expand Down
4 changes: 4 additions & 0 deletions config/step/eqtl_catalogue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
_target_: otg.eqtl_catalogue.EqtlCatalogueStep
eqtl_catalogue_paths_imported: ${datasets.eqtl_catalogue_paths_imported}
eqtl_catalogue_study_index_out: ${datasets.eqtl_catalogue_study_index_out}
eqtl_catalogue_summary_stats_out: ${datasets.eqtl_catalogue_summary_stats_out}
23 changes: 11 additions & 12 deletions docs/development/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,23 @@ All pipelines in this repository are intended to be run in Google Dataproc. Runn

In order to run the code:

1. Manually edit your local `workflow/dag.yaml` file and comment out the steps you do not want to run.
1. Manually edit your local `src/airflow/dags/*` file and comment out the steps you do not want to run.

2. Manually edit your local `pyproject.toml` file and modify the version of the code.
- This must be different from the version used by any other people working on the repository to avoid any deployment conflicts, so it's a good idea to use your name, for example: `1.2.3+jdoe`.
- You can also add a brief branch description, for example: `1.2.3+jdoe.myfeature`.
- Note that the version must comply with [PEP440 conventions](https://peps.python.org/pep-0440/#normalization), otherwise Poetry will not allow it to be deployed.
- Do not use underscores or hyphens in your version name. When building the WHL file, they will be automatically converted to dots, which means the file name will no longer match the version and the build will fail. Use dots instead.

3. Run `make build`.
3. Manually edit your local `src/airflow/dags/common_airflow.py` and set `OTG_VERSION` to the same version as you did in the previous step.

4. Run `make build`.
- This will create a bundle containing the neccessary code, configuration and dependencies to run the ETL pipeline, and then upload this bundle to Google Cloud.
- A version specific subpath is used, so uploading the code will not affect any branches but your own.
- If there was already a code bundle uploaded with the same version number, it will be replaced.

4. Submit the Dataproc job with `poetry run python workflow/workflow_template.py`
- You will need to specify additional parameters, some are mandatory and some are optional. Run with `--help` to see usage.
- The script will provision the cluster and submit the job.
- The cluster will take a few minutes to get provisioned and running, during which the script will not output anything, this is normal.
- Once submitted, you can monitor the progress of your job on this page: https://console.cloud.google.com/dataproc/jobs?project=open-targets-genetics-dev.
- On completion (whether successful or a failure), the cluster will be automatically removed, so you don't have to worry about shutting it down to avoid incurring charges.
5. Open Airflow UI and run the DAG.


## Contributing checklist
When making changes, and especially when implementing a new module or feature, it's essential to ensure that all relevant sections of the code base are modified.
Expand All @@ -57,19 +55,20 @@ When making changes, and especially when implementing a new module or feature, i
- [ ] Update the documentation and check it with `make build-documentation`. This will start a local server to browse it (URL will be printed, usually `http://127.0.0.1:8000/`)

For more details on each of these steps, see the sections below.

### Documentation
* If during development you had a question which wasn't covered in the documentation, and someone explained it to you, add it to the documentation. The same applies if you encountered any instructions in the documentation which were obsolete or incorrect.
* Documentation autogeneration expressions start with `:::`. They will automatically generate sections of the documentation based on class and method docstrings. Be sure to update them for:
+ Dataset definitions in `docs/reference/dataset` (example: `docs/reference/dataset/study_index/study_index_finngen.md`)
+ Step definition in `docs/reference/step` (example: `docs/reference/step/finngen.md`)
+ Dataset definitions in `docs/python_api/datasource/STEP` (example: `docs/python_api/datasource/finngen/study_index.md`)
+ Step definition in `docs/python_api/step/STEP.md` (example: `docs/python_api/step/finngen.md`)

### Configuration
* Input and output paths in `config/datasets/gcp.yaml`
* Step configuration in `config/step/STEP.yaml` (example: `config/step/finngen.yaml`)

### Classes
* Dataset class in `src/org/dataset/` (example: `src/otg/dataset/study_index.py``StudyIndexFinnGen`)
* Step main running class in `src/org/STEP.py` (example: `src/org/finngen.py`)
* Dataset class in `src/otg/datasource/STEP` (example: `src/otg/datasource/finngen/study_index.py``FinnGenStudyIndex`)
* Step main running class in `src/otg/STEP.py` (example: `src/otg/finngen.py`)

### Tests
* Test study fixture in `tests/conftest.py` (example: `mock_study_index_finngen` in that module)
Expand Down
4 changes: 4 additions & 0 deletions docs/python_api/datasource/eqtl_catalogue/study_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
title: Study Index
---
::: otg.datasource.eqtl_catalogue.study_index.EqtlCatalogueStudyIndex
4 changes: 4 additions & 0 deletions docs/python_api/datasource/eqtl_catalogue/summary_stats.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
title: Summary Stats
---
::: otg.datasource.eqtl_catalogue.summary_stats.EqtlCatalogueSummaryStats
4 changes: 4 additions & 0 deletions docs/python_api/step/eqtl_catalogue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
title: eQTL Catalogue
---
::: otg.eqtl_catalogue.EqtlCatalogueStep
4 changes: 3 additions & 1 deletion src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ def create_cluster(
# Create a disk config section if it does not exist.
cluster_config[worker_section].setdefault("disk_config", dict())
# Specify the number of local SSDs.
cluster_config[worker_section]["num_local_ssds"] = num_local_ssds
cluster_config[worker_section]["disk_config"][
"num_local_ssds"
] = num_local_ssds

# Return the cluster creation operator.
return DataprocCreateClusterOperator(
Expand Down
2 changes: 1 addition & 1 deletion src/airflow/dags/dag_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

CLUSTER_NAME = "otg-preprocess"

ALL_STEPS = ["finngen", "ld_index", "variant_annotation"]
ALL_STEPS = ["finngen", "eqtl_catalogue", "ld_index", "variant_annotation"]


with DAG(
Expand Down
30 changes: 0 additions & 30 deletions src/otg/assets/schemas/study_locus.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,6 @@
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "oddsRatio",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "oddsRatioConfidenceIntervalLower",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "oddsRatioConfidenceIntervalUpper",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "betaConfidenceIntervalLower",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "betaConfidenceIntervalUpper",
"nullable": true,
"type": "double"
},
{
"metadata": {},
"name": "pValueMantissa",
Expand Down
32 changes: 32 additions & 0 deletions src/otg/common/spark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,38 @@ def normalise_column(
)


def neglog_pvalue_to_mantissa_and_exponent(p_value: Column) -> tuple[Column, Column]:
"""Computing p-value mantissa and exponent based on the negative 10 based logarithm of the p-value.
Args:
p_value (Column): Neg-log p-value (string)
Returns:
tuple[Column, Column]: mantissa and exponent of the p-value
Examples:
>>> (
... spark.createDataFrame([(4.56, 'a'),(2109.23, 'b')], ['negLogPv', 'label'])
... .select('negLogPv',*neglog_pvalue_to_mantissa_and_exponent(f.col('negLogPv')))
... .show()
... )
+--------+------------------+--------------+
|negLogPv| pValueMantissa|pValueExponent|
+--------+------------------+--------------+
| 4.56| 3.63078054770101| -5|
| 2109.23|1.6982436524618154| -2110|
+--------+------------------+--------------+
<BLANKLINE>
"""
exponent: Column = f.ceil(p_value)
mantissa: Column = f.pow(f.lit(10), (p_value - exponent + f.lit(1)))

return (
mantissa.cast(t.DoubleType()).alias("pValueMantissa"),
(-1 * exponent).cast(t.IntegerType()).alias("pValueExponent"),
)


def calculate_neglog_pvalue(
p_value_mantissa: Column, p_value_exponent: Column
) -> Column:
Expand Down
15 changes: 10 additions & 5 deletions src/otg/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ def calculate_confidence_interval(
+---------------+---------------+----+--------------+---------------------------+---------------------------+
|pvalue_mantissa|pvalue_exponent|beta|standard_error|betaConfidenceIntervalLower|betaConfidenceIntervalUpper|
+---------------+---------------+----+--------------+---------------------------+---------------------------+
| 2.5| -10| 0.5| 0.2| 0.3| 0.7|
| 3.0| -5| 1.0| null| 0.7603910153486024| 1.2396089846513976|
| 1.5| -8|-0.2| 0.1| -0.30000000000000004| -0.1|
| 2.5| -10| 0.5| 0.2| 0.10799999999999998| 0.892|
| 3.0| -5| 1.0| null| 0.5303663900832607| 1.4696336099167393|
| 1.5| -8|-0.2| 0.1| -0.396| -0.00400000000000...|
+---------------+---------------+----+--------------+---------------------------+---------------------------+
<BLANKLINE>
"""
Expand All @@ -104,8 +104,13 @@ def calculate_confidence_interval(
).otherwise(standard_error)

# Calculate upper and lower confidence interval:
ci_lower = (beta - standard_error).alias("betaConfidenceIntervalLower")
ci_upper = (beta + standard_error).alias("betaConfidenceIntervalUpper")
z_score_095 = 1.96
ci_lower = (beta - z_score_095 * standard_error).alias(
"betaConfidenceIntervalLower"
)
ci_upper = (beta + z_score_095 * standard_error).alias(
"betaConfidenceIntervalUpper"
)

return (ci_lower, ci_upper)

Expand Down
3 changes: 3 additions & 0 deletions src/otg/datasource/eqtl_catalogue/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""eQTL Catalogue datasource classes."""

from __future__ import annotations
151 changes: 151 additions & 0 deletions src/otg/datasource/eqtl_catalogue/study_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""Study Index for eQTL Catalogue data source."""
from __future__ import annotations

from typing import TYPE_CHECKING, List

import pyspark.sql.functions as f

from otg.dataset.study_index import StudyIndex

if TYPE_CHECKING:
from pyspark.sql import DataFrame
from pyspark.sql.column import Column


class EqtlCatalogueStudyIndex(StudyIndex):
"""Study index dataset from eQTL Catalogue."""

@staticmethod
def _all_attributes() -> List[Column]:
"""A helper function to return all study index attribute expressions.
Returns:
List[Column]: all study index attribute expressions.
"""
study_attributes = [
# Project ID, example: "GTEx_V8".
f.col("study").alias("projectId"),
# Partial study ID, example: "GTEx_V8_Adipose_Subcutaneous". This ID will be converted to final only when
# summary statistics are parsed, because it must also include a gene ID.
f.concat(f.col("study"), f.lit("_"), f.col("qtl_group")).alias("studyId"),
# Summary stats location.
f.col("ftp_path").alias("summarystatsLocation"),
# Constant value fields.
f.lit(True).alias("hasSumstats"),
f.lit("eqtl").alias("studyType"),
]
tissue_attributes = [
# Human readable tissue label, example: "Adipose - Subcutaneous".
f.col("tissue_label").alias("traitFromSource"),
# Ontology identifier for the tissue, for example: "UBERON:0001157".
f.array(
f.regexp_replace(
f.regexp_replace(
f.col("tissue_ontology_id"),
"UBER_",
"UBERON_",
),
"_",
":",
)
).alias("traitFromSourceMappedIds"),
]
sample_attributes = [
f.lit(838).cast("long").alias("nSamples"),
f.lit("838 (281 females and 557 males)").alias("initialSampleSize"),
f.array(
f.struct(
f.lit(715).cast("long").alias("sampleSize"),
f.lit("European American").alias("ancestry"),
),
f.struct(
f.lit(103).cast("long").alias("sampleSize"),
f.lit("African American").alias("ancestry"),
),
f.struct(
f.lit(12).cast("long").alias("sampleSize"),
f.lit("Asian American").alias("ancestry"),
),
f.struct(
f.lit(16).cast("long").alias("sampleSize"),
f.lit("Hispanic or Latino").alias("ancestry"),
),
).alias("discoverySamples"),
]
publication_attributes = [
f.lit("32913098").alias("pubmedId"),
f.lit(
"The GTEx Consortium atlas of genetic regulatory effects across human tissues"
).alias("publicationTitle"),
f.lit("GTEx Consortium").alias("publicationFirstAuthor"),
f.lit("2020-09-11").alias("publicationDate"),
f.lit("Science").alias("publicationJournal"),
]
return (
study_attributes
+ tissue_attributes
+ sample_attributes
+ publication_attributes
)

@classmethod
def from_source(
cls: type[EqtlCatalogueStudyIndex],
eqtl_studies: DataFrame,
) -> EqtlCatalogueStudyIndex:
"""Ingest study level metadata from eQTL Catalogue.
Args:
eqtl_studies (DataFrame): ingested but unprocessed eQTL Catalogue studies.
Returns:
EqtlCatalogueStudyIndex: preliminary processed study index for eQTL Catalogue studies.
"""
return EqtlCatalogueStudyIndex(
_df=eqtl_studies.select(*cls._all_attributes()).withColumn(
"ldPopulationStructure",
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
),
_schema=cls.get_schema(),
)

@classmethod
def add_gene_id_column(
cls: type[EqtlCatalogueStudyIndex],
study_index_df: DataFrame,
summary_stats_df: DataFrame,
) -> EqtlCatalogueStudyIndex:
"""Add a geneId column to the study index and explode.
While the original list contains one entry per tissue, what we consider as a single study is one mini-GWAS for
an expression of a _particular gene_ in a particular study. At this stage we have a study index with partial
study IDs like "PROJECT_QTLGROUP", and a summary statistics object with full study IDs like
"PROJECT_QTLGROUP_GENEID", so we need to perform a merge and explosion to obtain our final study index.
Args:
study_index_df (DataFrame): preliminary study index for eQTL Catalogue studies.
summary_stats_df (DataFrame): summary statistics dataframe for eQTL Catalogue data.
Returns:
EqtlCatalogueStudyIndex: final study index for eQTL Catalogue studies.
"""
partial_to_full_study_id = (
summary_stats_df.select(f.col("studyId"))
.distinct()
.select(
f.col("studyId").alias("fullStudyId"), # PROJECT_QTLGROUP_GENEID
f.regexp_extract(f.col("studyId"), r"(.*)_[\_]+", 1).alias(
"studyId"
), # PROJECT_QTLGROUP
)
.groupBy("studyId")
.agg(f.collect_list("fullStudyId").alias("fullStudyIdList"))
)
study_index_df = (
study_index_df.join(partial_to_full_study_id, "studyId", "inner")
.withColumn("fullStudyId", f.explode("fullStudyIdList"))
.drop("fullStudyIdList")
.withColumn("geneId", f.regexp_extract(f.col("studyId"), r".*_([\_]+)", 1))
.drop("fullStudyId")
)
return EqtlCatalogueStudyIndex(_df=study_index_df, _schema=cls.get_schema())
Loading

0 comments on commit 5de5b77

Please sign in to comment.