Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
fix: Refactor to create a list of SQL queries, instead of the data
Browse files Browse the repository at this point in the history
  • Loading branch information
rsavoye committed Feb 25, 2024
1 parent 7cf8212 commit c70c507
Showing 1 changed file with 70 additions and 131 deletions.
201 changes: 70 additions & 131 deletions tm_admin/tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,100 +62,9 @@ async def updateThread(
queries (list): The list of SQL queries to execute
db (PostgresClient): A database connection
"""
# pbar = tqdm.tqdm(queries)
pbar = tqdm.tqdm(queries)
for sql in pbar:
# for sql in queries:
# for sql in pbar:
print(sql)
result = await db.execute(sql)

return True

async def historyThread(
data: list,
db: PostgresClient,
table: str = "tasks",
):
"""Thread to handle importing
Args:
data (list): The list of records to import
db (PostgresClient): A database connection
table (str): The table to update
"""
# pbar = tqdm(data)
for entry in data:
# there is only one entry if using row_to_json()
id = entry['id']
uid = entry['user_id']
pid = entry['project_id']
tid = entry['task_id']
action = entry['action']
date = entry['action_date']
# Remove embedded single quotes
text = str()
if entry['action_text'] is None:
text = "NULL"
else:
text = entry['action_text'].replace("'", "")
timestamp = str(entry['action_date'])
# timestamp = "{%Y-%m-%dT%H:%M:%S}".format(date)
# timestamp = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f")
# entry['action_date'] = timestamp
# FIXME: currently the schema has this as an int, it's actully an enum
func = eval(f"Taskaction.{action}")
# columns = f"id, project_id, history.action, history.action_text, history.action_date, history.user_id"
# nested = f"{record['id']}, {record['project_id']}, {func.value}, '{text}', '{timestamp}', {record['user_id']}"
sql = f"UPDATE tasks "
sql += f" SET history=history||({pid}, {tid}, {func.value}, '{text}', '{timestamp}', {uid})::task_history"
# sql += f" SET history = (SELECT ARRAY_APPEND(history,({func.value}, '{text}', '{timestamp}', {entry['user_id']})::task_history)) "
sql += f" WHERE id={entry['task_id']} AND project_id={entry['project_id']}"
print(f"{sql};")
#try:
result = db.execute(sql)
#except:
# log.error(f"Couldn't execute query! '{sql}'")

return True

async def invalidationThread(
data: list,
db: PostgresClient,
):
"""Thread to handle importing
Args:
data (list): The list of records to import
db (PostgresClient): A database connection
"""
pbar = tqdm.tqdm(data)
for record in pbar:
map_timestamp = "NULL"
inval_timestamp = "NULL"
up_timestamp = "NULL"
val_timestamp = "NULL"
date = record['mapped_date']
if date is not None:
map_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date)
date = record['invalidated_date']
if date is not None:
inval_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date)
date = record['updated_date']
if date is not None:
up_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date)
date = record['validated_date']
if date is not None:
val_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date)

vid = "NULL"
if record['validator_id'] is not None:
vid = record['validator_id']

# columns = f"is_closed, mapper_id, mapped_date, invalidator_id, invalidated_date, invalidation_history_id, validator_id, validated_date, updated_date"

sql = f"UPDATE tasks"
# sql += f" SET invalidation_history = (SELECT ARRAY_APPEND(invalidation_history,({record['is_closed']}, {record['mapper_id']}, {map_timestamp}, {record['invalidator_id']}, {inval_timestamp}, {record['invalidation_history_id']}, {vid}, {val_timestamp}, {up_timestamp})::task_invalidation_history)) "
sql += f" SET invalidation_history = invalidation_history||({record['is_closed']}, {record['mapper_id']}, '{record['mapped_date']}', {record['invalidator_id']}, '{record['invalidated_date']}', {record['invalidation_history_id']}, {record['validator_id']}, '{record['validated_date']}', '{record['updated_date']}')::task_invalidation_history"
sql += f"WHERE id={record['task_id']} AND project_id={record['project_id']}"
# print(sql)
result = await db.execute(sql)

Expand All @@ -177,22 +86,16 @@ def __init__(self,
self.profile = TasksTable()
super().__init__('tasks')

async def mergeIssues(self,
inpg: PostgresClient,
):
table = "task_mapping_issues"
log.error(f"mergeIssues() Unimplemented!")
timer = Timer(initial_text=f"Merging {table} table...",
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")

async def mergeAnnotations(self,
inpg: PostgresClient,
):
table = "task_annotationstask_annotations"
log.error(f"mergeAnnotations() nimplemented!")
"""
Merge the task_annotation table from Tasking Manager into
TM Admin. This table doesn't actually appear to be currently
used by TM at all.
"""
table = "task_annotations"
log.error(f"mergeAnnotations() Unimplemented as the source is empty!")
timer = Timer(initial_text="Merging {table} table...",
text="merging {table table took {seconds:.0f}s",
logger=log.debug,
Expand All @@ -218,10 +121,11 @@ async def mergeAuxTables(self,
# FIXME: in TM, this table is empty
# await self.mergeAnnotations(inpg)

# await self.mergeHistory(inpg)
await self.mergeHistory(inpg)

await self.mergeInvalidations(inpg)

# This is now handled by mergeHistory
# await self.mergeIssues(inpg)

async def mergeHistory(self,
Expand All @@ -233,43 +137,79 @@ async def mergeHistory(self,
"""
table = 'task_history'
timer = Timer(initial_text=f"Merging {table} table...",
# text=f"merging {table} table took {seconds:.0f}s",
logger=log.debug,
text="merging table took {seconds:.0f}s",
logger=log.debug,
)
log.info(f"Merging {table} table...")
# pg = PostgresClient()
# await pg.connect('localhost/tm4')
# sql = f"SELECT MIN(project_id),MAX(project_id) FROM task_history"
# Get the number of records
# sql = f"SELECT reltuples::bigint AS estimate FROM pg_class WHERE oid = 'public.task_history'::regclass;"
# entries = await pg.getRecordCount(table)
timer.start()

# There is a small amount of data in this table, and we need to
# coorelate it to the task history when merging, so read in
# the entire dataset.
sql = f"SELECT * FROM task_mapping_issues ORDER BY id;"
# print(sql)
data = await inpg.execute(sql)
entries = len(data)
log.debug(f"There are {len(data)} records in task_mapping_issues")
issues = dict()
# pbar = tqdm.tqdm(data)
# for record in pbar:
for record in data:
hid = record['task_history_id']
issues[hid] = {'issue': record['issue'],
'category': record['mapping_issue_category_id'],
'count': record['count'],
}

# Now get the data from the history table
sql = f"SELECT * FROM {table}"
# print(sql)
timer.start()
data = await inpg.execute(sql)
entries = len(data)
log.debug(f"There are {len(data)} records in {table}")
timer.stop()

chunk = round(entries/cores)
blocks = list()
# Some tables in the input database are huge, and can either core
# dump python, or have performance issues. Past a certain threshold
# the data needs to be queried in pages instead of the entire table
# This is a huge table, we can't read in the entire thing

# FIXME: create an array of SQL queries, so later we can use
# prepared_queries in asyncpg for better performance. We also don't
# need all of the columns from the TM table, since task ID and
# project ID are already part of the table schema.
queries = list()
# pbar = tqdm.tqdm(data)
#for record in pbar:
for record in data:
entry = {"user_id": record['user_id']}
# entry['action'] = Taskaction(record['action']).name
entry['action'] = record['action']
entry['action_text'] = record['action_text']
if record['action_date']:
entry['action_date'] = '{:%Y-%m-%dT%H:%M:%S}'.format(record['action_date'])
# If there is an issue, add it to the record in the jsonb column
if record['id'] in issues:
entry.update(issues[record['id']])
# entry['issue'] = issues['issue']
# entry['category'] = issues['category']
# entry['count'] = issues['count']
asc = str(entry).replace("'", '"').replace("\\'", "'")
sql = "UPDATE tasks SET history = '{\"history\": [%s]}' WHERE id=%d AND project_id=%d" % (asc, record['task_id'], record['project_id'])
# print(sql)
queries.append(sql)

entries = len(queries)
chunk = round(entries/cores)
import copy
async with asyncio.TaskGroup() as tg:
for block in range(0, entries, chunk):
# for index in range(0, cores):
outpg = PostgresClient()
# FIXME: this should not be hard coded
await outpg.connect('localhost/tm_admin')
log.debug(f"Dispatching thread {block}:{block + chunk}")
# await licensesThread(data, outpg)
await historyThread(data[block:block + chunk], outpg)
# task = tg.create_task(historyThread(data[block:block + chunk], outpg))

# result = historyThread(data, adminpg[index], f"{table}{index}_view")
foo = copy.copy(queries[block:block + chunk -1])
log.debug(f"Dispatching thread {block}:{block + chunk - 1}")
# await updateThread(foo, outpg)
# await updateThread(queries[block:block + chunk], outpg)
task = tg.create_task(updateThread(foo, outpg))
timer.stop()

async def mergeInvalidations(self,
inpg: PostgresClient,
Expand Down Expand Up @@ -320,9 +260,8 @@ async def mergeInvalidations(self,
entry["is_closed_id"] = "false"
# entries[record['task_id']].append(entry)
asc = str(entry).replace("'", '"').replace("\\'", "'")
# UPDATE tasks SET invalidation_history = '{"history": [{"user_id": 35, "mapper_id": 11593853, "invalidator_id": 11596055}]}' WHERE id=35 AND project_id=105;
sql = "UPDATE tasks SET invalidation_history = '{\"history\": [%s]}' WHERE id=%d AND project_id=%d" % (asc, record['task_id'], record['project_id'])
print(sql)
# print(sql)
queries.append(sql)

entries = len(queries)
Expand All @@ -335,7 +274,7 @@ async def mergeInvalidations(self,
await outpg.connect('localhost/tm_admin')
log.debug(f"Dispatching thread {block}:{block + chunk}")
#await updateThread(queries[block:block + chunk], outpg)
task = tg.create_task(updateThread(data[block:block + chunk], outpg))
task = tg.create_task(updateThread(queries[block:block + chunk], outpg))

async def main():
"""This main function lets this class be run standalone by a bash script."""
Expand Down

0 comments on commit c70c507

Please sign in to comment.