Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip MS tool de-id for solo NLP task runs #264

Merged
merged 1 commit into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import shutil
import sys
import tempfile
from collections.abc import Iterable

import rich
Expand All @@ -24,21 +23,6 @@
###############################################################################


async def load_and_deidentify(loader: loaders.Loader, resources: Iterable[str]) -> tempfile.TemporaryDirectory:
"""
Loads the input directory and does a first-pass de-identification

Code outside this method should never see the original input files.

:returns: a temporary directory holding the de-identified files in FHIR ndjson format
"""
# First step is loading all the data into a local ndjson format
loaded_dir = await loader.load_all(list(resources))

# Second step is de-identifying that data (at a bulk level)
return await deid.Scrubber.scrub_bulk_data(loaded_dir.name)


async def etl_job(
config: JobConfig, selected_tasks: list[type[tasks.EtlTask]], use_philter: bool = False
) -> list[JobSummary]:
Expand Down Expand Up @@ -238,13 +222,20 @@ async def etl_main(args: argparse.Namespace) -> None:
root_input, client=client, export_to=args.export_to, since=args.since, until=args.until
)

# Pull down resources and run the MS tool on them
deid_dir = await load_and_deidentify(config_loader, required_resources)
# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
else:
print("Skipping bulk de-identification.")
print("These selected tasks will de-identify resources as they are processed.")

# Prepare config for jobs
config = JobConfig(
args.dir_input,
deid_dir.name,
loaded_dir.name,
args.dir_output,
args.dir_phi,
args.input_format,
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/studies/covid_symptom/covid_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class CovidSymptomNlpResultsTask(tasks.EtlTask):
name = "covid_symptom__nlp_results"
resource = "DocumentReference"
tags = {"covid_symptom", "gpu"}
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None, group_field="docref_id")]

def __init__(self, *args, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/studies/hftest/hf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class HuggingFaceTestTask(tasks.EtlTask):

name = "hftest__summary"
resource = "DocumentReference"
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None)]

# Task Version
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/etl/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class EtlTask:
name: str = None # task & table name
resource: str = None # incoming resource that this task operates on (will be included in bulk exports etc)
tags: set[str] = []
needs_bulk_deid = True # whether this task needs bulk MS tool de-id run on its inputs (NLP tasks usually don't)

outputs: list[OutputTable] = [OutputTable()]

Expand Down
15 changes: 15 additions & 0 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tempfile
from unittest import mock

import ddt
import pytest
from ctakesclient.typesystem import Polarity

Expand Down Expand Up @@ -88,13 +89,27 @@ def assert_output_equal(self, folder: str):
self.assert_etl_output_equal(os.path.join(self.data_dir, folder), self.output_path)


@ddt.ddt
class TestEtlJobFlow(BaseEtlSimple):
"""Test case for the sequence of data through the system"""

async def test_batched_output(self):
await self.run_etl(batch_size=1)
self.assert_output_equal("batched-output")

@ddt.data(
(["covid_symptom__nlp_results"], False),
(["patient"], True),
(["covid_symptom__nlp_results", "patient"], True),
)
@ddt.unpack
async def test_ms_deid_skipped_if_not_needed(self, tasks: list[str], expected_ms_deid: bool):
with self.assertRaises(SystemExit):
with mock.patch("cumulus_etl.deid.Scrubber.scrub_bulk_data") as mock_deid:
with mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit):
await self.run_etl(tasks=tasks)
self.assertEqual(1 if expected_ms_deid else 0, mock_deid.call_count)

async def test_downloaded_phi_is_not_kept(self):
"""Verify we remove all downloaded PHI even if interrupted"""
internal_phi_dir = None
Expand Down