From 3686d8aeb2a23e766df63e1d24aab6282a21b2ad Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Mon, 9 Oct 2023 17:11:31 +0100 Subject: [PATCH] refactor: simplify passing kwargs to a PySpark job --- src/otg/preprocess/finngen.py | 64 +++++++++++------------------------ 1 file changed, 19 insertions(+), 45 deletions(-) diff --git a/src/otg/preprocess/finngen.py b/src/otg/preprocess/finngen.py index 0883824a3..5132b968c 100644 --- a/src/otg/preprocess/finngen.py +++ b/src/otg/preprocess/finngen.py @@ -2,7 +2,7 @@ from __future__ import annotations -import argparse +import sys from urllib.request import urlopen from pyspark.sql import SparkSession @@ -11,28 +11,32 @@ from otg.dataset.finngen.summary_stats import FinnGenSummaryStatistics -def main(args) -> None: +def main( + finngen_phenotype_table_url, + finngen_release_prefix, + finngen_summary_stats_url_prefix, + finngen_summary_stats_url_suffix, + finngen_study_index_out, + finngen_summary_stats_out, + spark_write_mode, +) -> None: """Ingest FinnGen data.""" # Initialise Spark session. spark = SparkSession.builder.master("yarn").appName("ingest_finngen").getOrCreate() # Fetch study index. - json_data = urlopen(args.finngen_phenotype_table_url).read().decode("utf-8") + json_data = urlopen(finngen_phenotype_table_url).read().decode("utf-8") rdd = spark.sparkContext.parallelize([json_data]) study_index_raw = spark.read.json(rdd) # Process study index. study_index = FinnGenStudyIndex.from_source( study_index_raw, - args.finngen_release_prefix, - args.finngen_summary_stats_url_prefix, - args.finngen_summary_stats_url_suffix, + finngen_release_prefix, + finngen_summary_stats_url_prefix, + finngen_summary_stats_url_suffix, ).df # Save study index. - ( - study_index.write.mode(args.spark_write_mode).parquet( - args.finngen_study_index_out - ) - ) + (study_index.write.mode(spark_write_mode).parquet(finngen_study_index_out)) # Fetch summary stats. input_filenames = [row.summarystatsLocation for row in study_index.collect()] @@ -47,41 +51,11 @@ def main(args) -> None: ( summary_stats.sortWithinPartitions("position") .write.partitionBy("studyId", "chromosome") - .mode(args.spark_write_mode) - .parquet(args.finngen_summary_stats_out) + .mode(spark_write_mode) + .parquet(finngen_summary_stats_out) ) -parser = argparse.ArgumentParser(description="Process FinnGen data") -parser.add_argument( - "--finngen_phenotype_table_url", - help="URL to ingest FinnGen phenotype table JSON data from", -) -parser.add_argument( - "--finngen_release_prefix", help="Prefix which will be added to all study IDs" -) -parser.add_argument( - "--finngen_summary_stats_url_prefix", - help="Prefix for each summary stats file URL", -) -parser.add_argument( - "--finngen_summary_stats_url_suffix", - help="Suffix for each summary stats file URL", -) -parser.add_argument( - "--finngen_study_index_out", - help="Output URL in Google Storage to save the study index", -) -parser.add_argument( - "--finngen_summary_stats_out", - help="Output URL in Google Storage to save the summary stats", -) -parser.add_argument( - "--spark_write_mode", - help="Spark write mode which is applied to both study index and summary stats", -) - - if __name__ == "__main__": - args = parser.parse_args() - main(args) + kwargs = dict([arg[2:].split("=") for arg in sys.argv[1:]]) + main(**kwargs)