Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Refactor to import large datasets in pages and threading
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Jan 2, 2024
1 parent bee8c33 commit e403bc7
Showing 1 changed file with 21 additions and 21 deletions.
42 changes: 21 additions & 21 deletions tm_admin/tmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
info = get_cpu_info()
# More threads. Shorter import time, higher CPU load. But this is a
# pretty low CPU load proces anyway, so more is good.
cores = info["count"] * 2
cores = info["count"]

def importThread(
data: list,
Expand All @@ -61,6 +61,7 @@ def importThread(
db (PostgresClient): A database connection
tm (TMImport): the input handle
"""
# log.debug(f"There are {len(data)} data entries")
tm.writeAllData(data, tm.table)

return True
Expand Down Expand Up @@ -141,7 +142,7 @@ def getColumns(self,
"""
sql = f"SELECT column_name, data_type,column_default FROM information_schema.columns WHERE table_name = '{table}' ORDER BY dtd_identifier;"
results = self.tmdb.queryLocal(sql)
log.info(f"There are {len(results)} columns in the TM '{table}' table")
# log.info(f"There are {len(results)} columns in the TM '{table}' table")
table = dict()
for column in results:
# print(f"FIXME: {column}")
Expand Down Expand Up @@ -222,7 +223,10 @@ def writeAllData(self,
data (list): The table data from TM
table str(): The table to get the columns for.
"""
log.debug(f"Writing block {len(data)} to the database")
# log.debug(f"Writing block {len(data)} to the database")
if len(data) == 0:
return True

builtins = ['int32', 'int64', 'string', 'timestamp', 'bool']
#bar = Bar('Importing into TMAdmin', max=len(data))
# columns2 = self.getColumns(table)
Expand Down Expand Up @@ -361,7 +365,7 @@ def writeAllData(self,
# foo = f"str(columns)[1:-1].replace("'", "")
sql = f"""INSERT INTO {table}({str(columns)[1:-1].replace("'", "")}) VALUES({values[:-2]})"""
# print(sql)
results = self.admindb.queryLocal(sql)
results = self.admindb.dbcursor.execute(sql)

#bar.finish()

Expand Down Expand Up @@ -402,32 +406,28 @@ def main():
block = 0
chunk = round(entries / cores)

# this is the size of the pages in records
threshold = 10000
data = list()
tmpg = list()

tmpg = list()
for i in range(0, cores + 1):
# FIXME: this shouldn't be hardcoded
tmpg.append(PostgresClient('localhost/tm4'))
# Some tables in the input database are huge, and can either core
# dump python, or have performance issues. Past a certain threshold
# the data needs to be queried in pages instead of the entire table.
# For better performance, the page of data is still imported with
# threads for better performance.
for i in range(0, cores + 1):
tmpg.append(PostgresClient(args.outuri))

index = 0
if entries > threshold:
for block in range(0, entries, chunk):
data = doit.getPage(block, chunk)
page = round(len(data) / cores)
# importThread(data, tmpg[0], doit)
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
index = 0
# importThread(data[block : block + page], tmpg[0], doit)
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
block = 0
while block <= entries:
# log.debug("Dispatching Block %d:%d" % (block, block + page))
result = executor.submit(importThread, data[block : block + page], PostgresClient(args.outuri), doit)
block += page
index += 1
for block in range(0, entries, chunk):
data = doit.getPage(block, chunk)
page = round(len(data) / cores)
# log.debug(f"Dispatching Block {index}")
result = executor.submit(importThread, data, tmpg[index], doit)
index += 1
executor.shutdown()
else:
data = list
Expand Down

0 comments on commit e403bc7

Please sign in to comment.