Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chart-review): group notes by encounter, show real & anon IDs #274

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 74 additions & 10 deletions cumulus_etl/chart_review/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def init_checks(args: argparse.Namespace):


async def gather_docrefs(
client: fhir.FhirClient, root_input: store.Root, root_phi: store.Root, args: argparse.Namespace
client: fhir.FhirClient, root_input: store.Root, codebook: deid.Codebook, args: argparse.Namespace
) -> common.Directory:
"""Selects and downloads just the docrefs we need to an export folder."""
common.print_header("Gathering documents...")
Expand All @@ -41,26 +41,47 @@ async def gather_docrefs(

if root_input.protocol == "https": # is this a FHIR server?
return await downloader.download_docrefs_from_fhir_server(
client, root_input, root_phi, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
client, root_input, codebook, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
)
else:
return selector.select_docrefs_from_files(
root_input, root_phi, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
root_input, codebook, docrefs=args.docrefs, anon_docrefs=args.anon_docrefs, export_to=args.export_to
)


async def read_notes_from_ndjson(client: fhir.FhirClient, dirname: str) -> list[LabelStudioNote]:
async def read_notes_from_ndjson(
client: fhir.FhirClient, dirname: str, codebook: deid.Codebook
) -> list[LabelStudioNote]:
common.print_header("Downloading note text...")
docref_ids = []

# Download all the doc notes (and save some metadata about each)
encounter_ids = []
docrefs = []
coroutines = []
for docref in common.read_resource_ndjson(store.Root(dirname), "DocumentReference"):
docref_ids.append(docref["id"])
encounter_refs = docref.get("context", {}).get("encounter", [])
if not encounter_refs:
# If a note doesn't have an encounter - we're in big trouble
raise SystemExit(f"DocumentReference {docref['id']} is missing encounter information.")

encounter_ids.append(fhir.unref_resource(encounter_refs[0])[1]) # just use first encounter
docrefs.append(docref)
coroutines.append(fhir.get_docref_note(client, docref))
note_texts = await asyncio.gather(*coroutines)

# Now bundle each note together with some metadata and ID mappings
notes = []
for i, text in enumerate(note_texts):
notes.append(LabelStudioNote(docref_ids[i], text))
default_title = "Document"
codings = docrefs[i].get("type", {}).get("coding", [])
title = codings[0].get("display", default_title) if codings else default_title

enc_id = encounter_ids[i]
anon_enc_id = codebook.fake_id("Encounter", enc_id)
doc_id = docrefs[i]["id"]
doc_mappings = {doc_id: codebook.fake_id("DocumentReference", doc_id)}

notes.append(LabelStudioNote(enc_id, anon_enc_id, doc_mappings, title, text))

return notes

Expand Down Expand Up @@ -94,6 +115,48 @@ def philter_notes(notes: Collection[LabelStudioNote], args: argparse.Namespace)
note.text = philter.scrub_text(note.text)


def group_notes_by_encounter(notes: Collection[LabelStudioNote]) -> list[LabelStudioNote]:
"""
Gather all notes with the same encounter ID together into one note.
Reviewers seem to prefer that.
"""
grouped_notes = []

# Group up docs & notes by encounter
by_encounter_id = {}
for note in notes:
by_encounter_id.setdefault(note.enc_id, []).append(note)

# Group up the text into one big note
for enc_id, enc_notes in by_encounter_id.items():
grouped_text = ""
grouped_matches = []
grouped_doc_mappings = {}

for note in enc_notes:
grouped_doc_mappings.update(note.doc_mappings)

if grouped_text:
grouped_text += "\n\n\n"
grouped_text += "########################################\n########################################\n"
grouped_text += f"{note.title}\n"
grouped_text += "########################################\n########################################\n\n\n"
offset = len(grouped_text)
grouped_text += note.text

for match in note.matches:
match.begin += offset
match.end += offset
grouped_matches.append(match)

grouped_note = LabelStudioNote(enc_id, enc_notes[0].anon_id, grouped_doc_mappings, "", grouped_text)
grouped_note.matches = grouped_matches
grouped_notes.append(grouped_note)

return grouped_notes


def push_to_label_studio(
notes: Collection[LabelStudioNote], access_token: str, labels: dict, args: argparse.Namespace
) -> None:
Expand Down Expand Up @@ -161,19 +224,20 @@ async def chart_review_main(args: argparse.Namespace) -> None:

store.set_user_fs_options(vars(args)) # record filesystem options like --s3-region before creating Roots
root_input = store.Root(args.dir_input)
root_phi = store.Root(args.dir_phi, create=True)
codebook = deid.Codebook(args.dir_phi)

# Auth & read files early for quick error feedback
client = fhir.create_fhir_client_for_cli(args, root_input, ["DocumentReference"])
access_token = common.read_text(args.ls_token).strip()
labels = ctakesclient.filesystem.map_cui_pref(args.symptoms_bsv)

async with client:
ndjson_folder = await gather_docrefs(client, root_input, root_phi, args)
notes = await read_notes_from_ndjson(client, ndjson_folder.name)
ndjson_folder = await gather_docrefs(client, root_input, codebook, args)
notes = await read_notes_from_ndjson(client, ndjson_folder.name, codebook)

await run_nlp(notes, args)
philter_notes(notes, args) # safe to do after NLP because philter does not change character counts
notes = group_notes_by_encounter(notes)
push_to_label_studio(notes, access_token, labels, args)


Expand Down
7 changes: 3 additions & 4 deletions cumulus_etl/chart_review/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
async def download_docrefs_from_fhir_server(
client: fhir.FhirClient,
root_input: store.Root,
root_phi: store.Root,
codebook: deid.Codebook,
docrefs: str = None,
anon_docrefs: str = None,
export_to: str = None,
):
if docrefs:
return await _download_docrefs_from_real_ids(client, docrefs, export_to=export_to)
elif anon_docrefs:
return await _download_docrefs_from_fake_ids(client, root_phi.path, anon_docrefs, export_to=export_to)
return await _download_docrefs_from_fake_ids(client, codebook, anon_docrefs, export_to=export_to)
else:
# else we'll download the entire target path as a bulk export (presumably the user has scoped a Group)
ndjson_loader = loaders.FhirNdjsonLoader(root_input, client, export_to=export_to)
Expand All @@ -30,7 +30,7 @@ async def download_docrefs_from_fhir_server(

async def _download_docrefs_from_fake_ids(
client: fhir.FhirClient,
dir_phi: str,
codebook: deid.Codebook,
docref_csv: str,
export_to: str = None,
) -> common.Directory:
Expand All @@ -44,7 +44,6 @@ async def _download_docrefs_from_fake_ids(
fake_patient_ids = {row["patient_id"] for row in rows}

# We know how to reverse-map the patient identifiers, so do that up front
codebook = deid.Codebook(dir_phi)
patient_ids = codebook.real_ids("Patient", fake_patient_ids)

# Kick off a bunch of requests to the FHIR server for any documents for these patients
Expand Down
24 changes: 14 additions & 10 deletions cumulus_etl/chart_review/labelstudio.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@


class LabelStudioNote:
def __init__(self, ref_id: str, text: str):
self.ref_id = ref_id
def __init__(self, enc_id: str, anon_id: str, doc_mappings: dict[str, str], title: str, text: str):
self.enc_id = enc_id
self.anon_id = anon_id
self.doc_mappings = doc_mappings
self.title = title
self.text = text
self.matches: list[ctakesclient.typesystem.MatchText] = []

Expand All @@ -36,12 +39,12 @@ def __init__(self, url: str, api_key: str, project_id: int, cui_labels: dict[str

def push_tasks(self, notes: Collection[LabelStudioNote], *, overwrite: bool = False) -> None:
# Get any existing tasks that we might be updating
ref_ids = [note.ref_id for note in notes]
ref_id_filter = lsdm.Filters.create(
enc_ids = [note.enc_id for note in notes]
enc_id_filter = lsdm.Filters.create(
lsdm.Filters.AND,
[lsdm.Filters.item(lsdm.Column.data("ref_id"), lsdm.Operator.IN_LIST, lsdm.Type.List, ref_ids)],
[lsdm.Filters.item(lsdm.Column.data("enc_id"), lsdm.Operator.IN_LIST, lsdm.Type.List, enc_ids)],
)
existing_tasks = self._project.get_tasks(filters=ref_id_filter)
existing_tasks = self._project.get_tasks(filters=enc_id_filter)
new_task_count = len(notes) - len(existing_tasks)

# Should we delete existing entries?
Expand All @@ -51,8 +54,8 @@ def push_tasks(self, notes: Collection[LabelStudioNote], *, overwrite: bool = Fa
self._project.delete_tasks([t["id"] for t in existing_tasks])
else:
print(f" Skipping {len(existing_tasks)} existing tasks")
existing_ref_ids = {t["data"]["ref_id"] for t in existing_tasks}
notes = [note for note in notes if note.ref_id not in existing_ref_ids]
existing_enc_ids = {t["data"]["enc_id"] for t in existing_tasks}
notes = [note for note in notes if note.enc_id not in existing_enc_ids]

# OK, import away!
if notes:
Expand All @@ -78,7 +81,9 @@ def _format_task_for_note(self, note: LabelStudioNote) -> dict:
task = {
"data": {
"text": note.text,
"ref_id": note.ref_id,
"enc_id": note.enc_id,
"anon_id": note.anon_id,
"docref_mappings": note.doc_mappings,
},
}

Expand Down Expand Up @@ -111,7 +116,6 @@ def _format_prediction(self, task: dict, note: LabelStudioNote) -> None:
# actually kept in the config, so we have to make some assumptions about how the user set up their project.
#
# The rule that Cumulus uses is that the value= variable must equal the name= of the <Labels> element.
# TODO: add this quirk to our eventual documentation for this feature.
task["data"][self._labels_name] = [{"value": x} for x in sorted(used_labels)]

task["predictions"] = [prediction]
Expand Down
16 changes: 9 additions & 7 deletions cumulus_etl/chart_review/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@


def select_docrefs_from_files(
root_input: store.Root, root_phi: store.Root, docrefs: str = None, anon_docrefs: str = None, export_to: str = None
root_input: store.Root,
codebook: deid.Codebook,
docrefs: str = None,
anon_docrefs: str = None,
export_to: str = None,
) -> common.Directory:
"""Takes an input folder of ndjson and exports just the chosen docrefs to a new ndjson folder"""
# Get an appropriate filter method, for the given docrefs
docref_filter = _create_docref_filter(root_phi, docrefs, anon_docrefs)
docref_filter = _create_docref_filter(codebook, docrefs, anon_docrefs)

# Set up export folder
output_folder = cli_utils.make_export_dir(export_to=export_to)
Expand All @@ -28,14 +32,14 @@ def select_docrefs_from_files(


def _create_docref_filter(
root_phi: store.Root, docrefs: str = None, anon_docrefs: str = None
codebook: deid.Codebook, docrefs: str = None, anon_docrefs: str = None
) -> Callable[[Iterable[dict]], Iterator[dict]]:
"""This returns a method that will can an iterator of docrefs and returns an iterator of fewer docrefs"""
# Decide how we're filtering the input files (by real or fake ID, or no filtering at all!)
if docrefs:
return functools.partial(_filter_real_docrefs, docrefs)
elif anon_docrefs:
return functools.partial(_filter_fake_docrefs, root_phi, anon_docrefs)
return functools.partial(_filter_fake_docrefs, codebook, anon_docrefs)
else:
# Just accept everything (we still want to read them though, to copy them to a possible export folder).
# So this lambda just returns an iterator over its input.
Expand All @@ -56,13 +60,11 @@ def _filter_real_docrefs(docrefs_csv: str, docrefs: Iterable[dict]) -> Iterator[
break


def _filter_fake_docrefs(root_phi: store.Root, anon_docrefs_csv: str, docrefs: Iterable[dict]) -> Iterator[dict]:
def _filter_fake_docrefs(codebook: deid.Codebook, anon_docrefs_csv: str, docrefs: Iterable[dict]) -> Iterator[dict]:
"""Calculates the fake ID for all docrefs found, and keeps any that match the csv list"""
with common.read_csv(anon_docrefs_csv) as reader:
fake_docref_ids = {row["docref_id"] for row in reader} # ignore the patient_id column, not needed

codebook = deid.Codebook(root_phi.path)

for docref in docrefs:
fake_id = codebook.fake_id("DocumentReference", docref["id"], caching_allowed=False)
if fake_id in fake_docref_ids:
Expand Down
12 changes: 11 additions & 1 deletion docs/chart-review.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ mark the notes with the default NLP dictionary,
anonymize the notes with `philter`,
and then push the results to your Label Studio project number `3`.

### Grouping by Encounter

Chart review mode will group all notes by encounter and present them together as a single
Label Studio artifact.

Each clinical note will have a little header describing what type of note it is ("Admission MD"),
as well as its real & anonymized DocumentReference identifiers,
to make it easier to reference back to your EHR or Athena data.

## Bulk Export Options

You can point chart review mode at either a folder with DocumentReference ndjson files
Expand Down Expand Up @@ -240,7 +249,8 @@ But just briefly, a setup like this with hard-coded labels will work:
</View>
```

Or you can use dynamic labels, and chart review mode will define them from your symptoms file:
Or you can use dynamic labels, and chart review mode will define them from your symptoms file.
Note that the `value` argument must match the `name` argument in your config, like so:
```
<View>
<Labels name="label" toName="text" value="$label" />
Expand Down
2 changes: 1 addition & 1 deletion tests/fhir/test_fhir_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def test_bad_smart_config(self, bad_config_override):
{},
(
"Client error '400 Bad Request' for url 'https://auth.example.com/token'\n"
"For more information check: https://httpstatuses.com/400"
"For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/400"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but httpx changed its output format. Maybe we should be more resilient to that, but part of me likes the strictness and doesn't mind updating once in a blue moon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly i don't hate it either, and that's probably a better source for that info anyway.

),
),
)
Expand Down
Loading