diff --git a/tm_admin/tasks/tasks.py b/tm_admin/tasks/tasks.py index afad99d9..504cbe56 100755 --- a/tm_admin/tasks/tasks.py +++ b/tm_admin/tasks/tasks.py @@ -62,100 +62,9 @@ async def updateThread( queries (list): The list of SQL queries to execute db (PostgresClient): A database connection """ - # pbar = tqdm.tqdm(queries) + pbar = tqdm.tqdm(queries) + for sql in pbar: # for sql in queries: - # for sql in pbar: - print(sql) - result = await db.execute(sql) - - return True - -async def historyThread( - data: list, - db: PostgresClient, - table: str = "tasks", -): - """Thread to handle importing - - Args: - data (list): The list of records to import - db (PostgresClient): A database connection - table (str): The table to update - """ - # pbar = tqdm(data) - for entry in data: - # there is only one entry if using row_to_json() - id = entry['id'] - uid = entry['user_id'] - pid = entry['project_id'] - tid = entry['task_id'] - action = entry['action'] - date = entry['action_date'] - # Remove embedded single quotes - text = str() - if entry['action_text'] is None: - text = "NULL" - else: - text = entry['action_text'].replace("'", "") - timestamp = str(entry['action_date']) - # timestamp = "{%Y-%m-%dT%H:%M:%S}".format(date) - # timestamp = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") - # entry['action_date'] = timestamp - # FIXME: currently the schema has this as an int, it's actully an enum - func = eval(f"Taskaction.{action}") - # columns = f"id, project_id, history.action, history.action_text, history.action_date, history.user_id" - # nested = f"{record['id']}, {record['project_id']}, {func.value}, '{text}', '{timestamp}', {record['user_id']}" - sql = f"UPDATE tasks " - sql += f" SET history=history||({pid}, {tid}, {func.value}, '{text}', '{timestamp}', {uid})::task_history" - # sql += f" SET history = (SELECT ARRAY_APPEND(history,({func.value}, '{text}', '{timestamp}', {entry['user_id']})::task_history)) " - sql += f" WHERE id={entry['task_id']} AND project_id={entry['project_id']}" - print(f"{sql};") - #try: - result = db.execute(sql) - #except: - # log.error(f"Couldn't execute query! '{sql}'") - - return True - -async def invalidationThread( - data: list, - db: PostgresClient, -): - """Thread to handle importing - - Args: - data (list): The list of records to import - db (PostgresClient): A database connection - """ - pbar = tqdm.tqdm(data) - for record in pbar: - map_timestamp = "NULL" - inval_timestamp = "NULL" - up_timestamp = "NULL" - val_timestamp = "NULL" - date = record['mapped_date'] - if date is not None: - map_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date) - date = record['invalidated_date'] - if date is not None: - inval_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date) - date = record['updated_date'] - if date is not None: - up_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date) - date = record['validated_date'] - if date is not None: - val_timestamp = "'{:%Y-%m-%dT%H:%M:%S}'".format(date) - - vid = "NULL" - if record['validator_id'] is not None: - vid = record['validator_id'] - - # columns = f"is_closed, mapper_id, mapped_date, invalidator_id, invalidated_date, invalidation_history_id, validator_id, validated_date, updated_date" - - sql = f"UPDATE tasks" - # sql += f" SET invalidation_history = (SELECT ARRAY_APPEND(invalidation_history,({record['is_closed']}, {record['mapper_id']}, {map_timestamp}, {record['invalidator_id']}, {inval_timestamp}, {record['invalidation_history_id']}, {vid}, {val_timestamp}, {up_timestamp})::task_invalidation_history)) " - sql += f" SET invalidation_history = invalidation_history||({record['is_closed']}, {record['mapper_id']}, '{record['mapped_date']}', {record['invalidator_id']}, '{record['invalidated_date']}', {record['invalidation_history_id']}, {record['validator_id']}, '{record['validated_date']}', '{record['updated_date']}')::task_invalidation_history" - sql += f"WHERE id={record['task_id']} AND project_id={record['project_id']}" # print(sql) result = await db.execute(sql) @@ -177,22 +86,16 @@ def __init__(self, self.profile = TasksTable() super().__init__('tasks') - async def mergeIssues(self, - inpg: PostgresClient, - ): - table = "task_mapping_issues" - log.error(f"mergeIssues() Unimplemented!") - timer = Timer(initial_text=f"Merging {table} table...", - text="merging table took {seconds:.0f}s", - logger=log.debug, - ) - log.info(f"Merging {table} table...") - async def mergeAnnotations(self, inpg: PostgresClient, ): - table = "task_annotationstask_annotations" - log.error(f"mergeAnnotations() nimplemented!") + """ + Merge the task_annotation table from Tasking Manager into + TM Admin. This table doesn't actually appear to be currently + used by TM at all. + """ + table = "task_annotations" + log.error(f"mergeAnnotations() Unimplemented as the source is empty!") timer = Timer(initial_text="Merging {table} table...", text="merging {table table took {seconds:.0f}s", logger=log.debug, @@ -218,10 +121,11 @@ async def mergeAuxTables(self, # FIXME: in TM, this table is empty # await self.mergeAnnotations(inpg) - # await self.mergeHistory(inpg) + await self.mergeHistory(inpg) await self.mergeInvalidations(inpg) + # This is now handled by mergeHistory # await self.mergeIssues(inpg) async def mergeHistory(self, @@ -233,43 +137,79 @@ async def mergeHistory(self, """ table = 'task_history' timer = Timer(initial_text=f"Merging {table} table...", - # text=f"merging {table} table took {seconds:.0f}s", - logger=log.debug, + text="merging table took {seconds:.0f}s", + logger=log.debug, ) log.info(f"Merging {table} table...") - # pg = PostgresClient() - # await pg.connect('localhost/tm4') - # sql = f"SELECT MIN(project_id),MAX(project_id) FROM task_history" - # Get the number of records - # sql = f"SELECT reltuples::bigint AS estimate FROM pg_class WHERE oid = 'public.task_history'::regclass;" - # entries = await pg.getRecordCount(table) + timer.start() + # There is a small amount of data in this table, and we need to + # coorelate it to the task history when merging, so read in + # the entire dataset. + sql = f"SELECT * FROM task_mapping_issues ORDER BY id;" + # print(sql) + data = await inpg.execute(sql) + entries = len(data) + log.debug(f"There are {len(data)} records in task_mapping_issues") + issues = dict() + # pbar = tqdm.tqdm(data) + # for record in pbar: + for record in data: + hid = record['task_history_id'] + issues[hid] = {'issue': record['issue'], + 'category': record['mapping_issue_category_id'], + 'count': record['count'], + } + + # Now get the data from the history table sql = f"SELECT * FROM {table}" # print(sql) - timer.start() data = await inpg.execute(sql) entries = len(data) log.debug(f"There are {len(data)} records in {table}") - timer.stop() chunk = round(entries/cores) - blocks = list() - # Some tables in the input database are huge, and can either core - # dump python, or have performance issues. Past a certain threshold - # the data needs to be queried in pages instead of the entire table - # This is a huge table, we can't read in the entire thing + # FIXME: create an array of SQL queries, so later we can use + # prepared_queries in asyncpg for better performance. We also don't + # need all of the columns from the TM table, since task ID and + # project ID are already part of the table schema. + queries = list() + # pbar = tqdm.tqdm(data) + #for record in pbar: + for record in data: + entry = {"user_id": record['user_id']} + # entry['action'] = Taskaction(record['action']).name + entry['action'] = record['action'] + entry['action_text'] = record['action_text'] + if record['action_date']: + entry['action_date'] = '{:%Y-%m-%dT%H:%M:%S}'.format(record['action_date']) + # If there is an issue, add it to the record in the jsonb column + if record['id'] in issues: + entry.update(issues[record['id']]) + # entry['issue'] = issues['issue'] + # entry['category'] = issues['category'] + # entry['count'] = issues['count'] + asc = str(entry).replace("'", '"').replace("\\'", "'") + sql = "UPDATE tasks SET history = '{\"history\": [%s]}' WHERE id=%d AND project_id=%d" % (asc, record['task_id'], record['project_id']) + # print(sql) + queries.append(sql) + + entries = len(queries) + chunk = round(entries/cores) + import copy async with asyncio.TaskGroup() as tg: for block in range(0, entries, chunk): # for index in range(0, cores): outpg = PostgresClient() + # FIXME: this should not be hard coded await outpg.connect('localhost/tm_admin') - log.debug(f"Dispatching thread {block}:{block + chunk}") - # await licensesThread(data, outpg) - await historyThread(data[block:block + chunk], outpg) - # task = tg.create_task(historyThread(data[block:block + chunk], outpg)) - - # result = historyThread(data, adminpg[index], f"{table}{index}_view") + foo = copy.copy(queries[block:block + chunk -1]) + log.debug(f"Dispatching thread {block}:{block + chunk - 1}") +# await updateThread(foo, outpg) + # await updateThread(queries[block:block + chunk], outpg) + task = tg.create_task(updateThread(foo, outpg)) + timer.stop() async def mergeInvalidations(self, inpg: PostgresClient, @@ -320,9 +260,8 @@ async def mergeInvalidations(self, entry["is_closed_id"] = "false" # entries[record['task_id']].append(entry) asc = str(entry).replace("'", '"').replace("\\'", "'") - # UPDATE tasks SET invalidation_history = '{"history": [{"user_id": 35, "mapper_id": 11593853, "invalidator_id": 11596055}]}' WHERE id=35 AND project_id=105; sql = "UPDATE tasks SET invalidation_history = '{\"history\": [%s]}' WHERE id=%d AND project_id=%d" % (asc, record['task_id'], record['project_id']) - print(sql) + # print(sql) queries.append(sql) entries = len(queries) @@ -335,7 +274,7 @@ async def mergeInvalidations(self, await outpg.connect('localhost/tm_admin') log.debug(f"Dispatching thread {block}:{block + chunk}") #await updateThread(queries[block:block + chunk], outpg) - task = tg.create_task(updateThread(data[block:block + chunk], outpg)) + task = tg.create_task(updateThread(queries[block:block + chunk], outpg)) async def main(): """This main function lets this class be run standalone by a bash script."""