diff --git a/.gitignore b/.gitignore index 38083e8fd..07c8bef82 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ docs/assets/schemas/ src/airflow/logs/* !src/airflow/logs/.gitkeep site/ +.env diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a83aff3ad..12aaa1513 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -50,7 +50,7 @@ repos: - id: black - repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook - rev: v9.9.0 + rev: v9.10.0 hooks: - id: commitlint additional_dependencies: ["@commitlint/config-conventional"] diff --git a/config/datasets/gcp.yaml b/config/datasets/gcp.yaml index d34ab1270..f7e79c367 100644 --- a/config/datasets/gcp.yaml +++ b/config/datasets/gcp.yaml @@ -15,10 +15,11 @@ catalog_associations: ${datasets.inputs}/v2d/gwas_catalog_v1.0.2-associations_e1 catalog_studies: ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-studies-r2023-09-11.tsv catalog_ancestries: ${datasets.inputs}/v2d/gwas-catalog-v1.0.3-ancestries-r2023-09-11.tsv catalog_sumstats_lut: ${datasets.inputs}/v2d/harmonised_list-r2023-09-11.txt -finngen_phenotype_table_url: https://r9.finngen.fi/api/phenos ukbiobank_manifest: gs://genetics-portal-input/ukb_phenotypes/neale2_saige_study_manifest.190430.tsv l2g_gold_standard_curation: ${datasets.inputs}/l2g/gold_standard/curation.json gene_interactions: ${datasets.inputs}/l2g/interaction # 23.09 data +finngen_phenotype_table_url: https://r9.finngen.fi/api/phenos +eqtl_catalogue_paths_imported: ${datasets.inputs}/preprocess/eqtl_catalogue/tabix_ftp_paths_imported.tsv # Output datasets gene_index: ${datasets.outputs}/gene_index @@ -39,6 +40,8 @@ from_sumstats_pics: ${datasets.credible_set}/from_sumstats ukbiobank_study_index: ${datasets.outputs}/ukbiobank_study_index l2g_model: ${datasets.outputs}/l2g_model l2g_predictions: ${datasets.outputs}/l2g_predictions +eqtl_catalogue_study_index_out: ${datasets.outputs}/preprocess/eqtl_catalogue/study_index +eqtl_catalogue_summary_stats_out: ${datasets.outputs}/preprocess/eqtl_catalogue/summary_stats # Constants finngen_release_prefix: FINNGEN_R9 diff --git a/config/step/eqtl_catalogue.yaml b/config/step/eqtl_catalogue.yaml new file mode 100644 index 000000000..04a958993 --- /dev/null +++ b/config/step/eqtl_catalogue.yaml @@ -0,0 +1,4 @@ +_target_: otg.eqtl_catalogue.EqtlCatalogueStep +eqtl_catalogue_paths_imported: ${datasets.eqtl_catalogue_paths_imported} +eqtl_catalogue_study_index_out: ${datasets.eqtl_catalogue_study_index_out} +eqtl_catalogue_summary_stats_out: ${datasets.eqtl_catalogue_summary_stats_out} diff --git a/docs/development/contributing.md b/docs/development/contributing.md index f016f8646..3b37d18e8 100644 --- a/docs/development/contributing.md +++ b/docs/development/contributing.md @@ -29,7 +29,7 @@ All pipelines in this repository are intended to be run in Google Dataproc. Runn In order to run the code: -1. Manually edit your local `workflow/dag.yaml` file and comment out the steps you do not want to run. +1. Manually edit your local `src/airflow/dags/*` file and comment out the steps you do not want to run. 2. Manually edit your local `pyproject.toml` file and modify the version of the code. - This must be different from the version used by any other people working on the repository to avoid any deployment conflicts, so it's a good idea to use your name, for example: `1.2.3+jdoe`. @@ -37,17 +37,15 @@ In order to run the code: - Note that the version must comply with [PEP440 conventions](https://peps.python.org/pep-0440/#normalization), otherwise Poetry will not allow it to be deployed. - Do not use underscores or hyphens in your version name. When building the WHL file, they will be automatically converted to dots, which means the file name will no longer match the version and the build will fail. Use dots instead. -3. Run `make build`. +3. Manually edit your local `src/airflow/dags/common_airflow.py` and set `OTG_VERSION` to the same version as you did in the previous step. + +4. Run `make build`. - This will create a bundle containing the neccessary code, configuration and dependencies to run the ETL pipeline, and then upload this bundle to Google Cloud. - A version specific subpath is used, so uploading the code will not affect any branches but your own. - If there was already a code bundle uploaded with the same version number, it will be replaced. -4. Submit the Dataproc job with `poetry run python workflow/workflow_template.py` - - You will need to specify additional parameters, some are mandatory and some are optional. Run with `--help` to see usage. - - The script will provision the cluster and submit the job. - - The cluster will take a few minutes to get provisioned and running, during which the script will not output anything, this is normal. - - Once submitted, you can monitor the progress of your job on this page: https://console.cloud.google.com/dataproc/jobs?project=open-targets-genetics-dev. - - On completion (whether successful or a failure), the cluster will be automatically removed, so you don't have to worry about shutting it down to avoid incurring charges. +5. Open Airflow UI and run the DAG. + ## Contributing checklist When making changes, and especially when implementing a new module or feature, it's essential to ensure that all relevant sections of the code base are modified. @@ -57,19 +55,20 @@ When making changes, and especially when implementing a new module or feature, i - [ ] Update the documentation and check it with `make build-documentation`. This will start a local server to browse it (URL will be printed, usually `http://127.0.0.1:8000/`) For more details on each of these steps, see the sections below. + ### Documentation * If during development you had a question which wasn't covered in the documentation, and someone explained it to you, add it to the documentation. The same applies if you encountered any instructions in the documentation which were obsolete or incorrect. * Documentation autogeneration expressions start with `:::`. They will automatically generate sections of the documentation based on class and method docstrings. Be sure to update them for: - + Dataset definitions in `docs/reference/dataset` (example: `docs/reference/dataset/study_index/study_index_finngen.md`) - + Step definition in `docs/reference/step` (example: `docs/reference/step/finngen.md`) + + Dataset definitions in `docs/python_api/datasource/STEP` (example: `docs/python_api/datasource/finngen/study_index.md`) + + Step definition in `docs/python_api/step/STEP.md` (example: `docs/python_api/step/finngen.md`) ### Configuration * Input and output paths in `config/datasets/gcp.yaml` * Step configuration in `config/step/STEP.yaml` (example: `config/step/finngen.yaml`) ### Classes -* Dataset class in `src/org/dataset/` (example: `src/otg/dataset/study_index.py` → `StudyIndexFinnGen`) -* Step main running class in `src/org/STEP.py` (example: `src/org/finngen.py`) +* Dataset class in `src/otg/datasource/STEP` (example: `src/otg/datasource/finngen/study_index.py` → `FinnGenStudyIndex`) +* Step main running class in `src/otg/STEP.py` (example: `src/otg/finngen.py`) ### Tests * Test study fixture in `tests/conftest.py` (example: `mock_study_index_finngen` in that module) diff --git a/docs/python_api/datasource/eqtl_catalogue/study_index.md b/docs/python_api/datasource/eqtl_catalogue/study_index.md new file mode 100644 index 000000000..c1e675200 --- /dev/null +++ b/docs/python_api/datasource/eqtl_catalogue/study_index.md @@ -0,0 +1,4 @@ +--- +title: Study Index +--- +::: otg.datasource.eqtl_catalogue.study_index.EqtlCatalogueStudyIndex diff --git a/docs/python_api/datasource/eqtl_catalogue/summary_stats.md b/docs/python_api/datasource/eqtl_catalogue/summary_stats.md new file mode 100644 index 000000000..62f2b4be2 --- /dev/null +++ b/docs/python_api/datasource/eqtl_catalogue/summary_stats.md @@ -0,0 +1,4 @@ +--- +title: Summary Stats +--- +::: otg.datasource.eqtl_catalogue.summary_stats.EqtlCatalogueSummaryStats diff --git a/docs/python_api/step/eqtl_catalogue.md b/docs/python_api/step/eqtl_catalogue.md new file mode 100644 index 000000000..9d96f3486 --- /dev/null +++ b/docs/python_api/step/eqtl_catalogue.md @@ -0,0 +1,4 @@ +--- +title: eQTL Catalogue +--- +::: otg.eqtl_catalogue.EqtlCatalogueStep diff --git a/src/airflow/dags/common_airflow.py b/src/airflow/dags/common_airflow.py index e62a95d5a..ea7855e0e 100644 --- a/src/airflow/dags/common_airflow.py +++ b/src/airflow/dags/common_airflow.py @@ -106,7 +106,9 @@ def create_cluster( # Create a disk config section if it does not exist. cluster_config[worker_section].setdefault("disk_config", dict()) # Specify the number of local SSDs. - cluster_config[worker_section]["num_local_ssds"] = num_local_ssds + cluster_config[worker_section]["disk_config"][ + "num_local_ssds" + ] = num_local_ssds # Return the cluster creation operator. return DataprocCreateClusterOperator( diff --git a/src/airflow/dags/dag_preprocess.py b/src/airflow/dags/dag_preprocess.py index 345586080..1a52a495a 100644 --- a/src/airflow/dags/dag_preprocess.py +++ b/src/airflow/dags/dag_preprocess.py @@ -8,7 +8,7 @@ CLUSTER_NAME = "otg-preprocess" -ALL_STEPS = ["finngen", "ld_index", "variant_annotation"] +ALL_STEPS = ["finngen", "eqtl_catalogue", "ld_index", "variant_annotation"] with DAG( diff --git a/src/otg/assets/schemas/study_locus.json b/src/otg/assets/schemas/study_locus.json index 8b987e630..97f9da3bf 100644 --- a/src/otg/assets/schemas/study_locus.json +++ b/src/otg/assets/schemas/study_locus.json @@ -36,36 +36,6 @@ "nullable": true, "type": "double" }, - { - "metadata": {}, - "name": "oddsRatio", - "nullable": true, - "type": "double" - }, - { - "metadata": {}, - "name": "oddsRatioConfidenceIntervalLower", - "nullable": true, - "type": "double" - }, - { - "metadata": {}, - "name": "oddsRatioConfidenceIntervalUpper", - "nullable": true, - "type": "double" - }, - { - "metadata": {}, - "name": "betaConfidenceIntervalLower", - "nullable": true, - "type": "double" - }, - { - "metadata": {}, - "name": "betaConfidenceIntervalUpper", - "nullable": true, - "type": "double" - }, { "metadata": {}, "name": "pValueMantissa", diff --git a/src/otg/common/spark_helpers.py b/src/otg/common/spark_helpers.py index 90fd8afb2..bd22d91c9 100644 --- a/src/otg/common/spark_helpers.py +++ b/src/otg/common/spark_helpers.py @@ -250,6 +250,38 @@ def normalise_column( ) +def neglog_pvalue_to_mantissa_and_exponent(p_value: Column) -> tuple[Column, Column]: + """Computing p-value mantissa and exponent based on the negative 10 based logarithm of the p-value. + + Args: + p_value (Column): Neg-log p-value (string) + + Returns: + tuple[Column, Column]: mantissa and exponent of the p-value + + Examples: + >>> ( + ... spark.createDataFrame([(4.56, 'a'),(2109.23, 'b')], ['negLogPv', 'label']) + ... .select('negLogPv',*neglog_pvalue_to_mantissa_and_exponent(f.col('negLogPv'))) + ... .show() + ... ) + +--------+------------------+--------------+ + |negLogPv| pValueMantissa|pValueExponent| + +--------+------------------+--------------+ + | 4.56| 3.63078054770101| -5| + | 2109.23|1.6982436524618154| -2110| + +--------+------------------+--------------+ + + """ + exponent: Column = f.ceil(p_value) + mantissa: Column = f.pow(f.lit(10), (p_value - exponent + f.lit(1))) + + return ( + mantissa.cast(t.DoubleType()).alias("pValueMantissa"), + (-1 * exponent).cast(t.IntegerType()).alias("pValueExponent"), + ) + + def calculate_neglog_pvalue( p_value_mantissa: Column, p_value_exponent: Column ) -> Column: diff --git a/src/otg/common/utils.py b/src/otg/common/utils.py index 608038eec..d05810670 100644 --- a/src/otg/common/utils.py +++ b/src/otg/common/utils.py @@ -86,9 +86,9 @@ def calculate_confidence_interval( +---------------+---------------+----+--------------+---------------------------+---------------------------+ |pvalue_mantissa|pvalue_exponent|beta|standard_error|betaConfidenceIntervalLower|betaConfidenceIntervalUpper| +---------------+---------------+----+--------------+---------------------------+---------------------------+ - | 2.5| -10| 0.5| 0.2| 0.3| 0.7| - | 3.0| -5| 1.0| null| 0.7603910153486024| 1.2396089846513976| - | 1.5| -8|-0.2| 0.1| -0.30000000000000004| -0.1| + | 2.5| -10| 0.5| 0.2| 0.10799999999999998| 0.892| + | 3.0| -5| 1.0| null| 0.5303663900832607| 1.4696336099167393| + | 1.5| -8|-0.2| 0.1| -0.396| -0.00400000000000...| +---------------+---------------+----+--------------+---------------------------+---------------------------+ """ @@ -104,8 +104,13 @@ def calculate_confidence_interval( ).otherwise(standard_error) # Calculate upper and lower confidence interval: - ci_lower = (beta - standard_error).alias("betaConfidenceIntervalLower") - ci_upper = (beta + standard_error).alias("betaConfidenceIntervalUpper") + z_score_095 = 1.96 + ci_lower = (beta - z_score_095 * standard_error).alias( + "betaConfidenceIntervalLower" + ) + ci_upper = (beta + z_score_095 * standard_error).alias( + "betaConfidenceIntervalUpper" + ) return (ci_lower, ci_upper) diff --git a/src/otg/datasource/eqtl_catalogue/__init__.py b/src/otg/datasource/eqtl_catalogue/__init__.py new file mode 100644 index 000000000..9632698b0 --- /dev/null +++ b/src/otg/datasource/eqtl_catalogue/__init__.py @@ -0,0 +1,3 @@ +"""eQTL Catalogue datasource classes.""" + +from __future__ import annotations diff --git a/src/otg/datasource/eqtl_catalogue/study_index.py b/src/otg/datasource/eqtl_catalogue/study_index.py new file mode 100644 index 000000000..5ae5c3bfd --- /dev/null +++ b/src/otg/datasource/eqtl_catalogue/study_index.py @@ -0,0 +1,151 @@ +"""Study Index for eQTL Catalogue data source.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, List + +import pyspark.sql.functions as f + +from otg.dataset.study_index import StudyIndex + +if TYPE_CHECKING: + from pyspark.sql import DataFrame + from pyspark.sql.column import Column + + +class EqtlCatalogueStudyIndex(StudyIndex): + """Study index dataset from eQTL Catalogue.""" + + @staticmethod + def _all_attributes() -> List[Column]: + """A helper function to return all study index attribute expressions. + + Returns: + List[Column]: all study index attribute expressions. + """ + study_attributes = [ + # Project ID, example: "GTEx_V8". + f.col("study").alias("projectId"), + # Partial study ID, example: "GTEx_V8_Adipose_Subcutaneous". This ID will be converted to final only when + # summary statistics are parsed, because it must also include a gene ID. + f.concat(f.col("study"), f.lit("_"), f.col("qtl_group")).alias("studyId"), + # Summary stats location. + f.col("ftp_path").alias("summarystatsLocation"), + # Constant value fields. + f.lit(True).alias("hasSumstats"), + f.lit("eqtl").alias("studyType"), + ] + tissue_attributes = [ + # Human readable tissue label, example: "Adipose - Subcutaneous". + f.col("tissue_label").alias("traitFromSource"), + # Ontology identifier for the tissue, for example: "UBERON:0001157". + f.array( + f.regexp_replace( + f.regexp_replace( + f.col("tissue_ontology_id"), + "UBER_", + "UBERON_", + ), + "_", + ":", + ) + ).alias("traitFromSourceMappedIds"), + ] + sample_attributes = [ + f.lit(838).cast("long").alias("nSamples"), + f.lit("838 (281 females and 557 males)").alias("initialSampleSize"), + f.array( + f.struct( + f.lit(715).cast("long").alias("sampleSize"), + f.lit("European American").alias("ancestry"), + ), + f.struct( + f.lit(103).cast("long").alias("sampleSize"), + f.lit("African American").alias("ancestry"), + ), + f.struct( + f.lit(12).cast("long").alias("sampleSize"), + f.lit("Asian American").alias("ancestry"), + ), + f.struct( + f.lit(16).cast("long").alias("sampleSize"), + f.lit("Hispanic or Latino").alias("ancestry"), + ), + ).alias("discoverySamples"), + ] + publication_attributes = [ + f.lit("32913098").alias("pubmedId"), + f.lit( + "The GTEx Consortium atlas of genetic regulatory effects across human tissues" + ).alias("publicationTitle"), + f.lit("GTEx Consortium").alias("publicationFirstAuthor"), + f.lit("2020-09-11").alias("publicationDate"), + f.lit("Science").alias("publicationJournal"), + ] + return ( + study_attributes + + tissue_attributes + + sample_attributes + + publication_attributes + ) + + @classmethod + def from_source( + cls: type[EqtlCatalogueStudyIndex], + eqtl_studies: DataFrame, + ) -> EqtlCatalogueStudyIndex: + """Ingest study level metadata from eQTL Catalogue. + + Args: + eqtl_studies (DataFrame): ingested but unprocessed eQTL Catalogue studies. + + Returns: + EqtlCatalogueStudyIndex: preliminary processed study index for eQTL Catalogue studies. + """ + return EqtlCatalogueStudyIndex( + _df=eqtl_studies.select(*cls._all_attributes()).withColumn( + "ldPopulationStructure", + cls.aggregate_and_map_ancestries(f.col("discoverySamples")), + ), + _schema=cls.get_schema(), + ) + + @classmethod + def add_gene_id_column( + cls: type[EqtlCatalogueStudyIndex], + study_index_df: DataFrame, + summary_stats_df: DataFrame, + ) -> EqtlCatalogueStudyIndex: + """Add a geneId column to the study index and explode. + + While the original list contains one entry per tissue, what we consider as a single study is one mini-GWAS for + an expression of a _particular gene_ in a particular study. At this stage we have a study index with partial + study IDs like "PROJECT_QTLGROUP", and a summary statistics object with full study IDs like + "PROJECT_QTLGROUP_GENEID", so we need to perform a merge and explosion to obtain our final study index. + + Args: + study_index_df (DataFrame): preliminary study index for eQTL Catalogue studies. + summary_stats_df (DataFrame): summary statistics dataframe for eQTL Catalogue data. + + Returns: + EqtlCatalogueStudyIndex: final study index for eQTL Catalogue studies. + """ + partial_to_full_study_id = ( + summary_stats_df.select(f.col("studyId")) + .distinct() + .select( + f.col("studyId").alias("fullStudyId"), # PROJECT_QTLGROUP_GENEID + f.regexp_extract(f.col("studyId"), r"(.*)_[\_]+", 1).alias( + "studyId" + ), # PROJECT_QTLGROUP + ) + .groupBy("studyId") + .agg(f.collect_list("fullStudyId").alias("fullStudyIdList")) + ) + study_index_df = ( + study_index_df.join(partial_to_full_study_id, "studyId", "inner") + .withColumn("fullStudyId", f.explode("fullStudyIdList")) + .drop("fullStudyIdList") + .withColumn("geneId", f.regexp_extract(f.col("studyId"), r".*_([\_]+)", 1)) + .drop("fullStudyId") + ) + return EqtlCatalogueStudyIndex(_df=study_index_df, _schema=cls.get_schema()) diff --git a/src/otg/datasource/eqtl_catalogue/summary_stats.py b/src/otg/datasource/eqtl_catalogue/summary_stats.py new file mode 100644 index 000000000..0b11b6759 --- /dev/null +++ b/src/otg/datasource/eqtl_catalogue/summary_stats.py @@ -0,0 +1,93 @@ +"""Summary statistics ingestion for eQTL Catalogue.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +import pyspark.sql.functions as f +import pyspark.sql.types as t + +from otg.common.utils import parse_pvalue +from otg.dataset.summary_statistics import SummaryStatistics + +if TYPE_CHECKING: + from pyspark.sql import DataFrame + from pyspark.sql.column import Column + + +@dataclass +class EqtlCatalogueSummaryStats(SummaryStatistics): + """Summary statistics dataset for eQTL Catalogue.""" + + @staticmethod + def _full_study_id_regexp() -> Column: + """Constructs a full study ID from the URI. + + Returns: + Column: expression to extract a full study ID from the URI. + """ + # Example of a URI which is used for parsing: + # "gs://genetics_etl_python_playground/input/preprocess/eqtl_catalogue/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz". + + # Regular expession to extract project ID from URI. Example: "GTEx_V8". + _project_id = f.regexp_extract( + f.input_file_name(), + r"imported/([^/]+)/.*", + 1, + ) + # Regular expression to extract QTL group from URI. Example: "Adipose_Subcutaneous". + _qtl_group = f.regexp_extract(f.input_file_name(), r"([^/]+)\.tsv\.gz", 1) + # Extracting gene ID from the column. Example: "ENSG00000225630". + _gene_id = f.col("gene_id") + + # We can now construct the full study ID based on all fields. + # Example: "GTEx_V8_Adipose_Subcutaneous_ENSG00000225630". + return f.concat(_project_id, f.lit("_"), _qtl_group, f.lit("_"), _gene_id) + + @classmethod + def from_source( + cls: type[EqtlCatalogueSummaryStats], + summary_stats_df: DataFrame, + ) -> EqtlCatalogueSummaryStats: + """Ingests all summary stats for all eQTL Catalogue studies. + + Args: + summary_stats_df (DataFrame): an ingested but unprocessed summary statistics dataframe from eQTL Catalogue. + + Returns: + EqtlCatalogueSummaryStats: a processed summary statistics dataframe for eQTL Catalogue. + """ + processed_summary_stats_df = ( + summary_stats_df.select( + # Construct study ID from the appropriate columns. + cls._full_study_id_regexp().alias("studyId"), + # Add variant information. + f.concat_ws( + "_", + f.col("chromosome"), + f.col("position"), + f.col("ref"), + f.col("alt"), + ).alias("variantId"), + f.col("chromosome"), + f.col("position").cast(t.IntegerType()), + # Parse p-value into mantissa and exponent. + *parse_pvalue(f.col("pvalue")), + # Add beta, standard error, and allele frequency information. + f.col("beta").cast("double"), + f.col("se").cast("double").alias("standardError"), + f.col("maf").cast("float").alias("effectAlleleFrequencyFromSource"), + ) + # Drop rows which don't have proper position or beta value. + .filter( + f.col("position").cast(t.IntegerType()).isNotNull() + & (f.col("beta") != 0) + ) + ) + + # Initialise a summary statistics object. + return cls( + _df=processed_summary_stats_df, + _schema=cls.get_schema(), + ) diff --git a/src/otg/datasource/gwas_catalog/associations.py b/src/otg/datasource/gwas_catalog/associations.py index 1229d17ec..642e656c2 100644 --- a/src/otg/datasource/gwas_catalog/associations.py +++ b/src/otg/datasource/gwas_catalog/associations.py @@ -1042,64 +1042,6 @@ def from_source( f.col("OR or BETA"), f.col("95% CI (TEXT)"), ).alias("beta"), - # odds ratio of the association - GWASCatalogAssociations._harmonise_odds_ratio( - GWASCatalogAssociations._normalise_risk_allele( - f.col("STRONGEST SNP-RISK ALLELE") - ), - f.col("referenceAllele"), - f.col("alternateAllele"), - f.col("OR or BETA"), - f.col("95% CI (TEXT)"), - ).alias("oddsRatio"), - # CI lower of the beta value - GWASCatalogAssociations._harmonise_beta_ci( - GWASCatalogAssociations._normalise_risk_allele( - f.col("STRONGEST SNP-RISK ALLELE") - ), - f.col("referenceAllele"), - f.col("alternateAllele"), - f.col("OR or BETA"), - f.col("95% CI (TEXT)"), - f.col("P-VALUE"), - "lower", - ).alias("betaConfidenceIntervalLower"), - # CI upper for the beta value - GWASCatalogAssociations._harmonise_beta_ci( - GWASCatalogAssociations._normalise_risk_allele( - f.col("STRONGEST SNP-RISK ALLELE") - ), - f.col("referenceAllele"), - f.col("alternateAllele"), - f.col("OR or BETA"), - f.col("95% CI (TEXT)"), - f.col("P-VALUE"), - "upper", - ).alias("betaConfidenceIntervalUpper"), - # CI lower of the odds ratio value - GWASCatalogAssociations._harmonise_odds_ratio_ci( - GWASCatalogAssociations._normalise_risk_allele( - f.col("STRONGEST SNP-RISK ALLELE") - ), - f.col("referenceAllele"), - f.col("alternateAllele"), - f.col("OR or BETA"), - f.col("95% CI (TEXT)"), - f.col("P-VALUE"), - "lower", - ).alias("oddsRatioConfidenceIntervalLower"), - # CI upper of the odds ratio value - GWASCatalogAssociations._harmonise_odds_ratio_ci( - GWASCatalogAssociations._normalise_risk_allele( - f.col("STRONGEST SNP-RISK ALLELE") - ), - f.col("referenceAllele"), - f.col("alternateAllele"), - f.col("OR or BETA"), - f.col("95% CI (TEXT)"), - f.col("P-VALUE"), - "upper", - ).alias("oddsRatioConfidenceIntervalUpper"), # p-value of the association, string: split into exponent and mantissa. *GWASCatalogAssociations._parse_pvalue(f.col("P-VALUE")), # Capturing phenotype granularity at the association level diff --git a/src/otg/datasource/gwas_catalog/summary_statistics.py b/src/otg/datasource/gwas_catalog/summary_statistics.py index ccb97a77e..141cc7785 100644 --- a/src/otg/datasource/gwas_catalog/summary_statistics.py +++ b/src/otg/datasource/gwas_catalog/summary_statistics.py @@ -8,11 +8,12 @@ import pyspark.sql.functions as f import pyspark.sql.types as t +from otg.common.spark_helpers import neglog_pvalue_to_mantissa_and_exponent from otg.common.utils import convert_odds_ratio_to_beta, parse_pvalue from otg.dataset.summary_statistics import SummaryStatistics if TYPE_CHECKING: - from pyspark.sql import DataFrame + from pyspark.sql import SparkSession @dataclass @@ -22,53 +23,120 @@ class GWASCatalogSummaryStatistics(SummaryStatistics): @classmethod def from_gwas_harmonized_summary_stats( cls: type[GWASCatalogSummaryStatistics], - sumstats_df: DataFrame, - study_id: str, + spark: SparkSession, + sumstats_file: str, ) -> GWASCatalogSummaryStatistics: """Create summary statistics object from summary statistics flatfile, harmonized by the GWAS Catalog. + Things got slightly complicated given the GWAS Catalog harmonization pipelines changed recently so we had to accomodate to + both formats. + Args: - sumstats_df (DataFrame): Harmonized dataset read as a spark dataframe from GWAS Catalog. - study_id (str): GWAS Catalog study accession. + spark (SparkSession): spark session + sumstats_file (str): list of GWAS Catalog summary stat files, with study ids in them. Returns: GWASCatalogSummaryStatistics: Summary statistics object. """ + sumstats_df = spark.read.csv(sumstats_file, sep="\t", header=True).withColumn( + "studyId", + f.upper( + f.regexp_extract(f.input_file_name(), r"(GCST\d+)\.h\.tsv\.gz$", 1) + ), + ) + + # Parsing variant id fields: + chromosome = ( + f.col("hm_chrom") + if "hm_chrom" in sumstats_df.columns + else f.col("chromosome") + ).cast(t.StringType()) + position = ( + f.col("hm_pos") + if "hm_pos" in sumstats_df.columns + else f.col("base_pair_location") + ).cast(t.IntegerType()) + ref_allele = ( + f.col("hm_other_allele") + if "hm_other_allele" in sumstats_df.columns + else f.col("other_allele") + ) + alt_allele = ( + f.col("hm_effect_allele") + if "hm_effect_allele" in sumstats_df.columns + else f.col("effect_allele") + ) + + # Parsing p-value (get a tuple with mantissa and exponent): + p_value_expression = ( + parse_pvalue(f.col("p_value")) + if "p_value" in sumstats_df.columns + else neglog_pvalue_to_mantissa_and_exponent(f.col("neg_log_10_p_value")) + ) + # The effect allele frequency is an optional column, we have to test if it is there: - allele_frequency_expression = ( - f.col("hm_effect_allele_frequency").cast(t.FloatType()) - if "hm_effect_allele_frequency" in sumstats_df.columns + allele_frequency = ( + f.col("effect_allele_frequency") + if "effect_allele_frequency" in sumstats_df.columns else f.lit(None) - ) + ).cast(t.FloatType()) # Do we have sample size? This expression captures 99.7% of sample size columns. - sample_size_expression = ( - f.col("n").cast(t.IntegerType()) - if "n" in sumstats_df.columns - else f.lit(None).cast(t.IntegerType()) + sample_size = (f.col("n") if "n" in sumstats_df.columns else f.lit(None)).cast( + t.IntegerType() ) + # Depending on the input, we might have beta, but the column might not be there at all also old format calls differently: + beta_expression = ( + f.col("hm_beta") + if "hm_beta" in sumstats_df.columns + else f.col("beta") + if "beta" in sumstats_df.columns + # If no column, create one: + else f.lit(None) + ).cast(t.DoubleType()) + + # We might have odds ratio or hazard ratio, wich are basically the same: + odds_ratio_expression = ( + f.col("hm_odds_ratio") + if "hm_odds_ratio" in sumstats_df.columns + else f.col("odds_ratio") + if "odds_ratio" in sumstats_df.columns + else f.col("hazard_ratio") + if "hazard_ratio" in sumstats_df.columns + # If no column, create one: + else f.lit(None) + ).cast(t.DoubleType()) + + # Standard error is mandatory but called differently in the new format: + standard_error = f.col("standard_error").cast(t.DoubleType()) + # Processing columns of interest: processed_sumstats_df = ( sumstats_df # Dropping rows which doesn't have proper position: .select( - # Adding study identifier: - f.lit(study_id).cast(t.StringType()).alias("studyId"), + "studyId", # Adding variant identifier: - f.col("hm_variant_id").alias("variantId"), - f.col("hm_chrom").alias("chromosome"), - f.col("hm_pos").cast(t.IntegerType()).alias("position"), + f.concat_ws( + "_", + chromosome, + position, + ref_allele, + alt_allele, + ).alias("variantId"), + chromosome.alias("chromosome"), + position.alias("position"), # Parsing p-value mantissa and exponent: - *parse_pvalue(f.col("p_value")), + *p_value_expression, # Converting/calculating effect and confidence interval: *convert_odds_ratio_to_beta( - f.col("hm_beta").cast(t.DoubleType()), - f.col("hm_odds_ratio").cast(t.DoubleType()), - f.col("standard_error").cast(t.DoubleType()), + beta_expression, + odds_ratio_expression, + standard_error, ), - allele_frequency_expression.alias("effectAlleleFrequencyFromSource"), - sample_size_expression.alias("sampleSize"), + allele_frequency.alias("effectAlleleFrequencyFromSource"), + sample_size.alias("sampleSize"), ) .filter( # Dropping associations where no harmonized position is available: @@ -78,7 +146,8 @@ def from_gwas_harmonized_summary_stats( (f.col("beta") != 0) ) .orderBy(f.col("chromosome"), f.col("position")) - .repartition(400) + # median study size is 200Mb, max is 2.6Gb + .repartition(20) ) # Initializing summary statistics object: diff --git a/src/otg/eqtl_catalogue.py b/src/otg/eqtl_catalogue.py new file mode 100644 index 000000000..c57f95ed3 --- /dev/null +++ b/src/otg/eqtl_catalogue.py @@ -0,0 +1,67 @@ +"""Step to run eQTL Catalogue study table ingestion.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from omegaconf import MISSING + +from otg.common.session import Session +from otg.datasource.eqtl_catalogue.study_index import EqtlCatalogueStudyIndex +from otg.datasource.eqtl_catalogue.summary_stats import EqtlCatalogueSummaryStats + + +@dataclass +class EqtlCatalogueStep: + """eQTL Catalogue ingestion step. + + Attributes: + session (Session): Session object. + eqtl_catalogue_paths_imported (str): eQTL Catalogue input files for the harmonised and imported data. + eqtl_catalogue_study_index_out (str): Output path for the eQTL Catalogue study index dataset. + eqtl_catalogue_summary_stats_out (str): Output path for the eQTL Catalogue summary stats. + """ + + session: Session = MISSING + + eqtl_catalogue_paths_imported: str = MISSING + eqtl_catalogue_study_index_out: str = MISSING + eqtl_catalogue_summary_stats_out: str = MISSING + + def __post_init__(self: EqtlCatalogueStep) -> None: + """Run step.""" + # Fetch study index. + df = self.session.spark.read.option("delimiter", "\t").csv( + self.eqtl_catalogue_paths_imported, header=True + ) + # Process partial study index. At this point, it is not complete because we don't have the gene IDs, which we + # will only get once the summary stats are ingested. + study_index_df = EqtlCatalogueStudyIndex.from_source(df).df + + # Fetch summary stats. + input_filenames = [row.summarystatsLocation for row in study_index_df.collect()] + summary_stats_df = ( + self.session.spark.read.option("delimiter", "\t") + .csv(input_filenames, header=True) + .repartition(1280) + ) + # Process summary stats. + summary_stats_df = EqtlCatalogueSummaryStats.from_source(summary_stats_df).df + + # Add geneId column to the study index. + study_index_df = EqtlCatalogueStudyIndex.add_gene_id_column( + study_index_df, + summary_stats_df, + ).df + + # Write study index. + study_index_df.write.mode(self.session.write_mode).parquet( + self.eqtl_catalogue_study_index_out + ) + # Write summary stats. + ( + summary_stats_df.sortWithinPartitions("position") + .write.partitionBy("chromosome") + .mode(self.session.write_mode) + .parquet(self.eqtl_catalogue_summary_stats_out) + ) diff --git a/src/otg/gwas_catalog_sumstat_preprocess.py b/src/otg/gwas_catalog_sumstat_preprocess.py index 590552ed5..6a315f3cb 100644 --- a/src/otg/gwas_catalog_sumstat_preprocess.py +++ b/src/otg/gwas_catalog_sumstat_preprocess.py @@ -17,31 +17,24 @@ class GWASCatalogSumstatsPreprocessStep: session (Session): Session object. raw_sumstats_path (str): Input raw GWAS Catalog summary statistics path. out_sumstats_path (str): Output GWAS Catalog summary statistics path. - study_id (str): GWAS Catalog study identifier. """ session: Session = MISSING raw_sumstats_path: str = MISSING out_sumstats_path: str = MISSING - study_id: str = MISSING def __post_init__(self: GWASCatalogSumstatsPreprocessStep) -> None: """Run step.""" # Extract self.session.logger.info(self.raw_sumstats_path) self.session.logger.info(self.out_sumstats_path) - self.session.logger.info(self.study_id) - # Reading dataset: - raw_dataset = self.session.spark.read.csv( - self.raw_sumstats_path, header=True, sep="\t" - ) self.session.logger.info( - f"Number of single point associations: {raw_dataset.count()}" + f"Ingesting summary stats from: {self.raw_sumstats_path}" ) # Processing dataset: GWASCatalogSummaryStatistics.from_gwas_harmonized_summary_stats( - raw_dataset, self.study_id + self.session.spark, self.raw_sumstats_path ).df.write.mode(self.session.write_mode).parquet(self.out_sumstats_path) self.session.logger.info("Processing dataset successfully completed.") diff --git a/tests/conftest.py b/tests/conftest.py index 6ed3572f4..8ad6a7083 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,8 @@ from otg.dataset.v2g import V2G from otg.dataset.variant_annotation import VariantAnnotation from otg.dataset.variant_index import VariantIndex +from otg.datasource.eqtl_catalogue.study_index import EqtlCatalogueStudyIndex +from otg.datasource.eqtl_catalogue.summary_stats import EqtlCatalogueSummaryStats from otg.datasource.finngen.study_index import FinnGenStudyIndex from otg.datasource.finngen.summary_stats import FinnGenSummaryStats from otg.datasource.gwas_catalog.associations import GWASCatalogAssociations @@ -163,6 +165,24 @@ def mock_summary_stats_finngen(spark: SparkSession) -> FinnGenSummaryStats: ) +@pytest.fixture() +def mock_study_index_eqtl_catalogue(spark: SparkSession) -> EqtlCatalogueStudyIndex: + """Mock EqtlCatalogueStudyIndex dataset.""" + return EqtlCatalogueStudyIndex( + _df=mock_study_index_data(spark), + _schema=StudyIndex.get_schema(), + ) + + +@pytest.fixture() +def mock_summary_stats_eqtl_catalogue(spark: SparkSession) -> EqtlCatalogueSummaryStats: + """Mock EqtlCatalogueSummaryStats dataset.""" + return EqtlCatalogueSummaryStats( + _df=mock_summary_statistics_data(spark), + _schema=SummaryStatistics.get_schema(), + ) + + @pytest.fixture() def mock_study_index_ukbiobank(spark: SparkSession) -> UKBiobankStudyIndex: """Mock StudyIndexUKBiobank dataset.""" @@ -202,11 +222,6 @@ def mock_study_locus_data(spark: SparkSession) -> DataFrame: .withColumnSpec("chromosome", percentNulls=0.1) .withColumnSpec("position", percentNulls=0.1) .withColumnSpec("beta", percentNulls=0.1) - .withColumnSpec("oddsRatio", percentNulls=0.1) - .withColumnSpec("oddsRatioConfidenceIntervalLower", percentNulls=0.1) - .withColumnSpec("oddsRatioConfidenceIntervalUpper", percentNulls=0.1) - .withColumnSpec("betaConfidenceIntervalLower", percentNulls=0.1) - .withColumnSpec("betaConfidenceIntervalUpper", percentNulls=0.1) .withColumnSpec("effectAlleleFrequencyFromSource", percentNulls=0.1) .withColumnSpec("standardError", percentNulls=0.1) .withColumnSpec("subStudyDescription", percentNulls=0.1) @@ -395,11 +410,6 @@ def mock_summary_statistics_data(spark: SparkSession) -> DataFrame: # Making sure p-values are below 1: ).build() - # Because some of the columns are not strictly speaking required, they are dropped now: - data_spec = data_spec.drop( - "betaConfidenceIntervalLower", "betaConfidenceIntervalUpper" - ) - return data_spec @@ -455,16 +465,6 @@ def sample_gwas_catalog_ancestries_lut(spark: SparkSession) -> DataFrame: ) -@pytest.fixture() -def sample_gwas_catalog_harmonised_sumstats(spark: SparkSession) -> DataFrame: - """Sample GWAS harmonised sumstats sample data.""" - return spark.read.csv( - "tests/data_samples/gwas_summary_stats_sample.tsv.gz", - sep="\t", - header=True, - ) - - @pytest.fixture() def sample_gwas_catalog_harmonised_sumstats_list(spark: SparkSession) -> DataFrame: """Sample GWAS harmonised sumstats sample data.""" @@ -516,6 +516,30 @@ def sample_finngen_summary_stats(spark: SparkSession) -> DataFrame: ) +@pytest.fixture() +def sample_eqtl_catalogue_studies(spark: SparkSession) -> DataFrame: + """Sample eQTL Catalogue studies.""" + # For reference, the sample file was generated with the following command: + # curl https://raw.githubusercontent.com/eQTL-Catalogue/eQTL-Catalogue-resources/master/tabix/tabix_ftp_paths_imported.tsv | head -n11 > tests/data_samples/eqtl_catalogue_studies_sample.tsv + with open("tests/data_samples/eqtl_catalogue_studies_sample.tsv") as eqtl_catalogue: + tsv = eqtl_catalogue.read() + rdd = spark.sparkContext.parallelize([tsv]) + return spark.read.csv(rdd, sep="\t", header=True) + + +@pytest.fixture() +def sample_eqtl_catalogue_summary_stats(spark: SparkSession) -> DataFrame: + """Sample eQTL Catalogue summary stats.""" + # For reference, the sample file was generated with the following commands: + # mkdir -p tests/data_samples/imported/GTEx_V8/ge + # curl ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz | gzip -cd | head -n11 | gzip -c > tests/data_samples/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz + # It's important for the test file to be named in exactly this way, because eQTL Catalogue study ID is populated based on input file name. + return spark.read.option("delimiter", "\t").csv( + "tests/data_samples/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz", + header=True, + ) + + @pytest.fixture() def sample_ukbiobank_studies(spark: SparkSession) -> DataFrame: """Sample UKBiobank manifest.""" diff --git a/tests/data_samples/eqtl_catalogue_studies_sample.tsv b/tests/data_samples/eqtl_catalogue_studies_sample.tsv new file mode 100644 index 000000000..f756ecd6e --- /dev/null +++ b/tests/data_samples/eqtl_catalogue_studies_sample.tsv @@ -0,0 +1,11 @@ +study qtl_group tissue_ontology_id tissue_ontology_term tissue_label condition_label quant_method ftp_path +GTEx_V8 Adipose_Subcutaneous UBER_0002190 subcutaneous adipose tissue Adipose - Subcutaneous naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz +GTEx_V8 Adipose_Visceral_Omentum UBER_0010414 omental fat pad Adipose - Visceral (Omentum) naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Adipose_Visceral_Omentum.tsv.gz +GTEx_V8 Adrenal_Gland UBER_0002369 adrenal gland Adrenal Gland naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Adrenal_Gland.tsv.gz +GTEx_V8 Artery_Aorta UBER_0001496 ascending aorta Artery - Aorta naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Artery_Aorta.tsv.gz +GTEx_V8 Artery_Coronary UBER_0001621 coronary artery Artery - Coronary naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Artery_Coronary.tsv.gz +GTEx_V8 Artery_Tibial UBER_0007610 tibial artery Artery - Tibial naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Artery_Tibial.tsv.gz +GTEx_V8 Brain_Amygdala UBER_0001876 amygdala Brain - Amygdala naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Brain_Amygdala.tsv.gz +GTEx_V8 Brain_Anterior_cingulate_cortex_BA24 UBER_0009835 anterior cingulate cortex Brain - Anterior cingulate cortex (BA24) naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Brain_Anterior_cingulate_cortex_BA24.tsv.gz +GTEx_V8 Brain_Caudate_basal_ganglia UBER_0001873 caudate nucleus Brain - Caudate (basal ganglia) naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Brain_Caudate_basal_ganglia.tsv.gz +GTEx_V8 Brain_Cerebellar_Hemisphere UBER_0002037 cerebellum Brain - Cerebellar Hemisphere naive ge ftp://ftp.ebi.ac.uk/pub/databases/spot/eQTL/imported/GTEx_V8/ge/Brain_Cerebellar_Hemisphere.tsv.gz diff --git a/tests/data_samples/gwas_summary_stats_sample.tsv.gz b/tests/data_samples/gwas_summary_stats_sample.tsv.gz deleted file mode 100644 index 9a9c5f735..000000000 Binary files a/tests/data_samples/gwas_summary_stats_sample.tsv.gz and /dev/null differ diff --git a/tests/data_samples/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz b/tests/data_samples/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz new file mode 100644 index 000000000..74c0844a4 Binary files /dev/null and b/tests/data_samples/imported/GTEx_V8/ge/Adipose_Subcutaneous.tsv.gz differ diff --git a/tests/data_samples/new_format_GCST90293086.h.tsv.gz b/tests/data_samples/new_format_GCST90293086.h.tsv.gz new file mode 100644 index 000000000..a15d475df Binary files /dev/null and b/tests/data_samples/new_format_GCST90293086.h.tsv.gz differ diff --git a/tests/data_samples/old_format_GCST006090.h.tsv.gz b/tests/data_samples/old_format_GCST006090.h.tsv.gz new file mode 100644 index 000000000..76a207ccf Binary files /dev/null and b/tests/data_samples/old_format_GCST006090.h.tsv.gz differ diff --git a/tests/datasource/eqtl_catalogue/test_eqtl_catalogue_study_index.py b/tests/datasource/eqtl_catalogue/test_eqtl_catalogue_study_index.py new file mode 100644 index 000000000..cc97914bf --- /dev/null +++ b/tests/datasource/eqtl_catalogue/test_eqtl_catalogue_study_index.py @@ -0,0 +1,20 @@ +"""Tests for study index dataset from eQTL Catalogue.""" + +from __future__ import annotations + +from pyspark.sql import DataFrame + +from otg.dataset.study_index import StudyIndex +from otg.datasource.eqtl_catalogue.study_index import EqtlCatalogueStudyIndex + + +def test_eqtl_catalogue_study_index_from_source( + sample_eqtl_catalogue_studies: DataFrame, +) -> None: + """Test study index from source.""" + assert isinstance( + EqtlCatalogueStudyIndex.from_source( + sample_eqtl_catalogue_studies, + ), + StudyIndex, + ) diff --git a/tests/datasource/eqtl_catalogue/test_eqtl_catalogue_summary_stats.py b/tests/datasource/eqtl_catalogue/test_eqtl_catalogue_summary_stats.py new file mode 100644 index 000000000..26132f986 --- /dev/null +++ b/tests/datasource/eqtl_catalogue/test_eqtl_catalogue_summary_stats.py @@ -0,0 +1,18 @@ +"""Tests for study index dataset from eQTL Catalogue.""" + +from __future__ import annotations + +from pyspark.sql import DataFrame + +from otg.dataset.summary_statistics import SummaryStatistics +from otg.datasource.eqtl_catalogue.summary_stats import EqtlCatalogueSummaryStats + + +def test_eqtl_catalogue_summary_stats_from_source( + sample_eqtl_catalogue_summary_stats: DataFrame, +) -> None: + """Test summary statistics from source.""" + assert isinstance( + EqtlCatalogueSummaryStats.from_source(sample_eqtl_catalogue_summary_stats), + SummaryStatistics, + ) diff --git a/tests/datasource/gwas_catalog/test_gwas_catalog_summary_statistics.py b/tests/datasource/gwas_catalog/test_gwas_catalog_summary_statistics.py index 548ac0f30..9bfb677c1 100644 --- a/tests/datasource/gwas_catalog/test_gwas_catalog_summary_statistics.py +++ b/tests/datasource/gwas_catalog/test_gwas_catalog_summary_statistics.py @@ -4,19 +4,109 @@ from typing import TYPE_CHECKING +import pyspark.sql.functions as f +import pytest + +from otg.dataset.summary_statistics import SummaryStatistics from otg.datasource.gwas_catalog.summary_statistics import GWASCatalogSummaryStatistics if TYPE_CHECKING: - from pyspark.sql import DataFrame - - -def test_gwas_catalog_summary_statistics_from_gwas_harmonized_summary_stats( - sample_gwas_catalog_harmonised_sumstats: DataFrame, -) -> None: - """Test GWASCatalogSummaryStatistics creation with mock data.""" - assert isinstance( - GWASCatalogSummaryStatistics.from_gwas_harmonized_summary_stats( - sample_gwas_catalog_harmonised_sumstats, "GCST000000" - ), - GWASCatalogSummaryStatistics, + from pyspark.sql import SparkSession + from pytest import FixtureRequest + + +class TestGWASCatalogSummaryStatistics: + """Test suite for GWAS Catalog summary stats ingestion.""" + + @pytest.fixture(scope="class") + def gwas_catalog_summary_statistics__new_format( + self: TestGWASCatalogSummaryStatistics, + spark: SparkSession, + ) -> GWASCatalogSummaryStatistics: + """Test GWASCatalogSummaryStatistics creation with mock data.""" + return GWASCatalogSummaryStatistics.from_gwas_harmonized_summary_stats( + spark, "tests/data_samples/new_format_GCST90293086.h.tsv.gz" + ) + + @pytest.fixture(scope="class") + def gwas_catalog_summary_statistics__old_format( + self: TestGWASCatalogSummaryStatistics, + spark: SparkSession, + ) -> GWASCatalogSummaryStatistics: + """Test GWASCatalogSummaryStatistics creation with mock data.""" + return GWASCatalogSummaryStatistics.from_gwas_harmonized_summary_stats( + spark, "tests/data_samples/old_format_GCST006090.h.tsv.gz" + ) + + @pytest.fixture(scope="class") + def test_dataset_instance( + self: TestGWASCatalogSummaryStatistics, request: FixtureRequest + ) -> GWASCatalogSummaryStatistics: + """Meta fixture to return the value of any requested fixture.""" + return request.getfixturevalue(request.param) + + @pytest.mark.parametrize( + "test_dataset_instance", + [ + "gwas_catalog_summary_statistics__old_format", + "gwas_catalog_summary_statistics__new_format", + ], + indirect=True, + ) + def test_return_type( + self: TestGWASCatalogSummaryStatistics, + test_dataset_instance: SummaryStatistics, + ) -> None: + """Testing return type.""" + assert isinstance(test_dataset_instance, SummaryStatistics) + + @pytest.mark.parametrize( + "test_dataset_instance", + [ + "gwas_catalog_summary_statistics__old_format", + "gwas_catalog_summary_statistics__new_format", + ], + indirect=True, + ) + def test_p_value_parsed_correctly( + self: TestGWASCatalogSummaryStatistics, + test_dataset_instance: SummaryStatistics, + ) -> None: + """Testing parsed p-value.""" + assert ( + test_dataset_instance.df.filter(f.col("pValueMantissa").isNotNull()).count() + > 1 + ) + + @pytest.mark.parametrize( + "test_dataset_instance", + [ + "gwas_catalog_summary_statistics__old_format", + "gwas_catalog_summary_statistics__new_format", + ], + indirect=True, + ) + def test_effect_parsed_correctly( + self: TestGWASCatalogSummaryStatistics, + test_dataset_instance: SummaryStatistics, + ) -> None: + """Testing properly parsed effect.""" + assert test_dataset_instance.df.filter(f.col("beta").isNotNull()).count() > 1 + + @pytest.mark.parametrize( + "test_dataset_instance", + [ + "gwas_catalog_summary_statistics__old_format", + "gwas_catalog_summary_statistics__new_format", + ], + indirect=True, ) + def test_study_id( + self: TestGWASCatalogSummaryStatistics, + test_dataset_instance: SummaryStatistics, + ) -> None: + """Testing properly parsed effect.""" + assert ( + test_dataset_instance.df.filter(f.col("studyId").startswith("GCST")).count() + == test_dataset_instance.df.count() + )