1
1
"""Study Index for Finngen data source."""
2
+
2
3
from __future__ import annotations
3
4
4
5
import pyspark .sql .functions as f
@@ -29,9 +30,7 @@ def from_source(
29
30
"""
30
31
# In order to populate the nSamples column, we need to peek inside the summary stats dataframe.
31
32
num_of_samples = (
32
- spark
33
- .read
34
- .parquet (raw_summary_stats_path )
33
+ spark .read .parquet (raw_summary_stats_path )
35
34
.filter (f .col ("chromosome" ) == "22" )
36
35
.groupBy ("studyId" )
37
36
.agg (f .first ("N" ).cast ("integer" ).alias ("nSamples" ))
@@ -45,29 +44,25 @@ def from_source(
45
44
f .lit ("UKB_PPP_EUR" ).alias ("projectId" ),
46
45
f .col ("_gentropy_study_id" ).alias ("studyId" ),
47
46
f .col ("UKBPPP_ProteinID" ).alias ("traitFromSource" ),
48
- f .lit ("UBERON_0001969" ).alias ("tissueFromSourceId " ),
47
+ f .lit ("UBERON_0001969" ).alias ("biosampleFromSourceId " ),
49
48
f .col ("ensembl_id" ).alias ("geneId" ),
50
49
f .lit (True ).alias ("hasSumstats" ),
51
50
f .col ("_gentropy_summary_stats_link" ).alias ("summarystatsLocation" ),
52
51
)
53
52
.join (num_of_samples , "studyId" , "inner" )
54
53
)
55
54
# Add population structure.
56
- study_index_df = (
57
- study_index_df
58
- .withColumn (
59
- "discoverySamples" ,
60
- f .array (
61
- f .struct (
62
- f .col ("nSamples" ).cast ("integer" ).alias ("sampleSize" ),
63
- f .lit ("European" ).alias ("ancestry" ),
64
- )
55
+ study_index_df = study_index_df .withColumn (
56
+ "discoverySamples" ,
57
+ f .array (
58
+ f .struct (
59
+ f .col ("nSamples" ).cast ("integer" ).alias ("sampleSize" ),
60
+ f .lit ("European" ).alias ("ancestry" ),
65
61
)
66
- )
67
- .withColumn (
68
- "ldPopulationStructure" ,
69
- cls .aggregate_and_map_ancestries (f .col ("discoverySamples" )),
70
- )
62
+ ),
63
+ ).withColumn (
64
+ "ldPopulationStructure" ,
65
+ cls .aggregate_and_map_ancestries (f .col ("discoverySamples" )),
71
66
)
72
67
73
68
return StudyIndex (
0 commit comments