Skip to content

Commit

Permalink
Have loaders return a results object with bundled data
Browse files Browse the repository at this point in the history
- It used to return common.Directory (which could be a TempDir)
- It now includes a common.Directory plus completion tracking info
  like group name and export timestamp.
- It will in future include metadata like a list of deleted IDs
  • Loading branch information
mikix committed Aug 30, 2024
1 parent 7396061 commit ce692de
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 62 deletions.
14 changes: 7 additions & 7 deletions cumulus_etl/etl/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ def print_config(


def handle_completion_args(
args: argparse.Namespace, loader: loaders.Loader
args: argparse.Namespace, loader_results: loaders.LoaderResults
) -> (str, datetime.datetime):
"""Returns (group_name, datetime)"""
# Grab completion options from CLI or loader
export_group_name = args.export_group or loader.group_name
export_group_name = args.export_group or loader_results.group_name
export_datetime = (
datetime.datetime.fromisoformat(args.export_timestamp)
if args.export_timestamp
else loader.export_datetime
else loader_results.export_datetime
)

# Disable entirely if asked to
Expand Down Expand Up @@ -267,22 +267,22 @@ async def etl_main(args: argparse.Namespace) -> None:
)

# Pull down resources from any remote location (like s3), convert from i2b2, or do a bulk export
loaded_dir = await config_loader.load_all(list(required_resources))
loader_results = await config_loader.load_all(list(required_resources))

# Establish the group name and datetime of the loaded dataset (from CLI args or Loader)
export_group_name, export_datetime = handle_completion_args(args, config_loader)
export_group_name, export_datetime = handle_completion_args(args, loader_results)

# If *any* of our tasks need bulk MS de-identification, run it
if any(t.needs_bulk_deid for t in selected_tasks):
loaded_dir = await deid.Scrubber.scrub_bulk_data(loaded_dir.name)
loader_results.directory = await deid.Scrubber.scrub_bulk_data(loader_results.path)
else:
print("Skipping bulk de-identification.")
print("These selected tasks will de-identify resources as they are processed.")

# Prepare config for jobs
config = JobConfig(
args.dir_input,
loaded_dir.name,
loader_results.path,
args.dir_output,
args.dir_phi,
args.input_format,
Expand Down
2 changes: 1 addition & 1 deletion cumulus_etl/loaders/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Public API for loaders"""

from .base import Loader
from .base import Loader, LoaderResults
from .fhir.ndjson_loader import FhirNdjsonLoader
from .i2b2.loader import I2b2Loader
26 changes: 21 additions & 5 deletions cumulus_etl/loaders/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
"""Base abstract loader"""

import abc
import dataclasses
import datetime
from collections.abc import Iterable

Check failure on line 6 in cumulus_etl/loaders/base.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (F401)

cumulus_etl/loaders/base.py:6:29: F401 `collections.abc.Iterable` imported but unused

from cumulus_etl import common, store


@dataclasses.dataclass(kw_only=True)
class LoaderResults:
"""Bundles results of a load request"""

# Where loaded files reside on disk (use .path for convenience)
directory: common.Directory

@property
def path(self) -> str:
return self.directory.name

# Completion tracking values - noting an export group name for this bundle of data
# and the time when it was exported ("transactionTime" in bulk-export terms).
group_name: str | None = None
export_datetime: datetime.datetime | None = None


class Loader(abc.ABC):
"""
An abstraction for how to load FHIR input
Expand All @@ -21,12 +41,8 @@ def __init__(self, root: store.Root):
"""
self.root = root

# Public properties (potentially set when loading) for reporting back to caller
self.group_name = None
self.export_datetime = None

@abc.abstractmethod
async def load_all(self, resources: list[str]) -> common.Directory:
async def load_all(self, resources: list[str]) -> LoaderResults:
"""
Loads the listed remote resources and places them into a local folder as FHIR ndjson
Expand Down
27 changes: 15 additions & 12 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ def __init__(
self.until = until
self.resume = resume

async def load_all(self, resources: list[str]) -> common.Directory:
async def load_all(self, resources: list[str]) -> base.LoaderResults:
# Are we doing a bulk FHIR export from a server?
if self.root.protocol in ["http", "https"]:
loaded_dir = await self.load_from_bulk_export(resources)
input_root = store.Root(loaded_dir.name)
results = await self.load_from_bulk_export(resources)
input_root = store.Root(results.path)
else:
if self.export_to or self.since or self.until or self.resume:
errors.fatal(
"You provided FHIR bulk export parameters but did not provide a FHIR server",
errors.ARGS_CONFLICT,
)

results = base.LoaderResults(directory=self.root.path)
input_root = self.root

# Parse logs for export information
try:
parser = BulkExportLogParser(input_root)
self.group_name = parser.group_name
self.export_datetime = parser.export_datetime
results.group_name = parser.group_name
results.export_datetime = parser.export_datetime
except BulkExportLogParser.LogParsingError:
# Once we require group name & export datetime, we should warn about this.
# For now, just ignore any errors.
Expand All @@ -75,11 +76,13 @@ async def load_all(self, resources: list[str]) -> common.Directory:
filenames = common.ls_resources(input_root, set(resources), warn_if_empty=True)
for filename in filenames:
input_root.get(filename, f"{tmpdir.name}/")
return tmpdir
results.directory = tmpdir

return results

async def load_from_bulk_export(
self, resources: list[str], prefer_url_resources: bool = False
) -> common.Directory:
) -> base.LoaderResults:
"""
Performs a bulk export and drops the results in an export dir.
Expand All @@ -101,11 +104,11 @@ async def load_from_bulk_export(
)
await bulk_exporter.export()

# Copy back these settings from the export
self.group_name = bulk_exporter.group_name
self.export_datetime = bulk_exporter.export_datetime

except errors.FatalError as exc:
errors.fatal(str(exc), errors.BULK_EXPORT_FAILED)

return target_dir
return base.LoaderResults(
directory=target_dir,
group_name=bulk_exporter.group_name,
export_datetime=bulk_exporter.export_datetime,
)
13 changes: 7 additions & 6 deletions cumulus_etl/loaders/i2b2/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import TypeVar

from cumulus_etl import cli_utils, common, store
from cumulus_etl.loaders.base import Loader
from cumulus_etl.loaders import base
from cumulus_etl.loaders.i2b2 import extract, schema, transform
from cumulus_etl.loaders.i2b2.oracle import extract as oracle_extract

Expand All @@ -18,7 +18,7 @@
I2b2ToFhirCallable = Callable[[AnyDimension], dict]


class I2b2Loader(Loader):
class I2b2Loader(base.Loader):
"""
Loader for i2b2 data.
Expand All @@ -34,11 +34,12 @@ def __init__(self, root: store.Root, export_to: str | None = None):
super().__init__(root)
self.export_to = export_to

async def load_all(self, resources: list[str]) -> common.Directory:
async def load_all(self, resources: list[str]) -> base.LoaderResults:
if self.root.protocol in ["tcp"]:
return self._load_all_from_oracle(resources)

return self._load_all_from_csv(resources)
directory = self._load_all_from_oracle(resources)
else:
directory = self._load_all_from_csv(resources)
return base.LoaderResults(directory=directory)

def _load_all_with_extractors(
self,
Expand Down
12 changes: 4 additions & 8 deletions tests/etl/test_etl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,16 @@ async def test_task_init_checks(self, mock_check):
async def test_completion_args(self, etl_args, loader_vals, expected_vals):
"""Verify that we parse completion args with the correct fallbacks and checks."""
# Grab all observations before we mock anything
observations = loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
observations = await loaders.FhirNdjsonLoader(store.Root(self.input_path)).load_all(
["Observation"]
)

def fake_load_all(internal_self, resources):
del resources
internal_self.group_name = loader_vals[0]
internal_self.export_datetime = loader_vals[1]
return observations
observations.group_name = loader_vals[0]
observations.export_datetime = loader_vals[1]

with (
self.assertRaises(SystemExit) as cm,
mock.patch("cumulus_etl.etl.cli.etl_job", side_effect=SystemExit) as mock_etl_job,
mock.patch.object(loaders.FhirNdjsonLoader, "load_all", new=fake_load_all),
mock.patch.object(loaders.FhirNdjsonLoader, "load_all", return_value=observations),
):
await self.run_etl(tasks=["observation"], **etl_args)

Expand Down
22 changes: 19 additions & 3 deletions tests/loaders/i2b2/test_i2b2_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import shutil
import tempfile

from cumulus_etl import store
from cumulus_etl import common, store
from cumulus_etl.loaders.i2b2 import loader
from tests.utils import AsyncTestCase

Expand All @@ -22,6 +22,22 @@ async def test_missing_files(self):
vitals = f"{self.datadir}/i2b2/input/observation_fact_vitals.csv"
shutil.copy(vitals, tmpdir)

loaded_dir = await i2b2_loader.load_all(["Observation", "Patient"])
results = await i2b2_loader.load_all(["Observation", "Patient"])

self.assertEqual(["Observation.1.ndjson"], os.listdir(loaded_dir.name))
self.assertEqual(["Observation.1.ndjson"], os.listdir(results.path))

async def test_duplicate_ids(self):
"""Verify that we ignore duplicate IDs"""
with tempfile.TemporaryDirectory() as tmpdir:
root = store.Root(tmpdir)
i2b2_loader = loader.I2b2Loader(root)

common.write_text(
f"{tmpdir}/patient_dimension.csv",
"PATIENT_NUM,BIRTH_DATE\n" "123,1982-10-16\n" "123,1983-11-17\n" "456,2000-01-13\n",
)

results = await i2b2_loader.load_all(["Patient"])
rows = common.read_resource_ndjson(store.Root(results.path), "Patient")
values = [(r["id"], r["birthDate"]) for r in rows]
self.assertEqual(values, [("123", "1982-10-16"), ("456", "2000-01-13")])
10 changes: 5 additions & 5 deletions tests/loaders/i2b2/test_i2b2_oracle_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async def test_loader(self, mock_extract):

root = store.Root("tcp://localhost/foo")
oracle_loader = loader.I2b2Loader(root)
tmpdir = await oracle_loader.load_all(["Condition", "Encounter", "Patient"])
results = await oracle_loader.load_all(["Condition", "Encounter", "Patient"])

# Check results
self.assertEqual(
Expand All @@ -102,17 +102,17 @@ async def test_loader(self, mock_extract):
"Encounter.ndjson",
"Patient.ndjson",
},
set(os.listdir(tmpdir.name)),
set(os.listdir(results.path)),
)

self.assertEqual(
i2b2_mock_data.condition(),
common.read_json(os.path.join(tmpdir.name, "Condition.ndjson")),
common.read_json(os.path.join(results.path, "Condition.ndjson")),
)
self.assertEqual(
i2b2_mock_data.encounter(),
common.read_json(os.path.join(tmpdir.name, "Encounter.ndjson")),
common.read_json(os.path.join(results.path, "Encounter.ndjson")),
)
self.assertEqual(
i2b2_mock_data.patient(), common.read_json(os.path.join(tmpdir.name, "Patient.ndjson"))
i2b2_mock_data.patient(), common.read_json(os.path.join(results.path, "Patient.ndjson"))
)
30 changes: 15 additions & 15 deletions tests/loaders/ndjson/test_ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ async def test_local_happy_path(self):
writer.write(patient)

loader = loaders.FhirNdjsonLoader(store.Root(tmpdir))
loaded_dir = await loader.load_all(["Patient"])
results = await loader.load_all(["Patient"])

self.assertEqual(["Patient.ndjson"], os.listdir(loaded_dir.name))
self.assertEqual(patient, common.read_json(f"{loaded_dir.name}/Patient.ndjson"))
self.assertEqual("G", loader.group_name)
self.assertEqual(["Patient.ndjson"], os.listdir(results.path))
self.assertEqual(patient, common.read_json(f"{results.path}/Patient.ndjson"))
self.assertEqual("G", results.group_name)
self.assertEqual(
datetime.datetime.fromisoformat("1999-03-14T14:12:10"), loader.export_datetime
datetime.datetime.fromisoformat("1999-03-14T14:12:10"), results.export_datetime
)

# At some point, we do want to make this fatal.
Expand All @@ -80,11 +80,11 @@ async def test_log_parsing_is_non_fatal(self):
self._write_log_file(f"{tmpdir}/log.2.ndjson", "G2", "2002-02-02")

loader = loaders.FhirNdjsonLoader(store.Root(tmpdir))
await loader.load_all([])
results = await loader.load_all([])

# We used neither log and didn't error out.
self.assertIsNone(loader.group_name)
self.assertIsNone(loader.export_datetime)
self.assertIsNone(results.group_name)
self.assertIsNone(results.export_datetime)

@mock.patch("cumulus_etl.fhir.fhir_client.FhirClient")
@mock.patch("cumulus_etl.etl.cli.loaders.FhirNdjsonLoader")
Expand Down Expand Up @@ -299,7 +299,7 @@ async def fake_export() -> None:
loader = loaders.FhirNdjsonLoader(
store.Root("http://localhost:9999"), mock.AsyncMock(), export_to=target
)
folder = await loader.load_all(["Patient"])
results = await loader.load_all(["Patient"])

# Confirm export folder still has the data (and log) we created above in the mock
self.assertTrue(os.path.isdir(target))
Expand All @@ -309,9 +309,9 @@ async def fake_export() -> None:
self.assertEqual({"eventId": "kickoff"}, common.read_json(f"{target}/log.ndjson"))

# Confirm the returned dir has only the data (we don't want to confuse MS tool with logs)
self.assertNotEqual(folder.name, target)
self.assertEqual({"Patient.ndjson"}, set(os.listdir(folder.name)))
self.assertEqual(patient, common.read_json(f"{folder.name}/Patient.ndjson"))
self.assertNotEqual(results.path, target)
self.assertEqual({"Patient.ndjson"}, set(os.listdir(results.path)))
self.assertEqual(patient, common.read_json(f"{results.path}/Patient.ndjson"))

async def test_export_internal_folder_happy_path(self):
"""Test that we can also safely export without an export-to folder involved"""
Expand All @@ -325,11 +325,11 @@ async def fake_export() -> None:
self.mock_exporter.export.side_effect = fake_export

loader = loaders.FhirNdjsonLoader(store.Root("http://localhost:9999"), mock.AsyncMock())
folder = await loader.load_all(["Patient"])
results = await loader.load_all(["Patient"])

# Confirm the returned dir has only the data (we don't want to confuse MS tool with logs)
self.assertEqual({"Patient.ndjson"}, set(os.listdir(folder.name)))
self.assertEqual(patient, common.read_json(f"{folder.name}/Patient.ndjson"))
self.assertEqual({"Patient.ndjson"}, set(os.listdir(results.path)))
self.assertEqual(patient, common.read_json(f"{results.path}/Patient.ndjson"))

async def test_export_to_folder_has_contents(self):
"""Verify we fail if an export folder already has contents"""
Expand Down

0 comments on commit ce692de

Please sign in to comment.