Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet and Delta import #2062

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import au.csiro.pathling.io.FileSystemPersistence;
import au.csiro.pathling.io.ImportMode;
import ca.uhn.fhir.rest.annotation.ResourceParam;
import io.delta.tables.DeltaTable;
import jakarta.annotation.Nonnull;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
Expand All @@ -39,6 +40,7 @@
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.hl7.fhir.instance.model.api.IBaseResource;
Expand All @@ -50,6 +52,7 @@
import org.hl7.fhir.r4.model.OperationOutcome.OperationOutcomeIssueComponent;
import org.hl7.fhir.r4.model.Parameters;
import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
import org.hl7.fhir.r4.model.StringType;
import org.hl7.fhir.r4.model.UrlType;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -137,6 +140,13 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
.map(param -> ImportMode.fromCode(
((CodeType) param.getValue()).asStringValue()))
.orElse(ImportMode.OVERWRITE);

// Get the serialized resource type from the source parameter.
final String format = sourceParam.getPart().stream()
johngrimes marked this conversation as resolved.
Show resolved Hide resolved
.filter(param -> "format".equals(param.getName()))
.findFirst()
.map(param -> ((StringType) param.getValue()).getValueAsString()).orElse(null);

final String resourceCode = ((CodeType) resourceTypeParam.getValue()).getCode();
final ResourceType resourceType = ResourceType.fromCode(resourceCode);

Expand All @@ -149,17 +159,13 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
}

// Read the resources from the source URL into a dataset of strings.
final Dataset<String> jsonStrings = readStringsFromUrl(urlParam);

// Parse each line into a HAPI FHIR object, then encode to a Spark dataset.
final Dataset<IBaseResource> resources = jsonStrings.map(jsonToResourceConverter(),
fhirEncoder);
final Dataset<Row> rows = readRowsFromUrl(urlParam, format, fhirEncoder);

log.info("Importing {} resources (mode: {})", resourceType.toCode(), importMode.getCode());
if (importMode == ImportMode.OVERWRITE) {
database.overwrite(resourceType, resources.toDF());
database.overwrite(resourceType, rows);
} else {
database.merge(resourceType, resources.toDF());
database.merge(resourceType, rows);
}
}

Expand All @@ -177,22 +183,36 @@ public OperationOutcome execute(@Nonnull @ResourceParam final Parameters inParam
}

@Nonnull
private Dataset<String> readStringsFromUrl(@Nonnull final ParametersParameterComponent urlParam) {
private Dataset<Row> readRowsFromUrl(@Nonnull final ParametersParameterComponent urlParam,
final String serializationMode, final ExpressionEncoder<IBaseResource> fhirEncoder) {
final String url = ((UrlType) urlParam.getValue()).getValueAsString();
final String decodedUrl = URLDecoder.decode(url, StandardCharsets.UTF_8);
final String convertedUrl = FileSystemPersistence.convertS3ToS3aUrl(decodedUrl);
final Dataset<String> jsonStrings;
final Dataset<Row> rowDataset;
try {
// Check that the user is authorized to execute the operation.
accessRules.ifPresent(ar -> ar.checkCanImportFrom(convertedUrl));
final FilterFunction<String> nonBlanks = s -> !s.isBlank();
jsonStrings = spark.read().textFile(convertedUrl).filter(nonBlanks);
if (serializationMode == null || "ndjson".equals(serializationMode)) {
// Parse each line into a HAPI FHIR object, then encode to a Spark dataset.
rowDataset = spark.read().textFile(convertedUrl).filter(nonBlanks)
.map(jsonToResourceConverter(),
fhirEncoder).toDF();
} else if ("parquet".equals(serializationMode)) {
// Use the Spark Parquet reader.
rowDataset = spark.read().parquet(convertedUrl);
} else if ("delta".equals(serializationMode)) {
// Use the Delta Lake reader.
rowDataset = DeltaTable.forPath(spark, convertedUrl).toDF();
} else {
throw new InvalidUserInputError("Unsupported format: " + serializationMode);
}
} catch (final SecurityError e) {
throw new InvalidUserInputError("Not allowed to import from URL: " + convertedUrl, e);
} catch (final Exception e) {
throw new InvalidUserInputError("Error reading from URL: " + convertedUrl, e);
}
return jsonStrings;
return rowDataset;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@
"max": "1",
"documentation": "A value of 'overwrite' will cause all existing resources of the specified type to be deleted and replaced with the contents of the source file. A value of 'merge' will match existing resources with updated resources in the source file based on their ID, and either update the existing resources or add new resources as appropriate. The default value is 'overwrite'.",
"type": "code"
},
{
"name": "format",
"use": "in",
"min": 0,
"max": "1",
"documentation": "Indicates the format of the source file. Possible values are 'ndjson', 'parquet' and 'delta'. The default value is 'ndjson'.",
"type": "code"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,33 @@ class ImportTest extends ModificationTest {

@SuppressWarnings("SameParameterValue")
@Nonnull
Parameters buildImportParameters(@Nonnull final URL jsonURL,
Parameters buildImportParameters(@Nonnull final URL url,
@Nonnull final ResourceType resourceType) {
final Parameters parameters = new Parameters();
final ParametersParameterComponent sourceParam = parameters.addParameter().setName("source");
sourceParam.addPart().setName("resourceType").setValue(new CodeType(resourceType.toCode()));
sourceParam.addPart().setName("url").setValue(new UrlType(jsonURL.toExternalForm()));
sourceParam.addPart().setName("url").setValue(new UrlType(url.toExternalForm()));
return parameters;
}

@SuppressWarnings("SameParameterValue")
@Nonnull
Parameters buildImportParameters(@Nonnull final URL jsonURL,
@Nonnull final ResourceType resourceType, @Nonnull final ImportMode mode) {
final Parameters parameters = buildImportParameters(jsonURL, resourceType);
Parameters buildImportParameters(@Nonnull final URL url,
@Nonnull final ResourceType resourceType, @Nonnull final String format) {
final Parameters parameters = new Parameters();
final ParametersParameterComponent sourceParam = parameters.addParameter().setName("source");
sourceParam.addPart().setName("resourceType").setValue(new CodeType(resourceType.toCode()));
sourceParam.addPart().setName("url").setValue(new UrlType(url.toExternalForm()));
sourceParam.addPart().setName("format").setValue(new CodeType(format));
return parameters;
}

@SuppressWarnings("SameParameterValue")
@Nonnull
Parameters buildImportParameters(@Nonnull final URL url,
@Nonnull final ResourceType resourceType, @Nonnull final String format,
@Nonnull final ImportMode mode) {
final Parameters parameters = buildImportParameters(url, resourceType, format);
final ParametersParameterComponent sourceParam = parameters.getParameter().stream()
.filter(p -> p.getName().equals("source")).findFirst()
.orElseThrow();
Expand All @@ -96,30 +109,26 @@ Parameters buildImportParameters(@Nonnull final URL jsonURL,
@Test
void importJsonFile() {
final URL jsonURL = getResourceAsUrl("import/Patient.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT));
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson"));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
final Dataset<Row> expected = new DatasetBuilder(spark)
.withIdColumn()
.withRow("121503c8-9564-4b48-9086-a22df717948e")
.withRow("2b36c1e2-bbe1-45ae-8124-4adad2677702")
.withRow("7001ad9c-34d2-4eb5-8165-5fdc2147f469")
.withRow("8ee183e2-b3c0-4151-be94-b945d6aa8c6d")
.withRow("9360820c-8602-4335-8b50-c88d627a0c20")
.withRow("a7eb2ce7-1075-426c-addd-957b861b0e55")
.withRow("bbd33563-70d9-4f6d-a79a-dd1fc55f5ad9")
.withRow("beff242e-580b-47c0-9844-c1a68c36c5bf")
.withRow("e62e52ae-2d75-4070-a0ae-3cc78d35ed08")
.build();
assertPatientDatasetMatches(result);
}

DatasetAssert.of(result.select("id")).hasRows(expected);
@Test
void importJsonFileUsingDefault() {
final URL jsonURL = getResourceAsUrl("import/Patient.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
assertPatientDatasetMatches(result);
}

@Test
void mergeJsonFile() {
final URL jsonURL = getResourceAsUrl("import/Patient_updates.ndjson");
importExecutor.execute(
buildImportParameters(jsonURL, ResourceType.PATIENT, ImportMode.MERGE));
buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson", ImportMode.MERGE));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
final Dataset<Row> expected = new DatasetBuilder(spark)
Expand All @@ -143,14 +152,14 @@ void mergeJsonFile() {
@Test
void importJsonFileWithBlankLines() {
final URL jsonURL = getResourceAsUrl("import/Patient_with_eol.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT));
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson"));
assertEquals(9, database.read(ResourceType.PATIENT).count());
}

@Test
void importJsonFileWithRecursiveDatatype() {
final URL jsonURL = getResourceAsUrl("import/Questionnaire.ndjson");
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.QUESTIONNAIRE));
importExecutor.execute(buildImportParameters(jsonURL, ResourceType.QUESTIONNAIRE, "ndjson"));
final Dataset<Row> questionnaireDataset = database.read(ResourceType.QUESTIONNAIRE);
assertEquals(1, questionnaireDataset.count());

Expand Down Expand Up @@ -182,6 +191,24 @@ void importJsonFileWithRecursiveDatatype() {
DatasetAssert.of(expandedItemsDataset).hasRows(expectedDataset);
}

@Test
void importParquetFile() {
final URL parquetURL = getResourceAsUrl("import/Patient.parquet");
importExecutor.execute(buildImportParameters(parquetURL, ResourceType.PATIENT, "parquet"));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
assertPatientDatasetMatches(result);
}

@Test
void importDeltaFile() {
final URL deltaURL = getResourceAsUrl("import/Patient.delta");
importExecutor.execute(buildImportParameters(deltaURL, ResourceType.PATIENT, "delta"));

final Dataset<Row> result = database.read(ResourceType.PATIENT);
assertPatientDatasetMatches(result);
}

@Test
void throwsOnUnsupportedResourceType() {
final List<ResourceType> resourceTypes = Arrays.asList(ResourceType.PARAMETERS,
Expand All @@ -190,8 +217,8 @@ void throwsOnUnsupportedResourceType() {
for (final ResourceType resourceType : resourceTypes) {
final InvalidUserInputError error = assertThrows(InvalidUserInputError.class,
() -> importExecutor.execute(
buildImportParameters(new URL("file://some/url"),
resourceType)), "Unsupported resource type: " + resourceType.toCode());
buildImportParameters(getResourceAsUrl("import/Patient.ndjson"),
resourceType, "ndjson")), "Unsupported resource type: " + resourceType.toCode());
assertEquals("Unsupported resource type: " + resourceType.toCode(), error.getMessage());
}
}
Expand All @@ -200,11 +227,37 @@ void throwsOnUnsupportedResourceType() {
void throwsOnMissingId() {
final URL jsonURL = getResourceAsUrl("import/Patient_missing_id.ndjson");
final Exception error = assertThrows(Exception.class,
() -> importExecutor.execute(buildImportParameters(jsonURL, ResourceType.PATIENT)));
() -> importExecutor.execute(
buildImportParameters(jsonURL, ResourceType.PATIENT, "ndjson")));
final BaseServerResponseException convertedError =
ErrorHandlingInterceptor.convertError(error);
assertInstanceOf(InvalidRequestException.class, convertedError);
assertEquals("Encountered a resource with no ID", convertedError.getMessage());
}

@Test
void throwsOnUnsupportedFormat() {
assertThrows(InvalidUserInputError.class,
() -> importExecutor.execute(
buildImportParameters(getResourceAsUrl("import/Patient.ndjson"),
ResourceType.PATIENT, "foo")), "Unsupported format: foo");
}

private void assertPatientDatasetMatches(@Nonnull final Dataset<Row> result) {
final Dataset<Row> expected = new DatasetBuilder(spark)
.withIdColumn()
.withRow("121503c8-9564-4b48-9086-a22df717948e")
.withRow("2b36c1e2-bbe1-45ae-8124-4adad2677702")
.withRow("7001ad9c-34d2-4eb5-8165-5fdc2147f469")
.withRow("8ee183e2-b3c0-4151-be94-b945d6aa8c6d")
.withRow("9360820c-8602-4335-8b50-c88d627a0c20")
.withRow("a7eb2ce7-1075-426c-addd-957b861b0e55")
.withRow("bbd33563-70d9-4f6d-a79a-dd1fc55f5ad9")
.withRow("beff242e-580b-47c0-9844-c1a68c36c5bf")
.withRow("e62e52ae-2d75-4070-a0ae-3cc78d35ed08")
.build();

DatasetAssert.of(result.select("id")).hasRows(expected);
}

}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1736902211723,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"9","numOutputBytes":"131068"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"35a6f069-9b81-4eba-84ca-49caf72a4fab"}}
{"add":{"path":"part-00000-672fe062-8b0c-442b-868c-89803ac2dc1b-c000.snappy.parquet","partitionValues":{},"size":131068,"modificationTime":1736902211147,"dataChange":true,"stats":"{\"numRecords\":9,\"minValues\":{\"id\":\"121503c8-9564-4b48-9086-a22df717\",\"id_versioned\":\"Patient/121503c8-9564-4b48-9086-\",\"meta\":{},\"text\":{\"status\":\"generated\",\"div\":\"<div xmlns=\\\"http://www.w3.org/19\",\"_fid\":189809872},\"gender\":\"female\",\"birthDate\":\"1957-06-06\",\"deceasedDateTime\":\"1967-06-08T08:27:40+00:00\",\"maritalStatus\":{\"text\":\"M\",\"_fid\":93571182}},\"maxValues\":{\"id\":\"e62e52ae-2d75-4070-a0ae-3cc78d35�\",\"id_versioned\":\"Patient/e62e52ae-2d75-4070-a0ae-�\",\"meta\":{},\"text\":{\"status\":\"generated\",\"div\":\"<div xmlns=\\\"http://www.w3.org/19�\",\"_fid\":2104429834},\"gender\":\"male\",\"birthDate\":\"1998-12-26\",\"deceasedDateTime\":\"2018-02-17T09:11:55+00:00\",\"maritalStatus\":{\"text\":\"S\",\"_fid\":1857434360}},\"nullCount\":{\"id\":0,\"id_versioned\":0,\"meta\":{\"id\":9,\"versionId\":9,\"versionId_versioned\":9,\"lastUpdated\":9,\"source\":9,\"profile\":9,\"security\":9,\"tag\":9,\"_fid\":9},\"implicitRules\":9,\"language\":9,\"text\":{\"id\":9,\"status\":0,\"div\":0,\"_fid\":0},\"identifier\":0,\"active\":9,\"name\":0,\"telecom\":0,\"gender\":0,\"birthDate\":0,\"deceasedBoolean\":9,\"deceasedDateTime\":5,\"address\":0,\"maritalStatus\":{\"id\":9,\"coding\":0,\"text\":0,\"_fid\":0},\"multipleBirthBoolean\":0,\"multipleBirthInteger\":9}}"}}
{"remove":{"path":"part-00000-b3c5e8be-3b6c-4e9c-8d65-eb2ef157cd95-c000.snappy.parquet","deletionTimestamp":1736902211707,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":132841}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.
Binary file not shown.
18 changes: 14 additions & 4 deletions site/docs/server/operations/import.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,19 @@ description: The import operation allows FHIR data to be imported into the serve

This operation allows FHIR R4 data to be imported into the server, making it
available for query via other operations such
as [search](./search), [aggregate](./aggregate) and [extract](./extract). This
operation accepts the [NDJSON](https://hl7.org/fhir/R4/nd-json.html) format, and
links to retrieve that data are provided rather that sending the data inline
within the request itself. This is to allow for large data sets to be imported
as [search](./search), [aggregate](./aggregate) and [extract](./extract). Links
to retrieve that data are provided rather that sending the data inline within
the request itself. This is to allow for large data sets to be imported
efficiently.

Source formats currently supported are:

* [NDJSON](https://hl7.org/fhir/R4/nd-json.html) format
* [Parquet](https://parquet.apache.org/) conforming to
the [Pathling schema](../../libraries/encoders/schema)
* [Delta Lake](https://delta.io/) conforming to
the [Pathling schema](../../libraries/encoders/schema)

Currently Pathling supports retrieval of NDJSON files from
[Amazon S3](https://aws.amazon.com/s3/) (`s3://`),
[HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html) (`hdfs://`) and
Expand Down Expand Up @@ -61,6 +68,9 @@ following parameters:
resources with updated resources in the source file based on their ID, and
either update the existing resources or add new resources as appropriate.
The default value is `overwrite`.
- `format [0..1] (code)` - Indicates the format of the source file.
Possible values are `ndjson`, `parquet` and `delta`. The default value
is `ndjson`.

## Response

Expand Down
Loading