Skip to content

Commit

Permalink
feat: add new covid_symptom__nlp_results_term_exists task
Browse files Browse the repository at this point in the history
- Adds a new covid_symptom__nlp_results_term_exists task, which uses
  the "termexists" model for polarity checking cTAKES rather than the
  previous "negation" model. This task will largely be used to compare
  the performance of the two models.
- Rename some docker compose targets, like the etl-support profile
  into the covid-symptom profile (the thinking is that we'll have
  study-specific sets of services that you might want on or off,
  depending on what you're doing)
- Refactors some NLP code to use shared base classes.
- Remove covid_symptom__nlp_results from the default set of tasks.
  Study-specific tasks should have to be requested.
  • Loading branch information
mikix committed Sep 29, 2023
1 parent c810cb2 commit 61e9761
Show file tree
Hide file tree
Showing 36 changed files with 532 additions and 375 deletions.
45 changes: 33 additions & 12 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
- AWS_DEFAULT_PROFILE
- CUMULUS_HUGGING_FACE_URL=http://llama2:8086/
- URL_CTAKES_REST=http://ctakes-covid:8080/ctakes-web-rest/service/analyze
- URL_CNLP_NEGATION=http://cnlp-transformers:8000/negation/process
- URL_CNLP_NEGATION=http://cnlpt-negation:8000/negation/process
- URL_CNLP_TERM_EXISTS=http://cnlpt-term-exists:8000/termexists/process
volumes:
- $HOME/.aws/:/root/.aws/:ro
- ctakes-overrides:/ctakes-overrides
Expand All @@ -30,12 +31,13 @@ services:
cumulus-etl-gpu:
extends: cumulus-etl-base
environment:
- URL_CNLP_NEGATION=http://cnlp-transformers-gpu:8000/negation/process
- URL_CNLP_NEGATION=http://cnlpt-negation-gpu:8000/negation/process
- URL_CNLP_TERM_EXISTS=http://cnlpt-term-exists-gpu:8000/termexists/process
profiles:
- etl-gpu

ctakes-covid-base:
image: smartonfhir/ctakes-covid:1.1
image: smartonfhir/ctakes-covid:1.1.0
environment:
- ctakes_umlsuser=umls_api_key
- ctakes_umlspw=$UMLS_API_KEY
Expand All @@ -49,20 +51,39 @@ services:
ctakes-covid:
extends: ctakes-covid-base
profiles:
- etl-support
- etl-support-gpu
- covid-symptom
- covid-symptom-gpu

cnlp-transformers:
image: smartonfhir/cnlp-transformers:negation-0.4-cpu
cnlpt-negation:
image: smartonfhir/cnlp-transformers:negation-0.6.1-cpu
profiles:
- etl-support
- covid-symptom
networks:
- cumulus-etl

cnlp-transformers-gpu:
image: smartonfhir/cnlp-transformers:negation-0.4-gpu
cnlpt-negation-gpu:
image: smartonfhir/cnlp-transformers:negation-0.6.1-gpu
profiles:
- etl-support-gpu
- covid-symptom-gpu
networks:
- cumulus-etl
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]

cnlpt-term-exists:
image: smartonfhir/cnlp-transformers:termexists-0.6.1-cpu
profiles:
- covid-symptom
networks:
- cumulus-etl

cnlpt-term-exists-gpu:
image: smartonfhir/cnlp-transformers:termexists-0.6.1-gpu
profiles:
- covid-symptom-gpu
networks:
- cumulus-etl
deploy:
Expand Down Expand Up @@ -111,7 +132,7 @@ services:
volumes:
- ./:/cumulus-etl/
working_dir: /cumulus-etl
command:
command:
- /cumulus-etl/tests/data/simple/ndjson-input
- /cumulus-etl/example-output
- /cumulus-etl/example-phi-build
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/etl/studies/covid_symptom/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""The covid_symptom study"""

from .covid_tasks import CovidSymptomNlpResultsTask
from .covid_tasks import CovidSymptomNlpResultsTask, CovidSymptomNlpResultsTermExistsTask
37 changes: 21 additions & 16 deletions cumulus_etl/etl/studies/covid_symptom/covid_ctakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,28 @@

import ctakesclient
import httpx
from ctakesclient.transformer import TransformerModel

from cumulus_etl import common, fhir, nlp, store


async def covid_symptoms_extract(
client: fhir.FhirClient,
cache: store.Root,
docref: dict,
clinical_note: str,
*,
polarity_model: TransformerModel,
task_version: int,
ctakes_http_client: httpx.AsyncClient = None,
cnlp_http_client: httpx.AsyncClient = None,
) -> list[dict] | None:
"""
Extract a list of Observations from NLP-detected symptoms in clinical notes
:param client: a client ready to talk to a FHIR server
:param cache: Where to cache NLP results
:param docref: Clinical Note
:param docref: DocumentReference resource (scrubbed)
:param clinical_note: the clinical note already extracted from the docref
:param polarity_model: how to test the polarity of cTAKES responses
:param task_version: version of task to inject into results
:param ctakes_http_client: HTTPX client to use for the cTAKES server
:param cnlp_http_client: HTTPX client to use for the cNLP transformer server
Expand All @@ -37,22 +40,22 @@ async def covid_symptoms_extract(
return None
_, encounter_id = fhir.unref_resource(encounters[0])

# Find the clinical note among the attachments
try:
clinical_note = await fhir.get_docref_note(client, docref)
except Exception as exc: # pylint: disable=broad-except
logging.warning("Error getting text for docref %s: %s", docref_id, exc)
return None

# cTAKES cache namespace history (and thus, cache invalidation history):
# v1: original cTAKES processing
# v2+: see CovidSymptomNlpResultsTask's version history
ctakes_namespace = f"covid_symptom_v{task_version}"

# cNLP cache namespace history (and thus, cache invalidation history):
# v1: original addition of cNLP filtering
# v2: we started dropping non-covid symptoms, which changes the span ordering
cnlp_namespace = f"{ctakes_namespace}-cnlp_v2"
match polarity_model:
case TransformerModel.NEGATION: # original
# cNLP cache namespace history (and thus, cache invalidation history):
# v1: original addition of cNLP filtering
# v2: we started dropping non-covid symptoms, which changes the span ordering
cnlp_namespace = f"{ctakes_namespace}-cnlp_v2"
case TransformerModel.TERM_EXISTS:
cnlp_namespace = f"{ctakes_namespace}-cnlp_term_exists_v1"
case _:
logging.warning("Unknown polarity method: %s", polarity_model.value)
return None

timestamp = common.datetime_now().isoformat()

Expand All @@ -76,9 +79,11 @@ def is_covid_match(m: ctakesclient.typesystem.MatchText):
# there too. We have found this to yield better results than cTAKES alone.
try:
spans = ctakes_json.list_spans(matches)
polarities_cnlp = await nlp.list_polarity(cache, cnlp_namespace, clinical_note, spans, client=cnlp_http_client)
polarities_cnlp = await nlp.list_polarity(
cache, cnlp_namespace, clinical_note, spans, model=polarity_model, client=cnlp_http_client
)
except Exception as exc: # pylint: disable=broad-except
logging.warning("Could not check negation for docref %s (%s): %s", docref_id, type(exc).__name__, exc)
logging.warning("Could not check polarity for docref %s (%s): %s", docref_id, type(exc).__name__, exc)
return None

# Now filter out any non-positive matches
Expand Down
124 changes: 58 additions & 66 deletions cumulus_etl/etl/studies/covid_symptom/covid_tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Define tasks for the covid_symptom study"""

import copy
import itertools
import os

import ctakesclient
import pyarrow
import rich.progress
from ctakesclient.transformer import TransformerModel

from cumulus_etl import common, formats, nlp, store
from cumulus_etl import formats, nlp, store
from cumulus_etl.etl import tasks
from cumulus_etl.etl.studies.covid_symptom import covid_ctakes

Expand Down Expand Up @@ -58,42 +57,13 @@ def is_ed_docref(docref):
return any(is_ed_coding(x) for x in codings)


class CovidSymptomNlpResultsTask(tasks.EtlTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using NLP"""
class BaseCovidSymptomNlpResultsTask(tasks.BaseNlpTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using cTAKES + a polarity check"""

name = "covid_symptom__nlp_results"
resource = "DocumentReference"
tags = {"covid_symptom", "gpu"}
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None, group_field="docref_id")]

# Task Version
# The "task_version" field is a simple integer that gets incremented any time an NLP-relevant parameter is changed.
# This is a reference to a bundle of metadata (cTAKES version, cNLP version, ICD10 code list).
# We could combine all that info into a field we save with the results. But it's more human-friendly to have a
# simple version to refer to. So anytime these properties get changed, bump the version and record the old bundle
# of metadata too.
task_version = 2

# Task Version History:
# ** 2 (2023-08): Corrected the cache location (version 1 results might be using stale cache) **
# cTAKES: smartonfhir/ctakes-covid:1.1
# cNLP: smartonfhir/cnlp-transformers:negation-0.4
# ctakesclient: 5.0
#
# ** 1 (2023-08): Updated ICD10 codes from ctakesclient **
# cTAKES: smartonfhir/ctakes-covid:1.1
# cNLP: smartonfhir/cnlp-transformers:negation-0.4
# ctakesclient: 5.0
#
# ** null (before we added a task version) **
# cTAKES: smartonfhir/ctakes-covid:1.1
# cNLP: smartonfhir/cnlp-transformers:negation-0.4
# ctakesclient: 3.0
# Subclasses: set name, tags, task_version, and polarity_model yourself
polarity_model = None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.seen_docrefs = set()
outputs = [tasks.OutputTable(schema=None, group_field="docref_id")]

async def prepare_task(self) -> bool:
bsv_path = ctakesclient.filesystem.covid_symptoms_path()
Expand All @@ -103,45 +73,24 @@ async def prepare_task(self) -> bool:
self.summaries[0].had_errors = True
return success

def add_error(self, docref: dict) -> None:
if not self.task_config.dir_errors:
return

error_root = store.Root(os.path.join(self.task_config.dir_errors, self.name), create=True)
error_path = error_root.joinpath("nlp-errors.ndjson")
with common.NdjsonWriter(error_path, "a") as writer:
writer.write(docref)

async def read_entries(self, *, progress: rich.progress.Progress = None) -> tasks.EntryIterator:
"""Passes clinical notes through NLP and returns any symptoms found"""
phi_root = store.Root(self.task_config.dir_phi, create=True)

# one client for both NLP services for now -- no parallel requests yet, so no need to be fancy
http_client = nlp.ctakes_httpx_client()

for docref in self.read_ndjson(progress=progress):
if not nlp.is_docref_valid(docref):
continue

# Check that the note is one of our special allow-listed types (we do this here rather than on the output
# side to save needing to run everything through NLP).
if not is_ed_docref(docref):
continue

orig_docref = copy.deepcopy(docref)
if not self.scrubber.scrub_resource(docref, scrub_attachments=False):
continue

async for orig_docref, docref, clinical_note in self.read_notes(progress=progress, doc_check=is_ed_docref):
symptoms = await covid_ctakes.covid_symptoms_extract(
self.task_config.client,
phi_root,
docref,
clinical_note,
polarity_model=self.polarity_model,
task_version=self.task_version,
ctakes_http_client=http_client,
cnlp_http_client=http_client,
)
if symptoms is None:
self.summaries[0].had_errors = True
self.add_error(orig_docref)
continue

Expand All @@ -159,11 +108,6 @@ async def read_entries(self, *, progress: rich.progress.Progress = None) -> task
# The Format class will replace all existing symptoms from this note at once (because we set group_field).
yield symptoms

def pop_current_group_values(self, table_index: int) -> set[str]:
values = self.seen_docrefs
self.seen_docrefs = set()
return values

@classmethod
def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Schema:
return pyarrow.schema(
Expand Down Expand Up @@ -201,3 +145,51 @@ def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Sche
),
]
)


class CovidSymptomNlpResultsTask(BaseCovidSymptomNlpResultsTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using cTAKES and cnlpt negation"""

name = "covid_symptom__nlp_results"
tags = {"covid_symptom", "gpu"}
polarity_model = TransformerModel.NEGATION

task_version = 3
# Task Version History:
# ** 3 (2023-09): Updated to cnlpt version 0.6.1 **
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.6.1
# ctakesclient: 5.0
#
# ** 2 (2023-08): Corrected the cache location (version 1 results might be using stale cache) **
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.4.0
# ctakesclient: 5.0
#
# ** 1 (2023-08): Updated ICD10 codes from ctakesclient **
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.4.0
# ctakesclient: 5.0
#
# ** null (before we added a task version) **
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:negation-0.4.0
# ctakesclient: 3.0


class CovidSymptomNlpResultsTermExistsTask(BaseCovidSymptomNlpResultsTask):
"""Covid Symptom study task, to generate symptom lists from ED notes using cTAKES and cnlpt termexists"""

name = "covid_symptom__nlp_results_term_exists"
polarity_model = TransformerModel.TERM_EXISTS

# Explicitly don't use any tags because this is really a "hidden" task that is mostly for comparing
# polarity model performance more than running a study. So we don't want it to be accidentally run.
tags = {}

task_version = 1
# Task Version History:
# ** 1 (2023-09): First version **
# cTAKES: smartonfhir/ctakes-covid:1.1.0
# cNLP: smartonfhir/cnlp-transformers:termexists-0.6.1
# ctakesclient: 5.0
29 changes: 4 additions & 25 deletions cumulus_etl/etl/studies/hftest/hf_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,16 @@
import pyarrow
import rich.progress

from cumulus_etl import common, fhir, formats, nlp
from cumulus_etl import common, formats, nlp
from cumulus_etl.etl import tasks


class HuggingFaceTestTask(tasks.EtlTask):
class HuggingFaceTestTask(tasks.BaseNlpTask):
"""Hugging Face Test study task, to generate a summary from text"""

name = "hftest__summary"
resource = "DocumentReference"
needs_bulk_deid = False
outputs = [tasks.OutputTable(schema=None)]

# Task Version
# The "task_version" field is a simple integer that gets incremented any time an NLP-relevant parameter is changed.
# This is a reference to a bundle of metadata (model revision, container revision, prompt string).
# We could combine all that info into a field we save with the results. But it's more human-friendly to have a
# simple version to refer to. So anytime these properties get changed, bump the version and record the old bundle
# of metadata too. Also update the safety checks in prepare_task()
task_version = 0

task_version = 0
# Task Version History:
# ** 0 **
# This is fluid until we actually promote this to a real task - feel free to update without bumping the version.
Expand Down Expand Up @@ -66,18 +56,7 @@ async def read_entries(self, *, progress: rich.progress.Progress = None) -> task
"""Passes clinical notes through HF and returns any symptoms found"""
http_client = httpx.AsyncClient(timeout=300)

for docref in self.read_ndjson(progress=progress):
can_process = nlp.is_docref_valid(docref) and self.scrubber.scrub_resource(docref, scrub_attachments=False)
if not can_process:
continue

try:
clinical_note = await fhir.get_docref_note(self.task_config.client, docref)
except Exception as exc: # pylint: disable=broad-except
logging.warning("Error getting text for docref %s: %s", docref["id"], exc)
self.summaries[0].had_errors = True
continue

async for _, docref, clinical_note in self.read_notes(progress=progress):
timestamp = common.datetime_now().isoformat()

# If you change this prompt, consider updating task_version.
Expand Down
Loading

0 comments on commit 61e9761

Please sign in to comment.