-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatas_submittor.py
150 lines (135 loc) · 7.84 KB
/
datas_submittor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Script to submit data and metadata to a gen3 instance using the sdk and command line tool
"""
import argparse
from gen3.auth import Gen3Auth
from gen3.submission import Gen3Submission
from gen3.index import Gen3Index
import requests
import os
import json
import sys
import subprocess
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--folder", required=True, type=str,
help="The outer folder where simulated data lives, usually "
"s/path/to/umccr-dictionary/data/<NAME OF DICT>")
parser.add_argument("--projects", nargs="*", default=["AusDiab", "FIELD", "BioHEART-CT"],
help="The names of the specific projects, space-delimited, which are sub-folders of the "
"--folder provided")
parser.add_argument("--delete-all-metadata", action="store_true",
help="If specified, will delete all node metadata below the project level in order.")
parser.add_argument("--profile", action="store",
help="The name of your gen3-client profile, required for uploading data files to the portal.")
parser.add_argument("--api-endpoint", action="store",
help="The URL of the data commons, e.g. https://data.acdc.ozheart.org")
parser.add_argument("--credentials", action="store", default="_local/credentials.json",
help="The path to the credentials.json with authority to upload to the commons")
parser.add_argument("--numparallel", action="store", default=2,
help="how many cores to use for uploading in parallel")
parser.add_argument("--add-subjects", action="store_true", default=False,
help="If specified, will skip program and project creation and will add nodes from subjects "
"onwards")
parser.add_argument("--metadata-only", action="store_true", default=False,
help="If specified, will only update the metadata json files and will not upload associated "
"data files.")
return parser.parse_args()
def delete_metadata(project_name, folder_path, api_endpoint, credentials_path):
with open(os.path.join(folder_path, project, "DataImportOrder.txt"), "r") as f:
import_order = [line.rstrip() for line in f]
import_order.remove("project")
import_order.remove("program")
import_order.reverse()
endpoint = api_endpoint
auth = Gen3Auth(endpoint=endpoint, refresh_file=credentials_path)
sub = Gen3Submission(endpoint=endpoint, auth_provider=auth)
sub.delete_nodes("program1", project_name, import_order)
if __name__ == "__main__":
# Parsing Args
args = parse_arguments()
####### Deleting current metadata #######
if args.delete_all_metadata:
proceed = input(f"Are you sure you want to delete all existing metadata for the projects: {args.projects}? y/n\n")
if proceed.lower() == "y":
"Ok, now proceeding to delete..."
for project in args.projects:
delete_metadata(project, args.folder, args.api_endpoint, args.credentials)
print("Deletion completed, now exiting.")
sys.exit()
else:
print("ok, now exiting. Please remove --delete_all_metadata flag and rerun script.")
sys.exit()
####### Uploading dummy data files to S3 #######
# Running below code for each project
for project in args.projects:
print(f"Processing project: {project}")
folder = args.folder
# Uploading dummy files if metadata only = False
if not args.metadata_only:
if args.profile and os.path.exists(os.path.join(folder, project, "dummy_files")):
upload_path = os.path.join(folder, project, "dummy_files")
bash_command = f"gen3-client upload --upload-path={upload_path} --profile={args.profile} " \
f"--numparallel={args.numparallel}"
process = subprocess.Popen(bash_command.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
####### Creating gen3 sdk class objects for auth, submission, and indexing #######
script_path = os.path.abspath(os.path.dirname(__file__))
auth = Gen3Auth(endpoint=args.api_endpoint, refresh_file=args.credentials)
sub = Gen3Submission(endpoint=args.api_endpoint, auth_provider=auth)
index = Gen3Index(endpoint=args.api_endpoint, auth_provider=auth)
# Creating the project under program
if not args.add_subjects:
# hard coding program1
sub.create_program({
"dbgap_accession_number": "prg123",
"name": "program1",
"type": "program"
})
proj = json.load(open(os.path.join(folder, project, "edited_jsons", "project.json")))
sub.create_project("program1", proj) #creating new project within project1
###### Adding Index Properties to Metadata Jsons #######
for line in open(os.path.join(folder, project, "DataImportOrder.txt"), "r"):
line = line.strip()
if args.add_subjects:
skip_objects = ["program", "project", "acknowledgement", "publication"]
else:
skip_objects = ["program", "project", "acknowledgement", "publication"]
if line not in skip_objects:
print(f"uploading {line}")
try:
jsn = json.load(open(os.path.join(folder, project, "edited_jsons", f"{line}.json")))
# if you are uploading metdata and dummy files, and the json ends with "file", then try find the file in gen3 S3 and get index properties
if not args.metadata_only:
if line.endswith("file"):
for file_md in jsn:
try:
# Getting index properties of the data file from the gen3 index class
indexed_file = index.get_with_params({"file_name": file_md['file_name']})
# writing index properties to the key values
file_md['object_id'] = indexed_file['did']
file_md['md5sum'] = indexed_file['hashes']['md5']
file_md['file_size'] = indexed_file['size']
except KeyError as e:
print(e)
print(f"{file_md['file_name']} data file not yet uploaded")
except requests.exceptions.HTTPError as e:
print(e)
content = e.response.content
print(f"{file_md['file_name']} data file not yet uploaded")
pass
except TypeError as e:
print(e)
print(f"{file_md['file_name']} data file not yet uploaded")
####### Submitting updated data file index metadata #######
try:
sub.submit_record("program1", project, jsn) # submitting
except requests.exceptions.HTTPError as e:
content = e.response.content
try:
content = json.dumps(json.loads(content), indent=4, sort_keys=True)
except:
pass
raise requests.exceptions.HTTPError(content,response=e.response)
except FileNotFoundError as e:
print(f"{line} json not found, skipping")