Skip to content

Commit

Permalink
feat(chart-review): group notes by encounter, show real & anon IDs
Browse files Browse the repository at this point in the history
This adds a couple quality-of-life improvements to chart-review mode:
- Notes are now grouped by encounter, with each note clearly demarked
  inside a larger text document.
- Notes now have a header with their note type label (e.g. "Admission
  MD")
- The encounter ID (real+anon) plus all the contained DocRef IDs
  (real+anon) are also pushed as Label Studio metadata, which will show
  up in exports if folks want to do some post-processing.
  • Loading branch information
mikix committed Sep 25, 2023
1 parent 9224baa commit 9f02038
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 50 deletions.
84 changes: 74 additions & 10 deletions cumulus_etl/chart_review/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def init_checks(args: argparse.Namespace):


async def gather_docrefs(
client: fhir.FhirClient, root_input: store.Root, root_phi: store.Root, args: argparse.Namespace
client: fhir.FhirClient, root_input: store.Root, codebook: deid.Codebook, args: argparse.Namespace
) -> common.Directory:
"""Selects and downloads just the docrefs we need to an export folder."""
common.print_header("Gathering documents...")
Expand All @@ -41,26 +41,47 @@ async def gather_docrefs(

if root_input.protocol == "https": # is this a FHIR server?
return await downloader.download_docrefs_from_fhir_server(
client, root_input, root_phi, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
client, root_input, codebook, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
)
else:
return selector.select_docrefs_from_files(
root_input, root_phi, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
root_input, codebook, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
)


async def read_notes_from_ndjson(client: fhir.FhirClient, dirname: str) -> list[LabelStudioNote]:
async def read_notes_from_ndjson(
client: fhir.FhirClient, dirname: str, codebook: deid.Codebook
) -> list[LabelStudioNote]:
common.print_header("Downloading note text...")
docref_ids = []

# Download all the doc notes (and save some metadata about each)
encounter_ids = []
docrefs = []
coroutines = []
for docref in common.read_resource_ndjson(store.Root(dirname), "DocumentReference"):
docref_ids.append(docref["id"])
encounter_refs = docref.get("context", {}).get("encounter", [])
if not encounter_refs:
# If a note doesn't have an encounter - we're in big trouble
raise SystemExit(f"DocumentReference {docref['id']} is missing encounter information.")

encounter_ids.append(fhir.unref_resource(encounter_refs[0])[1]) # just use first encounter
docrefs.append(docref)
coroutines.append(fhir.get_docref_note(client, docref))
note_texts = await asyncio.gather(*coroutines)

# Now bundle each note together with some metadata and ID mappings
notes = []
for i, text in enumerate(note_texts):
notes.append(LabelStudioNote(docref_ids[i], text))
default_title = "Document"
codings = docrefs[i].get("type", {}).get("coding", [])
title = codings[0].get("display", default_title) if codings else default_title

enc_id = encounter_ids[i]
anon_enc_id = codebook.fake_id("Encounter", enc_id)
doc_id = docrefs[i]["id"]
doc_mappings = {doc_id: codebook.fake_id("DocumentReference", doc_id)}

notes.append(LabelStudioNote(enc_id, anon_enc_id, doc_mappings, title, text))

return notes

Expand Down Expand Up @@ -94,6 +115,48 @@ def philter_notes(notes: Collection[LabelStudioNote], args: argparse.Namespace)
note.text = philter.scrub_text(note.text)


def group_notes_by_encounter(notes: Collection[LabelStudioNote]) -> list[LabelStudioNote]:
"""
Gather all notes with the same encounter ID together into one note.
Reviewers seem to prefer that.
"""
grouped_notes = []

# Group up docs & notes by encounter
by_encounter_id = {}
for note in notes:
by_encounter_id.setdefault(note.enc_id, []).append(note)

# Group up the text into one big note
for enc_id, enc_notes in by_encounter_id.items():
grouped_text = ""
grouped_matches = []
grouped_doc_mappings = {}

for note in enc_notes:
grouped_doc_mappings.update(note.doc_mappings)

if grouped_text:
grouped_text += "\n\n\n"
grouped_text += "########################################\n########################################\n"
grouped_text += f"{note.title}\n"
grouped_text += "########################################\n########################################\n\n\n"
offset = len(grouped_text)
grouped_text += note.text

for match in note.matches:
match.begin += offset
match.end += offset
grouped_matches.append(match)

grouped_note = LabelStudioNote(enc_id, enc_notes[0].anon_id, grouped_doc_mappings, "", grouped_text)
grouped_note.matches = grouped_matches
grouped_notes.append(grouped_note)

return grouped_notes


def push_to_label_studio(
notes: Collection[LabelStudioNote], access_token: str, labels: dict, args: argparse.Namespace
) -> None:
Expand Down Expand Up @@ -161,19 +224,20 @@ async def chart_review_main(args: argparse.Namespace) -> None:

store.set_user_fs_options(vars(args)) # record filesystem options like --s3-region before creating Roots
root_input = store.Root(args.dir_input)
root_phi = store.Root(args.dir_phi, create=True)
codebook = deid.Codebook(args.dir_phi)

# Auth & read files early for quick error feedback
client = fhir.create_fhir_client_for_cli(args, root_input, ["DocumentReference"])
access_token = common.read_text(args.ls_token).strip()
labels = ctakesclient.filesystem.map_cui_pref(args.symptoms_bsv)

async with client:
ndjson_folder = await gather_docrefs(client, root_input, root_phi, args)
notes = await read_notes_from_ndjson(client, ndjson_folder.name)
ndjson_folder = await gather_docrefs(client, root_input, codebook, args)
notes = await read_notes_from_ndjson(client, ndjson_folder.name, codebook)

await run_nlp(notes, args)
philter_notes(notes, args) # safe to do after NLP because philter does not change character counts
notes = group_notes_by_encounter(notes)
push_to_label_studio(notes, access_token, labels, args)


Expand Down
7 changes: 3 additions & 4 deletions cumulus_etl/chart_review/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
async def download_docrefs_from_fhir_server(
client: fhir.FhirClient,
root_input: store.Root,
root_phi: store.Root,
codebook: deid.Codebook,
docrefs: str = None,
anon_docrefs: str = None,
export_to: str = None,
):
if docrefs:
return await _download_docrefs_from_real_ids(client, docrefs, export_to=export_to)
elif anon_docrefs:
return await _download_docrefs_from_fake_ids(client, root_phi.path, anon_docrefs, export_to=export_to)
return await _download_docrefs_from_fake_ids(client, codebook, anon_docrefs, export_to=export_to)
else:
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
Expand All @@ -30,7 +30,7 @@ async def download_docrefs_from_fhir_server(

async def _download_docrefs_from_fake_ids(
client: fhir.FhirClient,
dir_phi: str,
codebook: deid.Codebook,
docref_csv: str,
export_to: str = None,
) -> common.Directory:
Expand All @@ -44,7 +44,6 @@ async def _download_docrefs_from_fake_ids(
fake_patient_ids = {row["patient_id"] for row in rows}

# We know how to reverse-map the patient identifiers, so do that up front
codebook = deid.Codebook(dir_phi)
patient_ids = codebook.real_ids("Patient", fake_patient_ids)

# Kick off a bunch of requests to the FHIR server for any documents for these patients
Expand Down
24 changes: 14 additions & 10 deletions cumulus_etl/chart_review/labelstudio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@


class LabelStudioNote:
def __init__(self, ref_id: str, text: str):
self.ref_id = ref_id
def __init__(self, enc_id: str, anon_id: str, doc_mappings: dict[str, str], title: str, text: str):
self.enc_id = enc_id
self.anon_id = anon_id
self.doc_mappings = doc_mappings
self.title = title
self.text = text
self.matches: list[ctakesclient.typesystem.MatchText] = []

Expand All @@ -36,12 +39,12 @@ def __init__(self, url: str, api_key: str, project_id: int, cui_labels: dict[str

def push_tasks(self, notes: Collection[LabelStudioNote], *, overwrite: bool = False) -> None:
# Get any existing tasks that we might be updating
ref_ids = [note.ref_id for note in notes]
ref_id_filter = lsdm.Filters.create(
enc_ids = [note.enc_id for note in notes]
enc_id_filter = lsdm.Filters.create(
lsdm.Filters.AND,
[lsdm.Filters.item(lsdm.Column.data("ref_id"), lsdm.Operator.IN_LIST, lsdm.Type.List, ref_ids)],
[lsdm.Filters.item(lsdm.Column.data("enc_id"), lsdm.Operator.IN_LIST, lsdm.Type.List, enc_ids)],
)
existing_tasks = self._project.get_tasks(filters=ref_id_filter)
existing_tasks = self._project.get_tasks(filters=enc_id_filter)
new_task_count = len(notes) - len(existing_tasks)

# Should we delete existing entries?
Expand All @@ -51,8 +54,8 @@ def push_tasks(self, notes: Collection[LabelStudioNote], *, overwrite: bool = Fa
self._project.delete_tasks([t["id"] for t in existing_tasks])
else:
print(f" Skipping {len(existing_tasks)} existing tasks")
existing_ref_ids = {t["data"]["ref_id"] for t in existing_tasks}
notes = [note for note in notes if note.ref_id not in existing_ref_ids]
existing_enc_ids = {t["data"]["enc_id"] for t in existing_tasks}
notes = [note for note in notes if note.enc_id not in existing_enc_ids]

# OK, import away!
if notes:
Expand All @@ -78,7 +81,9 @@ def _format_task_for_note(self, note: LabelStudioNote) -> dict:
task = {
"data": {
"text": note.text,
"ref_id": note.ref_id,
"enc_id": note.enc_id,
"anon_id": note.anon_id,
"docref_mappings": note.doc_mappings,
},
}

Expand Down Expand Up @@ -111,7 +116,6 @@ def _format_prediction(self, task: dict, note: LabelStudioNote) -> None:
# actually kept in the config, so we have to make some assumptions about how the user set up their project.
#
# The rule that Cumulus uses is that the value= variable must equal the name= of the <Labels> element.
# TODO: add this quirk to our eventual documentation for this feature.
task["data"][self._labels_name] = [{"value": x} for x in sorted(used_labels)]

task["predictions"] = [prediction]
Expand Down
16 changes: 9 additions & 7 deletions cumulus_etl/chart_review/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@


def select_docrefs_from_files(
root_input: store.Root, root_phi: store.Root, docrefs: str = None, anon_docrefs: str = None, export_to: str = None
root_input: store.Root,
codebook: deid.Codebook,
docrefs: str = None,
anon_docrefs: str = None,
export_to: str = None,
) -> common.Directory:
"""Takes an input folder of ndjson and exports just the chosen docrefs to a new ndjson folder"""
# Get an appropriate filter method, for the given docrefs
docref_filter = _create_docref_filter(root_phi, docrefs, anon_docrefs)
docref_filter = _create_docref_filter(codebook, docrefs, anon_docrefs)

# Set up export folder
output_folder = cli_utils.make_export_dir(export_to=export_to)
Expand All @@ -28,14 +32,14 @@ def select_docrefs_from_files(


def _create_docref_filter(
root_phi: store.Root, docrefs: str = None, anon_docrefs: str = None
codebook: deid.Codebook, docrefs: str = None, anon_docrefs: str = None
) -> Callable[[Iterable[dict]], Iterator[dict]]:
"""This returns a method that will can an iterator of docrefs and returns an iterator of fewer docrefs"""
# Decide how we're filtering the input files (by real or fake ID, or no filtering at all!)
if docrefs:
return functools.partial(_filter_real_docrefs, docrefs)
elif anon_docrefs:
return functools.partial(_filter_fake_docrefs, root_phi, anon_docrefs)
return functools.partial(_filter_fake_docrefs, codebook, anon_docrefs)
else:
# Just accept everything (we still want to read them though, to copy them to a possible export folder).
# So this lambda just returns an iterator over its input.
Expand All @@ -56,13 +60,11 @@ def _filter_real_docrefs(docrefs_csv: str, docrefs: Iterable[dict]) -> Iterator[
break


def _filter_fake_docrefs(root_phi: store.Root, anon_docrefs_csv: str, docrefs: Iterable[dict]) -> Iterator[dict]:
def _filter_fake_docrefs(codebook: deid.Codebook, anon_docrefs_csv: str, docrefs: Iterable[dict]) -> Iterator[dict]:
"""Calculates the fake ID for all docrefs found, and keeps any that match the csv list"""
with common.read_csv(anon_docrefs_csv) as reader:
fake_docref_ids = {row["docref_id"] for row in reader} # ignore the patient_id column, not needed

codebook = deid.Codebook(root_phi.path)

for docref in docrefs:
fake_id = codebook.fake_id("DocumentReference", docref["id"], caching_allowed=False)
if fake_id in fake_docref_ids:
Expand Down
12 changes: 11 additions & 1 deletion docs/chart-review.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ mark the notes with the default NLP dictionary,
anonymize the notes with `philter`,
and then push the results to your Label Studio project number `3`.

### Grouping by Encounter

Chart review mode will group all notes by encounter and present them together as a single
Label Studio artifact.

Each clinical note will have a little header describing what type of note it is ("Admission MD"),
as well as its real & anonymized DocumentReference identifiers,
to make it easier to reference back to your EHR or Athena data.

## Bulk Export Options

You can point chart review mode at either a folder with DocumentReference ndjson files
Expand Down Expand Up @@ -240,7 +249,8 @@ But just briefly, a setup like this with hard-coded labels will work:
</View>
```

Or you can use dynamic labels, and chart review mode will define them from your symptoms file:
Or you can use dynamic labels, and chart review mode will define them from your symptoms file.
Note that the `value` argument must match the `name` argument in your config, like so:
```
<View>
<Labels name="label" toName="text" value="$label" />
Expand Down
2 changes: 1 addition & 1 deletion tests/fhir/test_fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def test_bad_smart_config(self, bad_config_override):
{},
(
"Client error '400 Bad Request' for url 'https://auth.example.com/token'\n"
"For more information check: https://httpstatuses.com/400"
"For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/400"
),
),
)
Expand Down
Loading

0 comments on commit 9f02038

Please sign in to comment.