-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathImportFile.py
executable file
·412 lines (331 loc) · 18 KB
/
ImportFile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
"""This module handles all operations involving importing reports."""
import shutil
import platform
import ctypes
import csv
from tempfile import TemporaryDirectory
from os import path, makedirs
from PyQt5.QtCore import QDate, Qt
from PyQt5.QtWidgets import QWidget, QDialog, QDialogButtonBox, QLabel
from PyQt5.QtGui import QStandardItemModel, QStandardItem
from PyQt5 import QtWidgets
import GeneralUtils
from Constants import *
from Counter4 import Counter4To5Converter
from ui import ImportReportTab, ReportResultWidget
from ManageVendors import Vendor
from FetchData import CompletionStatus
from Settings import SettingsModel
from ManageDB import UpdateDatabaseWorker
class ProcessResult:
"""This holds the results of an import process
:param vendor: The target vendor
:param report_type: The target report type
"""
def __init__(self, vendor: Vendor, report_type: str):
self.vendor = vendor
self.report_type = report_type
self.completion_status = CompletionStatus.SUCCESSFUL
self.message = ""
self.file_name = ""
self.file_dir = ""
self.file_path = ""
def get_c5_equivalent(counter4_report_type: str) -> str:
return COUNTER_4_REPORT_EQUIVALENTS[counter4_report_type]
class ImportReportController:
"""Controls the Import Report tab
:param vendors: The list of vendors in the system
:param settings: The user's settings
:param import_report_widget: The import report widget.
:param import_report_ui: The UI for the import_report_widget.
"""
def __init__(self, vendors: list, settings: SettingsModel, import_report_widget: QWidget,
import_report_ui: ImportReportTab.Ui_import_report_tab):
# region General
self.import_report_widget = import_report_widget
self.vendors = vendors
self.date = QDate.currentDate()
self.selected_vendor_index = -1
self.selected_c5_report_type_index = -1
self.selected_c4_report_type_index = -1
self.c5_selected_file_path: str = ""
self.c4_selected_file_paths: list = []
self.settings = settings
self.result_dialog = None
# endregion
# region Vendors
self.vendor_combo_box = import_report_ui.vendor_combo_box
self.vendor_list_model = QStandardItemModel(self.vendor_combo_box)
self.vendor_combo_box.setModel(self.vendor_list_model)
self.vendor_combo_box.currentIndexChanged.connect(self.on_vendor_selected)
self.update_vendors_ui()
self.selected_vendor_index = self.vendor_combo_box.currentIndex()
# endregion
# region Counter 5
self.c5_report_type_combo_box = import_report_ui.c5_report_type_combo_box
self.c5_report_type_model = QStandardItemModel(self.c5_report_type_combo_box)
self.c5_report_type_combo_box.setModel(self.c5_report_type_model)
self.c5_report_type_combo_box.currentIndexChanged.connect(self.on_c5_report_type_selected)
for report_type in ALL_REPORTS:
item = QStandardItem(report_type)
item.setEditable(False)
self.c5_report_type_model.appendRow(item)
self.selected_c5_report_type_index = self.c5_report_type_combo_box.currentIndex()
self.c5_select_file_btn = import_report_ui.c5_select_file_button
self.c5_select_file_btn.clicked.connect(self.on_c5_select_file_clicked)
self.c5_selected_file_edit = import_report_ui.c5_selected_file_edit
self.c5_import_report_button = import_report_ui.c5_import_report_button
self.c5_import_report_button.clicked.connect(self.on_c5_import_clicked)
# endregion
# region Counter 4
self.c4_report_type_combo_box = import_report_ui.c4_report_type_combo_box
self.c4_report_type_model = QStandardItemModel(self.c4_report_type_combo_box)
self.c4_report_type_combo_box.setModel(self.c4_report_type_model)
self.c4_report_type_combo_box.currentIndexChanged.connect(self.on_c4_report_type_selected)
self.c4_report_type_equiv_label = import_report_ui.c4_report_type_equiv_label
for report_type in COUNTER_4_REPORT_EQUIVALENTS.keys():
item = QStandardItem(report_type)
item.setEditable(False)
self.c4_report_type_model.appendRow(item)
self.selected_c4_report_type_index = self.c4_report_type_combo_box.currentIndex()
self.c4_report_type_equiv_label.setText(get_c5_equivalent(self.c4_report_type_combo_box.currentText()))
self.c4_select_file_btn = import_report_ui.c4_select_file_button
self.c4_select_file_btn.clicked.connect(self.on_c4_select_file_clicked)
self.c4_selected_files_frame = import_report_ui.c4_selected_files_frame
self.c4_selected_files_frame_layout = self.c4_selected_files_frame.layout()
self.c4_import_report_button = import_report_ui.c4_import_report_button
self.c4_import_report_button.clicked.connect(self.on_c4_import_clicked)
# endregion
# region Date
self.year_date_edit = import_report_ui.report_year_date_edit
self.year_date_edit.setDate(self.date)
self.year_date_edit.dateChanged.connect(self.on_date_changed)
# endregion
def on_vendors_changed(self, vendors: list):
"""Handles the signal emitted when the system's vendor list is updated
:param vendors: An updated list of the system's vendors
"""
self.update_vendors(vendors)
self.update_vendors_ui()
self.selected_vendor_index = self.vendor_combo_box.currentIndex()
def update_vendors(self, vendors: list):
""" Updates the local copy of vendors that support report import
:param vendors: A list of vendors
"""
self.vendors = vendors
def update_vendors_ui(self):
"""Updates the UI to show vendors that support report import"""
self.vendor_list_model.clear()
for vendor in self.vendors:
item = QStandardItem(vendor.name)
item.setEditable(False)
self.vendor_list_model.appendRow(item)
def on_vendor_selected(self, index: int):
"""Handles the signal emitted when a vendor is selected"""
self.selected_vendor_index = index
def on_c5_report_type_selected(self, index: int):
"""Handles the signal emitted when a report type is selected"""
self.selected_c5_report_type_index = index
def on_c4_report_type_selected(self, index: int):
"""Handles the signal emitted when a report type is selected"""
self.selected_c4_report_type_index = index
self.c4_report_type_equiv_label.setText(get_c5_equivalent(self.c4_report_type_combo_box.currentText()))
def on_date_changed(self, date: QDate):
"""Handles the signal emitted when the target date is changed"""
self.date = date
def on_c5_select_file_clicked(self):
"""Handles the signal emitted when the select file button is clicked"""
file_path = GeneralUtils.choose_file(TSV_FILTER + CSV_FILTER)
if file_path:
self.c5_selected_file_path = file_path
file_name = file_path.split("/")[-1]
self.c5_selected_file_edit.setText(file_name)
def on_c4_select_file_clicked(self):
"""Handles the signal emitted when the select file button is clicked"""
file_paths = GeneralUtils.choose_files(TSV_AND_CSV_FILTER)
if file_paths:
self.c4_selected_file_paths = file_paths
file_names = [file_path.split("/")[-1] for file_path in file_paths]
# Remove existing options from ui
for i in reversed(range(self.c4_selected_files_frame_layout.count())):
widget = self.c4_selected_files_frame_layout.itemAt(i).widget()
# remove it from the layout list
self.c4_selected_files_frame_layout.removeWidget(widget)
# remove it from the gui
widget.deleteLater()
# Add new file names
for file_name in file_names:
label = QLabel(file_name)
self.c4_selected_files_frame_layout.addWidget(label)
def on_c5_import_clicked(self):
"""Handles the signal emitted when the import button is clicked"""
if self.selected_vendor_index == -1:
GeneralUtils.show_message("Select a vendor")
return
elif self.selected_c5_report_type_index == -1:
GeneralUtils.show_message("Select a report type")
return
elif self.c5_selected_file_path == "":
GeneralUtils.show_message("Select a file")
return
vendor = self.vendors[self.selected_vendor_index]
report_type = ALL_REPORTS[self.selected_c5_report_type_index]
process_result = self.import_report(vendor, report_type, self.c5_selected_file_path)
self.show_results([process_result])
def on_c4_import_clicked(self):
"""Handles the signal emitted when the import button is clicked"""
if self.selected_vendor_index == -1:
GeneralUtils.show_message("Select a vendor")
return
elif not self.c4_selected_file_paths:
GeneralUtils.show_message("Select a file")
return
vendor = self.vendors[self.selected_vendor_index]
report_types = get_c5_equivalent(self.c4_report_type_combo_box.currentText())
# Check if target C5 file already exists
existing_report_types = []
for report_type in report_types.split(", "):
if self.check_if_c5_report_exists(vendor.name, report_type):
existing_report_types.append(report_type)
# Confirm overwrite
if existing_report_types:
if not GeneralUtils.ask_confirmation(f"COUNTER 5 [{', '.join(existing_report_types)}] already exist in the "
"database for this vendor, do you want to overwrite them?"):
return
with TemporaryDirectory("") as dir_path:
converter = Counter4To5Converter(self.vendors[self.selected_vendor_index],
self.c4_report_type_combo_box.currentText(),
self.c4_selected_file_paths,
dir_path + path.sep,
self.year_date_edit.date())
try:
c5_file_paths = converter.do_conversion()
except Exception as e:
process_result = ProcessResult(vendor, report_types)
process_result.completion_status = CompletionStatus.FAILED
process_result.message = "Error converting file. " + str(e)
self.show_results([process_result])
return
if not c5_file_paths: # If nothing was processed
process_result = ProcessResult(vendor, report_types)
process_result.completion_status = CompletionStatus.FAILED
process_result.message = "No COUNTER 5 report was created, make sure the COUNTER 4 input files are " \
"correct"
self.show_results([process_result])
return
process_results = []
for report_type in c5_file_paths:
file_path = c5_file_paths[report_type]
process_result = self.import_report(vendor, report_type, file_path)
process_results.append(process_result)
self.show_results(process_results)
def import_report(self, vendor: Vendor, report_type: str, origin_file_path: str) -> ProcessResult:
""" Imports the selected file using the selected parameters
:param vendor: The target vendor
:param report_type: The target report type
:param origin_file_path: The path of the file to be imported
:raises Exception: If anything goes wrong while importing the report
"""
process_result = ProcessResult(vendor, report_type)
try:
dest_file_dir = GeneralUtils.get_yearly_file_dir(self.settings.yearly_directory, vendor.name, self.date)
dest_file_name = GeneralUtils.get_yearly_file_name(vendor.name, report_type, self.date)
dest_file_path = f"{dest_file_dir}{dest_file_name}"
# Verify that dest_file_dir exists
if not path.isdir(dest_file_dir):
makedirs(dest_file_dir)
# Validate report header
delimiter = DELIMITERS[origin_file_path[-4:].lower()]
file = open(origin_file_path, 'r', encoding='utf-8-sig')
reader = csv.reader(file, delimiter=delimiter, quotechar='\"')
if file.mode == 'r':
header = {}
for row in range(HEADER_ROWS): # reads header row data
cells = next(reader)
if cells:
key = cells[0].lower()
if key != HEADER_ENTRIES[row]:
raise Exception('File has invalid header (missing row ' + HEADER_ENTRIES[row] + ')')
else:
header[key] = cells[1].strip()
else:
raise Exception('File has invalid header (missing row ' + HEADER_ENTRIES[row] + ')')
for row in range(BLANK_ROWS):
cells = next(reader)
if cells:
if cells[0].strip():
raise Exception('File has invalid header (not enough blank rows)')
if header['report_id'] != report_type:
raise Exception('File has invalid header (wrong Report_Id)')
if not header['created']:
raise Exception('File has invalid header (no Created date)')
else:
raise Exception('Could not open file')
# Copy selected_file_path to dest_file_path
self.copy_file(origin_file_path, dest_file_path)
process_result.file_dir = dest_file_dir
process_result.file_name = dest_file_name
process_result.file_path = dest_file_path
process_result.completion_status = CompletionStatus.SUCCESSFUL
# Save protected tsv file
protected_file_dir = f"{PROTECTED_DATABASE_FILE_DIR}{self.date.toString('yyyy')}/{vendor.name}/"
if not path.isdir(protected_file_dir):
makedirs(protected_file_dir)
if platform.system() == "Windows":
ctypes.windll.kernel32.SetFileAttributesW(PROTECTED_DATABASE_FILE_DIR, 2) # Hide folder
protected_file_path = f"{protected_file_dir}{dest_file_name}"
self.copy_file(origin_file_path, protected_file_path)
# Add file to database
database_worker = UpdateDatabaseWorker([{'file': protected_file_path,
'vendor': vendor.name,
'year': int(self.date.toString('yyyy'))}],
False)
database_worker.work()
except Exception as e:
process_result.message = f"Exception: {e}"
process_result.completion_status = CompletionStatus.FAILED
return process_result
def check_if_c5_report_exists(self, vendor_name, report_type) -> bool:
protected_file_dir = f"{PROTECTED_DATABASE_FILE_DIR}{self.date.toString('yyyy')}/{vendor_name}/"
protected_file_name = GeneralUtils.get_yearly_file_name(vendor_name, report_type, self.date)
protected_file_path = f"{protected_file_dir}{protected_file_name}"
return path.isfile(protected_file_path)
def copy_file(self, origin_path: str, dest_path: str):
"""Copies a file from origin_path to dest_path"""
shutil.copy2(origin_path, dest_path)
def show_results(self, process_results: list):
"""Shows the result of the import process to the user
:param process_results: The results of the import process
"""
self.result_dialog = QDialog(self.import_report_widget, flags=Qt.WindowCloseButtonHint)
self.result_dialog.setWindowTitle("Import Result")
vertical_layout = QtWidgets.QVBoxLayout(self.result_dialog)
vertical_layout.setContentsMargins(5, 5, 5, 5)
for process_result in process_results:
report_result_widget = QWidget(self.result_dialog)
report_result_ui = ReportResultWidget.Ui_ReportResultWidget()
report_result_ui.setupUi(report_result_widget)
vendor = process_result.vendor
report_type = process_result.report_type
report_result_ui.report_type_label.setText(f"{vendor.name} - {report_type}")
report_result_ui.success_label.setText(process_result.completion_status.value)
if process_result.completion_status == CompletionStatus.SUCCESSFUL:
report_result_ui.message_label.hide()
report_result_ui.retry_frame.hide()
report_result_ui.file_label.setText(f"Saved as: {process_result.file_name}")
report_result_ui.file_label.mousePressEvent = \
lambda event, file_path = process_result.file_path: GeneralUtils.open_file_or_dir(file_path)
report_result_ui.folder_button.clicked.connect(
lambda: GeneralUtils.open_file_or_dir(process_result.file_dir))
report_result_ui.success_label.setText("Successful!")
report_result_ui.retry_frame.hide()
elif process_result.completion_status == CompletionStatus.FAILED:
report_result_ui.file_frame.hide()
report_result_ui.retry_frame.hide()
report_result_ui.message_label.setText(process_result.message)
vertical_layout.addWidget(report_result_widget)
button_box = QtWidgets.QDialogButtonBox(QDialogButtonBox.Ok, self.result_dialog)
button_box.setCenterButtons(True)
button_box.accepted.connect(self.result_dialog.accept)
vertical_layout.addWidget(button_box)
self.result_dialog.show()