Skip to content

Commit

Permalink
Merge pull request #248 from smart-on-fhir/mikix/deid-progress
Browse files Browse the repository at this point in the history
feat: show a progress bar during MS de-id
  • Loading branch information
mikix authored Jul 19, 2023
2 parents cf3feb2 + 393b5cb commit 57437ec
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 17 deletions.
13 changes: 13 additions & 0 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import time
import urllib.parse

import rich.progress

from cumulus_etl import errors, loaders


Expand Down Expand Up @@ -90,3 +92,14 @@ def is_url_available(url: str, retry: bool = True) -> bool:
time.sleep(3)

return False


def make_progress_bar() -> rich.progress.Progress:
# The default columns don't change to elapsed time when finished.
columns = [
rich.progress.TextColumn("[progress.description]{task.description}"),
rich.progress.BarColumn(),
rich.progress.TaskProgressColumn(),
rich.progress.TimeRemainingColumn(elapsed_when_finished=True),
]
return rich.progress.Progress(*columns)
5 changes: 3 additions & 2 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,14 @@ def warn_mode():
_first_header = True


def print_header(name: str) -> None:
def print_header(name: str | None = None) -> None:
"""Prints a section break to the console, with a name for the user"""
global _first_header
if not _first_header:
print("###############################################################")
_first_header = False
print(name)
if name:
print(name)


###############################################################################
Expand Down
50 changes: 47 additions & 3 deletions cumulus_etl/deid/mstool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"""

import asyncio
import glob
import os
import sys

from cumulus_etl import common, errors
from cumulus_etl import cli_utils, common, errors

MSTOOL_CMD = "Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool"

Expand All @@ -23,7 +24,7 @@ async def run_mstool(input_dir: str, output_dir: str) -> None:
The input must be in ndjson format. And the output will be as well.
"""
common.print_header("De-identifying data...")
common.print_header()

process = await asyncio.create_subprocess_exec(
MSTOOL_CMD,
Expand All @@ -34,10 +35,53 @@ async def run_mstool(input_dir: str, output_dir: str) -> None:
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await process.communicate()

_, stderr = await _wait_for_completion(process, input_dir, output_dir)

if process.returncode != 0:
print(
f"An error occurred while de-identifying the input resources:\n\n{stderr.decode('utf8')}", file=sys.stderr
)
raise SystemExit(errors.MSTOOL_FAILED)


async def _wait_for_completion(process: asyncio.subprocess.Process, input_dir: str, output_dir: str) -> (str, str):
"""Waits for the MS tool to finish, with a nice little progress bar, returns stdout and stderr"""
stdout, stderr = None, None

with cli_utils.make_progress_bar() as progress:
task = progress.add_task("De-identifying data…", total=1)
target = _count_file_sizes(f"{input_dir}/*.ndjson")

while process.returncode is None:
try:
# Wait for completion for a moment
stdout, stderr = await asyncio.wait_for(process.communicate(), 1)
except asyncio.TimeoutError:
# MS tool isn't done yet, let's calculate percentage finished so far,
# by comparing full PHI and de-identified file sizes.
# They won't perfectly match up (de-id should be smaller), but it's something.
current = _count_file_sizes(f"{output_dir}/*")
percentage = _compare_file_sizes(target, current)
progress.update(task, completed=percentage)

progress.update(task, completed=1)

return stdout, stderr


def _compare_file_sizes(target: dict[str, int], current: dict[str, int]) -> float:
"""Gives one percentage for how far toward the target file sizes we are currently"""
total_expected = sum(target.values())
total_current = 0
for filename, size in current.items():
if filename in target:
total_current += target[filename] # use target size, because current (de-identified) files will be smaller
else: # an in-progress file is being written out
total_current += size
return total_current / total_expected


def _count_file_sizes(pattern: str) -> dict[str, int]:
"""Returns all files that match the given pattern and their sizes"""
return {os.path.basename(filename): os.path.getsize(filename) for filename in glob.glob(pattern)}
13 changes: 1 addition & 12 deletions cumulus_etl/etl/convert/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@
from cumulus_etl.etl import tasks


def make_progress_bar() -> rich.progress.Progress:
# The default columns don't change to elapsed time when finished.
columns = [
rich.progress.TextColumn("[progress.description]{task.description}"),
rich.progress.BarColumn(),
rich.progress.TaskProgressColumn(),
rich.progress.TimeRemainingColumn(elapsed_when_finished=True),
]
return rich.progress.Progress(*columns)


def convert_task_table(
task: type[tasks.EtlTask],
table: tasks.OutputTable,
Expand Down Expand Up @@ -84,7 +73,7 @@ def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None:
def walk_tree(input_root: store.Root, output_root: store.Root, formatter_class: type[formats.Format]) -> None:
all_tasks = tasks.get_all_tasks()

with make_progress_bar() as progress:
with cli_utils.make_progress_bar() as progress:
for task in all_tasks:
for table in task.outputs:
convert_task_table(task, table, input_root, output_root, formatter_class, progress)
Expand Down

0 comments on commit 57437ec

Please sign in to comment.