Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: show a progress bar during MS de-id #248

Merged
merged 3 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cumulus_etl/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import time
import urllib.parse

import rich.progress

from cumulus_etl import errors, loaders


Expand Down Expand Up @@ -90,3 +92,14 @@ def is_url_available(url: str, retry: bool = True) -> bool:
time.sleep(3)

return False


def make_progress_bar() -> rich.progress.Progress:
# The default columns don't change to elapsed time when finished.
columns = [
rich.progress.TextColumn("[progress.description]{task.description}"),
rich.progress.BarColumn(),
rich.progress.TaskProgressColumn(),
rich.progress.TimeRemainingColumn(elapsed_when_finished=True),
]
return rich.progress.Progress(*columns)
5 changes: 3 additions & 2 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,14 @@ def warn_mode():
_first_header = True


def print_header(name: str) -> None:
def print_header(name: str | None = None) -> None:
"""Prints a section break to the console, with a name for the user"""
global _first_header
if not _first_header:
print("###############################################################")
_first_header = False
print(name)
if name:
print(name)


###############################################################################
Expand Down
50 changes: 47 additions & 3 deletions cumulus_etl/deid/mstool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"""

import asyncio
import glob
import os
import sys

from cumulus_etl import common, errors
from cumulus_etl import cli_utils, common, errors

MSTOOL_CMD = "Microsoft.Health.Fhir.Anonymizer.R4.CommandLineTool"

Expand All @@ -23,7 +24,7 @@ async def run_mstool(input_dir: str, output_dir: str) -> None:

The input must be in ndjson format. And the output will be as well.
"""
common.print_header("De-identifying data...")
common.print_header()

process = await asyncio.create_subprocess_exec(
MSTOOL_CMD,
Expand All @@ -34,10 +35,53 @@ async def run_mstool(input_dir: str, output_dir: str) -> None:
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await process.communicate()

_, stderr = await _wait_for_completion(process, input_dir, output_dir)

if process.returncode != 0:
print(
f"An error occurred while de-identifying the input resources:\n\n{stderr.decode('utf8')}", file=sys.stderr
)
raise SystemExit(errors.MSTOOL_FAILED)


async def _wait_for_completion(process: asyncio.subprocess.Process, input_dir: str, output_dir: str) -> (str, str):
"""Waits for the MS tool to finish, with a nice little progress bar, returns stdout and stderr"""
stdout, stderr = None, None

with cli_utils.make_progress_bar() as progress:
task = progress.add_task("De-identifying data…", total=1)
target = _count_file_sizes(f"{input_dir}/*.ndjson")

while process.returncode is None:
try:
# Wait for completion for a moment
stdout, stderr = await asyncio.wait_for(process.communicate(), 1)
except asyncio.TimeoutError:
# MS tool isn't done yet, let's calculate percentage finished so far,
# by comparing full PHI and de-identified file sizes.
# They won't perfectly match up (de-id should be smaller), but it's something.
current = _count_file_sizes(f"{output_dir}/*")
percentage = _compare_file_sizes(target, current)
progress.update(task, completed=percentage)

progress.update(task, completed=1)

return stdout, stderr


def _compare_file_sizes(target: dict[str, int], current: dict[str, int]) -> float:
"""Gives one percentage for how far toward the target file sizes we are currently"""
total_expected = sum(target.values())
total_current = 0
for filename, size in current.items():
if filename in target:
total_current += target[filename] # use target size, because current (de-identified) files will be smaller
else: # an in-progress file is being written out
total_current += size
return total_current / total_expected


def _count_file_sizes(pattern: str) -> dict[str, int]:
"""Returns all files that match the given pattern and their sizes"""
return {os.path.basename(filename): os.path.getsize(filename) for filename in glob.glob(pattern)}
13 changes: 1 addition & 12 deletions cumulus_etl/etl/convert/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,6 @@
from cumulus_etl.etl import tasks


def make_progress_bar() -> rich.progress.Progress:
# The default columns don't change to elapsed time when finished.
columns = [
rich.progress.TextColumn("[progress.description]{task.description}"),
rich.progress.BarColumn(),
rich.progress.TaskProgressColumn(),
rich.progress.TimeRemainingColumn(elapsed_when_finished=True),
]
return rich.progress.Progress(*columns)


def convert_task_table(
task: type[tasks.EtlTask],
table: tasks.OutputTable,
Expand Down Expand Up @@ -84,7 +73,7 @@ def copy_job_configs(input_root: store.Root, output_root: store.Root) -> None:
def walk_tree(input_root: store.Root, output_root: store.Root, formatter_class: type[formats.Format]) -> None:
all_tasks = tasks.get_all_tasks()

with make_progress_bar() as progress:
with cli_utils.make_progress_bar() as progress:
for task in all_tasks:
for table in task.outputs:
convert_task_table(task, table, input_root, output_root, formatter_class, progress)
Expand Down