From 5265647590246cb8a3d93a1fea932b780087520a Mon Sep 17 00:00:00 2001 From: Michael Terry Date: Thu, 13 Jul 2023 15:08:22 -0400 Subject: [PATCH] fix: make sure that no inferred schemas ever hit a delta lake This commit fixes a couple oversights with inferred schemas: - The type might clash with the wide schema we have per-spec, even though the types are really compatible (inferred int for a float field does happen) - The wrong type might get through unnoticed if it is a deep field that our per-spec schema didn't catch. Here's a summary of changes to make that happen: - Drop all use of pandas. It's too loose with the types. Instead, switch to pyarrow schemas which were used under the covers of pandas anyway. - Add schema earlier in the process (at Task batching time, not at Formatter writing time). This means that all formatters get to see the same nice schema, though only deltalake uses it. --- cumulus_etl/common.py | 67 ++++- cumulus_etl/etl/cli.py | 2 +- cumulus_etl/etl/convert/cli.py | 8 +- .../etl/studies/covid_symptom/covid_tasks.py | 39 ++- cumulus_etl/etl/tasks/base.py | 84 ++++-- cumulus_etl/fhir/__init__.py | 2 +- cumulus_etl/fhir/fhir_schemas.py | 123 ++++++-- cumulus_etl/formats/__init__.py | 1 + cumulus_etl/formats/base.py | 21 +- cumulus_etl/formats/batch.py | 21 ++ cumulus_etl/formats/batched_files.py | 13 +- cumulus_etl/formats/deltalake.py | 78 ++--- cumulus_etl/formats/ndjson.py | 9 +- cumulus_etl/formats/parquet.py | 8 +- cumulus_etl/loaders/i2b2/extract.py | 30 +- cumulus_etl/loaders/i2b2/loader.py | 16 +- cumulus_etl/store.py | 4 +- pyproject.toml | 1 - tests/convert/test_convert_cli.py | 3 + tests/covid_symptom/test_nlp_results.py | 22 +- .../output/observation/observation.000.ndjson | 16 +- .../medicationrequest.000.ndjson | 2 +- .../output/procedure/procedure.000.ndjson | 4 +- .../servicerequest/servicerequest.000.ndjson | 4 +- tests/etl/test_etl_cli.py | 7 +- tests/etl/test_tasks.py | 202 ++++++++++--- tests/i2b2/test_i2b2_loader.py | 2 +- tests/i2b2/test_i2b2_oracle_extract.py | 2 +- tests/test_deltalake.py | 280 ++++++------------ tests/utils.py | 27 +- 30 files changed, 651 insertions(+), 447 deletions(-) create mode 100644 cumulus_etl/formats/batch.py diff --git a/cumulus_etl/common.py b/cumulus_etl/common.py index 21d08d71..3ddaffbd 100644 --- a/cumulus_etl/common.py +++ b/cumulus_etl/common.py @@ -33,14 +33,18 @@ def ls_resources(root, resource: str) -> list[str]: @contextlib.contextmanager -def _atomic_open(path: str, mode: str) -> TextIO: +def _atomic_open(path: str, mode: str, **kwargs) -> TextIO: """A version of open() that handles atomic file access across many filesystems (like S3)""" root = store.Root(path) - # fsspec is atomic per-transaction -- if an error occurs inside the transaction, partial writes will be discarded - with root.fs.transaction: - with root.fs.open(path, mode=mode, encoding="utf8") as file: - yield file + with contextlib.ExitStack() as stack: + if "w" in mode: + # fsspec is atomic per-transaction. + # If an error occurs inside the transaction, partial writes will be discarded. + # But we only want a transaction if we're writing - read transactions may error out + stack.enter_context(root.fs.transaction) + + yield stack.enter_context(root.fs.open(path, mode=mode, encoding="utf8", **kwargs)) def read_text(path: str) -> str: @@ -94,7 +98,8 @@ def write_json(path: str, data: Any, indent: int = None) -> None: @contextlib.contextmanager def read_csv(path: str) -> csv.DictReader: - with open(path, newline="", encoding="utf8") as csvfile: + # Python docs say to use newline="", to support quoted multi-line fields + with _atomic_open(path, "r", newline="") as csvfile: yield csv.DictReader(csvfile) @@ -115,13 +120,34 @@ def read_resource_ndjson(root, resource: str) -> Iterator[dict]: yield from read_ndjson(filename) +def write_rows_to_ndjson(path: str, rows: list[dict], sparse: bool = False) -> None: + """ + Writes the data out, row by row, to an .ndjson file (non-atomically). + + :param path: where to write the file + :param rows: data to write + :param sparse: if True, None entries are skipped + """ + with NdjsonWriter(path, allow_empty=True) as f: + for row in rows: + if sparse: + row = sparse_dict(row) + f.write(row) + + class NdjsonWriter: - """Convenience context manager to write multiple objects to a local ndjson file.""" + """ + Convenience context manager to write multiple objects to a ndjson file. + + Note that this is not atomic - partial writes will make it to the target file. + """ - def __init__(self, path: str, mode: str = "w"): - self._path = path + def __init__(self, path: str, mode: str = "w", allow_empty: bool = False): + self._root = store.Root(path) self._mode = mode self._file = None + if allow_empty: + self._ensure_file() def __enter__(self): return self @@ -131,15 +157,31 @@ def __exit__(self, exc_type, exc_value, traceback): self._file.close() self._file = None - def write(self, obj: dict) -> None: - # lazily create the file, to avoid 0-line ndjson files + def _ensure_file(self): if not self._file: - self._file = open(self._path, self._mode, encoding="utf8") # pylint: disable=consider-using-with + self._file = self._root.fs.open(self._root.path, self._mode, encoding="utf8") + + def write(self, obj: dict) -> None: + # lazily create the file, to avoid 0-line ndjson files (unless created in __init__) + self._ensure_file() json.dump(obj, self._file) self._file.write("\n") +def sparse_dict(dictionary: dict) -> dict: + """Returns a value of the input dictionary without any keys with None values.""" + + def iteration(item: Any) -> Any: + if isinstance(item, dict): + return {key: iteration(val) for key, val in item.items() if val is not None} + elif isinstance(item, list): + return [iteration(x) for x in item] + return item + + return iteration(dictionary) + + ############################################################################### # # Helper Functions: Logging @@ -162,7 +204,6 @@ def human_file_size(count: int) -> str: Returns a human-readable version of a count of bytes. I couldn't find a version of this that's sitting in a library we use. Very annoying. - Pandas has one, but it's private. """ for suffix in ("KB", "MB"): count /= 1024 diff --git a/cumulus_etl/etl/cli.py b/cumulus_etl/etl/cli.py index 675e6906..4efb0220 100644 --- a/cumulus_etl/etl/cli.py +++ b/cumulus_etl/etl/cli.py @@ -231,7 +231,7 @@ async def etl_main(args: argparse.Namespace) -> None: async with client: if args.input_format == "i2b2": - config_loader = loaders.I2b2Loader(root_input, args.batch_size, export_to=args.export_to) + config_loader = loaders.I2b2Loader(root_input, export_to=args.export_to) else: config_loader = loaders.FhirNdjsonLoader( root_input, client=client, export_to=args.export_to, since=args.since, until=args.until diff --git a/cumulus_etl/etl/convert/cli.py b/cumulus_etl/etl/convert/cli.py index 0a6dd58e..115b7e07 100644 --- a/cumulus_etl/etl/convert/cli.py +++ b/cumulus_etl/etl/convert/cli.py @@ -8,7 +8,6 @@ import os import tempfile -import pandas import rich.progress from cumulus_etl import cli_utils, common, errors, formats, store @@ -61,10 +60,9 @@ def convert_task_table( progress_task = progress.add_task(table.get_name(task), total=count) for index, ndjson_path in enumerate(ndjson_paths): - rows = common.read_ndjson(ndjson_path) - df = pandas.DataFrame(rows) - df.drop_duplicates("id", inplace=True) - formatter.write_records(df, index) + rows = list(common.read_ndjson(ndjson_path)) + batch = task.make_batch_from_rows(formatter, rows, index=index) + formatter.write_records(batch) progress.update(progress_task, advance=1) formatter.finalize() diff --git a/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py b/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py index deeb3d3a..11b07fd7 100644 --- a/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py +++ b/cumulus_etl/etl/studies/covid_symptom/covid_tasks.py @@ -5,8 +5,9 @@ import os import ctakesclient +import pyarrow -from cumulus_etl import common, nlp, store +from cumulus_etl import common, formats, nlp, store from cumulus_etl.etl import tasks from cumulus_etl.etl.studies.covid_symptom import covid_ctakes @@ -118,3 +119,39 @@ async def read_entries(self) -> tasks.EntryIterator: # This way we don't need to worry about symptoms from the same note crossing batch boundaries. # The Format class will replace all existing symptoms from this note at once (because we set group_field). yield symptoms + + @classmethod + def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Schema: + return pyarrow.schema( + [ + pyarrow.field("id", pyarrow.string()), + pyarrow.field("docref_id", pyarrow.string()), + pyarrow.field("encounter_id", pyarrow.string()), + pyarrow.field("subject_id", pyarrow.string()), + pyarrow.field( + "match", + pyarrow.struct( + [ + pyarrow.field("begin", pyarrow.int32()), + pyarrow.field("end", pyarrow.int32()), + pyarrow.field("text", pyarrow.string()), + pyarrow.field("polarity", pyarrow.int8()), + pyarrow.field("type", pyarrow.string()), + pyarrow.field( + "conceptAttributes", + pyarrow.list_( + pyarrow.struct( + [ + pyarrow.field("code", pyarrow.string()), + pyarrow.field("codingScheme", pyarrow.string()), + pyarrow.field("cui", pyarrow.string()), + pyarrow.field("tui", pyarrow.string()), + ] + ) + ), + ), + ] + ), + ), + ] + ) diff --git a/cumulus_etl/etl/tasks/base.py b/cumulus_etl/etl/tasks/base.py index 58b70b27..c3d0f8a7 100644 --- a/cumulus_etl/etl/tasks/base.py +++ b/cumulus_etl/etl/tasks/base.py @@ -4,9 +4,9 @@ import os from collections.abc import AsyncIterator, Iterator -import pandas +import pyarrow -from cumulus_etl import common, deid, formats, store +from cumulus_etl import common, deid, fhir, formats, store from cumulus_etl.etl import config from cumulus_etl.etl.tasks import batching @@ -117,6 +117,11 @@ async def run(self) -> list[config.JobSummary]: return self.summaries + @classmethod + def make_batch_from_rows(cls, formatter: formats.Format, rows: list[dict], index: int = 0): + schema = cls.get_schema(formatter, rows) + return formats.Batch(rows, schema=schema, index=index) + ########################################################################################## # # Internal helpers @@ -130,16 +135,16 @@ async def _write_tables_in_batches(self, entries: EntryIterator) -> None: # Batches is a tuple of lists of resources - the tuple almost never matters, but it is there in case the # task is generating multiple types of resources. Like MedicationRequest creating Medications as it goes. # Each tuple of batches collectively adds up to roughly our target batch size. - for table_index, batch in enumerate(batches): - if not batch: + for table_index, rows in enumerate(batches): + if not rows: continue formatter = self._get_formatter(table_index) - batch_len = len(batch) + batch_len = len(rows) summary = self.summaries[table_index] summary.attempt += batch_len - if self._write_one_table_batch(formatter, batch, batch_index): + if self._write_one_table_batch(formatter, rows, batch_index): summary.success += batch_len self.table_batch_cleanup(table_index, batch_index) @@ -173,41 +178,55 @@ def _get_formatter(self, table_index: int) -> formats.Format: return self.formatters[table_index] - def _write_one_table_batch(self, formatter: formats.Format, batch: list[dict], batch_index: int) -> bool: - # Start by stuffing the batch entries into a dataframe - dataframe = pandas.DataFrame(batch) - - # Drop duplicates inside the batch to guarantee to the formatter that the "id" column is unique. - # This does not fix uniqueness across batches, but formatters that care about that can control for it. - # For context: - # - We have seen duplicates inside and across files generated by Cerner bulk exports. So this is a real - # concern found in the wild, and we can't just trust input data to be "clean." - # - The deltalake backend in particular would prefer the ID to be at least unique inside a batch, so that - # it can more easily handle merge logic. Duplicate IDs across batches will be naturally overwritten as - # new batches are merged in. - # - Other backends like ndjson can currently just live with duplicates across batches, that's fine. - dataframe.drop_duplicates("id", inplace=True) + def _uniquify_rows(self, rows: list[dict]) -> list[dict]: + """ + Drop duplicates inside the batch to guarantee to the formatter that the "id" column is unique. + + This does not fix uniqueness across batches, but formatters that care about that can control for it. + + For context: + - We have seen duplicates inside and across files generated by Cerner bulk exports. So this is a real + concern found in the wild, and we can't just trust input data to be "clean." + - The deltalake backend in particular would prefer the ID to be at least unique inside a batch, so that + it can more easily handle merge logic. Duplicate IDs across batches will be naturally overwritten as + new batches are merged in. + - Other backends like ndjson can currently just live with duplicates across batches, that's fine. + """ + id_set = set() + + def is_unique(row): + nonlocal id_set + if row["id"] in id_set: + return False + id_set.add(row["id"]) + return True + return [row for row in rows if is_unique(row)] + + def _write_one_table_batch(self, formatter: formats.Format, rows: list[dict], batch_index: int) -> bool: # Checkpoint scrubber data before writing to the store, because if we get interrupted, it's safer to have an # updated codebook with no data than data with an inaccurate codebook. self.scrubber.save() - # Now we write that DataFrame to the target folder, in the requested format (e.g. parquet). - success = formatter.write_records(dataframe, batch_index) + rows = self._uniquify_rows(rows) + batch = self.make_batch_from_rows(formatter, rows, index=batch_index) + + # Now we write that batch to the target folder, in the requested format (e.g. parquet). + success = formatter.write_records(batch) if not success: - # We should write the "bad" dataframe to the error dir, for later review - self._write_errors(dataframe, batch_index) + # We should write the "bad" batch to the error dir, for later review + self._write_errors(batch) return success - def _write_errors(self, df: pandas.DataFrame, index: int) -> None: + def _write_errors(self, batch: formats.Batch) -> None: """Takes the dataframe and writes it to the error dir, if one was provided""" if not self.task_config.dir_errors: return error_root = store.Root(os.path.join(self.task_config.dir_errors, self.name), create=True) - error_path = error_root.joinpath(f"write-error.{index:03}.ndjson") - df.to_json(error_path, orient="records", lines=True, storage_options=error_root.fsspec_options()) + error_path = error_root.joinpath(f"write-error.{batch.index:03}.ndjson") + common.write_rows_to_ndjson(error_path, batch.rows) ########################################################################################## # @@ -253,3 +272,14 @@ async def prepare_task(self) -> bool: :returns: False if this task should be skipped and end immediately """ return True + + @classmethod + def get_schema(cls, formatter: formats.Format, rows: list[dict]) -> pyarrow.Schema | None: + """ + Creates a properly-schema'd Table from the provided batch. + + Can be overridden as needed for non-FHIR outputs. + """ + if formatter.resource_type: + return fhir.pyarrow_schema_from_resource_batch(formatter.resource_type, rows) + return None diff --git a/cumulus_etl/fhir/__init__.py b/cumulus_etl/fhir/__init__.py index 4dafb844..15f4b9f9 100644 --- a/cumulus_etl/fhir/__init__.py +++ b/cumulus_etl/fhir/__init__.py @@ -1,5 +1,5 @@ """Support for talking to FHIR servers & handling the FHIR spec""" from .fhir_client import FhirClient, create_fhir_client_for_cli -from .fhir_schemas import create_spark_schema_for_resource +from .fhir_schemas import pyarrow_schema_from_resource_batch from .fhir_utils import download_reference, get_docref_note, ref_resource, unref_resource diff --git a/cumulus_etl/fhir/fhir_schemas.py b/cumulus_etl/fhir/fhir_schemas.py index ff6a7fd3..1ab9325d 100644 --- a/cumulus_etl/fhir/fhir_schemas.py +++ b/cumulus_etl/fhir/fhir_schemas.py @@ -2,79 +2,144 @@ from collections import namedtuple from functools import partial +from typing import Any +import pyarrow from fhirclient.models import fhirabstractbase, fhirdate, fhirelementfactory -import pyspark FhirProperty = namedtuple("FhirProperty", ["name", "json_name", "pytype", "is_list", "of_many", "required"]) -def create_spark_schema_for_resource(resource_type: str) -> pyspark.sql.types.StructType: +def pyarrow_schema_from_resource_batch(resource_type: str, batch: list[dict]) -> pyarrow.Schema: """ - Creates a Pyspark StructType schema based off the named resource (like 'Observation'). + Creates a PyArrow schema based off the named resource (like 'Observation') and batch contents. Note that this schema will not be deep (fully nested all the way down), it will simply be wide (covering each toplevel field, each likely nullable). + But it *will* at least include every field contained in the batch. The primary goal here is to simplify complexity in the consuming SQL so that it can assume each column is at least defined. """ + # Examine batch to see the full shape of it, in order to detect any deeply nested fields that we want to make sure + # to include in the final schema (normally, we go wide but only as deep as we need to) + batch_shape = _get_shape_of_dicts(None, batch) + + return create_pyarrow_schema_for_resource(resource_type, batch_shape) + + +def _get_shape_of_dicts(total_shape: dict | None, item: Any) -> dict: + """ + Examines `item` and gives a description of its "shape". + + Shape here means a dictionary tree of fields, like {"id": {}, "code": {"text": {}}} + where empty dictionaries indicate no further children. + + This is not a generic concept at all - it's purely to aid with creating a schema for a batch of input rows. + This shape will tell us which FHIR fields to include in our schema. + + Example Input: + {"address": [{"street": "123 Main St", "city": "Springfield"}], "name": "Jane Smith"} + + Example output: + {"address": {"street": {}, "city": {}}, "name": {}} + + :param total_shape: a pre-existing shape that we will merge fields into + :param item: the current item being examined + :returns: a shape for this item and its descendants (will be same dict as total_shape if that was passed in) + """ + total_shape = total_shape or {} + + if isinstance(item, list): + for x in item: + total_shape = _get_shape_of_dicts(total_shape, x) + elif isinstance(item, dict): + for key, val in item.items(): + total_shape[key] = _get_shape_of_dicts(total_shape.get(key), val) + + return total_shape + + +def create_pyarrow_schema_for_resource(resource_type: str, batch_shape: dict) -> pyarrow.Schema: + """ + Creates a PyArrow schema based off the named resource (like 'Observation'). + + This schema will be as wide as the spec is and as deep as the batch_shape is. + + batch_shape is a dictionary tree of fields to include, like {"id": {}, "code": {"text": {}}} + where empty dictionaries indicate no children (but the parent should still be included). + """ instance = fhirelementfactory.FHIRElementFactory.instantiate(resource_type, None) - return fhir_obj_to_pyspark_fields(instance, recurse=True) + # fhirclient doesn't include `resourceType` in the list of properties. So do that manually. + type_field = pyarrow.field("resourceType", pyarrow.string()) -def fhir_obj_to_pyspark_fields( - base_obj: fhirabstractbase.FHIRAbstractBase, *, recurse: bool -) -> pyspark.sql.types.StructType: + return pyarrow.schema([type_field, *fhir_obj_to_pyarrow_fields(instance, batch_shape, level=0)]) + + +def get_all_column_names(resource_type: str) -> list[str]: + """ + Creates a list of all toplevel names for this resource + """ + instance = fhirelementfactory.FHIRElementFactory.instantiate(resource_type, None) + properties = map(FhirProperty._make, instance.elementProperties()) + return [prop.json_name for prop in properties] + + +def fhir_obj_to_pyarrow_fields( + base_obj: fhirabstractbase.FHIRAbstractBase, batch_shape: dict, *, level: int +) -> list[pyarrow.Field]: """Convert a FHIR instance to a Pyspark StructType schema definition""" properties = map(FhirProperty._make, base_obj.elementProperties()) - return pyspark.sql.types.StructType( - list(filter(None, map(partial(fhir_to_spark_property, recurse=recurse), properties))) - ) + return list(filter(None, map(partial(fhir_to_pyarrow_property, batch_shape=batch_shape, level=level), properties))) -def fhir_to_spark_property(prop: FhirProperty, *, recurse: bool) -> pyspark.sql.types.StructField | None: +def fhir_to_pyarrow_property(prop: FhirProperty, *, batch_shape: dict = None, level: int) -> pyarrow.Field | None: """Converts a single FhirProperty to a Pyspark StructField, returning None if this field should be skipped""" - spark_type = fhir_to_spark_type(prop.pytype, recurse=recurse) - if spark_type is None: + if batch_shape is not None: + batch_shape = batch_shape.get(prop.json_name) + if level > 1 and batch_shape is None: + # If we're deep, only include fields we actually see in data + return None + + pyarrow_type = fhir_to_pyarrow_type(prop.pytype, batch_shape, level=level) + if pyarrow_type is None: return None - # Wrap lists in an ArrayType + # Wrap lists in an ListType if prop.is_list: - spark_type = pyspark.sql.types.ArrayType(spark_type) + pyarrow_type = pyarrow.list_(pyarrow_type) # Mark all types as nullable, don't worry about the prop.required field. # The ETL itself doesn't need to be in the business of validation, we just want to push the data through. - return pyspark.sql.types.StructField(prop.json_name, spark_type, nullable=True) + return pyarrow.field(prop.json_name, pyarrow_type, nullable=True) -def fhir_to_spark_type(pytype: type, recurse=False) -> pyspark.sql.types.DataType | None: +def fhir_to_pyarrow_type(pytype: type, batch_shape: dict, *, level: int) -> pyarrow.DataType | None: """Converts a basic python type to a Pyspark type, returning None if this element should be skipped""" if pytype is int: - # TODO: investigate if we can reduce this to IntegerType (32-bit) safely. - # The FHIR spec is 32-bit only (but does include some unsigned variants), while LongType is 64-bit. - # I've started this off as LongType because that matches the historical inferred types before we used a - # pre-calculated schema. - return pyspark.sql.types.LongType() + return pyarrow.int32() elif pytype is float: - # TODO: the FHIR spec suggests that double might not even be enough: + # TODO: the FHIR spec suggests that float64 might not even be enough: # From https://www.hl7.org/fhir/R4/datatypes.html: # "In object code, implementations that might meet this constraint are GMP implementations or equivalents # to Java BigDecimal that implement arbitrary precision, or a combination of a (64 bit) floating point # value with a precision field" # But for now, we are matching the inferred types from before we used a pre-calculated schema. # We can presumably up-scale this at some point if we find limitations. - return pyspark.sql.types.DoubleType() + return pyarrow.float64() elif pytype is str: - return pyspark.sql.types.StringType() + return pyarrow.string() elif pytype is bool: - return pyspark.sql.types.BooleanType() + return pyarrow.bool_() elif pytype is fhirdate.FHIRDate: - return pyspark.sql.types.StringType() # just leave it as a string, like it appears in the JSON + return pyarrow.string() # just leave it as a string, like it appears in the JSON elif issubclass(pytype, fhirabstractbase.FHIRAbstractBase): - if recurse: - return fhir_obj_to_pyspark_fields(pytype(), recurse=False) + # If this field is present in the inferred schema, that means some part of the data has this field. + # So we should recurse and extend our normally shallow schema to be deep enough to include this field too. + if level == 0 or batch_shape is not None: + return pyarrow.struct(fhir_obj_to_pyarrow_fields(pytype(), batch_shape, level=level + 1)) # Else skip this element entirely and do not descend, to avoid infinite recursion. # Note that in theory this might leave a struct with no child fields (if a struct's only children where also diff --git a/cumulus_etl/formats/__init__.py b/cumulus_etl/formats/__init__.py index d5f6b154..999ef8c5 100644 --- a/cumulus_etl/formats/__init__.py +++ b/cumulus_etl/formats/__init__.py @@ -1,4 +1,5 @@ """Classes that know _how_ to write out results to the target folder""" from .base import Format +from .batch import Batch from .factory import get_format_class diff --git a/cumulus_etl/formats/base.py b/cumulus_etl/formats/base.py index 099b7f96..6d7b9403 100644 --- a/cumulus_etl/formats/base.py +++ b/cumulus_etl/formats/base.py @@ -3,9 +3,8 @@ import abc import logging -import pandas - from cumulus_etl import store +from cumulus_etl.formats.batch import Batch class Format(abc.ABC): @@ -39,31 +38,29 @@ def __init__(self, root: store.Root, dbname: str, group_field: str = None, resou self.group_field = group_field self.resource_type = resource_type - def write_records(self, dataframe: pandas.DataFrame, batch: int) -> bool: + def write_records(self, batch: Batch) -> bool: """ - Writes a single dataframe to the output root. + Writes a single batch of data to the output root. - The dataframe must contain a unique (no duplicates) "id" column. + The batch must contain a unique (no duplicates) "id" column. - :param dataframe: the data records to write - :param batch: the batch number, from zero up + :param batch: the batch of data :returns: whether the batch was successfully written """ try: - self._write_one_batch(dataframe, batch) + self._write_one_batch(batch) return True except Exception: # pylint: disable=broad-except logging.exception("Could not process data records") return False @abc.abstractmethod - def _write_one_batch(self, dataframe: pandas.DataFrame, batch: int) -> None: + def _write_one_batch(self, batch: Batch) -> None: """ - Writes a single dataframe to the output root. + Writes a single batch to the output root. - :param dataframe: the data records to write - :param batch: the batch number, from zero up + :param batch: the batch of data """ def finalize(self) -> None: diff --git a/cumulus_etl/formats/batch.py b/cumulus_etl/formats/batch.py new file mode 100644 index 00000000..be8ea922 --- /dev/null +++ b/cumulus_etl/formats/batch.py @@ -0,0 +1,21 @@ +"""Code to help managing a single batch of data""" + +import pyarrow + + +class Batch: + """ + A single chunk of FHIR data, winding its way through the ETL. + + A batch is: + - Always the same FHIR resource + - Kept to roughly --batch-size in size + - It may be less if we have less data or duplicates were pruned + - It may be more if a group_field wanted to make sure some rows were in the same batch (see OutputTable) + - Written to the target location as one piece (e.g. one ndjson file or one Delta Lake update chunk) + """ + + def __init__(self, rows: list[dict], schema: pyarrow.Schema = None, index: int = 0): + self.rows = rows + self.schema = schema + self.index = index diff --git a/cumulus_etl/formats/batched_files.py b/cumulus_etl/formats/batched_files.py index 896872b7..f5cf83b5 100644 --- a/cumulus_etl/formats/batched_files.py +++ b/cumulus_etl/formats/batched_files.py @@ -2,9 +2,8 @@ import abc -import pandas - from cumulus_etl.formats.base import Format +from cumulus_etl.formats.batch import Batch class BatchedFileFormat(Format): @@ -24,9 +23,9 @@ def suffix(self) -> str: """ @abc.abstractmethod - def write_format(self, df: pandas.DataFrame, path: str) -> None: + def write_format(self, batch: Batch, path: str) -> None: """ - Write the data in `df` to the target path file + Write the data in `batch` to the target path file """ ########################################################################################## @@ -48,8 +47,8 @@ def __init__(self, *args, **kwargs) -> None: except FileNotFoundError: pass - def _write_one_batch(self, dataframe: pandas.DataFrame, batch: int) -> None: + def _write_one_batch(self, batch: Batch) -> None: """Writes the whole dataframe to a single file""" self.root.makedirs(self.root.joinpath(self.dbname)) - full_path = self.root.joinpath(f"{self.dbname}/{self.dbname}.{batch:03}.{self.suffix}") - self.write_format(dataframe, full_path) + full_path = self.root.joinpath(f"{self.dbname}/{self.dbname}.{batch.index:03}.{self.suffix}") + self.write_format(batch, full_path) diff --git a/cumulus_etl/formats/deltalake.py b/cumulus_etl/formats/deltalake.py index e8b2887a..354017f3 100644 --- a/cumulus_etl/formats/deltalake.py +++ b/cumulus_etl/formats/deltalake.py @@ -10,18 +10,20 @@ import tempfile import delta -import pandas +import pyarrow +import pyarrow.parquet import pyspark from pyspark.sql.utils import AnalysisException -from cumulus_etl import fhir, store +from cumulus_etl import store from cumulus_etl.formats.base import Format +from cumulus_etl.formats.batch import Batch -# This class would be a lot simpler if we could use fsspec & pandas directly, since that's what the rest of our code +# This class would be a lot simpler if we could use fsspec & pyarrow directly, since that's what the rest of our code # uses and expects (in terms of filesystem writing). # # There is a 1st party Delta Lake implementation (`deltalake`) based off native Rust code and which talks to -# fsspec & pandas by default. But it is missing some critical features as of this writing (mostly merges): +# fsspec & pyarrow by default. But it is missing some critical features as of this writing (mostly merges): # - Merge support in deltalake bindings: https://github.com/delta-io/delta-rs/issues/850 @@ -81,14 +83,14 @@ def initialize_class(cls, root: store.Root) -> None: cls.spark.sparkContext.setLogLevel("ERROR") cls._configure_fs(root, cls.spark) - def _write_one_batch(self, dataframe: pandas.DataFrame, batch: int) -> None: + def _write_one_batch(self, batch: Batch) -> None: """Writes the whole dataframe to a delta lake""" - with self.pandas_to_spark_with_schema(dataframe) as updates: + with self.batch_to_spark(batch) as updates: if updates is None: return - table = self.update_delta_table(updates) + delta_table = self.update_delta_table(updates) - table.generate("symlink_format_manifest") + delta_table.generate("symlink_format_manifest") def update_delta_table(self, updates: pyspark.sql.DataFrame) -> delta.DeltaTable: full_path = self._table_path(self.dbname) @@ -161,52 +163,14 @@ def _configure_fs(root: store.Root, spark: pyspark.sql.SparkSession): spark.conf.set("fs.s3a.endpoint.region", region_name) @contextlib.contextmanager - def pandas_to_spark_with_schema(self, dataframe: pandas.DataFrame) -> pyspark.sql.DataFrame | None: - """Transforms a pandas DF to a spark DF with a full FHIR schema included""" - # This method solves two problems: - # 1. Pandas schemas are very loosey-goosey (and don't really take nested data into account), so simply - # calling self.spark.createDataFrame(df) does not give names for nested struct fields. - # 2. We want to provide column info for all valid FHIR fields (at least shallowly) so that the - # downstream SQL can reference all toplevel columns even if the source data doesn't have those fields. - # - # Issue #1 is solved by writing to parquet and reading it back in (a little wonky, but it gives full schemas). - # Issue #2 is solved by merging a computed FHIR schema with the actual data schema. - # - # Some devils-in-the-details: - # - Pyspark does not let us merge schemas in python code, we can only seem to do it while reading in multiple - # dataframes. So we write out the schema as an empty parquet file and read it back in with the data. - # We could write some manual schema-merging code, but I'm leery that we'd get it right or that it's worth - # doing ourselves rather than just writing this weird file and letting pyspark do it for us. - # - Our FHIR schema is incomplete and shallow (it skips all nested structs) to avoid infinite recursion issues, - # and we simply merge this incomplete schema in with the actual data schema, which will have full nested - # inferred schemas for exactly the fields it uses. We always write to the delta lake with autoMerge of - # schemas enabled, so incrementally adding fields to existing lakes will be fine. - # - Delta Lake does not like columns that have null types nor struct types with no children. So we make sure - # that every column has *some* definition, and that structs have content. - - with tempfile.TemporaryDirectory() as parquet_dir: - paths = [] - - # Write the pandas dataframe to parquet to force full nested schemas. - # We also convert dtypes, to get modern nullable pandas types (rather than using its default behavior of - # converting a nullable integer column into a float column). - if not dataframe.empty: # delta lake doesn't like empty parquet files - data_path = os.path.join(parquet_dir, "data.parquet") - dataframe.convert_dtypes().to_parquet(data_path, index=False) - del dataframe # allow GC to clean this up - paths.append(data_path) - - # Write the empty schema dataframe, so we can merge it with the above real dataframe - if self.resource_type: - schema_path = os.path.join(parquet_dir, "schema.parquet") - schema = fhir.create_spark_schema_for_resource(self.resource_type) - self.spark.createDataFrame([], schema=schema).write.parquet(schema_path) - paths.append(schema_path) - - if paths: - # Provide the happy merged result, coalesced to one partition (because our schema trick above creates 2) - yield self.spark.read.parquet(*paths, mergeSchema=True).coalesce(1) - else: - # Rare path, but possible - we have one task that uses a non-FHIR output format. - # So if it also has no data and has no schema, we hit this. - yield None + def batch_to_spark(self, batch: Batch) -> pyspark.sql.DataFrame | None: + """Transforms a batch to a spark DF""" + # This is the quick and dirty way - write batch to parquet with pyarrow and read it back. + # But a more direct way would be to convert the pyarrow schema to a pyspark schema and just + # call self.spark.createDataFrame(batch.rows, schema=pyspark_schema). A future improvement. + with tempfile.NamedTemporaryFile() as data_path: + table = pyarrow.Table.from_pylist(batch.rows, schema=batch.schema) + pyarrow.parquet.write_table(table, data_path.name) + del table + + yield self.spark.read.parquet(data_path.name) diff --git a/cumulus_etl/formats/ndjson.py b/cumulus_etl/formats/ndjson.py index 46575b04..147cbbaa 100644 --- a/cumulus_etl/formats/ndjson.py +++ b/cumulus_etl/formats/ndjson.py @@ -1,7 +1,7 @@ """An implementation of Format that writes to a few flat ndjson files""" -import pandas - +from cumulus_etl import common +from cumulus_etl.formats.batch import Batch from cumulus_etl.formats.batched_files import BatchedFileFormat @@ -12,5 +12,6 @@ class NdjsonFormat(BatchedFileFormat): def suffix(self) -> str: return "ndjson" - def write_format(self, df: pandas.DataFrame, path: str) -> None: - df.to_json(path, orient="records", lines=True, storage_options=self.root.fsspec_options()) + def write_format(self, batch: Batch, path: str) -> None: + # This is mostly used in tests and debugging, so we'll write out sparse files (no null columns) + common.write_rows_to_ndjson(path, batch.rows, sparse=True) diff --git a/cumulus_etl/formats/parquet.py b/cumulus_etl/formats/parquet.py index 7245e402..b67bde3a 100644 --- a/cumulus_etl/formats/parquet.py +++ b/cumulus_etl/formats/parquet.py @@ -1,7 +1,8 @@ """An implementation of Format that writes to a few flat parquet files""" -import pandas +import pyarrow.parquet +from cumulus_etl.formats.batch import Batch from cumulus_etl.formats.batched_files import BatchedFileFormat @@ -12,5 +13,6 @@ class ParquetFormat(BatchedFileFormat): def suffix(self) -> str: return "parquet" - def write_format(self, df: pandas.DataFrame, path: str) -> None: - df.to_parquet(path, index=False, storage_options=self.root.fsspec_options()) + def write_format(self, batch: Batch, path: str) -> None: + table = pyarrow.Table.from_pylist(batch.rows, schema=batch.schema) + pyarrow.parquet.write_table(table, path) diff --git a/cumulus_etl/loaders/i2b2/extract.py b/cumulus_etl/loaders/i2b2/extract.py index 2d569850..0fe468d8 100644 --- a/cumulus_etl/loaders/i2b2/extract.py +++ b/cumulus_etl/loaders/i2b2/extract.py @@ -3,59 +3,49 @@ import logging from collections.abc import Iterator -import pandas - +from cumulus_etl import common from cumulus_etl.loaders.i2b2.schema import ObservationFact, PatientDimension, VisitDimension -def extract_csv(path_csv: str, batch_size: int) -> Iterator[dict]: +def extract_csv(path_csv: str) -> Iterator[dict]: """ :param path_csv: /path/to/i2b2_formatted_file.csv - :param batch_size: how many entries to load into memory at once :return: an iterator over each row from the file """ - count = 0 try: - with pandas.read_csv(path_csv, dtype=str, na_filter=False, chunksize=batch_size) as reader: + with common.read_csv(path_csv) as reader: print(f"Reading csv {path_csv}...") - for chunk in reader: - for _, row in chunk.iterrows(): - yield dict(row) - count += len(chunk) - print(f" Read {count:,} entries...") + yield from reader print(f"Done reading {path_csv}.") except FileNotFoundError: print(f"No {path_csv}, skipping.") -def extract_csv_observation_facts(path_csv: str, batch_size: int) -> Iterator[ObservationFact]: +def extract_csv_observation_facts(path_csv: str) -> Iterator[ObservationFact]: """ :param path_csv: /path/to/file.csv - :param batch_size: how many entries to load into memory at once :return: i2b2 ObservationFact table """ logging.info("Transforming text into List[ObservationFact]") - for row in extract_csv(path_csv, batch_size): + for row in extract_csv(path_csv): yield ObservationFact(row) -def extract_csv_patients(path_csv: str, batch_size: int) -> Iterator[PatientDimension]: +def extract_csv_patients(path_csv: str) -> Iterator[PatientDimension]: """ :param path_csv: /path/to/file.csv - :param batch_size: how many entries to load into memory at once :return: List i2b2 patient dimension table """ logging.info("Transforming text into List[PatientDimension]") - for row in extract_csv(path_csv, batch_size): + for row in extract_csv(path_csv): yield PatientDimension(row) -def extract_csv_visits(path_csv: str, batch_size: int) -> Iterator[VisitDimension]: +def extract_csv_visits(path_csv: str) -> Iterator[VisitDimension]: """ :param path_csv: /path/to/file.csv - :param batch_size: how many entries to load into memory at once :return: List i2b2 visit dimension table """ logging.info("Transforming text into List[VisitDimension]") - for row in extract_csv(path_csv, batch_size): + for row in extract_csv(path_csv): yield VisitDimension(row) diff --git a/cumulus_etl/loaders/i2b2/loader.py b/cumulus_etl/loaders/i2b2/loader.py index 37e2025c..1a062c18 100644 --- a/cumulus_etl/loaders/i2b2/loader.py +++ b/cumulus_etl/loaders/i2b2/loader.py @@ -25,15 +25,13 @@ class I2b2Loader(Loader): Expected format is either a tcp:// URL pointing at an Oracle server or a local folder. """ - def __init__(self, root: store.Root, batch_size: int, export_to: str = None): + def __init__(self, root: store.Root, export_to: str = None): """ Initialize a new I2b2Loader class :param root: the base location to read data from - :param batch_size: the most entries to keep in memory at once :param export_to: folder to save the ndjson results of converting i2b2 """ super().__init__(root) - self.batch_size = batch_size self.export_to = export_to async def load_all(self, resources: list[str]) -> Directory: @@ -139,30 +137,24 @@ def _load_all_from_csv(self, resources: list[str]) -> Directory: conditions=partial( extract.extract_csv_observation_facts, os.path.join(path, "observation_fact_diagnosis.csv"), - self.batch_size, ), lab_views=partial( extract.extract_csv_observation_facts, os.path.join(path, "observation_fact_lab_views.csv"), - self.batch_size, ), medicationrequests=partial( extract.extract_csv_observation_facts, os.path.join(path, "observation_fact_medications.csv"), - self.batch_size, ), vitals=partial( extract.extract_csv_observation_facts, os.path.join(path, "observation_fact_vitals.csv"), - self.batch_size, ), documentreferences=partial( - extract.extract_csv_observation_facts, os.path.join(path, "observation_fact_notes.csv"), self.batch_size + extract.extract_csv_observation_facts, os.path.join(path, "observation_fact_notes.csv") ), - patients=partial( - extract.extract_csv_patients, os.path.join(path, "patient_dimension.csv"), self.batch_size - ), - encounters=partial(extract.extract_csv_visits, os.path.join(path, "visit_dimension.csv"), self.batch_size), + patients=partial(extract.extract_csv_patients, os.path.join(path, "patient_dimension.csv")), + encounters=partial(extract.extract_csv_visits, os.path.join(path, "visit_dimension.csv")), ) ################################################################################################################### diff --git a/cumulus_etl/store.py b/cumulus_etl/store.py index 730b4c52..85703ea0 100644 --- a/cumulus_etl/store.py +++ b/cumulus_etl/store.py @@ -15,7 +15,7 @@ def set_user_fs_options(args: dict) -> None: def get_fs_options(protocol: str) -> dict: - """Provides a set of storage option kwargs for fsspec calls or pandas storage_options arguments""" + """Provides a set of storage option kwargs for fsspec calls""" options = {} if protocol == "s3": @@ -130,5 +130,5 @@ def rm(self, path: str, recursive=False) -> None: self.fs.rm(path, recursive=recursive) def fsspec_options(self) -> dict: - """Provides a set of storage option kwargs for fsspec calls or pandas storage_options arguments""" + """Provides a set of storage option kwargs for fsspec calls""" return get_fs_options(self.protocol) diff --git a/pyproject.toml b/pyproject.toml index 5360cf8c..7579ab03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,6 @@ dependencies = [ "jwcrypto < 2", "label-studio-sdk < 1", "oracledb < 2", - "pandas < 3", "philter-lite < 1", "pyarrow < 13", "rich < 14", diff --git a/tests/convert/test_convert_cli.py b/tests/convert/test_convert_cli.py index 535fdf36..af1821e4 100644 --- a/tests/convert/test_convert_cli.py +++ b/tests/convert/test_convert_cli.py @@ -84,6 +84,9 @@ async def test_happy_path(self): conditions = utils.read_delta_lake(f"{self.target_path}/condition") # and conditions self.assertEqual(2, len(conditions)) self.assertEqual("2010-03-02", conditions[0]["recordedDate"]) + symptoms = utils.read_delta_lake(f"{self.target_path}/covid_symptom__nlp_results") # and covid symptoms + self.assertEqual(4, len(symptoms)) + self.assertEqual("for", symptoms[0]["match"]["text"]) # Now make a second small, partial output folder to layer into the existing Delta Lake delta_timestamp = "2023-02-29__19.53.08" diff --git a/tests/covid_symptom/test_nlp_results.py b/tests/covid_symptom/test_nlp_results.py index 846eaa6a..7e6d298d 100644 --- a/tests/covid_symptom/test_nlp_results.py +++ b/tests/covid_symptom/test_nlp_results.py @@ -35,9 +35,9 @@ async def test_unknown_modifier_extensions_skipped_for_nlp_symptoms(self): # Confirm that only symptoms from docref 0 got stored self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] + batch = self.format.write_records.call_args[0][0] expected_subject = self.codebook.db.patient("1234") - self.assertEqual({expected_subject}, set(df.subject_id)) + self.assertEqual({expected_subject}, {row["subject_id"] for row in batch.rows}) @ddt.data( # (coding, expected valid note) @@ -66,8 +66,8 @@ async def test_ed_note_filtering_for_nlp(self, codings, expected): await covid_symptom.CovidSymptomNlpResultsTask(self.job_config, self.scrubber).run() self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual(4 if expected else 0, len(df)) + batch = self.format.write_records.call_args[0][0] + self.assertEqual(4 if expected else 0, len(batch.rows)) async def test_non_ed_visit_is_skipped_for_covid_symptoms(self): """Verify we ignore non ED visits for the covid symptoms NLP""" @@ -82,9 +82,9 @@ async def test_non_ed_visit_is_skipped_for_covid_symptoms(self): # Confirm that only symptoms from docref 'present' got stored self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] + batch = self.format.write_records.call_args[0][0] expected_docref = self.codebook.db.resource_hash("present") - self.assertEqual({expected_docref}, set(df.docref_id)) + self.assertEqual({expected_docref}, {row["docref_id"] for row in batch.rows}) @ddt.data( ({"status": "entered-in-error"}, False), @@ -105,8 +105,8 @@ async def test_bad_doc_status_is_skipped_for_covid_symptoms(self, status: dict, await covid_symptom.CovidSymptomNlpResultsTask(self.job_config, self.scrubber).run() self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual(should_process, not df.empty) + batch = self.format.write_records.call_args[0][0] + self.assertEqual(2 if should_process else 0, len(batch.rows)) @ddt.data( # list of (URL, contentType), expected text @@ -134,11 +134,11 @@ async def test_note_urls_downloaded(self, attachments, expected_text): await covid_symptom.CovidSymptomNlpResultsTask(self.job_config, self.scrubber).run() self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] + batch = self.format.write_records.call_args[0][0] if expected_text: - self.assertEqual(expected_text, df.iloc[0].match["text"]) + self.assertEqual(expected_text, batch.rows[0]["match"]["text"]) else: - self.assertTrue(df.empty) + self.assertEqual(0, len(batch.rows)) async def test_nlp_errors_saved(self): docref = i2b2_mock_data.documentreference() diff --git a/tests/data/i2b2/output/observation/observation.000.ndjson b/tests/data/i2b2/output/observation/observation.000.ndjson index 7836f64f..9919f46f 100644 --- a/tests/data/i2b2/output/observation/observation.000.ndjson +++ b/tests/data/i2b2/output/observation/observation.000.ndjson @@ -1,8 +1,8 @@ -{"resourceType":"Observation","id":"a63803e71fe4208065298da3e293296c194621cca8c374aea2e4f32ddb44c26f","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/5c7cf282fcd61e10a4f5184ddb228ba156ebaa067629559458963e217b0ecba1"},"category":[{"coding":[{"code":"laboratory","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-19","status":"unknown","code":{"coding":[{"code":"94500-6","system":"http:\/\/loinc.org"}]},"valueCodeableConcept":{"coding":[{"code":"272519000","system":"http:\/\/snomed.info\/sct","display":"Absent"}]},"valueQuantity":null} -{"resourceType":"Observation","id":"228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"laboratory","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"LAB:1","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":{"coding":[{"code":"See Image","system":"http:\/\/cumulus.smarthealthit.org\/i2b2","display":"See Image"}]},"valueQuantity":null} -{"resourceType":"Observation","id":"d9b6de700feb3fb10370bb9e8fd5ba14ac7a31610012e6dd5f481429a321aa37","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":{"coding":[{"code":"Left Leg","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueQuantity":null} -{"resourceType":"Observation","id":"de1452522921e519285b9ce0ea23fd72d2005a98355eed0f1ec8347e7f6c45b4","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":{"coding":[{"code":"Right Leg","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueQuantity":null} -{"resourceType":"Observation","id":"d4e4830b25758182fba1c5dfc7370668d4abfff13d59913ba7e54692e2a5e190","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":null,"valueQuantity":null} -{"resourceType":"Observation","id":"bdaae1b02c3ab4841a2495c3069103ef4c4a6a5639a2c80430f1fb9637c628d7","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":null,"valueQuantity":{"value":10.5,"unit":"lb","system":"http:\/\/unitsofmeasure.org","code":"[lb_av]","comparator":"<="}} -{"resourceType":"Observation","id":"6628cff0b3de8d4c86cc553713fd3605cd9266abb2366a84cef02d1a5508a58d","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":null,"valueQuantity":{"value":1.13,"unit":"NOT DEFINED IN SOURCE"}} -{"resourceType":"Observation","id":"823ce087e9ae385ee4e9626fe3d92b4374580f7b59591639464d6bb7529b920b","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":null,"valueQuantity":{"value":99.9,"unit":"%","system":"http:\/\/unitsofmeasure.org","code":"%","comparator":">"}} +{"resourceType":"Observation","id":"a63803e71fe4208065298da3e293296c194621cca8c374aea2e4f32ddb44c26f","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/5c7cf282fcd61e10a4f5184ddb228ba156ebaa067629559458963e217b0ecba1"},"category":[{"coding":[{"code":"laboratory","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-19","status":"unknown","code":{"coding":[{"code":"94500-6","system":"http:\/\/loinc.org"}]},"valueCodeableConcept":{"coding":[{"code":"272519000","system":"http:\/\/snomed.info\/sct","display":"Absent"}]}} +{"resourceType":"Observation","id":"228b982ddae20b8da26a212666995acde914b941a4ff7c314adf89d02c3831f0","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"laboratory","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"LAB:1","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":{"coding":[{"code":"See Image","system":"http:\/\/cumulus.smarthealthit.org\/i2b2","display":"See Image"}]}} +{"resourceType":"Observation","id":"d9b6de700feb3fb10370bb9e8fd5ba14ac7a31610012e6dd5f481429a321aa37","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":{"coding":[{"code":"Left Leg","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]}} +{"resourceType":"Observation","id":"de1452522921e519285b9ce0ea23fd72d2005a98355eed0f1ec8347e7f6c45b4","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueCodeableConcept":{"coding":[{"code":"Right Leg","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]}} +{"resourceType":"Observation","id":"d4e4830b25758182fba1c5dfc7370668d4abfff13d59913ba7e54692e2a5e190","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]}} +{"resourceType":"Observation","id":"bdaae1b02c3ab4841a2495c3069103ef4c4a6a5639a2c80430f1fb9637c628d7","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueQuantity":{"value":10.5,"unit":"lb","system":"http:\/\/unitsofmeasure.org","code":"[lb_av]","comparator":"<="}} +{"resourceType":"Observation","id":"6628cff0b3de8d4c86cc553713fd3605cd9266abb2366a84cef02d1a5508a58d","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueQuantity":{"value":1.13,"unit":"NOT DEFINED IN SOURCE"}} +{"resourceType":"Observation","id":"823ce087e9ae385ee4e9626fe3d92b4374580f7b59591639464d6bb7529b920b","subject":{"reference":"Patient\/49fbb06b4b49eb49a096cf2a96674fb84a4d52ee74ec25c8f6f26023cb4764a7"},"encounter":{"reference":"Encounter\/fb29ea2a68ca2e1e4bbe22bdeedf021d94ec89f7e3d38ecbe908a8f2b3d89687"},"category":[{"coding":[{"code":"vital-signs","system":"http:\/\/terminology.hl7.org\/CodeSystem\/observation-category"}]}],"effectiveDateTime":"2020-03-20","status":"unknown","code":{"coding":[{"code":"VITAL:1248","system":"http:\/\/cumulus.smarthealthit.org\/i2b2"}]},"valueQuantity":{"value":99.9,"unit":"%","system":"http:\/\/unitsofmeasure.org","code":"%","comparator":">"}} diff --git a/tests/data/simple/output/medicationrequest/medicationrequest.000.ndjson b/tests/data/simple/output/medicationrequest/medicationrequest.000.ndjson index af6e45e5..34b42090 100644 --- a/tests/data/simple/output/medicationrequest/medicationrequest.000.ndjson +++ b/tests/data/simple/output/medicationrequest/medicationrequest.000.ndjson @@ -1,2 +1,2 @@ {"resourceType":"MedicationRequest","id":"6f988bf5ccca3551d4a84928e5507e9b31c54865b068187d3a1c8a640a176a1a","meta":{"profile":["http:\/\/hl7.org\/fhir\/us\/core\/StructureDefinition\/us-core-medicationrequest"]},"status":"active","statusReason":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/ValueSet\/medicationrequest-status-reason","code":"altchoice"}]},"intent":"order","category":[{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/medicationrequest-category","code":"community","display":"Community"}],"text":"Community"}],"priority":"routine","doNotPerform":false,"reportedBoolean":false,"medicationCodeableConcept":{"coding":[{"system":"http:\/\/www.nlm.nih.gov\/research\/umls\/rxnorm","code":"849574","display":"Naproxen sodium 220 MG Oral Tablet"}],"text":"Naproxen sodium 220 MG Oral Tablet"},"subject":{"reference":"Patient\/b18572e8ba7bc30b32ff72efa59a36524c59407d31166ce77c8c7351bdc4dc9c"},"encounter":{"reference":"Encounter\/cdb330ffb338c6f94075cd7f38708981dc3434bf12f49252485b687a98c555c7"},"supportingInformation":[{"reference":"DocumentReference\/cdb330ffb338c6f94075cd7f38708981dc3434bf12f49252485b687a98c555c7"}],"authoredOn":"1993-08-18T17:33:24-04:00","requester":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"performer":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"performerType":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"8724009"}]},"recorder":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"reasonCode":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"201834006","display":"Localized, primary osteoarthritis of the hand"}],"text":"Localized, primary osteoarthritis of the hand"}],"reasonReference":[{"reference":"Condition\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"instantiatesCanonical":["https:\/\/canonical\/"],"instantiatesUri":["https:\/\/uri\/"],"basedOn":[{"reference":"MedicationRequest\/fe735ec8b66c2d55c10f106ecf673db0a542a6930033aa13219a67edd12866fb"}],"courseOfTherapyType":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/medicationrequest-course-of-therapy","code":"continuous"}]},"insurance":[{"reference":"Coverage\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"dosageInstruction":[{"sequence":3,"additionalInstruction":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"418639000"}]}],"timing":{"event":["2018-02-01T17:33:24-04:00"],"repeat":{"boundsDuration":{"value":1},"count":1,"countMax":1,"duration":1,"durationMax":1,"durationUnit":"d","frequency":1,"frequencyMax":1,"period":1,"periodMax":1,"periodUnit":"d","dayOfWeek":["mon"],"timeOfDay":["13:00:00"],"when":["HS"],"offset":1},"code":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/v3-GTSAbbreviation","code":"QD"}]}},"asNeededBoolean":false,"site":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"251007"}]},"route":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"6064005"}]},"method":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"417924000"}]},"doseAndRate":[{"type":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/dose-rate-type","code":"calculated"}]},"doseQuantity":{"value":1},"rateRange":{"low":{"value":1},"high":{"value":1}}}],"maxDosePerPeriod":{"numerator":{"value":1},"denominator":{"value":1}},"maxDosePerAdministration":{"value":1},"maxDosePerLifetime":{"value":1}}],"dispenseRequest":{"initialFill":{"quantity":{"value":1},"duration":{"value":1}},"dispenseInterval":{"value":1},"validityPeriod":{"start":"2018-02-01T17:33:24-04:00","end":"2018-02-01T18:33:24-04:00"},"numberOfRepeatsAllowed":3,"quantity":{"value":1},"expectedSupplyDuration":{"value":1},"performer":{"reference":"Organization\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}},"substitution":{"allowedBoolean":true,"reason":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/v3-ActReason","code":"CT"}]}},"priorPrescription":{"reference":"MedicationRequest\/fe735ec8b66c2d55c10f106ecf673db0a542a6930033aa13219a67edd12866fb"},"detectedIssue":[{"reference":"DetectedIssue\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"eventHistory":[{"reference":"Provenance\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}]} -{"resourceType":"MedicationRequest","id":"da57de21394613b2039f39c5643c5a8f42e9e815055634eeb1399d30fc9c3844","meta":{"profile":["http:\/\/hl7.org\/fhir\/us\/core\/StructureDefinition\/us-core-medicationrequest"]},"status":"inactive","statusReason":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/ValueSet\/medicationrequest-status-reason","code":"altchoice"}]},"intent":"order","category":[{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/medicationrequest-category","code":"community","display":"Community"}],"text":"Community"}],"priority":"routine","doNotPerform":false,"reportedBoolean":false,"medicationCodeableConcept":{"coding":[{"system":"http:\/\/www.nlm.nih.gov\/research\/umls\/rxnorm","code":"849574","display":"Naproxen sodium 220 MG Oral Tablet"}],"text":"Naproxen sodium 220 MG Oral Tablet"},"subject":{"reference":"Patient\/b18572e8ba7bc30b32ff72efa59a36524c59407d31166ce77c8c7351bdc4dc9c"},"encounter":{"reference":"Encounter\/cdb330ffb338c6f94075cd7f38708981dc3434bf12f49252485b687a98c555c7"},"supportingInformation":[{"reference":"DocumentReference\/cdb330ffb338c6f94075cd7f38708981dc3434bf12f49252485b687a98c555c7"}],"authoredOn":"1993-08-18T17:33:24-04:00","requester":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"performer":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"performerType":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"8724009"}]},"recorder":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"reasonCode":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"201834006","display":"Localized, primary osteoarthritis of the hand"}],"text":"Localized, primary osteoarthritis of the hand"}],"reasonReference":[{"reference":"Condition\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"instantiatesCanonical":["https:\/\/canonical\/"],"instantiatesUri":["https:\/\/uri\/"],"basedOn":[{"reference":"MedicationRequest\/fe735ec8b66c2d55c10f106ecf673db0a542a6930033aa13219a67edd12866fb"}],"courseOfTherapyType":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/medicationrequest-course-of-therapy","code":"continuous"}]},"insurance":[{"reference":"Coverage\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"dosageInstruction":[{"sequence":3,"additionalInstruction":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"418639000"}]}],"timing":{"event":["2018-02-01T17:33:24-04:00"],"repeat":{"boundsDuration":{"value":1},"count":1,"countMax":1,"duration":1,"durationMax":1,"durationUnit":"d","frequency":1,"frequencyMax":1,"period":1,"periodMax":1,"periodUnit":"d","dayOfWeek":["mon"],"timeOfDay":["13:00:00"],"when":["HS"],"offset":1},"code":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/v3-GTSAbbreviation","code":"QD"}]}},"asNeededBoolean":false,"site":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"251007"}]},"route":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"6064005"}]},"method":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"417924000"}]},"doseAndRate":[{"type":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/dose-rate-type","code":"calculated"}]},"doseQuantity":{"value":1},"rateRange":{"low":{"value":1},"high":{"value":1}}}],"maxDosePerPeriod":{"numerator":{"value":1},"denominator":{"value":1}},"maxDosePerAdministration":{"value":1},"maxDosePerLifetime":{"value":1}}],"dispenseRequest":{"initialFill":{"quantity":{"value":1},"duration":{"value":1}},"dispenseInterval":{"value":1},"validityPeriod":{"start":"2018-02-01T17:33:24-04:00","end":"2018-02-01T18:33:24-04:00"},"numberOfRepeatsAllowed":3,"quantity":{"value":1},"expectedSupplyDuration":{"value":1},"performer":{"reference":"Organization\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}},"substitution":{"allowedBoolean":true,"reason":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/v3-ActReason","code":"CT"}]}},"priorPrescription":null,"detectedIssue":null,"eventHistory":null} +{"resourceType":"MedicationRequest","id":"da57de21394613b2039f39c5643c5a8f42e9e815055634eeb1399d30fc9c3844","meta":{"profile":["http:\/\/hl7.org\/fhir\/us\/core\/StructureDefinition\/us-core-medicationrequest"]},"status":"inactive","statusReason":{"coding":[{"system":"http:\/\/hl7.org\/fhir\/ValueSet\/medicationrequest-status-reason","code":"altchoice"}]},"intent":"order","category":[{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/medicationrequest-category","code":"community","display":"Community"}],"text":"Community"}],"priority":"routine","doNotPerform":false,"reportedBoolean":false,"medicationCodeableConcept":{"coding":[{"system":"http:\/\/www.nlm.nih.gov\/research\/umls\/rxnorm","code":"849574","display":"Naproxen sodium 220 MG Oral Tablet"}],"text":"Naproxen sodium 220 MG Oral Tablet"},"subject":{"reference":"Patient\/b18572e8ba7bc30b32ff72efa59a36524c59407d31166ce77c8c7351bdc4dc9c"},"encounter":{"reference":"Encounter\/cdb330ffb338c6f94075cd7f38708981dc3434bf12f49252485b687a98c555c7"},"supportingInformation":[{"reference":"DocumentReference\/cdb330ffb338c6f94075cd7f38708981dc3434bf12f49252485b687a98c555c7"}],"authoredOn":"1993-08-18T17:33:24-04:00","requester":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"performer":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"performerType":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"8724009"}]},"recorder":{"reference":"Practitioner\/5116db056f7dd2853349497c1a66555950bbf7491a497167aa4c4d30b8b8b0b0"},"reasonCode":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"201834006","display":"Localized, primary osteoarthritis of the hand"}],"text":"Localized, primary osteoarthritis of the hand"}],"reasonReference":[{"reference":"Condition\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"instantiatesCanonical":["https:\/\/canonical\/"],"instantiatesUri":["https:\/\/uri\/"],"basedOn":[{"reference":"MedicationRequest\/fe735ec8b66c2d55c10f106ecf673db0a542a6930033aa13219a67edd12866fb"}],"courseOfTherapyType":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/medicationrequest-course-of-therapy","code":"continuous"}]},"insurance":[{"reference":"Coverage\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}],"dosageInstruction":[{"sequence":3,"additionalInstruction":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"418639000"}]}],"timing":{"event":["2018-02-01T17:33:24-04:00"],"repeat":{"boundsDuration":{"value":1},"count":1,"countMax":1,"duration":1,"durationMax":1,"durationUnit":"d","frequency":1,"frequencyMax":1,"period":1,"periodMax":1,"periodUnit":"d","dayOfWeek":["mon"],"timeOfDay":["13:00:00"],"when":["HS"],"offset":1},"code":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/v3-GTSAbbreviation","code":"QD"}]}},"asNeededBoolean":false,"site":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"251007"}]},"route":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"6064005"}]},"method":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"417924000"}]},"doseAndRate":[{"type":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/dose-rate-type","code":"calculated"}]},"doseQuantity":{"value":1},"rateRange":{"low":{"value":1},"high":{"value":1}}}],"maxDosePerPeriod":{"numerator":{"value":1},"denominator":{"value":1}},"maxDosePerAdministration":{"value":1},"maxDosePerLifetime":{"value":1}}],"dispenseRequest":{"initialFill":{"quantity":{"value":1},"duration":{"value":1}},"dispenseInterval":{"value":1},"validityPeriod":{"start":"2018-02-01T17:33:24-04:00","end":"2018-02-01T18:33:24-04:00"},"numberOfRepeatsAllowed":3,"quantity":{"value":1},"expectedSupplyDuration":{"value":1},"performer":{"reference":"Organization\/ee1b8555df1476e7512bc31940148a7821edae6e152e92037e6e8d7e948800a4"}},"substitution":{"allowedBoolean":true,"reason":{"coding":[{"system":"http:\/\/terminology.hl7.org\/CodeSystem\/v3-ActReason","code":"CT"}]}}} diff --git a/tests/data/simple/output/procedure/procedure.000.ndjson b/tests/data/simple/output/procedure/procedure.000.ndjson index c657f5f0..b68665f3 100644 --- a/tests/data/simple/output/procedure/procedure.000.ndjson +++ b/tests/data/simple/output/procedure/procedure.000.ndjson @@ -1,2 +1,2 @@ -{"resourceType":"Procedure","id":"b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a","meta":{"versionId":"1"},"status":"completed","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"80146002","display":"Appendectomy (Procedure)"}],"text":"Appendectomy"},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"performedDateTime":"2013-04-05","recorder":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"asserter":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"performer":[{"actor":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}}],"reasonCode":[{"text":"Generalized abdominal pain 24 hours. Localized in *** with rebound and guarding"}],"followUp":[{"text":"ROS 5 days - ****-**-**"}],"usedReference":[{"reference":"Device\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}],"category":null,"bodySite":null,"complication":null,"usedCode":null} -{"resourceType":"Procedure","id":"86263a8b3ce0e27e7d18b0f234d957bc268249e1f2e1f29fa4e6b616ceff0eab","meta":null,"status":"completed","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"90105005","display":"Biopsy of soft tissue of forearm (Procedure)"}],"text":"Biopsy of suspected melanoma L) arm"},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"performedDateTime":"2014-02-03","recorder":null,"asserter":null,"performer":[{"actor":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}}],"reasonCode":[{"text":"**** lesion l) forearm. getting darker last 3 months."}],"followUp":[{"text":"****** ** ******"}],"usedReference":null,"category":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"103693007","display":"Diagnostic procedure (procedure)"}],"text":"Diagnostic procedure"},"bodySite":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"368225008","display":"Entire Left Forearm"}],"text":"Left forearm"}],"complication":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"67750007","display":"Ineffective airway clearance (finding)"}],"text":"Ineffective airway clearance"}],"usedCode":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"79068005","display":"Needle, device (physical object)"}],"text":"30-***** needle"}]} +{"resourceType":"Procedure","id":"b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a","meta":{"versionId":"1"},"status":"completed","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"80146002","display":"Appendectomy (Procedure)"}],"text":"Appendectomy"},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"performedDateTime":"2013-04-05","recorder":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"asserter":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"performer":[{"actor":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}}],"reasonCode":[{"text":"Generalized abdominal pain 24 hours. Localized in *** with rebound and guarding"}],"followUp":[{"text":"ROS 5 days - ****-**-**"}],"usedReference":[{"reference":"Device\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}]} +{"resourceType":"Procedure","id":"86263a8b3ce0e27e7d18b0f234d957bc268249e1f2e1f29fa4e6b616ceff0eab","status":"completed","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"90105005","display":"Biopsy of soft tissue of forearm (Procedure)"}],"text":"Biopsy of suspected melanoma L) arm"},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"performedDateTime":"2014-02-03","performer":[{"actor":{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}}],"reasonCode":[{"text":"**** lesion l) forearm. getting darker last 3 months."}],"followUp":[{"text":"****** ** ******"}],"category":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"103693007","display":"Diagnostic procedure (procedure)"}],"text":"Diagnostic procedure"},"bodySite":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"368225008","display":"Entire Left Forearm"}],"text":"Left forearm"}],"complication":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"67750007","display":"Ineffective airway clearance (finding)"}],"text":"Ineffective airway clearance"}],"usedCode":[{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"79068005","display":"Needle, device (physical object)"}],"text":"30-***** needle"}]} diff --git a/tests/data/simple/output/servicerequest/servicerequest.000.ndjson b/tests/data/simple/output/servicerequest/servicerequest.000.ndjson index ed4e614a..69f8fa34 100644 --- a/tests/data/simple/output/servicerequest/servicerequest.000.ndjson +++ b/tests/data/simple/output/servicerequest/servicerequest.000.ndjson @@ -1,2 +1,2 @@ -{"resourceType":"ServiceRequest","id":"baf86a2020490acb24ffa5c700291b88561c65c17151e2c7849f937be82e27ac","status":"active","intent":"plan","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"229115003","display":"***** Press (regime\/therapy)"}]},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"occurrenceTiming":{"repeat":{"count":20,"countMax":30,"frequency":3,"period":1,"periodUnit":"wk"}},"authoredOn":null,"requester":null,"performer":null} -{"resourceType":"ServiceRequest","id":"39fcb99e46d0e298dd42765e5920b51cc86ff39989e19fcdccb230d0cd1e4043","status":"completed","intent":"order","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"76164006","display":"Biopsy of colon (procedure)"}],"text":"Biopsy of colon"},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"occurrenceTiming":null,"authoredOn":"2017-03-05","requester":{"reference":"Practitioner\/d1736711dbf0c6584d11053912ea1e60984a6996294a91813536f40a729b4f52"},"performer":[{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}]} +{"resourceType":"ServiceRequest","id":"baf86a2020490acb24ffa5c700291b88561c65c17151e2c7849f937be82e27ac","status":"active","intent":"plan","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"229115003","display":"***** Press (regime\/therapy)"}]},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"occurrenceTiming":{"repeat":{"count":20,"countMax":30,"frequency":3,"period":1,"periodUnit":"wk"}}} +{"resourceType":"ServiceRequest","id":"39fcb99e46d0e298dd42765e5920b51cc86ff39989e19fcdccb230d0cd1e4043","status":"completed","intent":"order","code":{"coding":[{"system":"http:\/\/snomed.info\/sct","code":"76164006","display":"Biopsy of colon (procedure)"}],"text":"Biopsy of colon"},"subject":{"reference":"Patient\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"},"authoredOn":"2017-03-05","requester":{"reference":"Practitioner\/d1736711dbf0c6584d11053912ea1e60984a6996294a91813536f40a729b4f52"},"performer":[{"reference":"Practitioner\/b3b95f0f0726e6900745d734f86b008e75e19d02a0962f7e4d25f86b68356f2a"}]} diff --git a/tests/etl/test_etl_cli.py b/tests/etl/test_etl_cli.py index e6a6ae76..77e7ae7e 100644 --- a/tests/etl/test_etl_cli.py +++ b/tests/etl/test_etl_cli.py @@ -12,11 +12,10 @@ from cumulus_etl import cli, common, deid, errors, loaders, store from cumulus_etl.etl import context -from cumulus_etl.formats.deltalake import DeltaLakeFormat from tests.ctakesmock import CtakesMixin, fake_ctakes_extract from tests.s3mock import S3Mixin -from tests.utils import FROZEN_TIME_UTC, AsyncTestCase, TreeCompareMixin +from tests.utils import FROZEN_TIME_UTC, AsyncTestCase, TreeCompareMixin, read_delta_lake @pytest.mark.skipif(not shutil.which(deid.MSTOOL_CMD), reason="MS tool not installed") @@ -471,8 +470,8 @@ async def test_group_updates(self): def table_ids(): path = os.path.join(self.output_path, "covid_symptom__nlp_results") - df = DeltaLakeFormat.spark.read.format("delta").load(path).sort("id").toPandas() - return [row["id"] for _, row in df.iterrows()] + rows = read_delta_lake(path) + return [row["id"] for row in rows] # Get a baseline, with two symptoms per document await self.run_etl(output_format="deltalake", tasks=["covid_symptom__nlp_results"]) diff --git a/tests/etl/test_tasks.py b/tests/etl/test_tasks.py index 275e951a..4a8a238d 100644 --- a/tests/etl/test_tasks.py +++ b/tests/etl/test_tasks.py @@ -6,6 +6,7 @@ from unittest import mock import ddt +import pyarrow from cumulus_etl import common, deid, errors, fhir from cumulus_etl.etl import config, tasks @@ -40,9 +41,22 @@ def setUp(self) -> None: dir_errors=self.errors_dir, ) - self.format = mock.MagicMock(dbname="table") - self.format2 = mock.MagicMock(dbname="table2") # for tasks that have multiple output streams - self.create_formatter_mock = mock.MagicMock(side_effect=[self.format, self.format2]) + def make_formatter(dbname: str, group_field: str = None, resource_type: str = None): + formatter = mock.MagicMock(dbname=dbname, group_field=group_field, resource_type=resource_type) + self.format_count += 1 + if self.format_count == 1: + self.format = self.format or formatter + return self.format + elif self.format_count == 2: + self.format2 = self.format2 or formatter + return self.format2 + else: + return formatter # stop keeping track + + self.format = None + self.format2 = None # for tasks that have multiple output streams + self.format_count = 0 + self.create_formatter_mock = mock.MagicMock(side_effect=make_formatter) self.job_config.create_formatter = self.create_formatter_mock self.scrubber = deid.Scrubber() @@ -90,8 +104,8 @@ async def test_unknown_modifier_extensions_skipped_for_patients(self): # Confirm that only patient 0 got stored self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual([self.codebook.db.patient("0")], list(df.id)) + batch = self.format.write_records.call_args[0][0] + self.assertEqual([self.codebook.db.patient("0")], [row["id"] for row in batch.rows]) def test_unknown_task(self): with self.assertRaises(SystemExit) as cm: @@ -120,15 +134,18 @@ async def test_drop_duplicates(self): # Confirm that only one version of patient A got stored self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual(2, len(df.id)) - self.assertEqual(sorted([self.codebook.db.patient("A"), self.codebook.db.patient("B")]), sorted(df.id)) + batch = self.format.write_records.call_args[0][0] + self.assertEqual(2, len(batch.rows)) + self.assertEqual( + {self.codebook.db.patient("A"), self.codebook.db.patient("B")}, {row["id"] for row in batch.rows} + ) async def test_batch_write_errors_saved(self): self.make_json("Patient.1", "A") self.make_json("Patient.2", "B") self.make_json("Patient.3", "C") self.job_config.batch_size = 1 + self.format = mock.MagicMock(dbname="patient") self.format.write_records.side_effect = [False, True, False] # First and third will fail await basic_tasks.PatientTask(self.job_config, self.scrubber).run() @@ -145,6 +162,122 @@ async def test_batch_write_errors_saved(self): common.read_json(f"{self.errors_dir}/patient/write-error.002.ndjson"), ) + async def test_batch_has_wide_schema(self): + self.make_json("Patient.1", "A") # no interesting fields + + await basic_tasks.PatientTask(self.job_config, self.scrubber).run() + + schema = self.format.write_records.call_args[0][0].schema + self.assertListEqual( + [ + "resourceType", + "id", + "implicitRules", + "language", + "meta", + "contained", + "extension", + "modifierExtension", + "text", + "active", + "address", + "birthDate", + "communication", + "contact", + "deceasedBoolean", + "deceasedDateTime", + "gender", + "generalPractitioner", + "identifier", + "link", + "managingOrganization", + "maritalStatus", + "multipleBirthBoolean", + "multipleBirthInteger", + "name", + "photo", + "telecom", + ], + schema.names, + ) + + # Spot check a few of the types + self.assertEqual(pyarrow.string(), schema.field("id").type) + self.assertEqual(pyarrow.bool_(), schema.field("deceasedBoolean").type) + self.assertEqual(pyarrow.int32(), schema.field("multipleBirthInteger").type) + # Note how struct types only have basic types inside of them - this is intentional, no recursion of structs + # is done by the ETL. + self.assertEqual( + pyarrow.struct({"id": pyarrow.string(), "div": pyarrow.string(), "status": pyarrow.string()}), + schema.field("text").type, + ) + self.assertEqual( + pyarrow.list_(pyarrow.struct({"id": pyarrow.string(), "preferred": pyarrow.bool_()})), + schema.field("communication").type, + ) + + async def test_batch_schema_includes_inferred_fields(self): + """Verify that deep (inferred) fields are also included in the final schema""" + # Make sure that we include different deep fields for each - final schema should be a union + self.make_json("ServiceRequest.1", "A", category=[{"coding": [{"version": "1.0"}]}]) + self.make_json("ServiceRequest.2", "B", asNeededCodeableConcept={"coding": [{"userSelected": True}]}) + + await basic_tasks.ServiceRequestTask(self.job_config, self.scrubber).run() + + schema = self.format.write_records.call_args[0][0].schema + + # Start with simple, non-inferred CodeableConcept -- this should be bare-bones + self.assertEqual( + pyarrow.struct({"id": pyarrow.string(), "text": pyarrow.string()}), schema.field("performerType").type + ) + # Now the two custom/inferred/deep fields + self.assertEqual( + pyarrow.list_( + pyarrow.struct( + { + "id": pyarrow.string(), + "coding": pyarrow.list_( + pyarrow.struct( + { + "version": pyarrow.string(), + } + ) + ), + "text": pyarrow.string(), + } + ) + ), + schema.field("category").type, + ) + self.assertEqual( + pyarrow.struct( + { + "id": pyarrow.string(), + "coding": pyarrow.list_( + pyarrow.struct( + { + "userSelected": pyarrow.bool_(), + } + ) + ), + "text": pyarrow.string(), + } + ), + schema.field("asNeededCodeableConcept").type, + ) + + async def test_batch_schema_types_are_coerced(self): + """Verify that fields with "wrong" input types (like int instead of float) are correct in final schema""" + # Make sure that we include both wide and deep fields - we should coerce both into FHIR spec schema + self.make_json("ServiceRequest.1", "A", quantityQuantity={"value": 1}) # should be floating type + self.make_json("ServiceRequest.2", "B", quantityRange={"low": {"value": 2}}) # should be floating type + + await basic_tasks.ServiceRequestTask(self.job_config, self.scrubber).run() + + schema = self.format.write_records.call_args[0][0].schema + self.assertEqual(pyarrow.float64(), schema.field("quantityQuantity").type.field("value").type) + self.assertEqual(pyarrow.float64(), schema.field("quantityRange").type.field("low").type.field("value").type) + @ddt.ddt class TestMedicationRequestTask(TaskTestCase): @@ -158,19 +291,19 @@ async def test_inline_codes(self): await basic_tasks.MedicationRequestTask(self.job_config, self.scrubber).run() self.assertEqual(1, self.format.write_records.call_count) - df1 = self.format.write_records.call_args[0][0] + batch = self.format.write_records.call_args[0][0] self.assertEqual( { self.codebook.db.resource_hash("InlineCode"), self.codebook.db.resource_hash("NoCode"), }, - set(df1.id), + {row["id"] for row in batch.rows}, ) # Confirm we wrote an empty dataframe to the medication table self.assertEqual(1, self.format2.write_records.call_count) - df2 = self.format2.write_records.call_args[0][0] - self.assertTrue(df2.empty) + batch = self.format2.write_records.call_args[0][0] + self.assertEqual([], batch.rows) async def test_contained_medications(self): """Verify that we pass it through and don't blow up""" @@ -180,13 +313,13 @@ async def test_contained_medications(self): # Confirm we wrote the basic MedicationRequest self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual(f'#{self.codebook.db.resource_hash("123")}', df.iloc[0].medicationReference["reference"]) + batch = self.format.write_records.call_args[0][0] + self.assertEqual(f'#{self.codebook.db.resource_hash("123")}', batch.rows[0]["medicationReference"]["reference"]) # Confirm we wrote an empty dataframe to the medication table self.assertEqual(1, self.format2.write_records.call_count) - df2 = self.format2.write_records.call_args[0][0] - self.assertTrue(df2.empty) + batch = self.format2.write_records.call_args[0][0] + self.assertEqual(0, len(batch.rows)) @mock.patch("cumulus_etl.fhir.download_reference") async def test_external_medications(self, mock_download): @@ -208,16 +341,19 @@ async def test_external_medications(self, mock_download): # Confirm we wrote the basic MedicationRequest self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual([self.codebook.db.resource_hash("A")], list(df.id)) + batch = self.format.write_records.call_args[0][0] + self.assertEqual([self.codebook.db.resource_hash("A")], [row["id"] for row in batch.rows]) self.assertEqual( - f'Medication/{self.codebook.db.resource_hash("123")}', df.iloc[0].medicationReference["reference"] + f'Medication/{self.codebook.db.resource_hash("123")}', + batch.rows[0]["medicationReference"]["reference"], ) # AND that we wrote the downloaded resource! self.assertEqual(1, self.format2.write_records.call_count) - df = self.format2.write_records.call_args[0][0] - self.assertEqual([self.codebook.db.resource_hash("med1")], list(df.id)) # meds should be scrubbed too + batch = self.format2.write_records.call_args[0][0] + self.assertEqual( + [self.codebook.db.resource_hash("med1")], [row["id"] for row in batch.rows] + ) # meds should be scrubbed too @mock.patch("cumulus_etl.fhir.download_reference") async def test_external_medication_scrubbed(self, mock_download): @@ -236,14 +372,14 @@ async def test_external_medication_scrubbed(self, mock_download): # Check result self.assertEqual(1, self.format2.write_records.call_count) - df = self.format2.write_records.call_args[0][0] + batch = self.format2.write_records.call_args[0][0] self.assertEqual( { "resourceType": "Medication", "id": self.codebook.db.resource_hash("med1"), "status": "active", }, - dict(df.iloc[0]), + common.sparse_dict(batch.rows[0]), ) @mock.patch("cumulus_etl.fhir.download_reference") @@ -262,13 +398,13 @@ async def test_external_medications_with_error(self, mock_download): # Confirm we still wrote out all three request resources self.assertEqual(1, self.format.write_records.call_count) - df = self.format.write_records.call_args[0][0] - self.assertEqual(3, len(df.id)) + batch = self.format.write_records.call_args[0][0] + self.assertEqual(3, len(batch.rows)) # Confirm we still wrote out the medication for B self.assertEqual(1, self.format2.write_records.call_count) - df = self.format2.write_records.call_args[0][0] - self.assertEqual([self.codebook.db.resource_hash("medB")], list(df.id)) + batch = self.format2.write_records.call_args[0][0] + self.assertEqual([self.codebook.db.resource_hash("medB")], [row["id"] for row in batch.rows]) # And we saved the error? med_error_dir = f"{self.errors_dir}/medicationrequest" @@ -303,10 +439,10 @@ async def test_external_medications_skips_duplicates(self, mock_download): # Confirm we wrote just the downloaded resources, and didn't repeat the dup at all self.assertEqual(2, self.format2.write_records.call_count) - df1 = self.format2.write_records.call_args_list[0][0][0] - self.assertEqual([self.codebook.db.resource_hash("dup")], list(df1.id)) - df2 = self.format2.write_records.call_args_list[1][0][0] - self.assertEqual([self.codebook.db.resource_hash("new")], list(df2.id)) + batch = self.format2.write_records.call_args_list[0][0][0] + self.assertEqual([self.codebook.db.resource_hash("dup")], [row["id"] for row in batch.rows]) + batch = self.format2.write_records.call_args_list[1][0][0] + self.assertEqual([self.codebook.db.resource_hash("new")], [row["id"] for row in batch.rows]) @mock.patch("cumulus_etl.fhir.download_reference") async def test_external_medications_skips_unknown_modifiers(self, mock_download): @@ -328,5 +464,5 @@ async def test_external_medications_skips_unknown_modifiers(self, mock_download) await basic_tasks.MedicationRequestTask(self.job_config, self.scrubber).run() self.assertEqual(1, self.format2.write_records.call_count) - df = self.format2.write_records.call_args[0][0] - self.assertEqual([self.codebook.db.resource_hash("good")], list(df.id)) # no "odd" written + batch = self.format2.write_records.call_args[0][0] + self.assertEqual([self.codebook.db.resource_hash("good")], [row["id"] for row in batch.rows]) # no "odd" diff --git a/tests/i2b2/test_i2b2_loader.py b/tests/i2b2/test_i2b2_loader.py index 3c8cc8b0..bea7023b 100644 --- a/tests/i2b2/test_i2b2_loader.py +++ b/tests/i2b2/test_i2b2_loader.py @@ -16,7 +16,7 @@ async def test_missing_files(self): """Verify that we don't error out if files are missing, we just ignore the ones that are""" with tempfile.TemporaryDirectory() as tmpdir: root = store.Root(tmpdir) - i2b2_loader = loader.I2b2Loader(root, 5) + i2b2_loader = loader.I2b2Loader(root) # Write one file, but not others, just to confirm we do a partial read if possible. vitals = f"{self.datadir}/i2b2/input/observation_fact_vitals.csv" diff --git a/tests/i2b2/test_i2b2_oracle_extract.py b/tests/i2b2/test_i2b2_oracle_extract.py index 2e8fe60a..a80bb675 100644 --- a/tests/i2b2/test_i2b2_oracle_extract.py +++ b/tests/i2b2/test_i2b2_oracle_extract.py @@ -89,7 +89,7 @@ async def test_loader(self, mock_extract): mock_extract.list_visit.return_value = [i2b2_mock_data.encounter_dim()] root = store.Root("tcp://localhost/foo") - oracle_loader = loader.I2b2Loader(root, 5) + oracle_loader = loader.I2b2Loader(root) tmpdir = await oracle_loader.load_all(["Condition", "Encounter", "Patient"]) # Check results diff --git a/tests/test_deltalake.py b/tests/test_deltalake.py index 36a2eae3..afe5ebbb 100644 --- a/tests/test_deltalake.py +++ b/tests/test_deltalake.py @@ -6,15 +6,17 @@ import shutil import tempfile -import pandas -import pytest +import ddt +import pyarrow +import pyspark.sql from pyspark.sql.utils import AnalysisException -from cumulus_etl import store +from cumulus_etl import formats, store from cumulus_etl.formats.deltalake import DeltaLakeFormat from tests import utils +@ddt.ddt class TestDeltaLake(utils.AsyncTestCase): """ Test case for the Delta Lake format writer. @@ -39,29 +41,36 @@ def setUp(self): shutil.rmtree(self.output_dir, ignore_errors=True) @staticmethod - def df(**kwargs) -> pandas.DataFrame: + def df(**kwargs) -> list[dict]: """ - Creates a dummy DataFrame with ids & values equal to each kwarg provided. + Creates a dummy Table with ids & values equal to each kwarg provided. """ - rows = [{"id": k, "value": v} for k, v in kwargs.items()] - return pandas.DataFrame(rows) + return [{"id": k, "value": v} for k, v in kwargs.items()] - def store(self, df: pandas.DataFrame, batch: int = 10, group_field: str = None, resource_type: str = None) -> None: + def get_spark_schema(self, df: pyspark.sql.DataFrame) -> str: + # pyspark's printSchema function is much more readable/concise than other schema inspection methods, + # but it doesn't expose the internal calls it makes to produce that - so let's just capture stdout. + with contextlib.redirect_stdout(io.StringIO()) as schema_tree: + df.printSchema() + return schema_tree.getvalue().strip() + + def store( + self, rows: list[dict], batch_index: int = 10, schema: pyarrow.Schema = None, group_field: str = None + ) -> bool: """ Writes a single batch of data to the data lake. - :param df: the data to insert - :param batch: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic + :param rows: the data to insert + :param batch_index: which batch number this is, defaulting to 10 to avoid triggering any first/last batch logic + :param schema: the batch schema, in pyarrow format :param group_field: a group field name, used to delete non-matching group rows - :param resource_type: the name of the resource being stored """ - deltalake = DeltaLakeFormat(self.root, "patient", group_field=group_field, resource_type=resource_type) - deltalake.write_records(df, batch) + deltalake = DeltaLakeFormat(self.root, "patient", group_field=group_field) + return deltalake.write_records(formats.Batch(rows, index=batch_index, schema=schema)) - def assert_lake_equal(self, df: pandas.DataFrame) -> None: + def assert_lake_equal(self, rows: list[dict]) -> None: table_path = os.path.join(self.output_dir, "patient") - table_records = utils.read_delta_lake(table_path) - self.assertListEqual(df.to_dict(orient="records"), table_records) + self.assertListEqual(rows, utils.read_delta_lake(table_path)) def test_creates_if_empty(self): """Verify that the lake is created when empty""" @@ -98,15 +107,25 @@ def test_missing_field(self): self.store(self.df(b={"one": 1})) self.assert_lake_equal(self.df(a={"one": 1, "two": 2}, b={"one": 1})) - # This currently fails because delta silently drops field data that can't be converted to the correct type. - # Here is a request to change this behavior into an error: https://github.com/delta-io/delta/issues/1551 - # See https://github.com/smart-on-fhir/cumulus-etl/issues/133 for some discussion of this issue. - @pytest.mark.xfail def test_altered_field(self): - """Verify that field types cannot be altered.""" - self.store(self.df(a={"one": 1})) - self.store(self.df(b={"one": "string"})) # should error out / not update - self.assert_lake_equal(self.df(a={"one": 1})) + """Verify that field types cannot be altered, as long as we have a schema.""" + schema = pyarrow.schema( + [ + pyarrow.field("id", pyarrow.string()), + pyarrow.field("value", pyarrow.int32()), + ] + ) + self.assertTrue(self.store(self.df(a=1), schema=schema)) + self.assertFalse(self.store(self.df(b="string"), schema=schema)) + self.assert_lake_equal(self.df(a=1)) + + # And just confirm the mildly buggy behavior that Delta Lake will silently ignore + # altered types when we don't force a schema. This is one reason we like to force a schema! + # We don't desire or care about this behavior, but just testing it here as a sort of documentation, + # in case they ever fix that, and then we get to know about it. + # Upstream issue: https://github.com/delta-io/delta/issues/1551 + self.assertTrue(self.store(self.df(b="string"))) + self.assert_lake_equal([{"id": "a", "value": 1}, {"id": "b"}]) def test_schema_has_names(self): """Verify that the lake's schemas has valid nested names, which may not always happen with spark""" @@ -152,27 +171,19 @@ def test_schema_has_names(self): ) def test_merged_schema_for_resource(self): - """Verify that the lake's schemas is derived from the full resource schema and also merged with real data""" - # This dataframe will: - # (A) be missing 99% of the schema - # (B) include a nested FHIR struct that our default (non-recursive) resource schemas will not include - # (C) include whole new non-FHIR columns - # Ideally the final result includes those two new elements and all the normal FHIR schema. + """Verify that the lake's schemas is updated over time as new fields appear""" rows = [ - # We specifically make the new columns int & bool types because pandas can have difficulty with nullable - # versions of those columns -- this will verify that the schema arrives back out correctly. + {"id": "bare-row"}, {"id": "int-row", "contact": [{"name": {"text": "Jane Doe"}}], "newIntColumn": 2000}, - {"id": "bool-row", "contact": [{"name": {"text": "John Doe"}}], "newBoolColumn": True}, + {"id": "bool-row", "contact": [{"name": {"given": ["John"]}}], "newBoolColumn": True}, ] - self.store(pandas.DataFrame(rows), resource_type="Patient") + for row in rows: + self.store([row]) table_path = os.path.join(self.output_dir, "patient") table_df = DeltaLakeFormat.spark.read.format("delta").load(table_path) - # pyspark's printSchema function is much more readable/concise than other schema inspection methods, - # but it doesn't expose the internal calls it makes to produce that - so let's just capture stdout. - with contextlib.redirect_stdout(io.StringIO()) as schema_tree: - table_df.printSchema() + schema_tree = self.get_spark_schema(table_df) self.assertEqual( """root @@ -181,157 +192,54 @@ def test_merged_schema_for_resource(self): | |-- element: struct (containsNull = true) | | |-- name: struct (nullable = true) | | | |-- text: string (nullable = true) - | | |-- id: string (nullable = true) - | | |-- gender: string (nullable = true) + | | | |-- given: array (nullable = true) + | | | | |-- element: string (containsNull = true) |-- newIntColumn: long (nullable = true) - |-- newBoolColumn: boolean (nullable = true) - |-- implicitRules: string (nullable = true) - |-- language: string (nullable = true) - |-- meta: struct (nullable = true) - | |-- id: string (nullable = true) - | |-- lastUpdated: string (nullable = true) - | |-- profile: array (nullable = true) - | | |-- element: string (containsNull = true) - | |-- source: string (nullable = true) - | |-- versionId: string (nullable = true) - |-- contained: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- implicitRules: string (nullable = true) - | | |-- language: string (nullable = true) - |-- extension: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- url: string (nullable = true) - | | |-- valueBase64Binary: string (nullable = true) - | | |-- valueBoolean: boolean (nullable = true) - | | |-- valueCanonical: string (nullable = true) - | | |-- valueCode: string (nullable = true) - | | |-- valueDate: string (nullable = true) - | | |-- valueDateTime: string (nullable = true) - | | |-- valueDecimal: double (nullable = true) - | | |-- valueId: string (nullable = true) - | | |-- valueInstant: string (nullable = true) - | | |-- valueInteger: long (nullable = true) - | | |-- valueMarkdown: string (nullable = true) - | | |-- valueOid: string (nullable = true) - | | |-- valuePositiveInt: long (nullable = true) - | | |-- valueString: string (nullable = true) - | | |-- valueTime: string (nullable = true) - | | |-- valueUnsignedInt: long (nullable = true) - | | |-- valueUri: string (nullable = true) - | | |-- valueUrl: string (nullable = true) - | | |-- valueUuid: string (nullable = true) - |-- modifierExtension: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- url: string (nullable = true) - | | |-- valueBase64Binary: string (nullable = true) - | | |-- valueBoolean: boolean (nullable = true) - | | |-- valueCanonical: string (nullable = true) - | | |-- valueCode: string (nullable = true) - | | |-- valueDate: string (nullable = true) - | | |-- valueDateTime: string (nullable = true) - | | |-- valueDecimal: double (nullable = true) - | | |-- valueId: string (nullable = true) - | | |-- valueInstant: string (nullable = true) - | | |-- valueInteger: long (nullable = true) - | | |-- valueMarkdown: string (nullable = true) - | | |-- valueOid: string (nullable = true) - | | |-- valuePositiveInt: long (nullable = true) - | | |-- valueString: string (nullable = true) - | | |-- valueTime: string (nullable = true) - | | |-- valueUnsignedInt: long (nullable = true) - | | |-- valueUri: string (nullable = true) - | | |-- valueUrl: string (nullable = true) - | | |-- valueUuid: string (nullable = true) - |-- text: struct (nullable = true) - | |-- id: string (nullable = true) - | |-- div: string (nullable = true) - | |-- status: string (nullable = true) - |-- active: boolean (nullable = true) - |-- address: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- city: string (nullable = true) - | | |-- country: string (nullable = true) - | | |-- district: string (nullable = true) - | | |-- line: array (nullable = true) - | | | |-- element: string (containsNull = true) - | | |-- postalCode: string (nullable = true) - | | |-- state: string (nullable = true) - | | |-- text: string (nullable = true) - | | |-- type: string (nullable = true) - | | |-- use: string (nullable = true) - |-- birthDate: string (nullable = true) - |-- communication: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- preferred: boolean (nullable = true) - |-- deceasedBoolean: boolean (nullable = true) - |-- deceasedDateTime: string (nullable = true) - |-- gender: string (nullable = true) - |-- generalPractitioner: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- display: string (nullable = true) - | | |-- reference: string (nullable = true) - | | |-- type: string (nullable = true) - |-- identifier: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- system: string (nullable = true) - | | |-- use: string (nullable = true) - | | |-- value: string (nullable = true) - |-- link: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- type: string (nullable = true) - |-- managingOrganization: struct (nullable = true) - | |-- id: string (nullable = true) - | |-- display: string (nullable = true) - | |-- reference: string (nullable = true) - | |-- type: string (nullable = true) - |-- maritalStatus: struct (nullable = true) - | |-- id: string (nullable = true) - | |-- text: string (nullable = true) - |-- multipleBirthBoolean: boolean (nullable = true) - |-- multipleBirthInteger: long (nullable = true) - |-- name: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- family: string (nullable = true) - | | |-- given: array (nullable = true) - | | | |-- element: string (containsNull = true) - | | |-- prefix: array (nullable = true) - | | | |-- element: string (containsNull = true) - | | |-- suffix: array (nullable = true) - | | | |-- element: string (containsNull = true) - | | |-- text: string (nullable = true) - | | |-- use: string (nullable = true) - |-- photo: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- contentType: string (nullable = true) - | | |-- creation: string (nullable = true) - | | |-- data: string (nullable = true) - | | |-- hash: string (nullable = true) - | | |-- language: string (nullable = true) - | | |-- size: long (nullable = true) - | | |-- title: string (nullable = true) - | | |-- url: string (nullable = true) - |-- telecom: array (nullable = true) - | |-- element: struct (containsNull = true) - | | |-- id: string (nullable = true) - | | |-- rank: long (nullable = true) - | | |-- system: string (nullable = true) - | | |-- use: string (nullable = true) - | | |-- value: string (nullable = true) - -""", - schema_tree.getvalue(), + |-- newBoolColumn: boolean (nullable = true)""", + schema_tree, + ) + + @ddt.data( + # In general, the first type used wins + (pyarrow.int64(), 2000, pyarrow.int32(), 2000, "long", 2000), + (pyarrow.int32(), 2000, pyarrow.int64(), 2000, "integer", 2000), + (pyarrow.int64(), 3000000000, pyarrow.int32(), 2000, "long", 2000), + # Interestingly, delta lake will silently down-convert for us. + # This is not an expected scenario, but we should beware this gotcha. + (pyarrow.int32(), 2000, pyarrow.int64(), 3000000000, "integer", -1294967296), + ) + @ddt.unpack + def test_column_type_merges(self, type1, val1, type2, val2, expected_type, expected_value): + """Verify that if we write a slightly different, but compatible field to the delta lake, it works""" + schema1 = pyarrow.schema( + [ + pyarrow.field("id", pyarrow.string()), + pyarrow.field("int", type1), + ] + ) + self.store([{"id": "1", "int": val1}], schema=schema1) + + schema2 = pyarrow.schema( + [ + pyarrow.field("id", pyarrow.string()), + pyarrow.field("int", type2), + ] + ) + self.store([{"id": "1", "int": val2}], schema=schema2) + + table_path = os.path.join(self.output_dir, "patient") + table_df = DeltaLakeFormat.spark.read.format("delta").load(table_path) + schema_tree = self.get_spark_schema(table_df) + self.assertEqual( + f"""root + |-- id: string (nullable = true) + |-- int: {expected_type} (nullable = true)""", + schema_tree, ) + values = utils.read_delta_lake(table_path) + self.assertEqual(expected_value, values[0]["int"]) + def test_group_field(self): """Verify that we can safely delete some data from the lake using groups""" self.store( diff --git a/tests/utils.py b/tests/utils.py index 42908298..48621659 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,6 +8,7 @@ import json import os import time +import tracemalloc import unittest from unittest import mock @@ -188,7 +189,7 @@ def read_delta_lake(lake_path: str, *, version: int = None) -> list[dict]: DeltaLakeFormat.initialize_class() must have already been called. - Compare the results to a pandas dataframe with df.to_dict(orient="records") or just as a list of dicts. + Compare the results to a pyarrow table with table.to_pylist() or just as a list of dicts. """ # Read spark table reader = DeltaLakeFormat.spark.read @@ -204,10 +205,30 @@ def read_delta_lake(lake_path: str, *, version: int = None) -> list[dict]: @contextlib.contextmanager -def timeit(desc: str = None): +def time_it(desc: str = None): """Tiny little timer context manager that is useful when debugging""" start = time.perf_counter() yield end = time.perf_counter() suffix = f" ({desc})" if desc else "" - print(f"TIMEIT: {end - start:.2f}s{suffix}") + print(f"TIME IT: {end - start:.2f}s{suffix}") + + +@contextlib.contextmanager +def mem_it(desc: str = None): + """Tiny little context manager to measure memory usage""" + start_tracing = not tracemalloc.is_tracing() + if start_tracing: + tracemalloc.start() + + before, before_peak = tracemalloc.get_traced_memory() + yield + after, after_peak = tracemalloc.get_traced_memory() + + if start_tracing: + tracemalloc.stop() + + suffix = f" ({desc})" if desc else "" + if after_peak > before_peak: + suffix = f"{suffix} ({after_peak - before_peak:,} PEAK change)" + print(f"MEM IT: {after - before:,}{suffix}")