From e403bc7468d8c5fdc61d0d4ce69088a39502862e Mon Sep 17 00:00:00 2001 From: Rob Savoye Date: Tue, 2 Jan 2024 12:26:48 -0700 Subject: [PATCH] fix: Refactor to import large datasets in pages and threading --- tm_admin/tmdb.py | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/tm_admin/tmdb.py b/tm_admin/tmdb.py index a49b6601..b64c9eaa 100755 --- a/tm_admin/tmdb.py +++ b/tm_admin/tmdb.py @@ -47,7 +47,7 @@ info = get_cpu_info() # More threads. Shorter import time, higher CPU load. But this is a # pretty low CPU load proces anyway, so more is good. -cores = info["count"] * 2 +cores = info["count"] def importThread( data: list, @@ -61,6 +61,7 @@ def importThread( db (PostgresClient): A database connection tm (TMImport): the input handle """ + # log.debug(f"There are {len(data)} data entries") tm.writeAllData(data, tm.table) return True @@ -141,7 +142,7 @@ def getColumns(self, """ sql = f"SELECT column_name, data_type,column_default FROM information_schema.columns WHERE table_name = '{table}' ORDER BY dtd_identifier;" results = self.tmdb.queryLocal(sql) - log.info(f"There are {len(results)} columns in the TM '{table}' table") + # log.info(f"There are {len(results)} columns in the TM '{table}' table") table = dict() for column in results: # print(f"FIXME: {column}") @@ -222,7 +223,10 @@ def writeAllData(self, data (list): The table data from TM table str(): The table to get the columns for. """ - log.debug(f"Writing block {len(data)} to the database") + # log.debug(f"Writing block {len(data)} to the database") + if len(data) == 0: + return True + builtins = ['int32', 'int64', 'string', 'timestamp', 'bool'] #bar = Bar('Importing into TMAdmin', max=len(data)) # columns2 = self.getColumns(table) @@ -361,7 +365,7 @@ def writeAllData(self, # foo = f"str(columns)[1:-1].replace("'", "") sql = f"""INSERT INTO {table}({str(columns)[1:-1].replace("'", "")}) VALUES({values[:-2]})""" # print(sql) - results = self.admindb.queryLocal(sql) + results = self.admindb.dbcursor.execute(sql) #bar.finish() @@ -402,32 +406,28 @@ def main(): block = 0 chunk = round(entries / cores) + # this is the size of the pages in records threshold = 10000 data = list() tmpg = list() + tmpg = list() + for i in range(0, cores + 1): + # FIXME: this shouldn't be hardcoded + tmpg.append(PostgresClient('localhost/tm4')) # Some tables in the input database are huge, and can either core # dump python, or have performance issues. Past a certain threshold # the data needs to be queried in pages instead of the entire table. - # For better performance, the page of data is still imported with - # threads for better performance. - for i in range(0, cores + 1): - tmpg.append(PostgresClient(args.outuri)) - - index = 0 if entries > threshold: - for block in range(0, entries, chunk): - data = doit.getPage(block, chunk) - page = round(len(data) / cores) + # importThread(data, tmpg[0], doit) + with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: index = 0 - # importThread(data[block : block + page], tmpg[0], doit) - with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: - block = 0 - while block <= entries: - # log.debug("Dispatching Block %d:%d" % (block, block + page)) - result = executor.submit(importThread, data[block : block + page], PostgresClient(args.outuri), doit) - block += page - index += 1 + for block in range(0, entries, chunk): + data = doit.getPage(block, chunk) + page = round(len(data) / cores) + # log.debug(f"Dispatching Block {index}") + result = executor.submit(importThread, data, tmpg[index], doit) + index += 1 executor.shutdown() else: data = list