Skip to content

Commit

Permalink
feat: skip MS tool de-id for solo NLP task runs
Browse files Browse the repository at this point in the history
If an ETL run is only for an NLP task, the MS tool de-identification
is unnecessary. The NLP task only pulls out certain bits of the
incoming resources and still does the standard ID scrubbing itself.

So it's just wasted time, especially since NLP tasks are frequently
run by themselves as a separate GPU run.
  • Loading branch information
mikix committed Aug 15, 2023
1 parent ec534c7 commit a6cc336
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 19 deletions.
29 changes: 10 additions & 19 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import shutil
import sys
import tempfile
from collections.abc import Iterable

import rich
Expand All @@ -24,21 +23,6 @@
###############################################################################


async def load_and_deidentify(loader: loaders.Loader, resources: Iterable[str]) -> tempfile.TemporaryDirectory:
"""
Loads the input directory and does a first-pass de-identification
Code outside this method should never see the original input files.
:returns: a temporary directory holding the de-identified files in FHIR ndjson format
"""
# First step is loading all the data into a local ndjson format
loaded_dir = await loader.load_all(list(resources))

# Second step is de-identifying that data (at a bulk level)
return await deid.Scrubber.scrub_bulk_data(loaded_dir.name)


async def etl_job(
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
) -> list[JobSummary]:
Expand Down Expand Up @@ -238,13 +222,20 @@ async def etl_main(args: argparse.Namespace) -> None:
root_input, client=client, export_to=args.export_to, since=args.since, until=args.until
)

# Pull down resources and run the MS tool on them
deid_dir = await load_and_deidentify(config_loader, required_resources)
# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
else:
print("Skipping bulk de-identification.")
print("These selected tasks will de-identify resources as they are processed.")

# Prepare config for jobs
config = JobConfig(
args.dir_input,
deid_dir.name,
loaded_dir.name,
args.dir_output,
args.dir_phi,
args.input_format,
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/studies/covid_symptom/covid_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class CovidSymptomNlpResultsTask(tasks.EtlTask):
name = "covid_symptom__nlp_results"
resource = "DocumentReference"
tags = {"covid_symptom", "gpu"}
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None, group_field="docref_id")]

def __init__(self, *args, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/studies/hftest/hf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class HuggingFaceTestTask(tasks.EtlTask):

name = "hftest__summary"
resource = "DocumentReference"
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None)]

# Task Version
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class EtlTask:
name: str = None # task & table name
resource: str = None # incoming resource that this task operates on (will be included in bulk exports etc)
tags: set[str] = []
needs_bulk_deid = True # whether this task needs bulk MS tool de-id run on its inputs (NLP tasks usually don't)

outputs: list[OutputTable] = [OutputTable()]

Expand Down
15 changes: 15 additions & 0 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
from unittest import mock

import ddt
import pytest
from ctakesclient.typesystem import Polarity

Expand Down Expand Up @@ -88,13 +89,27 @@ def assert_output_equal(self, folder: str):
self.assert_etl_output_equal(os.path.join(self.data_dir, folder), self.output_path)


@ddt.ddt
class TestEtlJobFlow(BaseEtlSimple):
"""Test case for the sequence of data through the system"""

async def test_batched_output(self):
await self.run_etl(batch_size=1)
self.assert_output_equal("batched-output")

@ddt.data(
(["covid_symptom__nlp_results"], False),
(["patient"], True),
(["covid_symptom__nlp_results", "patient"], True),
)
@ddt.unpack
async def test_ms_deid_skipped_if_not_needed(self, tasks: list[str], expected_ms_deid: bool):
with self.assertRaises(SystemExit):
with mock.patch("cumulus_etl.deid.Scrubber.scrub_bulk_data") as mock_deid:
with mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit):
await self.run_etl(tasks=tasks)
self.assertEqual(1 if expected_ms_deid else 0, mock_deid.call_count)

async def test_downloaded_phi_is_not_kept(self):
"""Verify we remove all downloaded PHI even if interrupted"""
internal_phi_dir = None
Expand Down

0 comments on commit a6cc336

Please sign in to comment.