Skip to content

Commit

Permalink
feat: use group-name/export-time from log file if present
Browse files Browse the repository at this point in the history
When loading ndjson from a folder, parse any sibling logs we find
for export info like group/time.

These are the current workflow assumptions this adds:

- There will be a log.ndjson or log.*.ndjson file in the given folder.
  - There cannot be multiples (unless log.ndjson exists, in which case
    we always use that)
- That log file will be for a single export.
  - e.g. We will generally grab the last "kickoff" event and ignore
    others.
  • Loading branch information
mikix committed Apr 12, 2024
1 parent 7cac4db commit 34c652f
Show file tree
Hide file tree
Showing 18 changed files with 323 additions and 13 deletions.
9 changes: 8 additions & 1 deletion cumulus_etl/fhir/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""Support for talking to FHIR servers & handling the FHIR spec"""

from .fhir_client import FhirClient, create_fhir_client_for_cli
from .fhir_utils import download_reference, get_docref_note, parse_datetime, ref_resource, unref_resource
from .fhir_utils import (
download_reference,
get_docref_note,
parse_datetime,
parse_group_from_url,
ref_resource,
unref_resource,
)
24 changes: 24 additions & 0 deletions cumulus_etl/fhir/fhir_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import datetime
import email.message
import re
import urllib.parse

import inscriptis

Expand Down Expand Up @@ -109,6 +110,29 @@ def parse_datetime(value: str | None) -> datetime.datetime | None:
return None


def parse_group_from_url(url: str) -> str:
"""
Parses the group out of a FHIR URL.
These URLS look something like:
- https://hostname/root/Group/my-group <- group name of `my-group`
- https://hostname/root/Group/my-group/$export <- group name of `my-group`
- https://hostname/root <- no group name
"""
parsed = urllib.parse.urlparse(url)
if not parsed.scheme:
raise ValueError(f"Could not parse URL '{url}'")

pieces = parsed.path.split("/Group/", 2)
match len(pieces):
case 2:
return pieces[1].split("/")[0]
case _:
# Global exports don't seem realistic, but if the user does do them,
# we'll use the empty string as the default group name for that.
return ""


######################################################################################################################
#
# Resource downloading
Expand Down
12 changes: 1 addition & 11 deletions cumulus_etl/loaders/fhir/bulk_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,7 @@ def __init__(

# Public properties, to be read after the export:
self.export_datetime = None

# Parse the group out of the URL, which will look something like:
# - https://hostname/root/Group/my-group$export <- group name of `my-group`
# - https://hostname/root$export <- no group name, global export
if "/Group/" in self._url:
latter_half = self._url.split("/Group/", 2)[-1]
self.group_name = latter_half.split("/")[0]
else:
# Global exports don't seem realistic, but the user does do them,
# we'll use the empty string as the default group name for that...
self.group_name = ""
self.group_name = fhir.parse_group_from_url(self._url)

async def export(self) -> None:
"""
Expand Down
99 changes: 99 additions & 0 deletions cumulus_etl/loaders/fhir/export_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Parsing for bulk export log files
https://github.com/smart-on-fhir/bulk-data-client/wiki/Bulk-Data-Export-Log-Items
"""

import datetime
import json
import os
import re

from cumulus_etl import common, fhir, store


class BulkExportLogParser:
"""
Parses the log file generated by bulk exports.
These are the assumptions we make:
- There will be a log.ndjson or log.*.ndjson file in the given folder.
- There cannot be multiples (unless log.ndjson exists, in which case we always use that)
- That log file will be for a single export.
- e.g. We will generally grab the last "kickoff" event and ignore others.
"""

class LogParsingError(Exception):
pass

class IncompleteLog(LogParsingError):
pass

class MultipleLogs(LogParsingError):
pass

class NoLogs(LogParsingError):
pass

def __init__(self, root: store.Root):
self.group_name: str = None
self.export_datetime: datetime.datetime = None

self._parse(self._find(root))

def _parse(self, path: str) -> None:
# Go through every row, looking for the events we care about.
# Note that we parse every kickoff event we hit, for example.
# So we'll end up with the latest one (which works for single-export
# log files with maybe a false start at the beginning).
try:
for row in common.read_ndjson(path):
match row.get("eventId"):
case "kickoff":
self._parse_kickoff(row)
case "status_complete":
self._parse_status_complete(row)
except (KeyError, json.JSONDecodeError) as exc:
raise self.IncompleteLog(f"Error parsing '{path}'") from exc

if self.group_name is None:
raise self.IncompleteLog(f"No kickoff event found in '{path}'")
if self.export_datetime is None:
raise self.IncompleteLog(f"No status_complete event found in '{path}'")

def _parse_kickoff(self, row: dict) -> None:
details = row["eventDetail"]
self.group_name = fhir.parse_group_from_url(details["exportUrl"])

def _parse_status_complete(self, row: dict) -> None:
details = row["eventDetail"]
self.export_datetime = datetime.datetime.fromisoformat(details["transactionTime"])

def _find(self, root: store.Root) -> str:
"""Finds the log file inside the root"""
try:
paths = root.ls()
except FileNotFoundError as exc:
raise self.NoLogs("Folder does not exist") from exc
filenames = {os.path.basename(p): p for p in paths}

# In the easy case, it's just sitting there at log.ndjson,
# which is the filename that bulk-data-client uses.
# Because this is the standard name, we prefer this and don't
# error out even if there are other log.something.ndjson names in
# the folder (see below). Maybe this is a symlink to the most recent...
if full_path := filenames.get("log.ndjson"):
return full_path

# But possibly the user does some file renaming to manage different
# exports, so allow log.something.ndjson as well. (Much like we do
# for the input ndjson files.)
pattern = re.compile(r"log\..+\.ndjson")
log_files = list(filter(pattern.match, filenames.keys()))
match len(log_files):
case 0:
raise self.NoLogs("No log.ndjson file found")
case 1:
return filenames[log_files[0]]
case _:
raise self.MultipleLogs("Multiple log.*.ndjson files found")
11 changes: 11 additions & 0 deletions cumulus_etl/loaders/fhir/ndjson_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from cumulus_etl import cli_utils, common, errors, fhir, store
from cumulus_etl.loaders import base
from cumulus_etl.loaders.fhir.bulk_export import BulkExporter
from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser


class FhirNdjsonLoader(base.Loader):
Expand Down Expand Up @@ -47,6 +48,16 @@ async def load_all(self, resources: list[str]) -> common.Directory:
"You provided FHIR bulk export parameters but did not provide a FHIR server", errors.ARGS_CONFLICT
)

# Parse logs for export information
try:
parser = BulkExportLogParser(self.root)
self.group_name = parser.group_name
self.export_datetime = parser.export_datetime
except BulkExportLogParser.LogParsingError:
# Once we require group name & export datetime, we should warn about this.
# For now, just ignore any errors.
pass

# Copy the resources we need from the remote directory (like S3 buckets) to a local one.
#
# We do this even if the files are local, because the next step in our pipeline is the MS deid tool,
Expand Down
26 changes: 26 additions & 0 deletions tests/fhir/test_fhir_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ def test_parse_datetime(self, input_value, expected_value):
self.assertEqual(expected_value, parsed)


@ddt.ddt
class TestUrlParsing(utils.AsyncTestCase):
"""Tests for URL parsing"""

@ddt.data(
("//host", ValueError),
("https://host", ""),
("https://host/root", ""),
("https://Group/MyGroup", ""), # Group is hostname here
("https://host/root/?key=/Group/Testing/", ""),
("https://host/root/Group/MyGroup", "MyGroup"),
("https://host/root/Group/MyGroup/", "MyGroup"),
("https://host/Group/MyGroup/$export", "MyGroup"),
("https://host/Group/MyGroup?key=value", "MyGroup"),
("https://host/root/Group/Group/", "Group"),
)
@ddt.unpack
def test_parse_group_from_url(self, url, expected_group):
if isinstance(expected_group, str):
group = fhir.parse_group_from_url(url)
assert expected_group == group
else:
with self.assertRaises(expected_group):
fhir.parse_group_from_url(url)


@ddt.ddt
class TestDocrefNotesUtils(utils.AsyncTestCase):
"""Tests for the utility methods dealing with document reference clinical notes"""
Expand Down
File renamed without changes.
Empty file added tests/loaders/i2b2/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Empty file.
File renamed without changes.
106 changes: 106 additions & 0 deletions tests/loaders/ndjson/test_log_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Tests for bulk export log parsing"""

import datetime
import tempfile

import ddt

from cumulus_etl import common, store
from cumulus_etl.loaders.fhir.export_log import BulkExportLogParser
from tests.utils import AsyncTestCase


def kickoff(group: str) -> dict:
url = f"https://host/Group/{group}" if group else "https://host/"
return {
"eventId": "kickoff",
"eventDetail": {
"exportUrl": url,
},
}


def status_complete(timestamp: str) -> dict:
return {
"eventId": "status_complete",
"eventDetail": {
"transactionTime": timestamp,
},
}


@ddt.ddt
class TestBulkExportLogParser(AsyncTestCase):
"""Test case for parsing bulk export logs."""

def _assert_results(self, path, expected_result) -> None:
if isinstance(expected_result, tuple):
parser = BulkExportLogParser(store.Root(path))
expected_group = expected_result[0]
expected_datetime = datetime.datetime.fromisoformat(expected_result[1])
self.assertEqual(expected_group, parser.group_name)
self.assertEqual(expected_datetime, parser.export_datetime)
else:
with self.assertRaises(expected_result):
BulkExportLogParser(store.Root(path))

@ddt.data(
# Happy cases:
(["log.ndjson"], None),
(["log.blarg.ndjson"], None),
(["log.0001.ndjson"], None),
(["log.ndjson", "log.1.ndjson"], None),
# Error cases:
([], BulkExportLogParser.NoLogs),
(["log.1.ndjson", "log.2.ndjson"], BulkExportLogParser.MultipleLogs),
)
@ddt.unpack
def test_finding_the_log(self, files, error):
with tempfile.TemporaryDirectory() as tmpdir:
common.write_text(f"{tmpdir}/distraction.txt", "hello")
common.write_text(f"{tmpdir}/log.ndjson.bak", "bye")
for file in files:
with common.NdjsonWriter(f"{tmpdir}/{file}") as writer:
writer.write(kickoff("G"))
writer.write(status_complete("2020-10-17"))

error = error or ("G", "2020-10-17")
self._assert_results(tmpdir, error)

def test_no_dir(self):
self._assert_results("/path/does/not/exist", BulkExportLogParser.NoLogs)

@ddt.data(
# Happy cases:
( # basic simple case
[kickoff("G"), status_complete("2020-10-17")],
("G", "2020-10-17"),
),
( # multiple rows - we should pick last of each
[
kickoff("1st"),
kickoff("2nd"),
status_complete("2001-01-01"),
status_complete("2002-02-02"),
],
("2nd", "2002-02-02"),
),
([kickoff(""), status_complete("2020-10-17")], ("", "2020-10-17")), # global export group
# Error cases:
([status_complete("2010-03-09")], BulkExportLogParser.IncompleteLog), # missing group
([kickoff("G")], BulkExportLogParser.IncompleteLog), # missing time
([], BulkExportLogParser.IncompleteLog), # missing all
([{"eventId": "kickoff"}], BulkExportLogParser.IncompleteLog), # missing eventDetail
( # missing transactionTime
[{"eventId": "status_complete", "eventDetail": {}}],
BulkExportLogParser.IncompleteLog,
),
)
@ddt.unpack
def test_parsing(self, rows, expected_result):
with tempfile.TemporaryDirectory() as tmpdir:
with common.NdjsonWriter(f"{tmpdir}/log.ndjson", allow_empty=True) as writer:
for row in rows:
writer.write(row)

self._assert_results(tmpdir, expected_result)
Loading

0 comments on commit 34c652f

Please sign in to comment.