Skip to content

Commit

Permalink
Merge pull request #275 from smart-on-fhir/mikix/term-exists
Browse files Browse the repository at this point in the history
feat: add new covid_symptom__nlp_results_term_exists task
  • Loading branch information
mikix authored Sep 29, 2023
2 parents c810cb2 + 62aa293 commit de2bcb9
Show file tree
Hide file tree
Showing 41 changed files with 571 additions and 388 deletions.
45 changes: 33 additions & 12 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
- AWS_DEFAULT_PROFILE
- CUMULUS_HUGGING_FACE_URL=http://llama2:8086/
- URL_CTAKES_REST=http://ctakes-covid:8080/ctakes-web-rest/service/analyze
- URL_CNLP_NEGATION=http://cnlp-transformers:8000/negation/process
- URL_CNLP_NEGATION=http://cnlpt-negation:8000/negation/process
- URL_CNLP_TERM_EXISTS=http://cnlpt-term-exists:8000/termexists/process
volumes:
- $HOME/.aws/:/root/.aws/:ro
- ctakes-overrides:/ctakes-overrides
Expand All @@ -30,12 +31,13 @@ services:
cumulus-etl-gpu:
extends: cumulus-etl-base
environment:
- URL_CNLP_NEGATION=http://cnlp-transformers-gpu:8000/negation/process
- URL_CNLP_NEGATION=http://cnlpt-negation-gpu:8000/negation/process
- URL_CNLP_TERM_EXISTS=http://cnlpt-term-exists-gpu:8000/termexists/process
profiles:
- etl-gpu

ctakes-covid-base:
image: smartonfhir/ctakes-covid:1.1
image: smartonfhir/ctakes-covid:1.1.0
environment:
- ctakes_umlsuser=umls_api_key
- ctakes_umlspw=$UMLS_API_KEY
Expand All @@ -49,20 +51,39 @@ services:
ctakes-covid:
extends: ctakes-covid-base
profiles:
- etl-support
- etl-support-gpu
- covid-symptom
- covid-symptom-gpu

cnlp-transformers:
image: smartonfhir/cnlp-transformers:negation-0.4-cpu
cnlpt-negation:
image: smartonfhir/cnlp-transformers:negation-0.6.1-cpu
profiles:
- etl-support
- covid-symptom
networks:
- cumulus-etl

cnlp-transformers-gpu:
image: smartonfhir/cnlp-transformers:negation-0.4-gpu
cnlpt-negation-gpu:
image: smartonfhir/cnlp-transformers:negation-0.6.1-gpu
profiles:
- etl-support-gpu
- covid-symptom-gpu
networks:
- cumulus-etl
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]

cnlpt-term-exists:
image: smartonfhir/cnlp-transformers:termexists-0.6.1-cpu
profiles:
- covid-symptom
networks:
- cumulus-etl

cnlpt-term-exists-gpu:
image: smartonfhir/cnlp-transformers:termexists-0.6.1-gpu
profiles:
- covid-symptom-gpu
networks:
- cumulus-etl
deploy:
Expand Down Expand Up @@ -111,7 +132,7 @@ services:
volumes:
- ./:/cumulus-etl/
working_dir: /cumulus-etl
command:
command:
- /cumulus-etl/tests/data/simple/ndjson-input
- /cumulus-etl/example-output
- /cumulus-etl/example-phi-build
Expand Down
1 change: 1 addition & 0 deletions cumulus_etl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
LABEL_STUDIO_CONFIG_INVALID = 30
LABEL_STUDIO_MISSING = 31
FHIR_AUTH_FAILED = 32
SERVICE_MISSING = 33 # generic init-check service is missing


class FatalError(Exception):
Expand Down
10 changes: 5 additions & 5 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import rich
import rich.table

from cumulus_etl import cli_utils, common, deid, errors, fhir, loaders, nlp, store
from cumulus_etl import cli_utils, common, deid, errors, fhir, loaders, store
from cumulus_etl.etl import context, tasks
from cumulus_etl.etl.config import JobConfig, JobSummary

Expand Down Expand Up @@ -68,15 +68,15 @@ def check_mstool() -> None:
raise SystemExit(errors.MSTOOL_MISSING)


def check_requirements() -> None:
async def check_requirements(selected_tasks: Iterable[type[tasks.EtlTask]]) -> None:
"""
Verifies that all external services and programs are ready
May block while waiting a bit for them.
"""
nlp.check_ctakes()
nlp.check_cnlpt()
check_mstool()
for task in selected_tasks:
await task.init_check()


###############################################################################
Expand Down Expand Up @@ -203,7 +203,7 @@ async def etl_main(args: argparse.Namespace) -> None:

# Check that cTAKES is running and any other services or binaries we require
if not args.skip_init_checks:
check_requirements()
await check_requirements(selected_tasks)

# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource for t in selected_tasks)
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/etl/studies/covid_symptom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""The covid_symptom study"""

from .covid_tasks import CovidSymptomNlpResultsTask
from .covid_tasks import CovidSymptomNlpResultsTask, CovidSymptomNlpResultsTermExistsTask
37 changes: 21 additions & 16 deletions cumulus_etl/etl/studies/covid_symptom/covid_ctakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,28 @@

import ctakesclient
import httpx
from ctakesclient.transformer import TransformerModel

from cumulus_etl import common, fhir, nlp, store


async def covid_symptoms_extract(
client: fhir.FhirClient,
cache: store.Root,
docref: dict,
clinical_note: str,
*,
polarity_model: TransformerModel,
task_version: int,
ctakes_http_client: httpx.AsyncClient = None,
cnlp_http_client: httpx.AsyncClient = None,
) -> list[dict] | None:
"""
Extract a list of Observations from NLP-detected symptoms in clinical notes
:param client: a client ready to talk to a FHIR server
:param cache: Where to cache NLP results
:param docref: Clinical Note
:param docref: DocumentReference resource (scrubbed)
:param clinical_note: the clinical note already extracted from the docref
:param polarity_model: how to test the polarity of cTAKES responses
:param task_version: version of task to inject into results
:param ctakes_http_client: HTTPX client to use for the cTAKES server
:param cnlp_http_client: HTTPX client to use for the cNLP transformer server
Expand All @@ -37,22 +40,22 @@ async def covid_symptoms_extract(
return None
_, encounter_id = fhir.unref_resource(encounters[0])

# Find the clinical note among the attachments
try:
clinical_note = await fhir.get_docref_note(client, docref)
except Exception as exc: # pylint: disable=broad-except
logging.warning("Error getting text for docref %s: %s", docref_id, exc)
return None

# cTAKES cache namespace history (and thus, cache invalidation history):
# v1: original cTAKES processing
# v2+: see CovidSymptomNlpResultsTask's version history
ctakes_namespace = f"covid_symptom_v{task_version}"

# cNLP cache namespace history (and thus, cache invalidation history):
# v1: original addition of cNLP filtering
# v2: we started dropping non-covid symptoms, which changes the span ordering
cnlp_namespace = f"{ctakes_namespace}-cnlp_v2"
match polarity_model:
case TransformerModel.NEGATION: # original
# cNLP cache namespace history (and thus, cache invalidation history):
# v1: original addition of cNLP filtering
# v2: we started dropping non-covid symptoms, which changes the span ordering
cnlp_namespace = f"{ctakes_namespace}-cnlp_v2"
case TransformerModel.TERM_EXISTS:
cnlp_namespace = f"{ctakes_namespace}-cnlp_term_exists_v1"
case _:
logging.warning("Unknown polarity method: %s", polarity_model.value)
return None

timestamp = common.datetime_now().isoformat()

Expand All @@ -76,9 +79,11 @@ def is_covid_match(m: ctakesclient.typesystem.MatchText):
# there too. We have found this to yield better results than cTAKES alone.
try:
spans = ctakes_json.list_spans(matches)
polarities_cnlp = await nlp.list_polarity(cache, cnlp_namespace, clinical_note, spans, client=cnlp_http_client)
polarities_cnlp = await nlp.list_polarity(
cache, cnlp_namespace, clinical_note, spans, model=polarity_model, client=cnlp_http_client
)
except Exception as exc: # pylint: disable=broad-except
logging.warning("Could not check negation for docref %s (%s): %s", docref_id, type(exc).__name__, exc)
logging.warning("Could not check polarity for docref %s (%s): %s", docref_id, type(exc).__name__, exc)
return None

# Now filter out any non-positive matches
Expand Down
110 changes: 54 additions & 56 deletions cumulus_etl/etl/studies/covid_symptom/covid_tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Define tasks for the covid_symptom study"""

import copy
import itertools
import os

import ctakesclient
import pyarrow
import rich.progress
from ctakesclient.transformer import TransformerModel

from cumulus_etl import common, formats, nlp, store
from cumulus_etl import formats, nlp, store
from cumulus_etl.etl import tasks
from cumulus_etl.etl.studies.covid_symptom import covid_ctakes

Expand Down Expand Up @@ -58,42 +57,38 @@ def is_ed_docref(docref):
return any(is_ed_coding(x) for x in codings)


class CovidSymptomNlpResultsTask(tasks.EtlTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using NLP"""
class BaseCovidSymptomNlpResultsTask(tasks.BaseNlpTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using cTAKES + a polarity check"""

name = "covid_symptom__nlp_results"
resource = "DocumentReference"
tags = {"covid_symptom", "gpu"}
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None, group_field="docref_id")]

# Task Version
# The "task_version" field is a simple integer that gets incremented any time an NLP-relevant parameter is changed.
# This is a reference to a bundle of metadata (cTAKES version, cNLP version, ICD10 code list).
# We could combine all that info into a field we save with the results. But it's more human-friendly to have a
# simple version to refer to. So anytime these properties get changed, bump the version and record the old bundle
# of metadata too.
task_version = 2
# Subclasses: set name, tags, and polarity_model yourself
polarity_model = None

# Use a shared task_version for subclasses, to make sharing the ctakes cache folder easier
# (and they use essentially the same services anyway)
task_version = 3
# Task Version History:
# ** 3 (2023-09): Updated to cnlpt version 0.6.1 **
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.6.1
# cNLP: smartonfhir/cnlp-transformers:termexists-0.6.1
# ctakesclient: 5.0
#
# ** 2 (2023-08): Corrected the cache location (version 1 results might be using stale cache) **
# cTAKES: smartonfhir/ctakes-covid:1.1
# cNLP: smartonfhir/cnlp-transformers:negation-0.4
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.4.0
# ctakesclient: 5.0
#
# ** 1 (2023-08): Updated ICD10 codes from ctakesclient **
# cTAKES: smartonfhir/ctakes-covid:1.1
# cNLP: smartonfhir/cnlp-transformers:negation-0.4
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.4.0
# ctakesclient: 5.0
#
# ** null (before we added a task version) **
# cTAKES: smartonfhir/ctakes-covid:1.1
# cNLP: smartonfhir/cnlp-transformers:negation-0.4
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.4.0
# ctakesclient: 3.0

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.seen_docrefs = set()
outputs = [tasks.OutputTable(schema=None, group_field="docref_id")]

async def prepare_task(self) -> bool:
bsv_path = ctakesclient.filesystem.covid_symptoms_path()
Expand All @@ -103,45 +98,24 @@ async def prepare_task(self) -> bool:
self.summaries[0].had_errors = True
return success

def add_error(self, docref: dict) -> None:
if not self.task_config.dir_errors:
return

error_root = store.Root(os.path.join(self.task_config.dir_errors, self.name), create=True)
error_path = error_root.joinpath("nlp-errors.ndjson")
with common.NdjsonWriter(error_path, "a") as writer:
writer.write(docref)

async def read_entries(self, *, progress: rich.progress.Progress = None) -> tasks.EntryIterator:
"""Passes clinical notes through NLP and returns any symptoms found"""
phi_root = store.Root(self.task_config.dir_phi, create=True)

# one client for both NLP services for now -- no parallel requests yet, so no need to be fancy
http_client = nlp.ctakes_httpx_client()

for docref in self.read_ndjson(progress=progress):
if not nlp.is_docref_valid(docref):
continue

# Check that the note is one of our special allow-listed types (we do this here rather than on the output
# side to save needing to run everything through NLP).
if not is_ed_docref(docref):
continue

orig_docref = copy.deepcopy(docref)
if not self.scrubber.scrub_resource(docref, scrub_attachments=False):
continue

async for orig_docref, docref, clinical_note in self.read_notes(progress=progress, doc_check=is_ed_docref):
symptoms = await covid_ctakes.covid_symptoms_extract(
self.task_config.client,
phi_root,
docref,
clinical_note,
polarity_model=self.polarity_model,
task_version=self.task_version,
ctakes_http_client=http_client,
cnlp_http_client=http_client,
)
if symptoms is None:
self.summaries[0].had_errors = True
self.add_error(orig_docref)
continue

Expand All @@ -159,11 +133,6 @@ async def read_entries(self, *, progress: rich.progress.Progress = None) -> task
# The Format class will replace all existing symptoms from this note at once (because we set group_field).
yield symptoms

def pop_current_group_values(self, table_index: int) -> set[str]:
values = self.seen_docrefs
self.seen_docrefs = set()
return values

@classmethod
def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Schema:
return pyarrow.schema(
Expand Down Expand Up @@ -201,3 +170,32 @@ def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Sche
),
]
)


class CovidSymptomNlpResultsTask(BaseCovidSymptomNlpResultsTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using cTAKES and cnlpt negation"""

name = "covid_symptom__nlp_results"
tags = {"covid_symptom", "gpu"}
polarity_model = TransformerModel.NEGATION

@classmethod
async def init_check(cls) -> None:
nlp.check_ctakes()
nlp.check_negation_cnlpt()


class CovidSymptomNlpResultsTermExistsTask(BaseCovidSymptomNlpResultsTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using cTAKES and cnlpt termexists"""

name = "covid_symptom__nlp_results_term_exists"
polarity_model = TransformerModel.TERM_EXISTS

# Explicitly don't use any tags because this is really a "hidden" task that is mostly for comparing
# polarity model performance more than running a study. So we don't want it to be accidentally run.
tags = {}

@classmethod
async def init_check(cls) -> None:
nlp.check_ctakes()
nlp.check_term_exists_cnlpt()
Loading

0 comments on commit de2bcb9

Please sign in to comment.