-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathG2Loader.py
executable file
·1943 lines (1466 loc) · 82.7 KB
/
G2Loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python3
import argparse
import importlib
import json
import math
import os
import pathlib
import select
import signal
import subprocess
import sys
import tempfile
import textwrap
import threading
import time
from contextlib import suppress
from datetime import datetime
from glob import glob
from multiprocessing import Process, Queue, Value, Manager
from queue import Empty, Full
import DumpStack
import G2Paths
from CompressedFile import fileRowParser, isCompressedFile, openPossiblyCompressedFile
from G2ConfigTables import G2ConfigTables
from G2IniParams import G2IniParams
from G2Project import G2Project
from senzing import G2Config, G2ConfigMgr, G2Diagnostic, G2Engine, G2Exception, G2Product, \
G2LicenseException, G2NotFoundException
__all__ = []
__version__ = '2.2.6' # See https://www.python.org/dev/peps/pep-0396/
__date__ = '2018-09-18'
__updated__ = '2023-12-14'
# -----------------------------------------------------------------------------
# Exceptions
# -----------------------------------------------------------------------------
class UnconfiguredDataSourceException(Exception):
def __init__(self, data_source_name):
super().__init__(self, f"Datasource {data_source_name} not configured. See https://senzing.zendesk.com/hc/en-us/articles/360010784333 on how to configure data sources in the config file.")
# -----------------------------------------------------------------------------
# Class: Governor
#
# Dummy class for when no governor is imported
# -----------------------------------------------------------------------------
class Governor:
def __init__(self, *args, **kwargs):
return
def govern(self, *args, **kwargs):
""" Main function to trigger action(s) """
return
# ---------------------------------------------------------------------
# G2Loader
# ---------------------------------------------------------------------
def check_resources_and_startup(return_queue, thread_count, do_purge):
""" Check system resources, calculate a safe number of threads when argument not specified on command line """
try:
diag = G2Diagnostic()
diag.init('pyG2Diagnostic', g2module_params, cli_args.debugTrace)
except G2Exception as ex:
print('\nERROR: Could not start G2Diagnostic for check_resources_and_startup()')
print(f' {ex}')
return_queue.put(-1)
return
try:
g2_engine = init_engine('pyG2StartSetup', g2module_params, cli_args.debugTrace, prime_engine=False)
except G2Exception as ex:
print('ERROR: Could not start the G2 engine for check_resources_and_startup()')
print(f' {ex}')
return_queue.put(-1)
return
try:
g2_configmgr = G2ConfigMgr()
g2_configmgr.init('pyG2ConfigMgr', g2module_params, cli_args.debugTrace)
except G2Exception as ex:
print('ERROR: Could not start G2ConfigMgr for check_resources_and_startup()')
print(f' {ex}')
return_queue.put(-1)
return
try:
g2_product = G2Product()
g2_product.init('pyG2LicenseVersion', g2module_params, cli_args.debugTrace)
except G2Exception as ex:
print('ERROR: Could not start G2Product for check_resources_and_startup()')
print(f' {ex}\n')
return_queue.put(-1)
return
lic_info = json.loads(g2_product.license())
ver_info = json.loads(g2_product.version())
# Get the configuration list
try:
response = bytearray()
g2_configmgr.getConfigList(response)
config_list = json.loads(response.decode())
except G2Exception as ex:
print('ERROR: Could not get config list in check_resources_and_startup()')
print(f' {ex}')
return_queue.put(-1)
return
# Get the active config ID
try:
response = bytearray()
g2_engine.getActiveConfigID(response)
active_cfg_id = int(response.decode())
except G2Exception as ex:
print('ERROR: Could not get the active config in check_resources_and_startup()')
print(f' {ex}')
return_queue.put(-1)
return
# Get details for the currently active ID
active_cfg_details = [details for details in config_list['CONFIGS'] if details['CONFIG_ID'] == active_cfg_id]
config_comments = active_cfg_details[0]['CONFIG_COMMENTS']
config_created = active_cfg_details[0]['SYS_CREATE_DT']
# Get database info
# {'Hybrid Mode': False, 'Database Details': [{'Name': '/home/ant/senzprojs/3.1.2.22182/var/sqlite/G2C.db', 'Type': 'sqlite3'}]}
# {'Hybrid Mode': True, 'Database Details': [{'Name': '/home/ant/senzprojs/3.1.2.22182/var/sqlite/G2C.db', 'Type': 'sqlite3'}, {'Name': '/home/ant/senzprojs/3.1.2.22182/var/sqlite/G2C_LIB.db', 'Type': 'sqlite3'}, {'Name': '/home/ant/senzprojs/3.1.2.22182/var/sqlite/G2C_RES.db', 'Type': 'sqlite3'}]}
# {'Hybrid Mode': False, 'Database Details': [{'Name': 'localhost', 'Type': 'postgresql'}]}
# {'Hybrid Mode': True, 'Database Details': [{'Name': 'localhost', 'Type': 'postgresql'}]}
try:
response = bytearray()
diag.getDBInfo(response)
db_info = json.loads(response.decode())
except G2Exception as ex:
print('ERROR: Could not get the DB info in check_resources_and_startup()')
print(f' {ex}')
return_queue.put(-1)
return
print(textwrap.dedent(f'''\n\
Version & Config Details
------------------------
Senzing Version: {ver_info["VERSION"] + " (" + ver_info["BUILD_DATE"] + ")" if "VERSION" in ver_info else ""}
Configuration Parameters: {ini_file_name if ini_file_name else 'Read from SENZING_ENGINE_CONFIGURATION_JSON env var'}
Instance Config ID: {active_cfg_id}
Instance Config Comments: {config_comments}
Instance Config Created: {config_created}
Hybrid Database: {'Yes' if db_info['Hybrid Mode'] else 'No'}
Database(s): '''), end='')
for idx, db in enumerate(db_info['Database Details']):
print(f'{" " * 32 if idx > 0 else ""}{db["Type"]} - {db["Name"]}')
ini_db_types.append(db['Type'].upper())
if len(set(ini_db_types)) != 1:
print('\nERROR: No database detected in init parms or mixed databases in a hybrid setup!')
print(f' {ini_db_types}')
return_queue.put(-1)
return
print(textwrap.dedent(f'''\n\
License Details
---------------
Customer: {lic_info["customer"]}
Type: {lic_info["licenseType"]}
Records: {lic_info["recordLimit"]}
Expiration: {lic_info["expireDate"]}
Contract: {lic_info["contract"]}
'''))
physical_cores = diag.getPhysicalCores()
logical_cores = diag.getLogicalCores()
available_mem = diag.getAvailableMemory() / 1024 / 1024 / 1024.0
total_mem = diag.getTotalSystemMemory() / 1024 / 1024 / 1024.0
pause_msg = 'WARNING: Pausing for warning message(s)...'
db_tune_article = 'https://senzing.zendesk.com/hc/en-us/articles/360016288254-Tuning-Your-Database'
critical_error = warning_issued = False
max_time_per_insert = 0.5
#Limit the number of threads for sqlite, doesn't benefit from more and can slow down (6 is approx)
max_sqlite_threads = 6
sqlite_limit_msg = ''
sqlite_warned = False
# Obtain the default value to use for the max amount of memory to use
calc_max_avail_mem = vars(g2load_parser)["_option_string_actions"]["-ntm"].const
# Allow for higher factor when logical cores are available
calc_cores_factor = 2 if physical_cores != logical_cores else 1.2
print(textwrap.dedent(f'''\n\
System Resources
----------------
Physical cores: {physical_cores}
Logical cores: {logical_cores if physical_cores != logical_cores else ''}
Total memory (GB): {total_mem:.1f}
Available memory (GB): {available_mem:.1f}
'''))
if not cli_args.thread_count:
# Allow for 1 GB / thread
thread_calc_from_mem = math.ceil(available_mem / 100 * calc_max_avail_mem)
possible_num_threads = math.ceil(physical_cores * calc_cores_factor)
# Are the number of safe calculated threads <= 80% of available mem
if possible_num_threads <= thread_calc_from_mem:
thread_count = possible_num_threads
calc_thread_msg = '- Using maximum calculated number of threads. This can likely be increased if there is no database running locally.'
# Else if the thread_calc_from_mem (num of threads of 80% mem) is greater than the number of physical cores use that many threads
elif thread_calc_from_mem >= physical_cores:
thread_count = thread_calc_from_mem
calc_thread_msg = '- Additional processing capability is available, but not enough memory to safely support a higher thread count.'
# Low available memory compared to physical cores x factor, set to use half safe calculated memory value
else:
thread_count = math.ceil(thread_calc_from_mem / 2)
calc_thread_msg = f'- WARNING: System has less than 1 GB {(thread_calc_from_mem / physical_cores):.2f} GB available per physical core.\n \
Number of threads will be significantly reduced, you may see further warnings and should check your resources.'
# If a value was specified for -ntm override the thread count with the min of calc value or possible_num_threads
thread_count = min(math.ceil(available_mem / 100 * cli_args.threadCountMem), possible_num_threads) if cli_args.threadCountMem else thread_count
mem_percent = calc_max_avail_mem if not cli_args.threadCountMem else cli_args.threadCountMem
mem_msg = 'available' if not cli_args.threadCountMem else 'requested (-ntm)'
calc_thread_msg = calc_thread_msg if not cli_args.threadCountMem else ''
# Don't reformat and move end ''')), it's neater to do this than try and move cursor for next optional calc_thread_msg
print(textwrap.dedent(f'''
Number of threads
-----------------
- Number of threads arg (-nt) not specified. Calculating number of threads using {mem_percent}% of {mem_msg} memory.
- Monitor system resources. Use command line argument -nt to increase (or decrease) the number of threads on subsequent use.'''))
# Add extra message if args.threadCountMem wasn't specified
if calc_thread_msg:
print(f' {calc_thread_msg}')
print()
# Limit number of threads when sqlite, unless -nt arg specified
if 'SQLITE3' in ini_db_types:
if cli_args.thread_count:
thread_count = cli_args.thread_count
if thread_count > max_sqlite_threads:
sqlite_limit_msg = f' - WARNING: Greater than {max_sqlite_threads} could be slower when using SQLite'
sqlite_warned = True
else:
thread_count = min(thread_count, max_sqlite_threads)
sqlite_limit_msg = f' - Default is {max_sqlite_threads} when using SQLite, test higher/lower with -nt argument'
# 2.5GB per process - .5GB per thread
min_recommend_cores = math.ceil(thread_count / 4 + 1)
num_processes = math.ceil(float(thread_count) / cli_args.max_threads_per_process)
min_recommend_mem = (num_processes * 2.5 + thread_count * .5)
print(textwrap.dedent(f'''\n\
Resources Requested
-------------------
Number of threads: {thread_count} {sqlite_limit_msg}
Threads calculated: {'Yes' if not cli_args.thread_count else 'No, -nt argument was specified'}
Threads per process: {cli_args.max_threads_per_process}
Number of processes: {num_processes}
Min recommended cores: {min_recommend_cores}
Min recommended memory (GB): {min_recommend_mem:.1f}
'''))
if sqlite_warned:
print(pause_msg, flush=True)
time.sleep(10)
# Skip perf check if specified on CLI args or container env var
if not cli_args.skipDBPerf and not env_var_skip_dbperf:
print(textwrap.dedent('''\n\
Database Performance
--------------------
'''))
db_perf_response = bytearray()
diag.checkDBPerf(3, db_perf_response)
perf_info = json.loads(db_perf_response.decode())
num_recs_inserted = perf_info.get('numRecordsInserted', None)
if num_recs_inserted:
insert_time = perf_info['insertTime']
time_per_insert = (1.0 * insert_time / num_recs_inserted) if num_recs_inserted > 0 else 999
print(textwrap.indent(textwrap.dedent(f'''\
Records inserted: {num_recs_inserted:,}
Period for inserts: {insert_time} ms
Average per insert: {time_per_insert:.1f} ms
'''), ' '))
else:
print('\nERROR: Database performance tests failed!\n')
if time_per_insert > max_time_per_insert:
warning_issued = True
print(textwrap.dedent(f'''\
WARNING: Database performance of {time_per_insert:.1f} ms per insert is slower than the recommended minimum of {max_time_per_insert:.1f} ms per insert
For database tuning please refer to: {db_tune_article}
'''))
if physical_cores < min_recommend_cores:
warning_issued = True
print(f'WARNING: System has fewer ({physical_cores}) than the minimum recommended cores ({min_recommend_cores}) for this configuration\n')
if min_recommend_mem > available_mem:
critical_error = True
print(f'!!!!! CRITICAL WARNING: SYSTEM HAS LESS MEMORY AVAILABLE ({available_mem:.1f} GB) THAN THE MINIMUM RECOMMENDED ({min_recommend_mem:.1f} GB) !!!!!\n')
if critical_error or warning_issued:
print(pause_msg, flush=True)
time.sleep(10 if critical_error else 3)
# Purge repository
if do_purge:
print('\nPurging Senzing database...')
g2_engine.purgeRepository(False)
# Clean up (in reverse order of initialization)
g2_product.destroy()
del g2_product
g2_configmgr.destroy()
del g2_configmgr
g2_engine.destroy()
del g2_engine
diag.destroy()
del diag
# Return values are put in a queue
return_queue.put(thread_count)
def perform_load():
""" Main processing when not in redo only mode """
exit_code = 0
DumpStack.listen()
proc_start_time = time.time()
# Prepare the G2 configuration
g2_config_json = bytearray()
temp_queue = Queue()
get_initial_g2_config_process = Process(target=get_initial_g2_config_process_wrapper, args=(temp_queue, g2module_params, g2_config_json))
get_initial_g2_config_process.start()
g2_config_json = temp_queue.get(block=True)
result_of_get_initial_g2_config = temp_queue.get(block=True)
get_initial_g2_config_process.join()
if not result_of_get_initial_g2_config:
return 1, 0
g2_config_tables = G2ConfigTables(g2_config_json)
g2_project = G2Project(g2_config_tables, dsrcAction, cli_args.projectFileName, cli_args.projectFileSpec, cli_args.tmpPath)
if not g2_project.success:
return 1, 0
# Enhance the G2 configuration, by adding data sources and entity types
if not cli_args.testMode:
temp_queue = Queue()
enhance_g2_config_process = Process(target=enhance_g2_config_process_wrapper, args=(temp_queue, g2_project, g2module_params, g2_config_json, cli_args.configuredDatasourcesOnly))
enhance_g2_config_process.start()
g2_config_json = temp_queue.get(block=True)
result_of_enhance_g2_config = temp_queue.get(block=True)
enhance_g2_config_process.join()
if not result_of_enhance_g2_config:
return 1, 0
# Start loading
for sourceDict in g2_project.sourceList:
file_path = sourceDict['FILE_PATH']
orig_file_path = file_path
shuf_detected = False
cnt_rows = cnt_bad_parse = cnt_bad_umf = cnt_good_umf = api_errors.value = 0
dsrc_action_add_count.value = dsrc_action_del_count.value = dsrc_action_reeval_count.value = 0
g2_project.clearStatPack()
if cli_args.testMode:
print(f'\nTesting {file_path}, CTRL-C to end test at any time...\n')
else:
if dsrcAction == 'D':
print(f'\n{"-"*30} Deleting {"-"*30}\n')
elif dsrcAction == 'X':
print(f'\n{"-"*30} Reevaluating {"-"*30}\n')
else:
print(f'\n{"-"*30} Loading {"-"*30}\n')
# Drop to a single thread for files under 500k
if os.path.getsize(file_path) < (100000 if isCompressedFile(file_path) else 500000):
print(' Dropping to single thread due to small file size')
transport_thread_count = 1
else:
transport_thread_count = default_thread_count
# Shuffle the source file for performance, unless directed not to or in test mode or single threaded
if not cli_args.noShuffle and not cli_args.testMode and transport_thread_count > 1:
if isCompressedFile(file_path):
print('INFO: Not shuffling compressed file. Please ensure the data was shuffled before compressing!\n')
# If it looks like source file was previously shuffled by G2Loader don't do it again
elif SHUF_NO_DEL_TAG in file_path or SHUF_TAG in file_path:
shuf_detected = True
print(f'INFO: Not shuffling source file, previously shuffled. {SHUF_TAG} or {SHUF_NO_DEL_TAG} in file name\n')
if SHUF_NO_DEL_TAG in file_path and cli_args.shuffleNoDelete:
print(f'INFO: Source files with {SHUF_NO_DEL_TAG} in the name are not deleted by G2Loader. Argument -snd (--shuffleNoDelete) used\n')
time.sleep(10)
else:
# Add timestamp to no delete shuffled files
shuf_file_suffix = SHUF_NO_DEL_TAG + datetime.now().strftime("%Y%m%d_%H-%M-%S") if cli_args.shuffleNoDelete else SHUF_TAG
plib_file_path = pathlib.Path(file_path).resolve()
shuf_file_path = pathlib.Path(str(plib_file_path) + shuf_file_suffix)
# Look for previously shuffled files in original path...
if not cli_args.shuffFilesIgnore:
prior_shuf_files = [str(pathlib.Path(p).resolve()) for p in glob(file_path + SHUF_TAG_GLOB)]
# ...and shuffle redirect path if specified
if cli_args.shuffFileRedirect:
redirect_glob = str(shuf_path_redirect.joinpath(plib_file_path.name)) + SHUF_TAG_GLOB
prior_shuf_files.extend([str(pathlib.Path(p).resolve()) for p in glob(redirect_glob)])
if prior_shuf_files:
print(f'\nFound previously shuffled files matching {plib_file_path.name}...\n')
prior_shuf_files.sort()
for psf in prior_shuf_files:
print(f' {psf}')
print(textwrap.dedent(f'''
Pausing for {SHUF_RESPONSE_TIMEOUT} seconds... (This check can be skipped with the command line argument --shuffFilesIgnore (-sfi) )
The above listed files may not contain the same data as the input file.
If you wish to use one of the above, please check and compare the files to ensure the previously shuffled file is what you expect.
To quit and use a previously shuffled file hit <Enter>. To continue, wait {SHUF_RESPONSE_TIMEOUT} seconds or type c and <Enter>
'''))
# Wait 30 seconds to allow user to use a prior shuffle, timeout and continue if automated
while True:
r, _, _ = select.select([sys.stdin], [], [], SHUF_RESPONSE_TIMEOUT)
if r:
# Read without hitting enter?
read_input = sys.stdin.readline()
if read_input == '\n':
sys.exit(0)
elif read_input.lower() == 'c\n':
break
else:
print('<Enter> to quit or type c and <Enter> to continue: ')
else:
break
else:
print(f'INFO: Skipping check for previously shuffled files for {plib_file_path.name}')
# If redirecting the shuffled file modify to redirect path
if cli_args.shuffFileRedirect:
shuf_file_path = shuf_path_redirect.joinpath(shuf_file_path.name)
print(f'\nShuffling file to: {shuf_file_path}\n')
cmd = f'shuf {file_path} > {shuf_file_path}'
if sourceDict['FILE_FORMAT'] not in ('JSON', 'UMF'):
cmd = f'head -n1 {file_path} > {shuf_file_path} && tail -n+2 {file_path} | shuf >> {shuf_file_path}'
try:
process = subprocess.run(cmd, shell=True, check=True)
except subprocess.CalledProcessError as ex:
print(f'\nERROR: Shuffle command failed: {ex}')
return 1, 0
file_path = str(shuf_file_path)
file_reader = openPossiblyCompressedFile(file_path, 'r')
# --file_reader = safe_csv_reader(csv.reader(csvFile, fileFormat), cnt_bad_parse)
# Use previously stored header row, so get rid of this one
if sourceDict['FILE_FORMAT'] not in ('JSON', 'UMF'):
next(file_reader)
# Start processes and threads for this file
thread_list, work_queue = start_loader_process_and_threads(transport_thread_count)
if thread_stop.value != 0:
return exit_code, 0
# Start processing rows from source file
file_start_time = time.time()
batch_start_time = time.perf_counter()
cnt_rows = batch_time_governing = time_redo.value = time_governing.value = dsrc_action_diff.value = 0
while True:
try:
row = next(file_reader)
except StopIteration:
break
except Exception as ex:
cnt_rows += 1
cnt_bad_parse += 1
print(f'WARNING: Could not read row {cnt_rows}, {ex}')
continue
# Increment row count to agree with line count and references to bad rows are correct
cnt_rows += 1
# Skip records
if not cli_args.redoMode and cli_args.skipRecords and cnt_rows < cli_args.skipRecords + 1:
if cnt_rows == 1:
print(f'INFO: Skipping the first {cli_args.skipRecords} records...')
continue
# Skip blank or records that error, errors written to errors file if not disabled
row_data = fileRowParser(row, sourceDict, cnt_rows, errors_file=errors_file, errors_short=cli_args.errorsShort, errors_disable=cli_args.errorsFileDisable)
if not row_data:
cnt_bad_parse += 1
continue
# Don't do any transformation if this is raw UMF
ok_to_continue = True
if sourceDict['FILE_FORMAT'] != 'UMF':
# Update with file defaults
if 'DATA_SOURCE' not in row_data and 'DATA_SOURCE' in sourceDict:
row_data['DATA_SOURCE'] = sourceDict['DATA_SOURCE']
if cli_args.testMode:
mapping_response = g2_project.testJsonRecord(row_data, cnt_rows, sourceDict)
if mapping_response[0]:
cnt_bad_umf += 1
ok_to_continue = False
# --only add force a load_id if not in test mode (why do we do this??)
if 'LOAD_ID' not in row_data:
row_data['LOAD_ID'] = sourceDict['FILE_NAME']
# Put the record on the queue
if ok_to_continue:
cnt_good_umf += 1
if not cli_args.testMode:
while True:
try:
# Assist in indicating what type of record this is for processing thread
# Detect and set here if dsrc action was set as reeval on args
work_queue.put((row_data, True if dsrcAction == 'X' else False), True, 1)
except Full:
# Check to see if any threads have died
if not all((thread.is_alive() for thread in thread_list)):
print(textwrap.dedent('''\n\
ERROR: Thread(s) have shutdown unexpectedly!
- This typically happens when memory resources are exhausted and the system randomly kills processes.
- Please review: https://senzing.zendesk.com/hc/en-us/articles/115000856453
- Check output from the following command for out of memory messages.
- dmesg -e
'''))
return 1, cnt_bad_parse
continue
break
if cnt_rows % cli_args.loadOutputFrequency == 0:
batch_speed = int(cli_args.loadOutputFrequency / (time.perf_counter() - (batch_start_time - batch_time_governing))) if time.perf_counter() - batch_start_time != 0 else 1
print(f' {cnt_rows:,} rows processed at {time_now()}, {batch_speed:,} records per second{f", {api_errors.value:,} API errors" if api_errors.value > 0 else ""}', flush=True)
batch_start_time = time.perf_counter()
batch_time_governing = 0
# Process redo during ingestion
if cnt_rows % cli_args.redoInterruptFrequency == 0 and not cli_args.testMode and not cli_args.noRedo:
if process_redo(work_queue, True, 'Waiting for processing queue to empty to start redo...'):
print('\nERROR: Could not process redo record!\n')
# Check to see if any threads threw errors or control-c pressed and shut down
if thread_stop.value != 0:
exit_code = thread_stop.value
break
# Check if any of the threads died without throwing errors
if not all((thread.is_alive() for thread in thread_list)):
print('\nERROR: Thread failure!')
break
# Governor called for each record
# Called here instead of when reading from queue to allow queue to act as a small buffer
try:
rec_gov_start = time.perf_counter()
record_governor.govern()
rec_gov_stop = time.perf_counter()
with time_governing.get_lock():
time_governing.value += (rec_gov_stop - rec_gov_start)
batch_time_governing += rec_gov_stop - rec_gov_start
except Exception as err:
shutdown(f'\nERROR: Calling per record governor: {err}')
# Break this file if stop on record value
if not cli_args.redoMode and cli_args.stopOnRecord and cnt_rows >= cli_args.stopOnRecord:
print(f'\nINFO: Stopping at record {cnt_rows}, --stopOnRecord (-sr) argument was set')
break
# Process redo at end of processing a source. Wait for queue to empty of ingest records first
if thread_stop.value == 0 and not cli_args.testMode and not cli_args.noRedo:
if process_redo(work_queue, True, 'Source file processed, waiting for processing queue to empty to start redo...'):
print('\nERROR: Could not process redo record!\n')
end_time = time.time()
end_time_str = time_now(True)
# Close input file
file_reader.close()
if sourceDict['FILE_SOURCE'] == 'S3':
print(" Removing temporary file created by S3 download " + file_path)
os.remove(file_path)
# Remove shuffled file unless run with -snd or prior shuffle detected and not small file/low thread count
if not cli_args.shuffleNoDelete \
and not shuf_detected \
and not cli_args.noShuffle \
and not cli_args.testMode \
and transport_thread_count > 1:
with suppress(Exception):
print(f'\nDeleting shuffled file: {shuf_file_path}')
os.remove(shuf_file_path)
# Stop processes and threads
stop_loader_process_and_threads(thread_list, work_queue)
# Print load stats if not error or ctrl-c
if exit_code in (0, 9):
processing_secs = end_time - file_start_time
elapsed_secs = time.time() - file_start_time
elapsed_mins = round(elapsed_secs / 60, 1)
# Calculate approximate transactions/sec, remove timings that aren't part of ingest
file_tps = int((cnt_good_umf + cnt_bad_parse + cnt_bad_umf) / (processing_secs - time_governing.value - time_starting_engines.value - time_redo.value)) if processing_secs > 0 else 0
# Use good records count instead of 0 on small fast files
file_tps = file_tps if file_tps > 0 else cnt_good_umf
if shuf_detected:
shuf_msg = 'Shuffling skipped, file was previously shuffled by G2Loader'
elif transport_thread_count > 1:
if cli_args.noShuffle:
shuf_msg = 'Not shuffled (-ns was specified)'
else:
shuf_msg = shuf_file_path if cli_args.shuffleNoDelete and 'shuf_file_path' in locals() else 'Shuffled file deleted (-snd to keep after load)'
else:
shuf_msg = 'File wasn\'t shuffled, small size or number of threads was 1'
# Format with separator if specified
skip_records = f'{cli_args.skipRecords:,}' if cli_args.skipRecords and cli_args.skipRecords != 0 else ''
stop_on_record = f'{cli_args.stopOnRecord:,}' if cli_args.stopOnRecord and cli_args.stopOnRecord != 0 else ''
# Set error log file to blank or disabled msg if no errors or arg disabled it
errors_log_file = errors_file.name if errors_file else ''
if not api_errors.value and not cnt_bad_parse:
errors_log_file = 'No errors'
if cli_args.errorsFileDisable:
errors_log_file = 'Disabled with -ed'
rec_dsrc_action = dsrc_action_add_count.value + dsrc_action_del_count.value + dsrc_action_reeval_count.value if dsrc_action_diff.value else 0
if dsrcAction != 'A':
rec_dsrc_action = f'Not considered, explicit mode specified ({dsrc_action_names[dsrcAction]})'
rec_dsrc_action = rec_dsrc_action if isinstance(rec_dsrc_action, str) else f'{rec_dsrc_action:,}'
print(textwrap.dedent(f'''\n\
Processing Information
----------------------
Arguments: {" ".join(sys.argv[1:])}
Action: {dsrc_action_names[dsrcAction]}
Repository purged: {'Yes' if (cli_args.purgeFirst or cli_args.forcePurge) else 'No'}
Source File: {pathlib.Path(orig_file_path).resolve()}
Shuffled into: {shuf_msg}
Total records: {cnt_good_umf + cnt_bad_parse + cnt_bad_umf:,}
\tGood records: {cnt_good_umf:,}
\tBad records: {cnt_bad_parse:,}
\tIncomplete records: {cnt_bad_umf:,}
Records specifying action: {rec_dsrc_action}
\tAdds: {dsrc_action_add_count.value:,}
\tDeletes: {dsrc_action_del_count.value:,}
\tReeval: {dsrc_action_reeval_count.value:,}
Errors:
\tErrors log file: {errors_log_file}
\tFailed API calls: {api_errors.value:,}
Skipped records: {skip_records + ' (-skr was specified)' if cli_args.skipRecords else 'Not requested'}
Stop on record: {stop_on_record + ' (-sr was specified)' if cli_args.stopOnRecord else 'Not requested'}
Total elapsed time: {elapsed_mins} mins
\tStart time: {datetime.fromtimestamp(file_start_time).strftime('%I:%M:%S%p').lower()}
\tEnd time: {end_time_str}
\tTime processing redo: {str(round(time_redo.value / 60, 1)) + ' mins' if not cli_args.testMode and not cli_args.noRedo else 'Redo disabled (-n)'}
\tTime paused in governor(s): {round(time_governing.value / 60, 1)} mins
Records per second: {file_tps:,}
'''))
# Don't process next source file if errors
if exit_code:
break
elapsed_mins = round((time.time() - proc_start_time) / 60, 1)
if exit_code:
print(f'\nProcess aborted at {time_now()} after {elapsed_mins} minutes')
else:
print(f'\nProcess completed successfully at {time_now()} in {elapsed_mins} minutes')
if cli_args.noRedo:
print(textwrap.dedent(f'''\n
Process Redo Records
--------------------
Loading is complete but the processing of redo records was disabled with --noRedo (-n).
All source records have been entity resolved, there may be minor clean up of some entities
required. Please review: https://senzing.zendesk.com/hc/en-us/articles/360007475133
If redo records are not being processed separately by another G2Loader instance in redo only
mode - or another process you've built to manage redo records - run G2Loader again in redo
only mode:
./G2Loader.py --redoMode {"--iniFile " + str(ini_file_name) if ini_file_name else ""}
'''))
if cli_args.testMode:
report = g2_project.getTestResults('F')
report += '\nPress (s)ave to file or (q)uit when reviewing is complete\n'
less = subprocess.Popen(["less", "-FMXSR"], stdin=subprocess.PIPE)
# Less returns BrokenPipe if hitting q to exit earlier than end of report
try:
less.stdin.write(report.encode('utf-8'))
except IOError:
pass
less.stdin.close()
less.wait()
# Governor called for each source
try:
source_governor.govern()
except Exception as err:
shutdown(f'\nERROR: Calling per source governor: {err}')
return exit_code, cnt_bad_parse
def start_loader_process_and_threads(transport_thread_count):
thread_list = []
work_queue = None
# Start transport threads, bypass if in test mode
if not cli_args.testMode:
thread_stop.value = 0
work_queue = Queue(transport_thread_count * 10)
num_threads_left = transport_thread_count
thread_id = 0
while num_threads_left > 0:
thread_id += 1
thread_list.append(Process(target=send_to_g2, args=(thread_id, work_queue, min(cli_args.max_threads_per_process, num_threads_left), cli_args.debugTrace, thread_stop, cli_args.noWorkloadStats, dsrcAction)))
num_threads_left -= cli_args.max_threads_per_process
for thread in thread_list:
thread.start()
return thread_list, work_queue
def send_to_g2(thread_id_, work_queue_, num_threads_, debug_trace, thread_stop, no_workload_stats, dsrc_action):
g2_engines = []
try:
g2_engine = init_engine(f'pyG2Engine{thread_id_}', g2module_params, debug_trace, prime_engine=True, add_start_time=True)
g2_engines.append(g2_engine)
except G2Exception as ex:
print('ERROR: Could not start the G2 engine for sendToG2()')
print(f' {ex}')
with thread_stop.get_lock():
thread_stop.value = 1
return
try:
thread_list = []
for myid in range(num_threads_):
thread_list.append(threading.Thread(target=g2_thread, args=(f'{thread_id_}-{myid}', work_queue_, g2_engine, thread_stop, dsrc_action)))
for thread in thread_list:
thread.start()
# Periodically output engine workload stats
if not no_workload_stats:
break_stop = False
while thread_stop.value == 0:
start_time = time.time()
while (time.time() - start_time) <= cli_args.workloadOutputFrequency:
# Check if stopping between sleeping, set break_stop to prevent workload stats
if thread_stop.value != 0:
break_stop = True
break
# Sleep in small period, don't sleep the full amount or can block StopLoaderAndThreads() finishing
time.sleep(2)
if break_stop:
break
for engine in g2_engines:
dump_workload_stats(engine)
for thread in thread_list:
thread.join()
except Exception:
with thread_stop.get_lock():
thread_stop.value = 1
pass
# Final workload stats as finishing up
if not no_workload_stats:
dump_workload_stats(g2_engine)
with suppress(Exception):
g2_engine.destroy()
del g2_engine
return
def g2_thread(_, work_queue_, g2_engine_, thread_stop, dsrc_action_args):
""" g2 thread function """
def g2thread_error(msg, action):
""" Write out errors during processing """
call_error = textwrap.dedent(f'''
{str(datetime.now())} ERROR: {action} - {msg}
Data source: {data_source}
Record ID: {record_id}
Record Type: {"Redo" if is_redo_record else "Ingest"}
{row if not cli_args.errorsShort else " "}
''')
# If logging to error file is disabled print instead
if cli_args.errorsFileDisable:
print(call_error, flush=True, end='\033[F\033[F' if cli_args.errorsShort else '\n')
if not cli_args.errorsFileDisable:
try:
errors_file.write(f'\n\n{call_error.strip()}')
errors_file.flush()
except Exception as ex:
print(f'\nWARNING: Unable to write API error to {errors_file.name}')
print(f' {ex}', flush=True)
# If can't write to file, write to terminal
print(call_error, flush=True, end='\033[F\033[F' if cli_args.errorsShort else '\n')
# Increment value to report at end of processing each source
with api_errors.get_lock():
api_errors.value += 1
# For each work queue item
while thread_stop.value == 0 or work_queue_.empty() is False:
try:
row = work_queue_.get(True, 1)
except Empty:
row = None
continue
# Unpack tuple from the work queue into the data and indicator for being a redo record
row, is_redo_record = row
dsrc_action_str = None
# Start with dsrc_action set to what was used as the CLI arg or default of add
dsrc_action = dsrc_action_args
# Record is JSON
data_source = row.get('DATA_SOURCE', '')
record_id = str(row.get('RECORD_ID', ''))
# Is the record from the work queue specifically a redo record to be processed during redo time/mode?
if is_redo_record:
dsrc_action = 'X'
# If not, it's a normal ingestion record from file or project
else:
# If -D and -X were not specified, check each record for dsrc_action and use it instead of default add mode
# Consideration of dsrc_action is only valid in default add mode
if not cli_args.deleteMode and not cli_args.reprocessMode:
# Use the DSRC_ACTION from inbound row?
# Check if the inbound row specifies dsrc_action, use it and override CLI args (X, D, default is A) if present
# This provides functionality of sending in input file with multiple dsrc actions
row_dsrc_action = row.get('DSRC_ACTION', None)
dsrc_action = row_dsrc_action if row_dsrc_action else dsrc_action
dsrc_action = dsrc_action.upper() if isinstance(dsrc_action, str) else dsrc_action
# If the row dsrc_action differs from the CLI ARGs dsrc_action mode, log the fact to print info at end of data source
if dsrc_action != dsrc_action_args:
# Not enabled, could quickly fill up redirected logging files
if dsrc_action_diff.value != 1:
with dsrc_action_diff.get_lock():
dsrc_action_diff.value = 1
if dsrc_action == 'A':
with dsrc_action_add_count.get_lock():
dsrc_action_add_count.value += 1
if dsrc_action == 'D':
with dsrc_action_del_count.get_lock():
dsrc_action_del_count.value += 1
if dsrc_action == 'X':
with dsrc_action_reeval_count.get_lock():
dsrc_action_reeval_count.value += 1
try:
# Catch invalid dsrc_actions and push to error log and log as an API error
if dsrc_action not in ("A", "D", "X"):
g2thread_error('Unknown dsrc_action', dsrc_action)
continue
if dsrc_action == 'A':
dsrc_action_str = 'addRecord()'
g2_engine_.addRecord(data_source, record_id, json.dumps(row, sort_keys=True))
if dsrc_action == 'D':
dsrc_action_str = 'deleteRecord()'
g2_engine_.deleteRecord(data_source, record_id)
if dsrc_action == 'X':
dsrc_action_str = 'reevaluateRecord()'
# Check if the redo record is a REPAIR_ENTITY one, call reevaluateEntity() if so
# {'UMF_PROC': {'NAME': 'REPAIR_ENTITY', 'PARAMS': [{'PARAM': {'NAME': 'ENTITY_ID', 'VALUE': '32705738'}}]}}
if not data_source and not record_id:
entity_id = row.get("UMF_PROC", {}).get("PARAMS", {})[0].get("PARAM", {}).get("VALUE", None)
if entity_id:
g2_engine_.reevaluateEntity(entity_id)
else:
g2thread_error("Unable to process redo record format!", dsrc_action_str)
else:
g2_engine_.reevaluateRecord(data_source, record_id, 0)
except G2LicenseException as ex:
print('\nERROR: G2Engine licensing error!')
print(f' {ex}')
with thread_stop.get_lock():
thread_stop.value = 1
return
except G2NotFoundException as ex:
# Don't error if record for redo can't be located
if is_redo_record:
pass
else: