Skip to content

Commit

Permalink
refactor: simplify passing kwargs to a PySpark job
Browse files Browse the repository at this point in the history
  • Loading branch information
tskir committed Oct 9, 2023
1 parent ac78d10 commit 3686d8a
Showing 1 changed file with 19 additions and 45 deletions.
64 changes: 19 additions & 45 deletions src/otg/preprocess/finngen.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

import argparse
import sys
from urllib.request import urlopen

from pyspark.sql import SparkSession
Expand All @@ -11,28 +11,32 @@
from otg.dataset.finngen.summary_stats import FinnGenSummaryStatistics


def main(args) -> None:
def main(
finngen_phenotype_table_url,
finngen_release_prefix,
finngen_summary_stats_url_prefix,
finngen_summary_stats_url_suffix,
finngen_study_index_out,
finngen_summary_stats_out,
spark_write_mode,
) -> None:
"""Ingest FinnGen data."""
# Initialise Spark session.
spark = SparkSession.builder.master("yarn").appName("ingest_finngen").getOrCreate()

# Fetch study index.
json_data = urlopen(args.finngen_phenotype_table_url).read().decode("utf-8")
json_data = urlopen(finngen_phenotype_table_url).read().decode("utf-8")
rdd = spark.sparkContext.parallelize([json_data])
study_index_raw = spark.read.json(rdd)
# Process study index.
study_index = FinnGenStudyIndex.from_source(
study_index_raw,
args.finngen_release_prefix,
args.finngen_summary_stats_url_prefix,
args.finngen_summary_stats_url_suffix,
finngen_release_prefix,
finngen_summary_stats_url_prefix,
finngen_summary_stats_url_suffix,
).df
# Save study index.
(
study_index.write.mode(args.spark_write_mode).parquet(
args.finngen_study_index_out
)
)
(study_index.write.mode(spark_write_mode).parquet(finngen_study_index_out))

# Fetch summary stats.
input_filenames = [row.summarystatsLocation for row in study_index.collect()]
Expand All @@ -47,41 +51,11 @@ def main(args) -> None:
(
summary_stats.sortWithinPartitions("position")
.write.partitionBy("studyId", "chromosome")
.mode(args.spark_write_mode)
.parquet(args.finngen_summary_stats_out)
.mode(spark_write_mode)
.parquet(finngen_summary_stats_out)
)


parser = argparse.ArgumentParser(description="Process FinnGen data")
parser.add_argument(
"--finngen_phenotype_table_url",
help="URL to ingest FinnGen phenotype table JSON data from",
)
parser.add_argument(
"--finngen_release_prefix", help="Prefix which will be added to all study IDs"
)
parser.add_argument(
"--finngen_summary_stats_url_prefix",
help="Prefix for each summary stats file URL",
)
parser.add_argument(
"--finngen_summary_stats_url_suffix",
help="Suffix for each summary stats file URL",
)
parser.add_argument(
"--finngen_study_index_out",
help="Output URL in Google Storage to save the study index",
)
parser.add_argument(
"--finngen_summary_stats_out",
help="Output URL in Google Storage to save the summary stats",
)
parser.add_argument(
"--spark_write_mode",
help="Spark write mode which is applied to both study index and summary stats",
)


if __name__ == "__main__":
args = parser.parse_args()
main(args)
kwargs = dict([arg[2:].split("=") for arg in sys.argv[1:]])
main(**kwargs)

0 comments on commit 3686d8a

Please sign in to comment.