generated from CDCgov/template
-
Notifications
You must be signed in to change notification settings - Fork 11
/
biosample_sra_handler.py
executable file
·287 lines (278 loc) · 16.2 KB
/
biosample_sra_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
########################### Description ##################################
# Functions for NCBI input/output handling
################################################################################
import pandas as pd
from typing import Set, Dict, Any, Tuple, List
import os
from lxml import etree
import sys
import re
import ncbi_handler
import file_handler
import upload_log
from settings import BIOSAMPLE_REGEX, SRA_REGEX
# Check raw reads files listed in metadata file
def check_raw_read_files(submission_name: str, submission_dir: str, metadata: pd.DataFrame) -> Set[str]:
# Pop off the end directories of submission_dir 'submission_files/SRA'
raw_reads_path_default = os.path.join(os.path.split(os.path.split(submission_dir)[0])[0], "raw_reads")
# Separate samples stored in local and cloud
local_df = metadata[metadata["sra-file_location"] == "local"]
file_columns = [col for col in local_df.columns if re.match("sra-file_[1-9]\d*", col)]
validated_files = set()
invalid_raw_files = []
for index, row in local_df.iterrows():
# If multiple files check each one
for column_name in file_columns:
file = row[column_name]
# At least one file required for SRA but extra file columns can have empty slots
if (file is None or file.strip() == "") and column_name != "sra-file_1":
continue
file = file.strip()
# If full path don't use rawreads folderfil
if os.path.isabs(file):
file_path = file
else:
# If just file name use raw reads default path
file_path = os.path.join(raw_reads_path_default, file)
if os.path.isfile(file_path) == False:
invalid_raw_files.append(f"Error: Raw read files for {row['sra-sample_name']} could not be found. Field '{column_name}' must either be the full file path, or if just the file name it must be stored at '<submission_dir>/<submission_name>/raw_reads/<sra-file>'.")
else:
validated_files.add(file_path)
if invalid_raw_files:
for error_msg in invalid_raw_files:
print(error_msg, file=sys.stderr)
sys.exit(1)
return validated_files
# Create files for optional manual submission to repositories biosample and sra
def create_manual_submission_files(database: str, submission_dir: str, metadata: pd.DataFrame, config_dict: Dict[str, Any]) -> None:
if "SRA" in database:
metadata_regex = "^sra-|^organism$|^collection_date$"
rename_columns = {"sra-library_name":"sra-library_ID"}
drop_columns = ["sra-file_location", "sra-loader", "sra-file"]
column_ordered = ["sample_name","library_ID"]
prefix = "sra-"
# Create SRA specific fields
filename_cols = [col for col in metadata.columns.tolist() if re.match("sra-file_[1-9]\d*", col)]
# Correct index for filename column
for col in filename_cols:
# Remove 0 index
if col == "sra-file_1":
rename_columns[col] = "sra-filename"
else:
rename_columns[col] = col.replace("sra-file_", "sra-filename")
elif "BIOSAMPLE" in database:
metadata_regex = "^bs-|^organism$|^collection_date$"
rename_columns = {"bioproject":"bioproject_accession"}
drop_columns = ["bs-title", "bs-comment", "bs-sample_title", "bs-sample_description"]
column_ordered = ["sample_name"]
prefix = "bs-"
else:
print("Error: create_manual_submission_files function only for databases SRA/BioSample. Not '{database}'.", file=sys.stderr)
sys.exit(1)
# Filter only needed columns
database_df = metadata.filter(regex=metadata_regex).copy()
database_df = database_df.drop_duplicates()
# Rename columns for manual submission
database_df = database_df.rename(columns={key: value for key, value in rename_columns.items() if key in database_df})
# Remove columns not required for manual submission
database_df = database_df.drop(columns=[col for col in drop_columns if col in database_df.columns])
# Remove database prefix
database_df.columns = database_df.columns.str.replace(prefix,"")
# Order columns
columns_no_order = list(filter(lambda x: (x not in column_ordered)==True, database_df.columns))
column_ordered = column_ordered + columns_no_order
database_df = database_df.reindex(columns=column_ordered)
file_handler.save_csv(df=database_df, file_path=submission_dir, file_name="metadata.tsv", sep="\t")
# Create submission XML
def create_submission_xml(organism: str, database: str, submission_name: str, config_dict: Dict[str, Any], metadata: pd.DataFrame) -> bytes:
# Submission XML header
root = etree.Element("Submission")
description = etree.SubElement(root, "Description")
title = etree.SubElement(description, "Title")
if "BIOSAMPLE" in database:
if "bs-title" in metadata and pd.notnull(metadata["bs-title"].iloc[0]) and metadata["bs-title"].iloc[0].strip() != 0:
title.text = metadata["bs-title"].iloc[0]
else:
title.text = submission_name + "-BS"
comment = etree.SubElement(description, "Comment")
if "bs-comment" in metadata and pd.notnull(metadata["bs-comment"].iloc[0]) and metadata["bs-comment"].iloc[0].strip() != 0:
comment.text = metadata["bs-comment"].iloc[0]
else:
comment.text = "BioSample Submission"
elif "SRA" in database:
if "sra-title" in metadata and pd.notnull(metadata["sra-title"].iloc[0]) and metadata["sra-title"].iloc[0].strip() != 0:
title.text = metadata["sra-title"].iloc[0]
else:
title.text = submission_name + "-SRA"
comment = etree.SubElement(description, "Comment")
if "sra-comment" in metadata and pd.notnull(metadata["sra-comment"].iloc[0]) and metadata["sra-comment"].iloc[0].strip() != 0:
comment.text = metadata["sra-comment"].iloc[0]
else:
comment.text = "SRA Submission"
# Description info including organization and contact info
organization = etree.SubElement(description, "Organization", type=config_dict["Description"]["Organization"]["Type"], role=config_dict["Description"]["Organization"]["Role"])
org_name = etree.SubElement(organization, "Name")
org_name.text = config_dict["Description"]["Organization"]["Name"]
contact = etree.SubElement(organization, "Contact", email=config_dict["Description"]["Organization"]["Submitter"]["Email"])
name = etree.SubElement(contact, "Name")
first_name = etree.SubElement(name, "First")
first_name.text = config_dict["Description"]["Organization"]["Submitter"]["Name"]["First"]
last_name = etree.SubElement(name, "Last")
last_name.text = config_dict["Description"]["Organization"]["Submitter"]["Name"]["Last"]
if config_dict["Specified_Release_Date"]:
release_date = etree.SubElement(description, "Hold", release_date=config_dict["Specified_Release_Date"])
# XML actions
if "BIOSAMPLE" in database:
database_df = metadata.filter(regex=BIOSAMPLE_REGEX).copy()
database_df = database_df.drop_duplicates()
for index, row in database_df.iterrows():
action = etree.SubElement(root, "Action")
add_data = etree.SubElement(action, "AddData", target_db="BioSample")
data = etree.SubElement(add_data, "Data", content_type="xml")
xmlcontent = etree.SubElement(data, "XmlContent")
biosample = etree.SubElement(xmlcontent, "BioSample", schema_version="2.0")
sampleid = etree.SubElement(biosample, "SampleId")
spuid = etree.SubElement(sampleid, "SPUID", spuid_namespace=config_dict["Spuid_Namespace"])
spuid.text = row["bs-sample_name"]
if ("bs-sample_title" in metadata and pd.notnull(row["bs-sample_title"]) and row["bs-sample_title"].strip != "") or ("bs-sample_description" in metadata and pd.notnull(row["bs-sample_description"]) and row["bs-sample_description"].strip != ""):
descriptor = etree.SubElement(biosample, "Descriptor")
if "bs-sample_title" in metadata and pd.notnull(row["bs-sample_title"]) and row["bs-sample_title"].strip != "":
sample_title = etree.SubElement(descriptor, "Title")
sample_title.text = row["bs-sample_title"]
if "bs-sample_description" in metadata and pd.notnull(row["bs-sample_description"]) and row["bs-sample_description"].strip != "":
sample_description = etree.SubElement(descriptor, "Description")
sample_description.text = row["bs-sample_description"]
organismxml = etree.SubElement(biosample, "Organism")
organismname = etree.SubElement(organismxml, "OrganismName")
organismname.text = row["organism"]
if "bioproject" in metadata and pd.notnull(row["bioproject"]) and row["bioproject"].strip() != "":
bioproject = etree.SubElement(biosample, "BioProject")
primaryid = etree.SubElement(bioproject, "PrimaryId", db="BioProject")
primaryid.text = row["bioproject"]
package = etree.SubElement(biosample, "Package")
package.text = config_dict["BioSample_Package"]
# Attributes
attributes = etree.SubElement(biosample, "Attributes")
# Remove columns with bs-prefix that are not attributes
biosample_cols = [col for col in database_df.columns.tolist() if (col.startswith('bs-')) and (col not in ["bs-sample_name", "bs-package", "bs-title", "bs-comment", "bs-sample_title", "bs-sample_description"])]
for col in biosample_cols:
attribute_value = row[col]
if pd.notnull(attribute_value) and attribute_value.strip() != "":
attribute = etree.SubElement(attributes, "Attribute", attribute_name=col.replace("bs-",""))
attribute.text = row[col]
# Add collection date to Attributes
attribute = etree.SubElement(attributes, "Attribute", attribute_name="collection_date")
attribute.text = row["collection_date"]
identifier = etree.SubElement(add_data, "Identifier")
spuid = etree.SubElement(identifier, "SPUID", spuid_namespace=config_dict["Spuid_Namespace"])
spuid.text = row["bs-sample_name"]
if "SRA" in database:
database_df = metadata.filter(regex=(SRA_REGEX)).copy()
database_df = database_df.drop_duplicates()
file_columns = [col for col in database_df.columns if re.match("sra-file_[1-9]\d*", col)]
for index, row in database_df.iterrows():
action = etree.SubElement(root, "Action")
addfiles = etree.SubElement(action, "AddFiles", target_db="SRA")
for column_name in file_columns:
# extra file columns can be empty but not first file column
if column_name == "sra-file_1" and (row[column_name] is None or row[column_name].strip() == ""):
print(f"Error: metadata must contain a file for {row['sra-sample_name']} in column sra-file_1", file=sys.stderr)
sys.exit(1)
elif column_name != "sra-file_1" and (row[column_name] is None or row[column_name].strip() == ""):
continue
if row["sra-file_location"].strip().lower() == "cloud":
file = etree.SubElement(addfiles, "File", cloud_url = row[column_name].strip())
elif row["sra-file_location"].strip().lower() == "local":
file = etree.SubElement(addfiles, "File", file_path = os.path.basename(row[column_name].strip()))
else:
print("Error: Metadata field file_location must be either cloud or local. Field currently contains: " + row["sra-file_location"].strip().lower(), file=sys.stderr)
sys.exit(1)
datatype = etree.SubElement(file, "DataType")
datatype.text = "generic-data"
# Remove columns with sra- prefix that are not attributes
sra_cols = [col for col in database_df.columns.tolist() if col.startswith('sra-') and not re.match("(sra-sample_name|sra-title|sra-comment|sra-file_location|sra-file_\d*)", col)]
for col in sra_cols:
attribute_value = row[col]
if pd.notnull(attribute_value) and attribute_value.strip() != "":
attribute = etree.SubElement(addfiles, "Attribute", name=col.replace("sra-",""))
attribute.text = row[col]
if pd.notnull(row["bioproject"]) and row["bioproject"].strip() != "":
attribute_ref_id = etree.SubElement(addfiles, "AttributeRefId", name="BioProject")
refid = etree.SubElement(attribute_ref_id, "RefId")
primaryid = etree.SubElement(refid, "PrimaryId")
primaryid.text = row["bioproject"]
attribute_ref_id = etree.SubElement(addfiles, "AttributeRefId", name="BioSample")
refid = etree.SubElement(attribute_ref_id, "RefId")
spuid = etree.SubElement(refid, "SPUID", spuid_namespace=config_dict["Spuid_Namespace"])
spuid.text = metadata.loc[metadata["sra-sample_name"] == row["sra-sample_name"], "bs-sample_name"].iloc[0]
identifier = etree.SubElement(addfiles, "Identifier")
spuid = etree.SubElement(identifier, "SPUID", spuid_namespace=config_dict["Spuid_Namespace"])
spuid.text = row["sra-sample_name"]
# Pretty print xml
xml_str = etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=True)
return xml_str
# Create list of raw read paths inside sra submission folder
def create_raw_reads_list(submission_dir: str, raw_files_list: Set[str]) -> None:
with open(os.path.join(submission_dir, "raw_reads_location.txt"), "w+") as file:
for line in raw_files_list:
file.write(line + "\n")
# Main create function for BioSample/SRA
def create_biosample_sra_submission(organism: str, database: str, submission_name: str, submission_dir: str, config_dict: Dict[str, Any], metadata: pd.DataFrame):
if database == "SRA":
# Validate and write raw reads location
raw_files_list = check_raw_read_files(submission_name=submission_name, submission_dir=submission_dir, metadata=metadata)
create_raw_reads_list(submission_dir=submission_dir, raw_files_list=raw_files_list)
manual_df = metadata.copy()
create_manual_submission_files(database=database, submission_dir=submission_dir, metadata=manual_df, config_dict=config_dict)
xml_str = create_submission_xml(organism=organism, database=database, submission_name=submission_name, metadata=metadata, config_dict=config_dict)
file_handler.save_xml(xml_str, submission_dir)
# Read xml report and get status of the submission
def process_biosample_sra_report(report_file: str, database: str, submission_dir: str) -> Tuple[str, str]:
report_dict, submission_status, submission_id = ncbi_handler.process_report_header(report_file=report_file)
sample_name_prefix = {"BIOSAMPLE":"bs-", "SRA":"sra-"}
sample_info = []
if "Action" not in report_dict["SubmissionStatus"]:
return submission_status, submission_id
try:
# If only a single sample, convert into list for proper formatting
if isinstance(report_dict["SubmissionStatus"]["Action"], list):
action_list = report_dict["SubmissionStatus"]["Action"]
elif isinstance(report_dict["SubmissionStatus"]["Action"], dict):
action_list = [report_dict["SubmissionStatus"]["Action"]]
else:
print(f"Error: Unable to correctly process BioSample report at: {report_file}", file=sys.stderr)
return submission_status, submission_id
for action_dict in action_list:
# Skip if incorrect database
if "@target_db" not in action_dict or action_dict["@target_db"].lower() != database.lower():
continue
# Skip if missing sample info
elif "Response" not in action_dict:
continue
elif isinstance(action_dict["Response"], list) and any("Object" in response_dict for response_dict in action_dict["Response"]) and "@status" in action_dict:
# print(action_dict["Response"])
for index in range(len(action_dict["Response"])):
response_dict = action_dict["Response"][index]
if "Object" in response_dict and all(field in response_dict["Object"] for field in ["@accession", "@spuid"]):
sample_name = response_dict["Object"]["@spuid"]
accession = response_dict["Object"]["@accession"]
sample_name_col = f"{sample_name_prefix[database]}sample_name"
column_prefix = database.lower()
sample_info.append({sample_name_col:sample_name, f"{column_prefix}_status":response_dict["@status"], f"{column_prefix}_accession":accession, f"{column_prefix}_message":""})
break
elif isinstance(action_dict["Response"], dict) and "Object" in action_dict["Response"] and "@status" in action_dict:
if "@accession" in action_dict["Response"]["Object"] and "@spuid" in action_dict["Response"]["Object"]:
sample_name = action_dict["Response"]["Object"]["@spuid"]
accession = action_dict["Response"]["Object"]["@accession"]
sample_name_col = f"{sample_name_prefix[database]}sample_name"
column_prefix = database.lower()
sample_info.append({sample_name_col:sample_name, f"{column_prefix}_status":action_dict["@status"], f"{column_prefix}_accession":accession, f"{column_prefix}_message":""})
except:
pass
if submission_status == "PROCESSED" and not sample_info:
print(f"Error: Unable to process {database} report.xml to retrieve accessions at: {report_file}", file=sys.stderr)
if sample_info:
update_df = pd.DataFrame(sample_info)
upload_log.update_submission_status_csv(submission_dir=submission_dir, update_database=database, update_df=update_df)
return submission_status, submission_id