Skip to content

Commit

Permalink
#3 1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
docktermj committed May 28, 2021
1 parent 7f61d21 commit f862a71
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 32 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
[markdownlint](https://dlaa.me/markdownlint/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.1.0] - 2018-09-30

### Added to 1.1.0

- Shipped with SenzingAPI 1.2.0

## [1.0.0] - 2018-09-18

### Added to 1.0.0
Expand Down
79 changes: 47 additions & 32 deletions G2Loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from G2Database import G2Database
from G2ConfigTables import G2ConfigTables
from G2Project import G2Project
from G2Module import G2Module
from G2Engine import G2Engine
from G2Product import G2Product
from G2Exception import G2ModuleException, G2ModuleResolveMissingResEnt, G2ModuleLicenseException
from CompressedFile import openPossiblyCompressedFile, isCompressedFile
import DumpStack
Expand Down Expand Up @@ -150,21 +151,22 @@ def checkMiminumMemory():
time.sleep(3)

#---------------------------------------
def startSetup(doPurge, doLicense):
def startSetup(doPurge, doLicense, g2iniPath, debugTrace):
#--check for minimum system memory
checkMiminumMemory()

#--check the product version and license
try:
g2_module = G2Module('pyG2Purge', g2iniPath, debugTrace)
g2_module.init()
g2_product = G2Product()
g2_product.init('pyG2LicenseVersion', g2iniPath, debugTrace)
except G2ModuleException as ex:
print('ERROR: could not start the G2 module at ' + g2iniPath)
print('ERROR: could not start the G2 product module at ' + g2iniPath)
print(ex)
exit(1)

if (doLicense):
licInfo = g2_module.license()
verInfo = g2_module.version()
licInfo = json.loads(g2_product.license())
verInfo = json.loads(g2_product.version())
print("****LICENSE****")
if 'VERSION' in verInfo: print(" Version: " + verInfo['VERSION'] + " (" + verInfo['BUILD_DATE'] + ")")
if 'customer' in licInfo: print(" Customer: " + licInfo['customer'])
Expand All @@ -174,12 +176,25 @@ def startSetup(doPurge, doLicense):
if 'contract' in licInfo: print(" Contract: " + licInfo['contract'])
print("***************")

g2_product.destroy()
del g2_product

#--purge the repository
try:
g2_engine = G2Engine()
g2_engine.init('pyG2Purge', g2iniPath, debugTrace)
except G2ModuleException as ex:
print('ERROR: could not start the G2 engine at ' + g2iniPath)
print(ex)
exit(1)

if (doPurge):
print('Purging G2 database ...')
g2_module.purgeRepository(False)
g2_engine.purgeRepository(False)

g2_engine.destroy()
del g2_engine

g2_module.destroy()
del g2_module

#---------------------------------------
def loadProject():
Expand Down Expand Up @@ -208,7 +223,7 @@ def loadProject():
os.remove(filename)

#-- This may look weird but ctypes/ffi have problems with the native code and fork
setupProcess = Process(target=startSetup, args=(purgeFirst and not testMode, True))
setupProcess = Process(target=startSetup, args=(purgeFirst and not testMode, True, g2iniPath, debugTrace))
setupProcess.start()
setupProcess.join()
if setupProcess.exitcode != 0:
Expand Down Expand Up @@ -271,7 +286,7 @@ def loadProject():
threadId=0
while (numThreadsLeft>0):
threadId+=1
threadList.append(Process(target=sendToG2, args=(threadId, workQueue, min(maxThreadsPerProcess, numThreadsLeft))))
threadList.append(Process(target=sendToG2, args=(threadId, workQueue, min(maxThreadsPerProcess, numThreadsLeft), g2iniPath, debugTrace, threadStop, workloadStats, dsrcAction)))
numThreadsLeft-=maxThreadsPerProcess;
for thread in threadList:
thread.start()
Expand Down Expand Up @@ -415,7 +430,7 @@ def loadProject():
if exitCode:
print('Process aborted after %s minutes' % elapsedMins)
else:
print('Process completed sucessfully in %s minutes' % elapsedMins)
print('Process completed successfully in %s minutes' % elapsedMins)

return exitCode

Expand Down Expand Up @@ -447,15 +462,15 @@ def prepareG2db():
return True

#---------------------------------------
def sendToG2(threadId_, workQueue_, numThreads_):
def sendToG2(threadId_, workQueue_, numThreads_, g2iniPath, debugTrace, threadStop, workloadStats, dsrcAction):
global numProcessed
numProcessed = 0

try:
g2_module = G2Module('pyG2Module' + str(threadId_), g2iniPath, debugTrace)
g2_module.init()
g2_engine = G2Engine()
g2_engine.init('pyG2Engine' + str(threadId_), g2iniPath, debugTrace)
except G2ModuleException as ex:
print('ERROR: could not start the G2 module at ' + g2iniPath)
print('ERROR: could not start the G2 engine at ' + g2iniPath)
print(ex)
with threadStop.get_lock():
threadStop.value = 1
Expand All @@ -465,26 +480,26 @@ def sendToG2(threadId_, workQueue_, numThreads_):
if (numThreads_ > 1):
threadList = []
for myid in range(numThreads_):
threadList.append(threading.Thread(target=g2Thread, args=(str(threadId_)+"-"+str(myid), workQueue_, g2_module)))
threadList.append(threading.Thread(target=g2Thread, args=(str(threadId_)+"-"+str(myid), workQueue_, g2_engine, threadStop, workloadStats, dsrcAction)))
for thread in threadList:
thread.start()
for thread in threadList:
thread.join()
else:
g2Thread(str(threadId_), workQueue_, g2_module)
g2Thread(str(threadId_), workQueue_, g2_engine, threadStop, workloadStats, dsrcAction)

except: pass

if workloadStats > 0:
pprint.pprint(g2_module.stats())
pprint.pprint(g2_engine.stats())

try: g2_module.destroy()
try: g2_engine.destroy()
except: pass

return

#---------------------------------------
def g2Thread(threadId_, workQueue_, g2Module_):
def g2Thread(threadId_, workQueue_, g2Engine_, threadStop, workloadStats, dsrcAction):
''' g2 thread function '''
global numProcessed

Expand All @@ -496,19 +511,19 @@ def g2Thread(threadId_, workQueue_, g2Module_):
row = None
continue

#--call g2module
#--call g2engine
numProcessed += 1
if (workloadStats > 0 and (numProcessed%(maxThreadsPerProcess*sqlCommitSize)) == 0):
print(g2Module_.stats())
print(g2Engine_.stats())

try:
if dsrcAction == 'S':
ret = g2Module_.processWithResponse(row)
ret = g2Engine_.processWithResponse(row)
else:
g2Module_.process(row)
g2Engine_.process(row)
except G2ModuleLicenseException as err:
print(err)
print('ERROR: G2Module licensing error!')
print('ERROR: G2Engine licensing error!')
with threadStop.get_lock():
threadStop.value = 1
return
Expand Down Expand Up @@ -573,8 +588,8 @@ def signal_handler(signal, frame):
iniParser.read(iniFileName)
try: g2dbUri = iniParser.get('g2', 'G2Connection')
except: g2dbUri = None
try: odsDbUri = iniParser.get('g2', 'ODSConnection')
except: odsDbUri = None
####try: odsDbUri = iniParser.get('g2', 'ODSConnection')
####except: odsDbUri = None
try: configTableFile = iniParser.get('g2', 'G2ConfigFile')
except: configTableFile = None
try: g2iniPath = os.path.expanduser(iniParser.get('g2', 'iniPath'))
Expand Down Expand Up @@ -628,8 +643,8 @@ def signal_handler(signal, frame):
if not g2dbUri:
print('ERROR: A G2 database connection is not specified!')
sys.exit(1)
if not odsDbUri:
print('ERROR: A ODS database connection is not specified!')
####if not odsDbUri:
#### print('ERROR: A ODS database connection is not specified!')
sys.exit(1)
if not configTableFile:
print('ERROR: A G2 setup configuration file is not specified')
Expand Down Expand Up @@ -657,7 +672,7 @@ def signal_handler(signal, frame):

g2Dbo.close()

#--set globals for the g2 module
#--set globals for the g2 engine
maxThreadsPerProcess=4

#--all good, lets start loading!
Expand Down

0 comments on commit f862a71

Please sign in to comment.