From 61c1b9d315431043814bf17e16fe0123ecfb9424 Mon Sep 17 00:00:00 2001 From: Rob Savoye Date: Thu, 28 Dec 2023 17:20:38 -0700 Subject: [PATCH] fix: Make importing the primary table multi-threaded to improve performance --- tm_admin/tmdb.py | 55 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/tm_admin/tmdb.py b/tm_admin/tmdb.py index a9f79b9f..c1a533d1 100755 --- a/tm_admin/tmdb.py +++ b/tm_admin/tmdb.py @@ -31,6 +31,8 @@ from progress.bar import Bar, PixelBar from tm_admin.types_tm import Userrole, Mappinglevel, Organizationtype, Taskcreationmode, Projectstatus, Permissions, Projectpriority, Projectdifficulty, Mappingtypes, Editors, Teamvisibility, Taskstatus from tm_admin.yamlfile import YamlFile +import concurrent.futures +from cpuinfo import get_cpu_info # from tm_admin.users.users import createSQLValues # from tm_admin.organizations.organizations import createSQLValues @@ -40,6 +42,25 @@ import tm_admin as tma rootdir = tma.__path__[0] +# The number of threads is based on the CPU cores +info = get_cpu_info() +cores = info["count"] + +def importThread( + data: list, + db: PostgresClient, + tm, +): + """Thread to handle importing + + Args: + data (list): The list of records to import + db (PostgresClient): A database connection + tm (TMImport): the input handle + """ + tm.writeAllData(data, tm.table) + + return True class TMImport(object): def __init__(self, @@ -71,6 +92,7 @@ def __init__(self, self.admindb = PostgresClient(outuri) self.columns = list() self.data = list() + self.table = table yaml = YamlFile(f"{rootdir}/{table}/{table}.yaml") # yaml.dump() @@ -155,6 +177,7 @@ def writeAllData(self, data (list): The table data from TM table str(): The table to get the columns for. """ + #log.debug(f"Writing block {len(data)} to the database") builtins = ['int32', 'int64', 'string', 'timestamp', 'bool'] #bar = Bar('Importing into TMAdmin', max=len(data)) # columns2 = self.getColumns(table) @@ -328,13 +351,37 @@ def main(): ) doit = TMImport(args.inuri, args.outuri, args.table) - table = args.table # You have to love subtle cultural spelling differences. - if table == 'organizations': + data = list + if args.table == 'organizations': data = doit.getAllData('organisations') else: - data = doit.getAllData(table) - doit.writeAllData(data, table) + data = doit.getAllData(args.table) + + entries = len(data) + log.debug(f"There are {entries} entries in {args.table}") + chunk = round(entries / cores) + + tmpg = list() + for i in range(0, cores + 1): + tmpg.append(PostgresClient(args.outuri)) + + + if entries < 10000: + importThread(data, tmpg[0], doit) + quit() + + index = 0 + with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: + block = 0 + while block <= entries: + log.debug("Dispatching Block %d:%d" % (block, block + chunk)) + result = executor.submit(importThread, data[block : block + chunk], tmpg[index], doit) + block += chunk + index += 1 + # for future in tqdm(futures, desc=f"Dispatching Block {block}:{block + chunk}", total=chunk): + # future.result() + executor.shutdown() if __name__ == "__main__": """This is just a hook so this file can be run standalone during development."""