Skip to content

Commit

Permalink
feat: add cross-task temporary dir to share downloaded notes
Browse files Browse the repository at this point in the history
This commit adds a global temporary directory that can be used to share
data between disparate bits of code/tasks.

Specifically, the first user of this is fhir_utils.get_docref_note
which now caches any downloaded notes in this temporary directory.

This will make it much more performant to add a second NLP task (it
won't have to re-download the notes).

Like all temporary directories managed via context manager, this
directory will be cleaned up upon ETL finishing or being interrupted
by an error.

It is also created with mode 0700, so only the ETL user can read the
files.
  • Loading branch information
mikix committed Aug 2, 2023
1 parent 49bd85d commit 8dce1ee
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 17 deletions.
13 changes: 9 additions & 4 deletions cumulus_etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import enum
import logging
import sys
import tempfile

import rich.logging

from cumulus_etl import chart_review, etl
from cumulus_etl import chart_review, common, etl
from cumulus_etl.etl import convert


Expand Down Expand Up @@ -56,16 +57,20 @@ async def main(argv: list[str]) -> None:
parser = argparse.ArgumentParser(prog=prog)

if subcommand == Command.CHART_REVIEW.value:
await chart_review.run_chart_review(parser, argv)
run_method = chart_review.run_chart_review
elif subcommand == Command.CONVERT.value:
await convert.run_convert(parser, argv)
run_method = convert.run_convert
else:
parser.description = "Extract, transform, and load FHIR data."
if not subcommand:
# Add a note about other subcommands we offer, and tell argparse not to wrap our formatting
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.description += "\n\n" "other commands available:\n" " chart-review\n" " convert"
await etl.run_etl(parser, argv)
run_method = etl.run_etl

with tempfile.TemporaryDirectory() as tempdir:
common.set_global_temp_dir(tempdir)
await run_method(parser, argv)


def main_cli():
Expand Down
34 changes: 34 additions & 0 deletions cumulus_etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import itertools
import json
import logging
import os
import re
from collections.abc import Iterator
from typing import Any, Protocol, TextIO
Expand Down Expand Up @@ -33,6 +34,39 @@ def __init__(self, path: str):
self.name = path


###############################################################################
#
# Helper Functions: temporary dir handling
#
###############################################################################

_temp_dir: str | None = None


def set_global_temp_dir(path: str) -> None:
global _temp_dir
_temp_dir = path


def get_temp_dir(subdir: str) -> str:
"""
Use to access a specific subdir of the ETL temporary directory.
This directory is guaranteed to exist and to be removed when the ETL stops running.
Use this for sensitive content that you want to share among tasks, like downloaded clinical notes.
:returns: an absolute path to a temporary folder
"""
if not _temp_dir:
raise ValueError("No temporary directory was created yet")

full_path = os.path.join(_temp_dir, subdir)
os.makedirs(full_path, mode=0o700, exist_ok=True)

return full_path


###############################################################################
#
# Helper Functions: listing files
Expand Down
33 changes: 31 additions & 2 deletions cumulus_etl/fhir/fhir_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import inscriptis

from cumulus_etl import fhir
from cumulus_etl import common, fhir

# A relative reference is something like Patient/123 or Patient?identifier=http://hl7.org/fhir/sid/us-npi|9999999299
# (vs a contained reference that starts with # or an absolute URL reference like http://example.org/Patient/123)
Expand Down Expand Up @@ -152,7 +152,35 @@ async def _get_docref_note_from_attachment(client: fhir.FhirClient, attachment:
raise ValueError("No data or url field present")


def _get_cached_docref_note_path(docref: dict) -> str:
return f"{common.get_temp_dir('notes')}/{docref['id']}.txt"


def _get_cached_docref_note(docref: dict) -> str | None:
note_path = _get_cached_docref_note_path(docref)
try:
return common.read_text(note_path)
except FileNotFoundError:
return None


def _save_cached_docref_note(docref: dict, note: str) -> None:
note_path = _get_cached_docref_note_path(docref)
common.write_text(note_path, note)


async def get_docref_note(client: fhir.FhirClient, docref: dict) -> str:
"""
Returns the clinical note contained in or referenced by the given docref.
It will try to find the simplest version (plain text) or convert html to plain text if needed.
This also caches the note for the duration of the ETL, to avoid redundant downloads.
"""
note = _get_cached_docref_note(docref)
if note is not None:
return note

attachments = [content["attachment"] for content in docref["content"]]

# Find the best attachment to use, based on mimetype.
Expand Down Expand Up @@ -190,6 +218,7 @@ async def get_docref_note(client: fhir.FhirClient, docref: dict) -> str:

# Strip this "line feed" character that often shows up in notes and is confusing for NLP.
# Hopefully not many notes are using actual Spanish.
note = note and note.replace("¿", " ")
note = note.replace("¿", " ")

_save_cached_docref_note(docref, note)
return note
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Global test fixtures and setup"""

import tempfile

import pytest

from cumulus_etl import common


@pytest.fixture(autouse=True)
def isolated_temp_dir():
"""The global temp dir should be defined by default"""
with tempfile.TemporaryDirectory() as tempdir:
common.set_global_temp_dir(tempdir)
yield
55 changes: 44 additions & 11 deletions tests/fhir/test_fhir_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Tests for fhir_utils.py"""
import base64
import shutil
from unittest import mock

import ddt

from cumulus_etl import fhir
from cumulus_etl import common, fhir
from tests import utils


Expand Down Expand Up @@ -43,6 +44,24 @@ def test_unref_failures(self, reference):
def test_ref_resource(self, resource_type, resource_id, expected):
self.assertEqual({"reference": expected}, fhir.ref_resource(resource_type, resource_id))


@ddt.ddt
class TestDocrefNotesUtils(utils.AsyncTestCase):
"""Tests for the utility methods dealing with document reference clinical notes"""

def make_docref(self, docref_id: str, mimetype: str, note: str) -> dict:
return {
"id": docref_id,
"content": [
{
"attachment": {
"contentType": mimetype,
"data": base64.standard_b64encode(note.encode("utf8")).decode("ascii"),
},
},
],
}

@ddt.data(
("text/html", "<html><body>He<b>llooooo</b></html>", "Hellooooo"), # strips html
( # strips xhtml
Expand All @@ -54,19 +73,33 @@ def test_ref_resource(self, resource_type, resource_id, expected):
)
@ddt.unpack
async def test_docref_note_conversions(self, mimetype, incoming_note, expected_note):
docref = {
"content": [
{
"attachment": {
"contentType": mimetype,
"data": base64.standard_b64encode(incoming_note.encode("utf8")).decode("ascii"),
},
},
],
}
docref = self.make_docref("1", mimetype, incoming_note)
resulting_note = await fhir.get_docref_note(None, docref)
self.assertEqual(resulting_note, expected_note)

async def test_docref_note_caches_results(self):
"""Verify that get_docref_note has internal caching"""

async def assert_note_is(docref_id, text, expected_text):
docref = self.make_docref(docref_id, "text/plain", text)
note = await fhir.get_docref_note(None, docref)
self.assertEqual(expected_text, note)

# Confirm that we cache
await assert_note_is("same", "hello", "hello")
await assert_note_is("same", "goodbye", "hello")

# Confirm that we cache empty string correctly (i.e. empty string is not handled same as None)
await assert_note_is("empty", "", "")
await assert_note_is("empty", "not empty", "")

# Confirm that a new id is not cached
await assert_note_is("new", "fresh", "fresh")

# Sanity-check that if we blow away the cache, we get new text
shutil.rmtree(common.get_temp_dir("notes"))
await assert_note_is("same", "goodbye", "goodbye")

@ddt.data(
(None, None),
("", None),
Expand Down

0 comments on commit 8dce1ee

Please sign in to comment.