From d3bb3b211fe9fd7ae4e49954e98bc104bd820402 Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Mon, 17 Jul 2023 10:37:19 -0400 Subject: [PATCH] feat: show a progress bar during MS de-id --- cumulus_etl/cli_utils.py | 13 +++++++++ cumulus_etl/common.py | 5 ++-- cumulus_etl/deid/mstool.py | 50 ++++++++++++++++++++++++++++++++-- cumulus_etl/etl/convert/cli.py | 13 +-------- 4 files changed, 64 insertions(+), 17 deletions(-) diff --git a/cumulus_etl/cli_utils.py b/cumulus_etl/cli_utils.py index ca397906..89f62212 100644 --- a/cumulus_etl/cli_utils.py +++ b/cumulus_etl/cli_utils.py @@ -7,6 +7,8 @@ import time import urllib.parse +import rich.progress + from cumulus_etl import errors, loaders @@ -90,3 +92,14 @@ def is_url_available(url: str, retry: bool = True) -> bool: time.sleep(3) return False + + +def make_progress_bar() -> rich.progress.Progress: + # The default columns don't change to elapsed time when finished. + columns = [ + rich.progress.TextColumn("[progress.description]{task.description}"), + rich.progress.BarColumn(), + rich.progress.TaskProgressColumn(), + rich.progress.TimeRemainingColumn(elapsed_when_finished=True), + ] + return rich.progress.Progress(*columns) diff --git a/cumulus_etl/common.py b/cumulus_etl/common.py index 21d08d71..8b1ca457 100644 --- a/cumulus_etl/common.py +++ b/cumulus_etl/common.py @@ -209,13 +209,14 @@ def warn_mode(): _first_header = True -def print_header(name: str) -> None: +def print_header(name: str | None = None) -> None: """Prints a section break to the console, with a name for the user""" global _first_header if not _first_header: print("###############################################################") _first_header = False - print(name) + if name: + print(name) ############################################################################### diff --git a/cumulus_etl/deid/mstool.py b/cumulus_etl/deid/mstool.py index 22a931a3..e9df0310 100644 --- a/cumulus_etl/deid/mstool.py +++ b/cumulus_etl/deid/mstool.py @@ -5,10 +5,11 @@ """ import asyncio +import glob import os import sys -from cumulus_etl import common, errors +from cumulus_etl import cli_utils, common, errors MSTOOL_CMD = "Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool" @@ -23,7 +24,7 @@ async def run_mstool(input_dir: str, output_dir: str) -> None: The input must be in ndjson format. And the output will be as well. """ - common.print_header("De-identifying data...") + common.print_header() process = await asyncio.create_subprocess_exec( MSTOOL_CMD, @@ -34,10 +35,53 @@ async def run_mstool(input_dir: str, output_dir: str) -> None: stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) - _, stderr = await process.communicate() + + _, stderr = await _wait_for_completion(process, input_dir, output_dir) if process.returncode != 0: print( f"An error occurred while de-identifying the input resources:\n\n{stderr.decode('utf8')}", file=sys.stderr ) raise SystemExit(errors.MSTOOL_FAILED) + + +async def _wait_for_completion(process: asyncio.subprocess.Process, input_dir: str, output_dir: str) -> (str, str): + """Waits for the MS tool to finish, with a nice little progress bar, returns stdout and stderr""" + stdout, stderr = None, None + + with cli_utils.make_progress_bar() as progress: + task = progress.add_task("De-identifying data…", total=1) + target = _count_file_sizes(f"{input_dir}/*.ndjson") + + while process.returncode is None: + try: + # Wait for completion for a moment + stdout, stderr = await asyncio.wait_for(process.communicate(), 1) + except asyncio.TimeoutError: + # MS tool isn't done yet, let's calculate percentage finished so far, + # by comparing full PHI and de-identified file sizes. + # They won't perfectly match up (de-id should be smaller), but it's something. + current = _count_file_sizes(f"{output_dir}/*") + percentage = _compare_file_sizes(target, current) + progress.update(task, completed=percentage) + + progress.update(task, completed=1) + + return stdout, stderr + + +def _compare_file_sizes(target: dict[str, int], current: dict[str, int]) -> float: + """Gives one percentage for how far toward the target file sizes we are currently""" + total_expected = sum(target.values()) + total_current = 0 + for filename, size in current.items(): + if filename in target: + total_current += target[filename] # use target size, because current (de-identified) files will be smaller + else: # an in-progress file is being written out + total_current += size + return total_current / total_expected + + +def _count_file_sizes(pattern: str) -> dict[str, int]: + """Returns all files that match the given pattern and their sizes""" + return {os.path.basename(filename): os.path.getsize(filename) for filename in glob.glob(pattern)} diff --git a/cumulus_etl/etl/convert/cli.py b/cumulus_etl/etl/convert/cli.py index 0a6dd58e..0501aa76 100644 --- a/cumulus_etl/etl/convert/cli.py +++ b/cumulus_etl/etl/convert/cli.py @@ -15,17 +15,6 @@ from cumulus_etl.etl import tasks -def make_progress_bar() -> rich.progress.Progress: - # The default columns don't change to elapsed time when finished. - columns = [ - rich.progress.TextColumn("[progress.description]{task.description}"), - rich.progress.BarColumn(), - rich.progress.TaskProgressColumn(), - rich.progress.TimeRemainingColumn(elapsed_when_finished=True), - ] - return rich.progress.Progress(*columns) - - def convert_task_table( task: type[tasks.EtlTask], table: tasks.OutputTable, @@ -86,7 +75,7 @@ def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None: def walk_tree(input_root: store.Root, output_root: store.Root, formatter_class: type[formats.Format]) -> None: all_tasks = tasks.get_all_tasks() - with make_progress_bar() as progress: + with cli_utils.make_progress_bar() as progress: for task in all_tasks: for table in task.outputs: convert_task_table(task, table, input_root, output_root, formatter_class, progress)