diff --git a/compose.yaml b/compose.yaml index 72018470..819f678e 100644 --- a/compose.yaml +++ b/compose.yaml @@ -13,7 +13,8 @@ services: - AWS_DEFAULT_PROFILE - CUMULUS_HUGGING_FACE_URL=http://llama2:8086/ - URL_CTAKES_REST=http://ctakes-covid:8080/ctakes-web-rest/service/analyze - - URL_CNLP_NEGATION=http://cnlp-transformers:8000/negation/process + - URL_CNLP_NEGATION=http://cnlpt-negation:8000/negation/process + - URL_CNLP_TERM_EXISTS=http://cnlpt-term-exists:8000/termexists/process volumes: - $HOME/.aws/:/root/.aws/:ro - ctakes-overrides:/ctakes-overrides @@ -30,12 +31,13 @@ services: cumulus-etl-gpu: extends: cumulus-etl-base environment: - - URL_CNLP_NEGATION=http://cnlp-transformers-gpu:8000/negation/process + - URL_CNLP_NEGATION=http://cnlpt-negation-gpu:8000/negation/process + - URL_CNLP_TERM_EXISTS=http://cnlpt-term-exists-gpu:8000/termexists/process profiles: - etl-gpu ctakes-covid-base: - image: smartonfhir/ctakes-covid:1.1 + image: smartonfhir/ctakes-covid:1.1.0 environment: - ctakes_umlsuser=umls_api_key - ctakes_umlspw=$UMLS_API_KEY @@ -49,20 +51,39 @@ services: ctakes-covid: extends: ctakes-covid-base profiles: - - etl-support - - etl-support-gpu + - covid-symptom + - covid-symptom-gpu - cnlp-transformers: - image: smartonfhir/cnlp-transformers:negation-0.4-cpu + cnlpt-negation: + image: smartonfhir/cnlp-transformers:negation-0.6.1-cpu profiles: - - etl-support + - covid-symptom networks: - cumulus-etl - cnlp-transformers-gpu: - image: smartonfhir/cnlp-transformers:negation-0.4-gpu + cnlpt-negation-gpu: + image: smartonfhir/cnlp-transformers:negation-0.6.1-gpu profiles: - - etl-support-gpu + - covid-symptom-gpu + networks: + - cumulus-etl + deploy: + resources: + reservations: + devices: + - capabilities: [gpu] + + cnlpt-term-exists: + image: smartonfhir/cnlp-transformers:termexists-0.6.1-cpu + profiles: + - covid-symptom + networks: + - cumulus-etl + + cnlpt-term-exists-gpu: + image: smartonfhir/cnlp-transformers:termexists-0.6.1-gpu + profiles: + - covid-symptom-gpu networks: - cumulus-etl deploy: @@ -111,7 +132,7 @@ services: volumes: - ./:/cumulus-etl/ working_dir: /cumulus-etl - command: + command: - /cumulus-etl/tests/data/simple/ndjson-input - /cumulus-etl/example-output - /cumulus-etl/example-phi-build diff --git a/cumulus_etl/etl/studies/covid_symptom/__init__.py b/cumulus_etl/etl/studies/covid_symptom/__init__.py index 6e5b7948..ec307d2c 100644 --- a/cumulus_etl/etl/studies/covid_symptom/__init__.py +++ b/cumulus_etl/etl/studies/covid_symptom/__init__.py @@ -1,3 +1,3 @@ """The covid_symptom study""" -from .covid_tasks import CovidSymptomNlpResultsTask +from .covid_tasks import CovidSymptomNlpResultsTask, CovidSymptomNlpResultsTermExistsTask diff --git a/cumulus_etl/etl/studies/covid_symptom/covid_ctakes.py b/cumulus_etl/etl/studies/covid_symptom/covid_ctakes.py index 377ef088..d21ff2b9 100644 --- a/cumulus_etl/etl/studies/covid_symptom/covid_ctakes.py +++ b/cumulus_etl/etl/studies/covid_symptom/covid_ctakes.py @@ -4,15 +4,17 @@ import ctakesclient import httpx +from ctakesclient.transformer import TransformerModel from cumulus_etl import common, fhir, nlp, store async def covid_symptoms_extract( - client: fhir.FhirClient, cache: store.Root, docref: dict, + clinical_note: str, *, + polarity_model: TransformerModel, task_version: int, ctakes_http_client: httpx.AsyncClient = None, cnlp_http_client: httpx.AsyncClient = None, @@ -20,9 +22,10 @@ async def covid_symptoms_extract( """ Extract a list of Observations from NLP-detected symptoms in clinical notes - :param client: a client ready to talk to a FHIR server :param cache: Where to cache NLP results - :param docref: Clinical Note + :param docref: DocumentReference resource (scrubbed) + :param clinical_note: the clinical note already extracted from the docref + :param polarity_model: how to test the polarity of cTAKES responses :param task_version: version of task to inject into results :param ctakes_http_client: HTTPX client to use for the cTAKES server :param cnlp_http_client: HTTPX client to use for the cNLP transformer server @@ -37,22 +40,22 @@ async def covid_symptoms_extract( return None _, encounter_id = fhir.unref_resource(encounters[0]) - # Find the clinical note among the attachments - try: - clinical_note = await fhir.get_docref_note(client, docref) - except Exception as exc: # pylint: disable=broad-except - logging.warning("Error getting text for docref %s: %s", docref_id, exc) - return None - # cTAKES cache namespace history (and thus, cache invalidation history): # v1: original cTAKES processing # v2+: see CovidSymptomNlpResultsTask's version history ctakes_namespace = f"covid_symptom_v{task_version}" - # cNLP cache namespace history (and thus, cache invalidation history): - # v1: original addition of cNLP filtering - # v2: we started dropping non-covid symptoms, which changes the span ordering - cnlp_namespace = f"{ctakes_namespace}-cnlp_v2" + match polarity_model: + case TransformerModel.NEGATION: # original + # cNLP cache namespace history (and thus, cache invalidation history): + # v1: original addition of cNLP filtering + # v2: we started dropping non-covid symptoms, which changes the span ordering + cnlp_namespace = f"{ctakes_namespace}-cnlp_v2" + case TransformerModel.TERM_EXISTS: + cnlp_namespace = f"{ctakes_namespace}-cnlp_term_exists_v1" + case _: + logging.warning("Unknown polarity method: %s", polarity_model.value) + return None timestamp = common.datetime_now().isoformat() @@ -76,9 +79,11 @@ def is_covid_match(m: ctakesclient.typesystem.MatchText): # there too. We have found this to yield better results than cTAKES alone. try: spans = ctakes_json.list_spans(matches) - polarities_cnlp = await nlp.list_polarity(cache, cnlp_namespace, clinical_note, spans, client=cnlp_http_client) + polarities_cnlp = await nlp.list_polarity( + cache, cnlp_namespace, clinical_note, spans, model=polarity_model, client=cnlp_http_client + ) except Exception as exc: # pylint: disable=broad-except - logging.warning("Could not check negation for docref %s (%s): %s", docref_id, type(exc).__name__, exc) + logging.warning("Could not check polarity for docref %s (%s): %s", docref_id, type(exc).__name__, exc) return None # Now filter out any non-positive matches diff --git a/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py b/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py index 95db4791..b4081934 100644 --- a/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py +++ b/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py @@ -1,14 +1,13 @@ """Define tasks for the covid_symptom study""" -import copy import itertools -import os import ctakesclient import pyarrow import rich.progress +from ctakesclient.transformer import TransformerModel -from cumulus_etl import common, formats, nlp, store +from cumulus_etl import formats, nlp, store from cumulus_etl.etl import tasks from cumulus_etl.etl.studies.covid_symptom import covid_ctakes @@ -58,42 +57,13 @@ def is_ed_docref(docref): return any(is_ed_coding(x) for x in codings) -class CovidSymptomNlpResultsTask(tasks.EtlTask): - """Covid Symptom study task, to generate symptom lists from ED notes using NLP""" +class BaseCovidSymptomNlpResultsTask(tasks.BaseNlpTask): + """Covid Symptom study task, to generate symptom lists from ED notes using cTAKES + a polarity check""" - name = "covid_symptom__nlp_results" - resource = "DocumentReference" - tags = {"covid_symptom", "gpu"} - needs_bulk_deid = False - outputs = [tasks.OutputTable(schema=None, group_field="docref_id")] - - # Task Version - # The "task_version" field is a simple integer that gets incremented any time an NLP-relevant parameter is changed. - # This is a reference to a bundle of metadata (cTAKES version, cNLP version, ICD10 code list). - # We could combine all that info into a field we save with the results. But it's more human-friendly to have a - # simple version to refer to. So anytime these properties get changed, bump the version and record the old bundle - # of metadata too. - task_version = 2 - - # Task Version History: - # ** 2 (2023-08): Corrected the cache location (version 1 results might be using stale cache) ** - # cTAKES: smartonfhir/ctakes-covid:1.1 - # cNLP: smartonfhir/cnlp-transformers:negation-0.4 - # ctakesclient: 5.0 - # - # ** 1 (2023-08): Updated ICD10 codes from ctakesclient ** - # cTAKES: smartonfhir/ctakes-covid:1.1 - # cNLP: smartonfhir/cnlp-transformers:negation-0.4 - # ctakesclient: 5.0 - # - # ** null (before we added a task version) ** - # cTAKES: smartonfhir/ctakes-covid:1.1 - # cNLP: smartonfhir/cnlp-transformers:negation-0.4 - # ctakesclient: 3.0 + # Subclasses: set name, tags, task_version, and polarity_model yourself + polarity_model = None - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.seen_docrefs = set() + outputs = [tasks.OutputTable(schema=None, group_field="docref_id")] async def prepare_task(self) -> bool: bsv_path = ctakesclient.filesystem.covid_symptoms_path() @@ -103,15 +73,6 @@ async def prepare_task(self) -> bool: self.summaries[0].had_errors = True return success - def add_error(self, docref: dict) -> None: - if not self.task_config.dir_errors: - return - - error_root = store.Root(os.path.join(self.task_config.dir_errors, self.name), create=True) - error_path = error_root.joinpath("nlp-errors.ndjson") - with common.NdjsonWriter(error_path, "a") as writer: - writer.write(docref) - async def read_entries(self, *, progress: rich.progress.Progress = None) -> tasks.EntryIterator: """Passes clinical notes through NLP and returns any symptoms found""" phi_root = store.Root(self.task_config.dir_phi, create=True) @@ -119,29 +80,17 @@ async def read_entries(self, *, progress: rich.progress.Progress = None) -> task # one client for both NLP services for now -- no parallel requests yet, so no need to be fancy http_client = nlp.ctakes_httpx_client() - for docref in self.read_ndjson(progress=progress): - if not nlp.is_docref_valid(docref): - continue - - # Check that the note is one of our special allow-listed types (we do this here rather than on the output - # side to save needing to run everything through NLP). - if not is_ed_docref(docref): - continue - - orig_docref = copy.deepcopy(docref) - if not self.scrubber.scrub_resource(docref, scrub_attachments=False): - continue - + async for orig_docref, docref, clinical_note in self.read_notes(progress=progress, doc_check=is_ed_docref): symptoms = await covid_ctakes.covid_symptoms_extract( - self.task_config.client, phi_root, docref, + clinical_note, + polarity_model=self.polarity_model, task_version=self.task_version, ctakes_http_client=http_client, cnlp_http_client=http_client, ) if symptoms is None: - self.summaries[0].had_errors = True self.add_error(orig_docref) continue @@ -159,11 +108,6 @@ async def read_entries(self, *, progress: rich.progress.Progress = None) -> task # The Format class will replace all existing symptoms from this note at once (because we set group_field). yield symptoms - def pop_current_group_values(self, table_index: int) -> set[str]: - values = self.seen_docrefs - self.seen_docrefs = set() - return values - @classmethod def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Schema: return pyarrow.schema( @@ -201,3 +145,51 @@ def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Sche ), ] ) + + +class CovidSymptomNlpResultsTask(BaseCovidSymptomNlpResultsTask): + """Covid Symptom study task, to generate symptom lists from ED notes using cTAKES and cnlpt negation""" + + name = "covid_symptom__nlp_results" + tags = {"covid_symptom", "gpu"} + polarity_model = TransformerModel.NEGATION + + task_version = 3 + # Task Version History: + # ** 3 (2023-09): Updated to cnlpt version 0.6.1 ** + # cTAKES: smartonfhir/ctakes-covid:1.1.0 + # cNLP: smartonfhir/cnlp-transformers:negation-0.6.1 + # ctakesclient: 5.0 + # + # ** 2 (2023-08): Corrected the cache location (version 1 results might be using stale cache) ** + # cTAKES: smartonfhir/ctakes-covid:1.1.0 + # cNLP: smartonfhir/cnlp-transformers:negation-0.4.0 + # ctakesclient: 5.0 + # + # ** 1 (2023-08): Updated ICD10 codes from ctakesclient ** + # cTAKES: smartonfhir/ctakes-covid:1.1.0 + # cNLP: smartonfhir/cnlp-transformers:negation-0.4.0 + # ctakesclient: 5.0 + # + # ** null (before we added a task version) ** + # cTAKES: smartonfhir/ctakes-covid:1.1.0 + # cNLP: smartonfhir/cnlp-transformers:negation-0.4.0 + # ctakesclient: 3.0 + + +class CovidSymptomNlpResultsTermExistsTask(BaseCovidSymptomNlpResultsTask): + """Covid Symptom study task, to generate symptom lists from ED notes using cTAKES and cnlpt termexists""" + + name = "covid_symptom__nlp_results_term_exists" + polarity_model = TransformerModel.TERM_EXISTS + + # Explicitly don't use any tags because this is really a "hidden" task that is mostly for comparing + # polarity model performance more than running a study. So we don't want it to be accidentally run. + tags = {} + + task_version = 1 + # Task Version History: + # ** 1 (2023-09): First version ** + # cTAKES: smartonfhir/ctakes-covid:1.1.0 + # cNLP: smartonfhir/cnlp-transformers:termexists-0.6.1 + # ctakesclient: 5.0 diff --git a/cumulus_etl/etl/studies/hftest/hf_tasks.py b/cumulus_etl/etl/studies/hftest/hf_tasks.py index a64f1386..588879f0 100644 --- a/cumulus_etl/etl/studies/hftest/hf_tasks.py +++ b/cumulus_etl/etl/studies/hftest/hf_tasks.py @@ -6,26 +6,16 @@ import pyarrow import rich.progress -from cumulus_etl import common, fhir, formats, nlp +from cumulus_etl import common, formats, nlp from cumulus_etl.etl import tasks -class HuggingFaceTestTask(tasks.EtlTask): +class HuggingFaceTestTask(tasks.BaseNlpTask): """Hugging Face Test study task, to generate a summary from text""" name = "hftest__summary" - resource = "DocumentReference" - needs_bulk_deid = False - outputs = [tasks.OutputTable(schema=None)] - - # Task Version - # The "task_version" field is a simple integer that gets incremented any time an NLP-relevant parameter is changed. - # This is a reference to a bundle of metadata (model revision, container revision, prompt string). - # We could combine all that info into a field we save with the results. But it's more human-friendly to have a - # simple version to refer to. So anytime these properties get changed, bump the version and record the old bundle - # of metadata too. Also update the safety checks in prepare_task() - task_version = 0 + task_version = 0 # Task Version History: # ** 0 ** # This is fluid until we actually promote this to a real task - feel free to update without bumping the version. @@ -66,18 +56,7 @@ async def read_entries(self, *, progress: rich.progress.Progress = None) -> task """Passes clinical notes through HF and returns any symptoms found""" http_client = httpx.AsyncClient(timeout=300) - for docref in self.read_ndjson(progress=progress): - can_process = nlp.is_docref_valid(docref) and self.scrubber.scrub_resource(docref, scrub_attachments=False) - if not can_process: - continue - - try: - clinical_note = await fhir.get_docref_note(self.task_config.client, docref) - except Exception as exc: # pylint: disable=broad-except - logging.warning("Error getting text for docref %s: %s", docref["id"], exc) - self.summaries[0].had_errors = True - continue - + async for _, docref, clinical_note in self.read_notes(progress=progress): timestamp = common.datetime_now().isoformat() # If you change this prompt, consider updating task_version. diff --git a/cumulus_etl/etl/tasks/__init__.py b/cumulus_etl/etl/tasks/__init__.py index acbb894f..601a39fa 100644 --- a/cumulus_etl/etl/tasks/__init__.py +++ b/cumulus_etl/etl/tasks/__init__.py @@ -1,4 +1,7 @@ """Task support for the ETL workflow""" from .base import EntryIterator, EtlTask, OutputTable +from .nlp_task import BaseNlpTask + +# Import this last because importing specific tasks will want the above classes to be available from .factory import get_all_tasks, get_default_tasks, get_selected_tasks diff --git a/cumulus_etl/etl/tasks/factory.py b/cumulus_etl/etl/tasks/factory.py index 13cab199..97d28a43 100644 --- a/cumulus_etl/etl/tasks/factory.py +++ b/cumulus_etl/etl/tasks/factory.py @@ -29,6 +29,8 @@ def get_all_tasks() -> list[type[AnyTask]]: # Right now, just hard-code these. One day we might allow plugins or something similarly dynamic. # Note: tasks will be run in the order listed here. return get_default_tasks() + [ + covid_symptom.CovidSymptomNlpResultsTask, + covid_symptom.CovidSymptomNlpResultsTermExistsTask, hftest.HuggingFaceTestTask, ] @@ -52,7 +54,6 @@ def get_default_tasks() -> list[type[AnyTask]]: ObservationTask, ProcedureTask, ServiceRequestTask, - covid_symptom.CovidSymptomNlpResultsTask, # TODO: remove from default list at some point ] diff --git a/cumulus_etl/etl/tasks/nlp_task.py b/cumulus_etl/etl/tasks/nlp_task.py new file mode 100644 index 00000000..f340db05 --- /dev/null +++ b/cumulus_etl/etl/tasks/nlp_task.py @@ -0,0 +1,83 @@ +"""Base NLP task support""" + +import copy +import logging +import os +from typing import Callable + +import rich.progress + +from cumulus_etl import common, fhir, nlp, store +from cumulus_etl.etl.tasks.base import EtlTask, OutputTable + + +class BaseNlpTask(EtlTask): + """Base class for any clinical-notes-based NLP task.""" + + resource = "DocumentReference" + needs_bulk_deid = False + + # You may want to override these in your subclass + outputs = [OutputTable(schema=None)] # maybe a group_field? (remember to call self.seen_docrefs.add() if so) + tags = {"gpu"} # maybe a study identifier? + + # Task Version + # The "task_version" field is a simple integer that gets incremented any time an NLP-relevant parameter is changed. + # This is a reference to a bundle of metadata (model revision, container revision, prompt string). + # We could combine all that info into a field we save with the results. But it's more human-friendly to have a + # simple version to refer to. + # + # CONSIDERATIONS WHEN CHANGING THIS: + # - Record the new bundle of metadata in your class documentation + # - Update any safety checks in prepare_task() or elsewhere that check the NLP versioning + # - Be aware that your caching will be reset + task_version = 1 + # Task Version History: + # ** 1 (20xx-xx): First version ** + # CHANGE ME + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.seen_docrefs = set() + + def pop_current_group_values(self, table_index: int) -> set[str]: + values = self.seen_docrefs + self.seen_docrefs = set() + return values + + def add_error(self, docref: dict) -> None: + self.summaries[0].had_errors = True + + if not self.task_config.dir_errors: + return + error_root = store.Root(os.path.join(self.task_config.dir_errors, self.name), create=True) + error_path = error_root.joinpath("nlp-errors.ndjson") + with common.NdjsonWriter(error_path, "a") as writer: + writer.write(docref) + + async def read_notes( + self, *, doc_check: Callable[[dict], bool] = None, progress: rich.progress.Progress = None + ) -> (dict, dict, str): + """ + Iterate through clinical notes. + + :returns: a tuple of original-docref, scrubbed-docref, and clinical note + """ + for docref in self.read_ndjson(progress=progress): + orig_docref = copy.deepcopy(docref) + can_process = ( + nlp.is_docref_valid(docref) + and (doc_check is None or doc_check(docref)) + and self.scrubber.scrub_resource(docref, scrub_attachments=False) + ) + if not can_process: + continue + + try: + clinical_note = await fhir.get_docref_note(self.task_config.client, docref) + except Exception as exc: # pylint: disable=broad-except + logging.warning("Error getting text for docref %s: %s", docref["id"], exc) + self.add_error(orig_docref) + continue + + yield orig_docref, docref, clinical_note diff --git a/cumulus_etl/nlp/extract.py b/cumulus_etl/nlp/extract.py index e267af72..08ae9ab8 100644 --- a/cumulus_etl/nlp/extract.py +++ b/cumulus_etl/nlp/extract.py @@ -5,6 +5,7 @@ import ctakesclient import httpx +from ctakesclient.transformer import TransformerModel from cumulus_etl import common, store @@ -42,6 +43,7 @@ async def list_polarity( sentence: str, spans: list[tuple], client: httpx.AsyncClient = None, + model: TransformerModel = TransformerModel.NEGATION, ) -> list[ctakesclient.typesystem.Polarity]: """ This is a version of ctakesclient.transformer.list_polarity() that also uses a cache @@ -57,7 +59,7 @@ async def list_polarity( try: result = [ctakesclient.typesystem.Polarity(x) for x in common.read_json(full_path)] except Exception: # pylint: disable=broad-except - result = await ctakesclient.transformer.list_polarity(sentence, spans, client=client) + result = await ctakesclient.transformer.list_polarity(sentence, spans, client=client, model=model) cache.makedirs(os.path.dirname(full_path)) common.write_json(full_path, [x.value for x in result]) diff --git a/docs/performance.md b/docs/performance.md index 6fe422b0..ce5dd05d 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -91,14 +91,14 @@ Cumulus ETL is deployed with Docker images. And because of the way Docker interacts with the GPU, we define a whole second set of profiles for GPU usage. Normally, you specify profile & image names in a couple places: -1. When starting the support tools (cTAKES etc):
-`docker compose --profile etl-support up` +1. When starting a study's support tools (cTAKES etc):
+`docker compose --profile covid-symptom up` 1. When running the ETL tool:
`docker compose run cumulus-etl` To work with the GPU version of Cumulus ETL, just add `-gpu` to each of those names wherever they appear in [instructions](sample-runs.md): -1. docker compose --profile etl-support-gpu up +1. docker compose --profile covid-symptom-gpu up 1. docker compose run cumulus-etl-gpu ### Cloud Access to a GPU diff --git a/docs/setup/sample-runs.md b/docs/setup/sample-runs.md index 2915a46a..67dcfdbd 100644 --- a/docs/setup/sample-runs.md +++ b/docs/setup/sample-runs.md @@ -112,7 +112,6 @@ Once you've done that, you'll need the UMLS key mentioned at the top of this doc to start the network (here we're setting the UMLS_API_KEY, which cTAKES requires): ```sh export UMLS_API_KEY=your-umls-api-key -docker compose -f $CUMULUS_REPO_PATH/compose.yaml --profile etl-support up -d ``` The compose file will handle the environment variable mapping and volume mounts for you. diff --git a/docs/studies/covid-symptom.md b/docs/studies/covid-symptom.md new file mode 100644 index 00000000..3de8d0c2 --- /dev/null +++ b/docs/studies/covid-symptom.md @@ -0,0 +1,50 @@ +--- +title: Covid Symptom +parent: Studies +grand_parent: ETL +nav_order: 1 +# audience: engineer familiar with the project +# type: howto +--- + +# The Covid Symptom Study + +This study uses NLP to identify symptoms of COVID-19 in clinical notes. +Specifically, it uses [cTAKES](https://ctakes.apache.org/) and +[cNLP transformers](https://github.com/Machine-Learning-for-Medical-Language/cnlp_transformers) +to identify clinical terms. + +## Preparation + +Because it uses external services like cTAKES, you will want to make sure those are ready. +From your git clone of the `cumulus-etl` repo, you can run the following to run those services: +```shell +docker compose --profile covid-symptom-gpu up -d +``` + +You'll notice the `-gpu` suffix there. +Running NLP is much, much faster with access to a GPU, +so we strongly recommend you run this on GPU-enabled hardware. + +And since we _are_ running the GPU profile, when you do run the ETL, +you'll want to launch the GPU mode instead of the default `cumulus-etl` CPU mode: +```shell +docker compose run cumulus-etl-gpu … +``` + +But if you can't use a GPU or you just want to test things out, +you can use `--profile covid-symptom` above and the normal `cumulus-etl` run line to use the CPU. + +## Tasks + +There is one main task, run with `--task covid_symptom__nlp_results`. + +This will need access to clinical notes, +which are pulled fresh from your EHR (since the ETL doesn't store clinical notes). +This means you will likely have to provide some other FHIR authentication arguments like +`--smart-client-id` and `--fhir-url`. +See `--help` for more authentication options. + +There is also a second optional task `--task covid_symptom__nlp_results_term_exists`, +which just uses a different polarity cNLP transformer (`termexists` rather than `negation`). +You likely don't need both, but they may be interesting to compare. diff --git a/docs/studies/index.md b/docs/studies/index.md new file mode 100644 index 00000000..c0126875 --- /dev/null +++ b/docs/studies/index.md @@ -0,0 +1,21 @@ +--- +title: Studies +parent: ETL +nav_order: 3 +has_children: true +# audience: engineer familiar with the project +# type: howto +--- + +# Study-specific Tasks + +In addition to the default basic-FHIR-oriented Cumulus ETL tasks like `condition`, +which simply strip identifying information and largely leaves the FHIR alone, +there are also more interesting study-oriented tasks. + +These tend to be NLP tasks that extract information from clinical notes. + +They aren't run by default, +but you can provide the ones you are interested in with the `--task` parameter. + +In this folder, you can read further explanations of how to run each built-in study. diff --git a/pyproject.toml b/pyproject.toml index 6294fe0c..827f44c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">= 3.10" # open-pinned dependencies so that we're more likely to notice new releases (we'll still have time # to fix any breakages since users won't immediately see the problem). dependencies = [ - "ctakesclient >= 5, < 6", + "ctakesclient >= 5.1, < 6", "delta-spark >= 2.3, < 3", "fhirclient < 5", "httpx < 1", @@ -20,7 +20,7 @@ dependencies = [ "label-studio-sdk < 1", "oracledb < 2", "philter-lite < 1", - "pyarrow < 13", + "pyarrow < 14", "rich < 14", "s3fs", ] diff --git a/tests/convert/test_convert_cli.py b/tests/convert/test_convert_cli.py index b00f20e2..a55f1f20 100644 --- a/tests/convert/test_convert_cli.py +++ b/tests/convert/test_convert_cli.py @@ -29,8 +29,9 @@ def setUp(self): def prepare_original_dir(self) -> str: """Returns the job timestamp used, for easier inspection""" - # Fill in original dir + # Fill in original dir, including a non-default output folder shutil.copytree(f"{self.datadir}/simple/output", self.original_path) + shutil.copytree(f"{self.datadir}/covid/term-exists", self.original_path, dirs_exist_ok=True) os.makedirs(f"{self.original_path}/ignored") # just to confirm we only copy what we understand job_timestamp = "2023-02-28__19.53.08" @@ -73,6 +74,7 @@ async def test_happy_path(self): # Test first conversion results expected_tables = {output.get_name(t) for t in tasks.get_default_tasks() for output in t.outputs} + expected_tables.add("covid_symptom__nlp_results_term_exists") # this was our non-default added table self.assertEqual(expected_tables | {"JobConfig"}, set(os.listdir(self.target_path))) self.assertEqual( {"test": True}, common.read_json(f"{self.target_path}/JobConfig/{job_timestamp}/job_config.json") @@ -85,8 +87,8 @@ async def test_happy_path(self): conditions = utils.read_delta_lake(f"{self.target_path}/condition") # and conditions self.assertEqual(2, len(conditions)) self.assertEqual("2010-03-02", conditions[0]["recordedDate"]) - symptoms = utils.read_delta_lake(f"{self.target_path}/covid_symptom__nlp_results") # and covid symptoms - self.assertEqual(4, len(symptoms)) + symptoms = utils.read_delta_lake(f"{self.target_path}/covid_symptom__nlp_results_term_exists") # and covid + self.assertEqual(2, len(symptoms)) self.assertEqual("for", symptoms[0]["match"]["text"]) # Now make a second small, partial output folder to layer into the existing Delta Lake @@ -123,14 +125,21 @@ async def test_batch_metadata(self, mock_write): f"{self.datadir}/simple/output/patient", f"{self.original_path}/patient", ) - shutil.copytree( # Then, one that does (from batched-output, to confirm we read each batch in turn) - f"{self.datadir}/simple/batched-output/covid_symptom__nlp_results", + shutil.copytree( # Then, one that does + f"{self.datadir}/covid/output/covid_symptom__nlp_results", f"{self.original_path}/covid_symptom__nlp_results", ) - common.write_json( # change metadata to reference nonexistent group, to confirm we do read from this file + # And make a second batch, to confirm we read each meta file + common.write_json( f"{self.original_path}/covid_symptom__nlp_results/covid_symptom__nlp_results.001.meta", + # Reference a group that doesn't exist to prove we are reading this file and not just pooling group_fields + # that we see in the data. {"groups": ["nonexistent"]}, ) + common.write_json( + f"{self.original_path}/covid_symptom__nlp_results/covid_symptom__nlp_results.001.ndjson", + {"id": "D1.0", "docref_id": "D1"}, + ) os.makedirs(f"{self.original_path}/JobConfig") # Run conversion @@ -138,11 +147,12 @@ async def test_batch_metadata(self, mock_write): # Test results self.assertEqual(3, mock_write.call_count) - self.assertEqual(set(), mock_write.call_args_list[0][0][0].groups) + self.assertEqual(set(), mock_write.call_args_list[0][0][0].groups) # patients self.assertEqual( { - "f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd", + "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d", + "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e", }, - mock_write.call_args_list[1][0][0].groups, + mock_write.call_args_list[1][0][0].groups, # first (actual) covid batch ) - self.assertEqual({"nonexistent"}, mock_write.call_args_list[2][0][0].groups) + self.assertEqual({"nonexistent"}, mock_write.call_args_list[2][0][0].groups) # second (faked) covid batch diff --git a/tests/covid_symptom/test_nlp_results.py b/tests/covid_symptom/test_nlp_results.py index 309c7af0..2aecef57 100644 --- a/tests/covid_symptom/test_nlp_results.py +++ b/tests/covid_symptom/test_nlp_results.py @@ -10,7 +10,7 @@ from tests.ctakesmock import CtakesMixin from tests import i2b2_mock_data -from tests.etl.test_tasks import TaskTestCase +from tests.etl import BaseEtlSimple, TaskTestCase @ddt.ddt @@ -186,3 +186,18 @@ async def test_group_values_noted(self): self.codebook.db.resource_hash("zero-symptoms"), # even without rows, it shows up in group list } self.assertEqual(expected_groups, second_batch.groups) + + +class TestCovidSymptomEtl(BaseEtlSimple): + """Tests the end-to-end ETL of covid symptom tasks.""" + + DATA_ROOT = "covid" + + async def test_basic_run(self): + await self.run_etl(tags=["covid_symptom"]) + self.assert_output_equal() + + async def test_term_exists_task(self): + # This one isn't even tagged for the study - we only want this upon request + await self.run_etl(tasks=["covid_symptom__nlp_results_term_exists"]) + self.assert_output_equal("term-exists") diff --git a/tests/ctakesmock.py b/tests/ctakesmock.py index e830a60b..b68cdbb5 100644 --- a/tests/ctakesmock.py +++ b/tests/ctakesmock.py @@ -12,6 +12,7 @@ from unittest import mock from ctakesclient import typesystem +from ctakesclient.transformer import TransformerModel class CtakesMixin(unittest.TestCase): @@ -233,7 +234,15 @@ def fake_ctakes_extract(sentence: str) -> typesystem.CtakesJSON: return typesystem.CtakesJSON(response) -async def fake_transformer_list_polarity(sentence: str, spans: list[tuple], client=None) -> list[typesystem.Polarity]: +async def fake_transformer_list_polarity( + sentence: str, spans: list[tuple], client=None, model=TransformerModel.NEGATION +) -> list[typesystem.Polarity]: """Simple always-positive fake response from cNLP.""" del sentence, client - return [typesystem.Polarity.pos] * len(spans) + + # To better detect which model is in use, ensure a small difference between them + if model == TransformerModel.TERM_EXISTS: + # First span is negative + return [typesystem.Polarity.neg] + [typesystem.Polarity.pos] * (len(spans) - 1) + else: + return [typesystem.Polarity.pos] * len(spans) diff --git a/tests/data/covid/codebook.json b/tests/data/covid/codebook.json new file mode 100644 index 00000000..92b91a01 --- /dev/null +++ b/tests/data/covid/codebook.json @@ -0,0 +1 @@ +{"version": 1, "id_salt": "4688a4853dafc6a3d6934f0dd02205be0700d2ca64b636127a4436494dcaf88e"} \ No newline at end of file diff --git a/tests/data/covid/input/DocumentReference.ndjson b/tests/data/covid/input/DocumentReference.ndjson new file mode 100644 index 00000000..2fbbb29a --- /dev/null +++ b/tests/data/covid/input/DocumentReference.ndjson @@ -0,0 +1,2 @@ +{"id":"43","content":[{"attachment":{"contentType":"text\/plain","data":"Tm90ZXMgZm9yIGZldmVy"}}],"context":{"encounter":[{"reference":"Encounter\/23"}],"period":{"end":"2021-06-24","start":"2021-06-23"}},"status":"current","subject":{"reference":"Patient\/334567"},"type":{"coding":[{"code":"NOTE:149798455","display":"Admission MD","system":"http://cumulus.smarthealthit.org/i2b2"}]},"resourceType":"DocumentReference"} +{"id":"44","content":[{"attachment":{"contentType":"text\/plain","data":"Tm90ZXMhIGZvciBmZXZlcg=="}}],"context":{"encounter":[{"reference":"Encounter\/25"}],"period":{"end":"2021-06-25","start":"2021-06-24"}},"status":"current","subject":{"reference":"Patient\/323456"},"type":{"coding":[{"code":"NOTE:149798455","display":"Admission MD","system":"http://cumulus.smarthealthit.org/i2b2"}]},"resourceType":"DocumentReference"} diff --git a/tests/data/covid/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta b/tests/data/covid/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta new file mode 100644 index 00000000..7d1797dd --- /dev/null +++ b/tests/data/covid/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta @@ -0,0 +1,6 @@ +{ + "groups": [ + "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d", + "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e" + ] +} \ No newline at end of file diff --git a/tests/data/covid/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson b/tests/data/covid/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson new file mode 100644 index 00000000..e2f5598b --- /dev/null +++ b/tests/data/covid/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson @@ -0,0 +1,4 @@ +{"id": "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d.0", "docref_id": "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d", "encounter_id": "b3d0707624491d8b71a808bd20b63625981af48f526b95214146de2a15f7dd43", "subject_id": "00680c7c0e2e1712e9c4a01eb5c6dfb8949871faef6337c5db204d19e1d9ca58", "generated_on": "2021-09-14T21:23:45+00:00", "task_version": 3, "match": {"begin": 6, "end": 9, "text": "for", "polarity": 0, "conceptAttributes": [{"code": "386661006", "cui": "C0015967", "codingScheme": "SNOMEDCT_US", "tui": "T184"}, {"code": "50177009", "cui": "C0015967", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} +{"id": "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d.1", "docref_id": "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d", "encounter_id": "b3d0707624491d8b71a808bd20b63625981af48f526b95214146de2a15f7dd43", "subject_id": "00680c7c0e2e1712e9c4a01eb5c6dfb8949871faef6337c5db204d19e1d9ca58", "generated_on": "2021-09-14T21:23:45+00:00", "task_version": 3, "match": {"begin": 6, "end": 9, "text": "for", "polarity": 0, "conceptAttributes": [{"code": "422587007", "cui": "C0027497", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} +{"id": "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e.0", "docref_id": "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e", "encounter_id": "58a65c6cc5693a507af44f25f062171898aa6bc469766956b2c802d39fc6d4a7", "subject_id": "84cc1e7381070fda74a80df28a29323101be3b2c26b4d604abf43946ab1759f6", "generated_on": "2021-09-14T21:23:45+00:00", "task_version": 3, "match": {"begin": 7, "end": 10, "text": "for", "polarity": 0, "conceptAttributes": [{"code": "386661006", "cui": "C0015967", "codingScheme": "SNOMEDCT_US", "tui": "T184"}, {"code": "50177009", "cui": "C0015967", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} +{"id": "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e.1", "docref_id": "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e", "encounter_id": "58a65c6cc5693a507af44f25f062171898aa6bc469766956b2c802d39fc6d4a7", "subject_id": "84cc1e7381070fda74a80df28a29323101be3b2c26b4d604abf43946ab1759f6", "generated_on": "2021-09-14T21:23:45+00:00", "task_version": 3, "match": {"begin": 7, "end": 10, "text": "for", "polarity": 0, "conceptAttributes": [{"code": "422587007", "cui": "C0027497", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} diff --git a/tests/data/covid/term-exists/covid_symptom__nlp_results_term_exists/covid_symptom__nlp_results_term_exists.000.meta b/tests/data/covid/term-exists/covid_symptom__nlp_results_term_exists/covid_symptom__nlp_results_term_exists.000.meta new file mode 100644 index 00000000..7d1797dd --- /dev/null +++ b/tests/data/covid/term-exists/covid_symptom__nlp_results_term_exists/covid_symptom__nlp_results_term_exists.000.meta @@ -0,0 +1,6 @@ +{ + "groups": [ + "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d", + "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e" + ] +} \ No newline at end of file diff --git a/tests/data/covid/term-exists/covid_symptom__nlp_results_term_exists/covid_symptom__nlp_results_term_exists.000.ndjson b/tests/data/covid/term-exists/covid_symptom__nlp_results_term_exists/covid_symptom__nlp_results_term_exists.000.ndjson new file mode 100644 index 00000000..08e63d32 --- /dev/null +++ b/tests/data/covid/term-exists/covid_symptom__nlp_results_term_exists/covid_symptom__nlp_results_term_exists.000.ndjson @@ -0,0 +1,2 @@ +{"id": "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d.1", "docref_id": "c31a3dbf188ed241b2c06b2475cd56159017fa1df1ea882d3fc4beab860fc24d", "encounter_id": "b3d0707624491d8b71a808bd20b63625981af48f526b95214146de2a15f7dd43", "subject_id": "00680c7c0e2e1712e9c4a01eb5c6dfb8949871faef6337c5db204d19e1d9ca58", "generated_on": "2021-09-14T21:23:45+00:00", "task_version": 1, "match": {"begin": 6, "end": 9, "text": "for", "polarity": 0, "conceptAttributes": [{"code": "422587007", "cui": "C0027497", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} +{"id": "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e.1", "docref_id": "eb30741bbb9395fc3da72d02fd29b96e2e4c0c2592c3ae997d80bf522c80070e", "encounter_id": "58a65c6cc5693a507af44f25f062171898aa6bc469766956b2c802d39fc6d4a7", "subject_id": "84cc1e7381070fda74a80df28a29323101be3b2c26b4d604abf43946ab1759f6", "generated_on": "2021-09-14T21:23:45+00:00", "task_version": 1, "match": {"begin": 7, "end": 10, "text": "for", "polarity": 0, "conceptAttributes": [{"code": "422587007", "cui": "C0027497", "codingScheme": "SNOMEDCT_US", "tui": "T184"}], "type": "SignSymptomMention"}} diff --git a/tests/data/i2b2/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta b/tests/data/i2b2/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta deleted file mode 100644 index 7c452c8e..00000000 --- a/tests/data/i2b2/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta +++ /dev/null @@ -1,6 +0,0 @@ -{ - "groups": [ - "228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0", - "dfc45702900136d5fb09b8737853f5c727132882bd6ba0871942685c0b1df588" - ] -} diff --git a/tests/data/i2b2/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson b/tests/data/i2b2/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson deleted file mode 100644 index 289d2a0c..00000000 --- a/tests/data/i2b2/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson +++ /dev/null @@ -1,4 +0,0 @@ -{"id":"228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0.0","docref_id":"228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0","encounter_id":"5388b42b262276bfbcb659b1ff937b0e3e5b0ec8901ed3ad53fa387fd6f2589f","subject_id":"26f4d6d38eaa3347b8bd22bb4bc66ecbff5384926152738d282e841a247bfefb","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":6,"end":9,"text":"for","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0.1","docref_id":"228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0","encounter_id":"5388b42b262276bfbcb659b1ff937b0e3e5b0ec8901ed3ad53fa387fd6f2589f","subject_id":"26f4d6d38eaa3347b8bd22bb4bc66ecbff5384926152738d282e841a247bfefb","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":6,"end":9,"text":"for","polarity":0,"conceptAttributes":[{"code":"422587007","cui":"C0027497","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"dfc45702900136d5fb09b8737853f5c727132882bd6ba0871942685c0b1df588.0","docref_id":"dfc45702900136d5fb09b8737853f5c727132882bd6ba0871942685c0b1df588","encounter_id":"fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687","subject_id":"49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":7,"end":10,"text":"for","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"dfc45702900136d5fb09b8737853f5c727132882bd6ba0871942685c0b1df588.1","docref_id":"dfc45702900136d5fb09b8737853f5c727132882bd6ba0871942685c0b1df588","encounter_id":"fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687","subject_id":"49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":7,"end":10,"text":"for","polarity":0,"conceptAttributes":[{"code":"422587007","cui":"C0027497","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} diff --git a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta b/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta deleted file mode 100644 index beafb7b5..00000000 --- a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta +++ /dev/null @@ -1,5 +0,0 @@ -{ - "groups": [ - "f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd" - ] -} diff --git a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson b/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson deleted file mode 100644 index ce565cd8..00000000 --- a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson +++ /dev/null @@ -1,2 +0,0 @@ -{"id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd.0","docref_id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd","encounter_id":"d30aad4b-4503-8e22-0bc4-621b94398520","subject_id":"118dc10e-7745-20d7-e98d-7c358a84c15c","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":6,"end":9,"text":"for","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd.1","docref_id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd","encounter_id":"d30aad4b-4503-8e22-0bc4-621b94398520","subject_id":"118dc10e-7745-20d7-e98d-7c358a84c15c","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":6,"end":9,"text":"for","polarity":0,"conceptAttributes":[{"code":"422587007","cui":"C0027497","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} diff --git a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.001.meta b/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.001.meta deleted file mode 100644 index 7a55e17a..00000000 --- a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.001.meta +++ /dev/null @@ -1,5 +0,0 @@ -{ - "groups": [ - "c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971" - ] -} diff --git a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.001.ndjson b/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.001.ndjson deleted file mode 100644 index 3b014925..00000000 --- a/tests/data/simple/batched-output/covid_symptom__nlp_results/covid_symptom__nlp_results.001.ndjson +++ /dev/null @@ -1,2 +0,0 @@ -{"id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971.0","docref_id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971","encounter_id":"af1e6186-3f9a-1fa9-3c73-cfa56c84a056","subject_id":"1de9ea66-70d3-da1f-c735-df5ef7697fb9","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":7,"end":10,"text":"for","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971.1","docref_id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971","encounter_id":"af1e6186-3f9a-1fa9-3c73-cfa56c84a056","subject_id":"1de9ea66-70d3-da1f-c735-df5ef7697fb9","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":7,"end":10,"text":"for","polarity":0,"conceptAttributes":[{"code":"422587007","cui":"C0027497","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} diff --git a/tests/data/simple/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta b/tests/data/simple/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta deleted file mode 100644 index bb7a31b0..00000000 --- a/tests/data/simple/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta +++ /dev/null @@ -1,6 +0,0 @@ -{ - "groups": [ - "c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971", - "f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd" - ] -} diff --git a/tests/data/simple/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson b/tests/data/simple/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson deleted file mode 100644 index 0d9514a0..00000000 --- a/tests/data/simple/output/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson +++ /dev/null @@ -1,4 +0,0 @@ -{"id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd.0","docref_id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd","encounter_id":"d30aad4b-4503-8e22-0bc4-621b94398520","subject_id":"118dc10e-7745-20d7-e98d-7c358a84c15c","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":6,"end":9,"text":"for","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd.1","docref_id":"f29736c29af5b962b3947fd40bed6b8c3e97c642b72aaa08e082fec05148e7dd","encounter_id":"d30aad4b-4503-8e22-0bc4-621b94398520","subject_id":"118dc10e-7745-20d7-e98d-7c358a84c15c","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":6,"end":9,"text":"for","polarity":0,"conceptAttributes":[{"code":"422587007","cui":"C0027497","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971.0","docref_id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971","encounter_id":"af1e6186-3f9a-1fa9-3c73-cfa56c84a056","subject_id":"1de9ea66-70d3-da1f-c735-df5ef7697fb9","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":7,"end":10,"text":"for","polarity":0,"conceptAttributes":[{"code":"386661006","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"},{"code":"50177009","cui":"C0015967","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} -{"id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971.1","docref_id":"c601849ceffe49dba22ee952533ac87928cd7a472dee6d0390d53c9130519971","encounter_id":"af1e6186-3f9a-1fa9-3c73-cfa56c84a056","subject_id":"1de9ea66-70d3-da1f-c735-df5ef7697fb9","generated_on":"2021-09-14T21:23:45+00:00","task_version":2,"match":{"begin":7,"end":10,"text":"for","polarity":0,"conceptAttributes":[{"code":"422587007","cui":"C0027497","codingScheme":"SNOMEDCT_US","tui":"T184"}],"type":"SignSymptomMention"}} diff --git a/tests/etl/__init__.py b/tests/etl/__init__.py index e69de29b..e75f0d26 100644 --- a/tests/etl/__init__.py +++ b/tests/etl/__init__.py @@ -0,0 +1,3 @@ +"""Support code for ETL-based test cases.""" + +from .base import BaseEtlSimple, TaskTestCase diff --git a/tests/etl/base.py b/tests/etl/base.py new file mode 100644 index 00000000..71171f88 --- /dev/null +++ b/tests/etl/base.py @@ -0,0 +1,154 @@ +"""Base classes for ETL-oriented tests""" + +import os +import shutil +import tempfile +from unittest import mock + +import pytest + +from cumulus_etl import cli, common, deid, fhir +from cumulus_etl.etl.config import JobConfig +from tests import ctakesmock, utils + + +@pytest.mark.skipif(not shutil.which(deid.MSTOOL_CMD), reason="MS tool not installed") +class BaseEtlSimple(ctakesmock.CtakesMixin, utils.TreeCompareMixin, utils.AsyncTestCase): + """ + Base test case for basic runs of etl methods + + Subclasses may want to override self.input_path to point at their own input data. + + Don't put actual tests in here, but rather in subclasses below. + """ + + # Subclasses may want to override this with a folder that has input/, output/, and a codebook.json + DATA_ROOT = "simple" + + def setUp(self): + super().setUp() + + self.root_path = os.path.join(self.datadir, self.DATA_ROOT) + self.input_path = os.path.join(self.root_path, "input") + + tmpdir = tempfile.mkdtemp() + # Comment out this next line when debugging, to persist directory + self.addCleanup(shutil.rmtree, tmpdir) + + self.output_path = os.path.join(tmpdir, "output") + self.phi_path = os.path.join(tmpdir, "phi") + + self.enforce_consistent_uuids() + + async def run_etl( + self, + input_path=None, + output_path=None, + phi_path=None, + output_format: str | None = "ndjson", + comment=None, + batch_size=None, + tasks=None, + tags: list[str] = None, + philter=True, + errors_to=None, + export_to: str = None, + input_format: str = "ndjson", + ) -> None: + args = [ + input_path or self.input_path, + output_path or self.output_path, + phi_path or self.phi_path, + "--skip-init-checks", + f"--input-format={input_format}", + f"--ctakes-overrides={self.ctakes_overrides.name}", + ] + if output_format: + args.append(f"--output-format={output_format}") + if comment: + args.append(f"--comment={comment}") + if batch_size: + args.append(f"--batch-size={batch_size}") + if tasks: + args.append(f'--task={",".join(tasks)}') + if tags: + args.append(f'--task-filter={",".join(tags)}') + if philter: + args.append("--philter") + if export_to: + args.append(f"--export-to={export_to}") + if errors_to: + args.append(f"--errors-to={errors_to}") + await cli.main(args) + + def enforce_consistent_uuids(self): + """Make sure that UUIDs will be the same from run to run""" + # First, copy codebook over. This will help ensure that the order of + # calls doesn't matter as much. If *every* UUID were recorded in the + # codebook, this is all we'd need to do. + os.makedirs(self.phi_path) + shutil.copy(os.path.join(self.root_path, "codebook.json"), self.phi_path) + + def assert_output_equal(self, folder: str = "output"): + """Compares the etl output with the expected json structure""" + self.assert_etl_output_equal(os.path.join(self.root_path, folder), self.output_path) + + +class TaskTestCase(utils.AsyncTestCase): + """Base class for task-focused test suites""" + + def setUp(self) -> None: + super().setUp() + + client = fhir.FhirClient("http://localhost/", []) + self.tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with + self.input_dir = os.path.join(self.tmpdir.name, "input") + self.phi_dir = os.path.join(self.tmpdir.name, "phi") + self.errors_dir = os.path.join(self.tmpdir.name, "errors") + os.makedirs(self.input_dir) + os.makedirs(self.phi_dir) + + self.job_config = JobConfig( + self.input_dir, + self.input_dir, + self.tmpdir.name, + self.phi_dir, + "ndjson", + "ndjson", + client, + batch_size=5, + dir_errors=self.errors_dir, + ) + + def make_formatter(dbname: str, group_field: str = None, resource_type: str = None): + formatter = mock.MagicMock(dbname=dbname, group_field=group_field, resource_type=resource_type) + self.format_count += 1 + if self.format_count == 1: + self.format = self.format or formatter + return self.format + elif self.format_count == 2: + self.format2 = self.format2 or formatter + return self.format2 + else: + return formatter # stop keeping track + + self.format = None + self.format2 = None # for tasks that have multiple output streams + self.format_count = 0 + self.create_formatter_mock = mock.MagicMock(side_effect=make_formatter) + self.job_config.create_formatter = self.create_formatter_mock + + self.scrubber = deid.Scrubber() + self.codebook = self.scrubber.codebook + + # Keeps consistent IDs + shutil.copy(os.path.join(self.datadir, "simple/codebook.json"), self.phi_dir) + + def tearDown(self) -> None: + super().tearDown() + self.tmpdir = None + + def make_json(self, filename, resource_id, **kwargs): + common.write_json( + os.path.join(self.input_dir, f"{filename}.ndjson"), {"resourceType": "Test", **kwargs, "id": resource_id} + ) diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index ef601330..8946461e 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -4,89 +4,18 @@ import json import os import shutil -import tempfile from unittest import mock import ddt -import pytest from ctakesclient.typesystem import Polarity -from cumulus_etl import cli, common, deid, errors, loaders, store +from cumulus_etl import common, errors, loaders, store from cumulus_etl.etl import context -from tests.ctakesmock import CtakesMixin, fake_ctakes_extract +from tests.ctakesmock import fake_ctakes_extract +from tests.etl import BaseEtlSimple from tests.s3mock import S3Mixin -from tests.utils import FROZEN_TIME_UTC, AsyncTestCase, TreeCompareMixin, read_delta_lake - - -@pytest.mark.skipif(not shutil.which(deid.MSTOOL_CMD), reason="MS tool not installed") -class BaseEtlSimple(CtakesMixin, TreeCompareMixin, AsyncTestCase): - """ - Base test case for basic runs of etl methods - - Don't put actual tests in here, but rather in subclasses below. - """ - - def setUp(self): - super().setUp() - - self.data_dir = os.path.join(self.datadir, "simple") - self.input_path = os.path.join(self.data_dir, "input") - - tmpdir = tempfile.mkdtemp() - # Comment out this next line when debugging, to persist directory - self.addCleanup(shutil.rmtree, tmpdir) - - self.output_path = os.path.join(tmpdir, "output") - self.phi_path = os.path.join(tmpdir, "phi") - - self.enforce_consistent_uuids() - - async def run_etl( - self, - input_path=None, - output_path=None, - phi_path=None, - output_format: str | None = "ndjson", - comment=None, - batch_size=None, - tasks=None, - philter=True, - errors_to=None, - ) -> None: - args = [ - input_path or self.input_path, - output_path or self.output_path, - phi_path or self.phi_path, - "--skip-init-checks", - "--input-format=ndjson", - f"--ctakes-overrides={self.ctakes_overrides.name}", - ] - if output_format: - args.append(f"--output-format={output_format}") - if comment: - args.append(f"--comment={comment}") - if batch_size: - args.append(f"--batch-size={batch_size}") - if tasks: - args.append(f'--task={",".join(tasks)}') - if philter: - args.append("--philter") - if errors_to: - args.append(f"--errors-to={errors_to}") - await cli.main(args) - - def enforce_consistent_uuids(self): - """Make sure that UUIDs will be the same from run to run""" - # First, copy codebook over. This will help ensure that the order of - # calls doesn't matter as much. If *every* UUID were recorded in the - # codebook, this is all we'd need to do. - os.makedirs(self.phi_path) - shutil.copy(os.path.join(self.data_dir, "codebook.json"), self.phi_path) - - def assert_output_equal(self, folder: str): - """Compares the etl output with the expected json structure""" - self.assert_etl_output_equal(os.path.join(self.data_dir, folder), self.output_path) +from tests.utils import FROZEN_TIME_UTC, read_delta_lake @ddt.ddt @@ -274,7 +203,7 @@ class TestEtlFormats(BaseEtlSimple): async def test_etl_job_ndjson(self): await self.run_etl() - self.assert_output_equal("output") + self.assert_output_equal() async def test_etl_job_deltalake(self): await self.run_etl(output_format=None) # deltalake should be default output format @@ -335,8 +264,6 @@ async def test_etl_job_s3(self): "mockbucket/root/patient/patient.000.ndjson", "mockbucket/root/procedure/procedure.000.ndjson", "mockbucket/root/servicerequest/servicerequest.000.ndjson", - "mockbucket/root/covid_symptom__nlp_results/covid_symptom__nlp_results.000.ndjson", - "mockbucket/root/covid_symptom__nlp_results/covid_symptom__nlp_results.000.meta", }, all_files, ) @@ -349,6 +276,8 @@ async def test_etl_job_s3(self): class TestEtlNlp(BaseEtlSimple): """Test case for the cTAKES/cNLP responses""" + CACHE_FOLDER = "covid_symptom_v3" + def setUp(self): super().setUp() # sha256 checksums of the two test patient notes @@ -379,13 +308,13 @@ async def test_stores_cached_json(self): for index, checksum in enumerate(self.expected_checksums): ner = fake_ctakes_extract(facts[index]) - self.assertEqual(ner.as_json(), common.read_json(self.path_for_checksum("covid_symptom_v2", checksum))) - self.assertEqual([0, 0], common.read_json(self.path_for_checksum("covid_symptom_v2-cnlp_v2", checksum))) + self.assertEqual(ner.as_json(), common.read_json(self.path_for_checksum(self.CACHE_FOLDER, checksum))) + self.assertEqual([0, 0], common.read_json(self.path_for_checksum(f"{self.CACHE_FOLDER}-cnlp_v2", checksum))) async def test_does_not_hit_server_if_cache_exists(self): for index, checksum in enumerate(self.expected_checksums): # Write out some fake results to the cache location - filename = self.path_for_checksum("covid_symptom_v2", checksum) + filename = self.path_for_checksum(self.CACHE_FOLDER, checksum) os.makedirs(os.path.dirname(filename)) common.write_json( filename, @@ -405,7 +334,7 @@ async def test_does_not_hit_server_if_cache_exists(self): }, ) - cnlp_filename = self.path_for_checksum("covid_symptom_v2-cnlp_v2", checksum) + cnlp_filename = self.path_for_checksum(f"{self.CACHE_FOLDER}-cnlp_v2", checksum) os.makedirs(os.path.dirname(cnlp_filename)) common.write_json(cnlp_filename, [0]) diff --git a/tests/etl/test_tasks.py b/tests/etl/test_tasks.py index 75700f06..aed3f9a7 100644 --- a/tests/etl/test_tasks.py +++ b/tests/etl/test_tasks.py @@ -1,78 +1,15 @@ """Tests for etl/tasks/""" import os -import shutil -import tempfile from unittest import mock import ddt import pyarrow -from cumulus_etl import common, deid, errors, fhir -from cumulus_etl.etl import config, tasks +from cumulus_etl import common, errors +from cumulus_etl.etl import tasks from cumulus_etl.etl.tasks import basic_tasks - -from tests.utils import AsyncTestCase - - -class TaskTestCase(AsyncTestCase): - """Base class for task-focused test suites""" - - def setUp(self) -> None: - super().setUp() - - client = fhir.FhirClient("http://localhost/", []) - self.tmpdir = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with - self.input_dir = os.path.join(self.tmpdir.name, "input") - self.phi_dir = os.path.join(self.tmpdir.name, "phi") - self.errors_dir = os.path.join(self.tmpdir.name, "errors") - os.makedirs(self.input_dir) - os.makedirs(self.phi_dir) - - self.job_config = config.JobConfig( - self.input_dir, - self.input_dir, - self.tmpdir.name, - self.phi_dir, - "ndjson", - "ndjson", - client, - batch_size=5, - dir_errors=self.errors_dir, - ) - - def make_formatter(dbname: str, group_field: str = None, resource_type: str = None): - formatter = mock.MagicMock(dbname=dbname, group_field=group_field, resource_type=resource_type) - self.format_count += 1 - if self.format_count == 1: - self.format = self.format or formatter - return self.format - elif self.format_count == 2: - self.format2 = self.format2 or formatter - return self.format2 - else: - return formatter # stop keeping track - - self.format = None - self.format2 = None # for tasks that have multiple output streams - self.format_count = 0 - self.create_formatter_mock = mock.MagicMock(side_effect=make_formatter) - self.job_config.create_formatter = self.create_formatter_mock - - self.scrubber = deid.Scrubber() - self.codebook = self.scrubber.codebook - - # Keeps consistent IDs - shutil.copy(os.path.join(self.datadir, "simple/codebook.json"), self.phi_dir) - - def tearDown(self) -> None: - super().tearDown() - self.tmpdir = None - - def make_json(self, filename, resource_id, **kwargs): - common.write_json( - os.path.join(self.input_dir, f"{filename}.ndjson"), {"resourceType": "Test", **kwargs, "id": resource_id} - ) +from tests.etl import TaskTestCase @ddt.ddt diff --git a/tests/i2b2/test_i2b2_etl.py b/tests/i2b2/test_i2b2_etl.py index fdd85b12..49b9710e 100644 --- a/tests/i2b2/test_i2b2_etl.py +++ b/tests/i2b2/test_i2b2_etl.py @@ -2,70 +2,27 @@ import filecmp import os -import shutil import tempfile -import pytest +from tests.etl import BaseEtlSimple -from cumulus_etl import cli, deid -from tests.ctakesmock import CtakesMixin -from tests.utils import AsyncTestCase, TreeCompareMixin - - -@pytest.mark.skipif(not shutil.which(deid.MSTOOL_CMD), reason="MS tool not installed") -class TestI2b2Etl(CtakesMixin, TreeCompareMixin, AsyncTestCase): +class TestI2b2Etl(BaseEtlSimple): """ Base test case for basic runs of etl methods against i2b2 data """ - def setUp(self): - super().setUp() - - i2b2_dir = os.path.join(self.datadir, "i2b2") - self.input_path = os.path.join(i2b2_dir, "input") - self.expected_output_path = os.path.join(i2b2_dir, "output") - self.expected_export_path = os.path.join(i2b2_dir, "export") - - tmpdir = tempfile.mkdtemp() - # Comment out this next line when debugging, to persist directory - self.addCleanup(shutil.rmtree, tmpdir) - - self.output_path = os.path.join(tmpdir, "output") - self.phi_path = os.path.join(tmpdir, "phi") - - # Copy the codebook over, to guarantee the same ID mappings run-to-run - os.makedirs(self.phi_path) - shutil.copy(os.path.join(i2b2_dir, "codebook.json"), self.phi_path) + DATA_ROOT = "i2b2" async def test_full_etl(self): - await cli.main( - [ - self.input_path, - self.output_path, - self.phi_path, - "--skip-init-checks", - "--input-format=i2b2", - "--output-format=ndjson", - f"--ctakes-overrides={self.ctakes_overrides.name}", - ] - ) - self.assert_etl_output_equal(self.expected_output_path, self.output_path) + await self.run_etl(input_format="i2b2", philter=False) + self.assert_output_equal() async def test_export(self): with tempfile.TemporaryDirectory() as export_path: - await cli.main( - [ - self.input_path, - self.output_path, - self.phi_path, - "--skip-init-checks", - "--input-format=i2b2", - "--output-format=ndjson", - f"--export-to={export_path}", - "--task=patient", # just to make the test faster and confirm we don't export unnecessary files - ] - ) + # Only run patient task to make the test faster and confirm we don't export unnecessary files + await self.run_etl(input_format="i2b2", export_to=export_path, tasks=["patient"], philter=False) - dircmp = filecmp.dircmp(export_path, self.expected_export_path) + expected_export_path = os.path.join(self.datadir, self.DATA_ROOT, "export") + dircmp = filecmp.dircmp(export_path, expected_export_path) self.assert_file_tree_equal(dircmp)