Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Wait for futures to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
robsavoye committed Feb 6, 2024
1 parent 4f9ae3d commit 7da8eb5
Showing 1 changed file with 23 additions and 9 deletions.
32 changes: 23 additions & 9 deletions tm_admin/tmdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,16 @@ def getAllData(self,
keys = self.columns

columns = str(keys)[1:-1].replace("'", "")
# this is actually faster than using row_to_json(), and the
# data is a little easier to navigate.
sql = f"SELECT {columns} FROM {table}"
# sql = f"SELECT row_to_json({table}) as row FROM {table}"
results = self.tmdb.queryLocal(sql)
# print(sql)
self.tmdb.dbcursor.execute(sql)
results = self.tmdb.dbcursor.fetchall()

log.info(f"There are {len(results)} records in the TM '{table}' table")
data = list()
# this is actually faster than using row_to_json(), and the
# data is a little easier to navigate.
for record in results:
table = dict(zip(keys, record))
data.append(table)
Expand Down Expand Up @@ -409,13 +412,17 @@ def main():
tmpg = list()

tmpg = list()
for i in range(0, cores + 1):
# FIXME: this shouldn't be hardcoded
tmpg.append(PostgresClient('localhost/tm4'))
if entries >= threshold:
for i in range(0, cores + 1):
tmpg.append(PostgresClient(args.outuri))
else:
tmpg.append(PostgresClient(args.outuri))

# Some tables in the input database are huge, and can either core
# dump python, or have performance issues. Past a certain threshold
# the data needs to be queried in pages instead of the entire table.
if entries > threshold:
futures = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
index = 0
for block in range(0, entries, chunk):
Expand All @@ -424,8 +431,11 @@ def main():
# log.debug(f"Dispatching Block {index}")
# importThread(data, tmpg[0], doit)
result = executor.submit(importThread, data, tmpg[index], doit)
futures.append(result)
index += 1
executor.shutdown()
for future in concurrent.futures.as_completed(futures):
log.debug(f"Thread {future} to completed..")
executor.shutdown()
else:
data = list
# You have to love subtle cultural spelling differences.
Expand All @@ -436,22 +446,26 @@ def main():

# entries = len(data)
# log.debug(f"There are {entries} entries in {args.table}")
# chunk = round(entries / cores)
chunk = round(entries / cores)

if entries < threshold:
importThread(data, tmpg[0], doit)
quit()

index = 0
futures = list()
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
block = 0
while block <= entries:
log.debug("Dispatching Block %d:%d" % (block, block + chunk))
#importThread(data, tmpg[0], doit)
result = executor.submit(importThread, data[block : block + chunk], tmpg[index], doit)
futures.append(result)
block += chunk
index += 1
executor.shutdown()
for future in concurrent.futures.wait(futures, return_when='ALL_COMPLETED'):
log.debug(f"Waiting for {fuure} to complete..")
executor.shutdown()

# cleanup the connections
for conn in tmpg:
Expand Down

0 comments on commit 7da8eb5

Please sign in to comment.