-
Notifications
You must be signed in to change notification settings - Fork 0
/
overview-upload-csv
executable file
·209 lines (172 loc) · 9.1 KB
/
overview-upload-csv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env python3
import argparse
import csv
import logging
import pathlib
import queue
import requests.exceptions
import sys
import threading
import urllib.error
import urllib.parse
import urllib.request
import overview_upload
def open_local_file(filename, logger, field, line_number):
if not pathlib.Path(filename).is_file():
logger.warn('Skipping CSV line %d: %s is not a file', line_number, filename)
return
with open(filename, 'rb') as f:
yield f
def open_url(url, logger, field, line_number):
error_prefix = 'Skipping CSV line {}: '.format(line_number)
url = url.strip()
if not url:
logger.warn('%s: the %s field is empty', error_prefix, field)
try:
parts = urllib.parse.urlparse(url)
if not parts.scheme:
logger.warn('%s: "%s" does not start with http: or https:', error_prefix, url)
return
elif not parts.netloc:
logger.warn('%s: "%s" does not include a network location', error_prefix, url)
return
except ValueError as err:
logger.warn('%s: %s is not a valid URL', error_prefix, url)
return
try:
with urllib.request.urlopen(url) as f:
yield f
except urllib.error.URLError as err:
logger.warn('Error in CSV on line %d: cannot open %s: %s', line_number, url, str(err))
def build_upload_call(upload, logger, field, title_field, open_file):
def process_row(row, line_number):
path = row[field]
if not path:
logger.warn('Skipping CSV line %d: missing value for %s', line_number, field)
return
title = row[title_field]
if not title:
logger.warn('Skipping CSV line %d: missing value for %s', line_number, field)
return
for in_file in open_file(path, logger, field, line_number):
try:
upload.send_file_if_conditions_met(in_file, title, metadata=row)
except requests.exceptions.RequestException as err:
logger.warn('Failed upload: %s', str(err))
return process_row
def process_row_queue(row_queue, process_row):
"""Calls process_row(*row_queue.get()) forever.
Set this to be a daemon thread, so that the program will exit. (Python
concurrency is silly; this is a quick hack that works because we know we'll
be exiting the program after these threads are all done.)
"""
while True:
row = row_queue.get()
process_row(*row)
row_queue.task_done()
def main():
parser = argparse.ArgumentParser(description='Upload to Overview from a spreadsheet full of metadata')
parser.add_argument('api_token', help='API token from http://localhost:9000/documentsets/[ID]/api-tokens')
parser.add_argument('csv', help='path to CSV listing files to upload')
parser.add_argument('--server', help='URL of Overview server, defaults to http://localhost:9000', default='http://localhost:9000')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--url-field', help='CSV column containing URLs of files to stream to Overview')
group.add_argument('--local-file-field', help='CSV column containing filenames to stream to Overview')
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--ocr', dest='ocr', help='Run OCR on PDF pages that are only images', action="store_true")
group.add_argument('--no-ocr', dest='ocr', help='Skip OCR always (for speed)', action="store_false")
parser.add_argument('--split-by-page', dest='split_by_page', help='Turn input files into one document per page', action="store_true", default=False)
parser.add_argument('--lang', dest='lang', default='en', help='2-char ISO document language code (used for OCR and text analysis)')
parser.add_argument('--n-concurrent-uploads', type=int, default=1, help='Number of simultaneous uploads: useful when --url-field gives slow-but-plentiful connections, like S3')
parser.add_argument('--title-field', help='CSV column containing titles to display in Overview (default url/local-file)')
parser.add_argument('--create-document-set-with-title', dest='create_with_title', help='Create a new document set and then add files')
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--metadata-schema-json-file', help='JSON file containing desired document set metadata schema')
group.add_argument('--metadata-schema-json-string', help='JSON data containing desired document set metadata schema')
group.add_argument('--metadata-schema-field-names', help='List of comma-separated metadata field names for desired document set')
parser.set_defaults(ocr=True)
args = parser.parse_args()
logger = logging.getLogger('overview-upload-csv')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
metadata_schema = None # don't alter document set's existing schema by default
if args.metadata_schema_json_file is not None:
metadata_schema = overview_upload.read_metadata_json_file(args.metadata_schema_json_file, logger)
elif args.metadata_schema_json_string is not None:
metadata_schema = overview_upload.parse_metadata_json(args.metadata_schema_json_string, logger)
elif args.metadata_schema_field_names is not None:
metadata_schema = overview_upload.parse_metadata_from_delimited_string_of_fields(args.metadata_schema_field_names)
with open(args.csv, encoding='utf-8') as f:
reader = csv.DictReader(f)
field = None
open_file = None
if args.url_field is not None:
if args.url_field not in reader.fieldnames:
logger.error('You requested url-field `%s`, but the CSV does not contain it', args.url_field)
sys.exit(1)
field = args.url_field
open_file = open_url
else:
if args.local_file_field not in reader.fieldnames:
logger.error('You requested local-file-field `%s`, but the CSV does not contain it', args.local_file_field)
sys.exit(1)
field = args.local_file_field
open_file = open_local_file
if args.title_field and args.title_field not in reader.fieldnames:
logger.error('You requested title-field `%s`, but the CSV does not contain it', args.title_field)
sys.exit(1)
if args.create_with_title:
if metadata_schema is None:
metadata_schema = overview_upload.DefaultMetadataSchema
response = overview_upload.create_document_set(args.server, args.api_token, args.create_with_title, metadata_schema=metadata_schema, logger=logger)
logger.info('Created document set "%s" with ID %d', args.create_with_title, response['documentSet']['id'])
api_token = response['apiToken']['token']
else:
if metadata_schema is not None:
raise 'TODO alter the metadata schema of an existing document set'
api_token = args.api_token
upload = overview_upload.Upload(args.server, api_token, logger=logger)
process_row = build_upload_call(upload, logger, field, args.title_field or field, open_file)
# Multi-threading:
#
# Goal: call process_row(row, csvLineNumber) for every row in the CSV,
# in any order, and then exit.
#
# Implementation:
#
# * Producer: the main thread keeps a backlog of rows in row_queue.
# (row_queue.put() blocks when the queue is too large, which means we
# support an unlimited number of CSV rows.)
# * Consumers: each worker thread (RowQueueProcessor) reads from the
# queue and executes process_row() with the values it gets.
# * Completion: Python weirdness: we call queue.join() after we're done
# producing; it returns when all threads signal that they've completed
# all their tasks (by calling row_queue.task_done()). The threads
# never die, and [adam, 2017-08-01] I can't see that Python actually
# provides a mechanism for killing them. But they're daemon threads,
# and the main process exits after we're done, so they'll exit as
# well.
#
# I marvel that the Python Standard Library provides this queue module
# without an obvious mechanism for killing it.
row_queue = queue.Queue(args.n_concurrent_uploads * 2)
# Start consumers
row_processors = []
for i in range(args.n_concurrent_uploads):
row_processor = threading.Thread(target=process_row_queue, args=(row_queue, process_row))
row_processor.daemon = True
row_processor.start()
row_processors.append(row_processor)
# Produce all the tasks
for (i, row) in enumerate(reader):
row_queue.put((row, i + 1))
# Wait until the consumers have processed all the tasks
row_queue.join()
# POST to finish creating the document set
upload.finish(
ocr=args.ocr,
split_by_page=args.split_by_page,
lang=args.lang
)
if __name__ == '__main__':
main()