diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc new file mode 100644 index 0000000000..a90843e536 --- /dev/null +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc @@ -0,0 +1,118 @@ += apoc.import.arrow +:description: This section contains reference documentation for the apoc.import.arrow procedure. + +label:procedure[] label:apoc-extended[] + +[.emphasis] +apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array + +== Signature + +[source] +---- +apoc.import.arrow(urlOrBinaryFile :: ANY?, config = {} :: MAP?) :: (file :: STRING?, source :: STRING?, format :: STRING?, nodes :: INTEGER?, relationships :: INTEGER?, properties :: INTEGER?, time :: INTEGER?, rows :: INTEGER?, batchSize :: INTEGER?, batches :: INTEGER?, done :: BOOLEAN?, data :: STRING?) +---- + +== Input parameters +[.procedures, opts=header] +|=== +| Name | Type | Default +|urlOrBinaryFile|ANY?|null +|config|MAP?|{} +|=== + +== Config parameters +This procedure supports the following config parameters: + +.Config parameters +[opts=header, cols='1a,1a,1a,3a'] +|=== +| name | type |default | description +| unwindBatchSize | Integer | `2000` | the batch size of the unwind +| mapping | Map | `{}` | see `Mapping config` example below +|=== + +== Output parameters +[.procedures, opts=header] +|=== +| Name | Type +|file|STRING? +|source|STRING? +|format|STRING? +|nodes|INTEGER? +|relationships|INTEGER? +|properties|INTEGER? +|time|INTEGER? +|rows|INTEGER? +|batchSize|INTEGER? +|batches|INTEGER? +|done|BOOLEAN? +|data|STRING? +|=== + +[[usage-apoc.import.arrow]] +== Usage Examples + +The `apoc.import.arrow` procedure can be used to import arrow files created by the `apoc.export.arrow.*` procedures. + + +[source,cypher] +---- +CALL apoc.import.arrow("fileCreatedViaExportProcedures.arrow") +---- + +.Results +[opts=header] +|=== +| file | source | format | nodes | relationships | properties | time | rows | batchSize | batches | done | data +| "fileCreatedViaExportProcedures.arrow" | "file" | "arrow" | 3 | 1 | 15 | 105 | 4 | -1 | 0 | TRUE | NULL +|=== + + +We can also import a file from a binary `byte[]` created by the `apoc.export.arrow.stream.*` procedures. + +[source,cypher] +---- +CALL apoc.import.arrow(``) +---- + +=== Mapping config + +In order to import complex types not supported by Parquet, like Point, Duration, List of Duration, etc.. +we can use the mapping config to convert to the desired data type. +For example, if we have a node `(:MyLabel {durationProp: duration('P5M1.5D')}`, and we export it in a parquet file/binary, +we can import it by explicit a map with key the property key, and value the property type. + +That is in this example, by using the load procedure: +[source,cypher] +---- +CALL apoc.load.arrow(fileOrBinary, {mapping: {durationProp: 'Duration'}}) +---- + +Or with the import procedure: +[source,cypher] +---- +CALL apoc.import.parquet(fileOrBinary, {mapping: {durationProp: 'Duration'}}) +---- + +The mapping value types can be one of the following: + +* `Point` +* `LocalDateTime` +* `LocalTime` +* `DateTime` +* `Time` +* `Date` +* `Duration` +* `Char` +* `Byte` +* `Double` +* `Float` +* `Short` +* `Int` +* `Long` +* `Node` +* `Relationship` +* `BaseType` followed by Array, to map a list of values, where BaseType can be one of the previous type, for example `DurationArray` + + diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc index 2deaa5088f..efb2e3f8ed 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc @@ -8,6 +8,11 @@ This file is generated by DocsTest, so don't change it! [.procedures, opts=header, cols='5a,1a,1a'] |=== | Qualified Name | Type | Release +|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]] + +apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array +|label:procedure[] +|label:apoc-full[] |xref::overview/apoc.import/apoc.import.csv.adoc[apoc.import.csv icon:book[]] apoc.import.csv(nodes, relationships, config) - imports nodes and relationships from the provided CSV files with given labels and types diff --git a/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc b/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc index b2b7b1105d..866e04127d 100644 --- a/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc +++ b/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc @@ -1578,6 +1578,17 @@ apoc.label.exists(element, label) - returns true or false related to label exist |label:apoc-core[] |=== +== xref::overview/apoc.import/index.adoc[] + +[.procedures, opts=header, cols='5a,1a'] +|=== +| Qualified Name | Type +|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]] + +apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array +|label:procedure[] +|=== + == xref::overview/apoc.load/index.adoc[] [.procedures, opts=header, cols='5a,1a,1a'] diff --git a/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc b/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc index 15ae58b042..a231b5acbd 100644 --- a/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc +++ b/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc @@ -290,6 +290,7 @@ This file is generated by DocsTest, so don't change it! *** xref::overview/apoc.hashing/apoc.hashing.fingerprintGraph.adoc[] *** xref::overview/apoc.hashing/apoc.hashing.fingerprinting.adoc[] ** xref::overview/apoc.import/index.adoc[] +*** xref::overview/apoc.import/apoc.import.arrow.adoc[] *** xref::overview/apoc.import/apoc.import.csv.adoc[] *** xref::overview/apoc.import/apoc.import.graphml.adoc[] *** xref::overview/apoc.import/apoc.import.json.adoc[] diff --git a/full/build.gradle b/full/build.gradle index 2f33ad559f..fd7e7e5a76 100644 --- a/full/build.gradle +++ b/full/build.gradle @@ -117,6 +117,11 @@ dependencies { exclude group: 'io.netty' } +// compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' +// compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' +// testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' +// testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' + compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons testImplementation group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons diff --git a/full/src/main/java/apoc/export/arrow/ImportArrow.java b/full/src/main/java/apoc/export/arrow/ImportArrow.java new file mode 100644 index 0000000000..1b20fad6c3 --- /dev/null +++ b/full/src/main/java/apoc/export/arrow/ImportArrow.java @@ -0,0 +1,385 @@ +package apoc.export.arrow; + +import apoc.Extended; +import apoc.Pools; +import apoc.export.util.BatchTransaction; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import apoc.util.FileUtils; +import apoc.util.JsonUtil; +import apoc.util.Util; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonWriteFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.channels.SeekableByteChannel; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.values.storable.DateTimeValue; +import org.neo4j.values.storable.DateValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.LocalTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.TimeValue; +import org.neo4j.values.storable.Values; + +@Extended +public class ImportArrow { + + // TODO: field similar to the one placed in ArrowUtils (placed in core) + // when the Arrow procedures will be placed in extended remove these lines + // and replace FIELD_ID with FIELD_ID.getName(), FIELD_LABELS with FIELD_LABELS.getName(), etc.. + public static String FIELD_ID = ""; + public static String FIELD_LABELS = "labels"; + public static String FIELD_SOURCE_ID = ""; + public static String FIELD_TARGET_ID = ""; + public static String FIELD_TYPE = ""; + // -- end ArrowUtils fields + + @Context + public Pools pools; + + @Context + public GraphDatabaseService db; + + @Procedure(name = "apoc.import.arrow", mode = Mode.WRITE) + @Description("Imports arrow from the provided arrow file or byte array") + public Stream importFile( + @Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) + throws Exception { + + ProgressInfo result = Util.inThread(pools, () -> { + String file = null; + String sourceInfo = "binary"; + if (input instanceof String) { + file = (String) input; + sourceInfo = "file"; + } + + final ArrowConfig conf = new ArrowConfig(config); + + final Map idMapping = new HashMap<>(); + + AtomicInteger counter = new AtomicInteger(); + try (ArrowReader reader = getReader(input); + VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) { + + final ProgressReporter reporter = + new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow")); + BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); + try { + while (hasElements(counter, reader, schemaRoot)) { + + final Map row = schemaRoot.getFieldVectors().stream() + .collect( + HashMap::new, + (map, fieldVector) -> { + Object read = read(fieldVector, counter.get(), conf); + if (read == null) { + return; + } + map.put(fieldVector.getName(), read); + }, + HashMap::putAll); + + String relType = (String) row.remove(FIELD_TYPE); + if (relType == null) { + // is node + String[] stringLabels = (String[]) row.remove(FIELD_LABELS); + Label[] labels = Optional.ofNullable(stringLabels) + .map(l -> Arrays.stream(l).map(Label::label).toArray(Label[]::new)) + .orElse(new Label[] {}); + final Node node = btx.getTransaction().createNode(labels); + + long id = (long) row.remove(FIELD_ID); + idMapping.put(id, node.getId()); + + addProps(row, node); + reporter.update(1, 0, row.size()); + } else { + // is relationship + long sourceId = (long) row.remove(FIELD_SOURCE_ID); + Long idSource = idMapping.get(sourceId); + final Node source = btx.getTransaction().getNodeById(idSource); + + long targetId = (long) row.remove(FIELD_TARGET_ID); + Long idTarget = idMapping.get(targetId); + final Node target = btx.getTransaction().getNodeById(idTarget); + + final Relationship rel = + source.createRelationshipTo(target, RelationshipType.withName(relType)); + addProps(row, rel); + reporter.update(0, 1, row.size()); + } + + counter.incrementAndGet(); + btx.increment(); + } + + btx.commit(); + } catch (RuntimeException e) { + btx.rollback(); + throw e; + } finally { + btx.close(); + } + + return reporter.getTotal(); + } + }); + + return Stream.of(result); + } + + private ArrowReader getReader(Object input) throws IOException { + RootAllocator allocator = new RootAllocator(); + if (input instanceof String) { + final SeekableByteChannel channel = + FileUtils.inputStreamFor(input, null, null, null).asChannel(); + return new ArrowFileReader(channel, allocator); + } + ByteArrayInputStream inputStream = new ByteArrayInputStream((byte[]) input); + return new ArrowStreamReader(inputStream, allocator); + } + + private static boolean hasElements(AtomicInteger counter, ArrowReader reader, VectorSchemaRoot schemaRoot) + throws IOException { + if (counter.get() >= schemaRoot.getRowCount()) { + if (reader.loadNextBatch()) { + counter.set(0); + } else { + return false; + } + } + + return true; + } + + private static Object read(FieldVector fieldVector, int index, ArrowConfig conf) { + + if (fieldVector.isNull(index)) { + return null; + } else if (fieldVector instanceof BitVector) { + BitVector fe = (BitVector) fieldVector; + return fe.get(index) == 1; + } else { + Object object = fieldVector.getObject(index); + if (object instanceof Collection && ((Collection) object).isEmpty()) { + return null; + } + return toValidValue(object, fieldVector.getName(), conf.getMapping()); + } + } + + private void addProps(Map row, Entity rel) { + row.forEach(rel::setProperty); + } + + /** + * Analog to ArrowConfig present in APOC Core. + * TODO Merge these 2 classes when arrow procedure will be moved to APOC Extended + */ + public static class ArrowConfig { + + private final int batchSize; + private final Map mapping; + + public ArrowConfig(Map config) { + if (config == null) { + config = Collections.emptyMap(); + } + this.mapping = (Map) config.getOrDefault("mapping", Map.of()); + this.batchSize = Util.toInteger(config.getOrDefault("batchSize", 2000)); + } + + public int getBatchSize() { + return batchSize; + } + + public Map getMapping() { + return mapping; + } + } + + public static Object toValidValue(Object object, String field, Map mapping) { + Object fieldName = mapping.get(field); + if (object != null && fieldName != null) { + return convertValue(object.toString(), fieldName.toString()); + } + + if (object instanceof Collection) { + // if there isn't a mapping config, we convert the list to a String[] + return ((Collection) object) + .stream() + .map(i -> toValidValue(i, field, mapping)) + .collect(Collectors.toList()) + .toArray(new String[0]); + } + if (object instanceof Map) { + return ((Map) object) + .entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, e -> toValidValue(e.getValue(), field, mapping))); + } + try { + // we test if is a valid Neo4j type + Values.of(object); + return object; + } catch (Exception e) { + // otherwise we try to coerce it + return object.toString(); + } + } + + /** + * In case of complex type non-readable from Parquet, i.e. Duration, Point, List of Neo4j Types... + * we can use the `mapping: {keyToConvert: valueTypeName}` config to convert them. + * For example `mapping: {myPropertyKey: "DateArray"}` + */ + private static Object convertValue(String value, String typeName) { + switch (typeName) { + // {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} + case "Point": + return getPointValue(value); + case "LocalDateTime": + return LocalDateTimeValue.parse(value).asObjectCopy(); + case "LocalTime": + return LocalTimeValue.parse(value).asObjectCopy(); + case "DateTime": + return DateTimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "Time": + return TimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "Date": + return DateValue.parse(value).asObjectCopy(); + case "Duration": + return DurationValue.parse(value); + case "Char": + return value.charAt(0); + case "Byte": + return value.getBytes(); + case "Double": + return Double.parseDouble(value); + case "Float": + return Float.parseFloat(value); + case "Short": + return Short.parseShort(value); + case "Int": + return Integer.parseInt(value); + case "Long": + return Long.parseLong(value); + case "Node": + case "Relationship": + return JsonUtil.parse(value, null, Map.class); + case "NO_VALUE": + return null; + default: + // If ends with "Array", for example StringArray + if (typeName.endsWith("Array")) { + value = StringUtils.removeStart(value, "["); + value = StringUtils.removeEnd(value, "]"); + String array = typeName.replace("Array", ""); + + final Object[] prototype = getPrototypeFor(array); + return Arrays.stream(value.split(",")) + .map(item -> convertValue(StringUtils.trim(item), array)) + .collect(Collectors.toList()) + .toArray(prototype); + } + return value; + } + } + + private static PointValue getPointValue(String value) { + try { + return PointValue.parse(value); + } catch (Neo4jException e) { + // fallback in case of double-quotes, e.g. + // {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} + // we remove the double quotes before parsing the result, e.g. + // {crs:"wgs-84-3d",latitude:13.1,longitude:33.46789,height:100.0} + ObjectMapper objectMapper = new ObjectMapper().disable(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature()); + try { + Map readValue = objectMapper.readValue(value, Map.class); + String stringWithoutKeyQuotes = objectMapper.writeValueAsString(readValue); + return PointValue.parse(stringWithoutKeyQuotes); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + } + + // similar to CsvPropertyConverter + public static Object[] getPrototypeFor(String type) { + switch (type) { + case "Long": + return new Long[] {}; + case "Integer": + return new Integer[] {}; + case "Double": + return new Double[] {}; + case "Float": + return new Float[] {}; + case "Boolean": + return new Boolean[] {}; + case "Byte": + return new Byte[] {}; + case "Short": + return new Short[] {}; + case "Char": + return new Character[] {}; + case "String": + return new String[] {}; + case "DateTime": + return new ZonedDateTime[] {}; + case "LocalTime": + return new LocalTime[] {}; + case "LocalDateTime": + return new LocalDateTime[] {}; + case "Point": + return new PointValue[] {}; + case "Time": + return new OffsetTime[] {}; + case "Date": + return new LocalDate[] {}; + case "Duration": + return new DurationValue[] {}; + default: + throw new IllegalStateException("Type " + type + " not supported."); + } + } +} diff --git a/full/src/main/resources/extended.txt b/full/src/main/resources/extended.txt index 714ad186ef..5a6a942662 100644 --- a/full/src/main/resources/extended.txt +++ b/full/src/main/resources/extended.txt @@ -71,6 +71,7 @@ apoc.generate.ws apoc.gephi.add apoc.get.nodes apoc.get.rels +apoc.import.arrow apoc.json.validate apoc.load.csv apoc.load.csvParams diff --git a/full/src/test/java/apoc/export/arrow/ImportArrowTest.java b/full/src/test/java/apoc/export/arrow/ImportArrowTest.java new file mode 100644 index 0000000000..59206a37c4 --- /dev/null +++ b/full/src/test/java/apoc/export/arrow/ImportArrowTest.java @@ -0,0 +1,167 @@ +package apoc.export.arrow; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import apoc.meta.Meta; +import apoc.util.TestUtil; +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.ResourceIterator; +import org.neo4j.graphdb.Result; +import org.neo4j.kernel.impl.util.ValueUtils; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; +import org.neo4j.values.AnyValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.virtual.VirtualValues; + +public class ImportArrowTest { + private static File directory = new File("target/arrowImport"); + + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + private final Map MAPPING_ALL = Map.of( + "mapping", + Map.of("bffSince", "Duration", "place", "Point", "listInt", "LongArray", "born", "LocalDateTime")); + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting( + GraphDatabaseSettings.load_csv_file_url_root, + directory.toPath().toAbsolutePath()); + + @BeforeClass + public static void beforeClass() { + TestUtil.registerProcedure(db, ExportArrow.class, ImportArrow.class, Meta.class); + } + + @Before + public void before() { + db.executeTransactionally("MATCH (n) DETACH DELETE n"); + + db.executeTransactionally( + "CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace', 'Qwe'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); + db.executeTransactionally("CREATE (:Another {foo:1, listInt: [1,2]}), (:Another {bar:'Sam'})"); + + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true); + } + + @Test + public void testStreamRoundtripImportArrowAll() { + final byte[] bytes = + db.executeTransactionally("CALL apoc.export.arrow.stream.all", Map.of(), this::extractByteArray); + + testImportCommon(bytes, MAPPING_ALL); + } + + @Test + public void testFileRoundtripImportArrowAll() { + String file = db.executeTransactionally( + "CALL apoc.export.arrow.all('test_all.arrow') YIELD file", Map.of(), this::extractFileName); + + testImportCommon(file, MAPPING_ALL); + } + + @Test + public void testFileRoundtripImportArrowAllWithSmallBatchSize() { + String file = db.executeTransactionally( + "CALL apoc.export.arrow.all('test_all.arrow') YIELD file", Map.of(), this::extractFileName); + + Map config = new HashMap<>(MAPPING_ALL); + config.put("batchSize", 1); + testImportCommon(file, config); + } + + private void testImportCommon(Object file, Map config) { + // then + Map params = Map.of("file", file, "config", config); + + // remove current data + db.executeTransactionally("MATCH (n) DETACH DELETE n"); + + final String query = "CALL apoc.import.arrow($file, $config)"; + testCall(db, query, params, r -> { + assertEquals(4L, r.get("nodes")); + assertEquals(1L, r.get("relationships")); + }); + + testCall(db, "MATCH (start:User)-[rel:KNOWS]->(end:User) RETURN start, rel, end", r -> { + Node start = (Node) r.get("start"); + assertFirstUserNodeProps(start.getAllProperties()); + Node end = (Node) r.get("end"); + assertSecondUserNodeProps(end.getAllProperties()); + Relationship rel = (Relationship) r.get("rel"); + assertRelationshipProps(rel.getAllProperties()); + }); + + testResult(db, "MATCH (m:Another) RETURN m", r -> { + ResourceIterator m = r.columnAs("m"); + Node node = m.next(); + assertFirstAnotherNodeProps(node.getAllProperties()); + node = m.next(); + assertSecondAnotherNodeProps(node.getAllProperties()); + assertFalse(r.hasNext()); + }); + } + + private String extractFileName(Result result) { + return result.columnAs("file").next(); + } + + private byte[] extractByteArray(Result result) { + return result.columnAs("value").next(); + } + + private static void assertFirstUserNodeProps(Map props) { + assertEquals("Adam", props.get("name")); + assertEquals(42L, props.get("age")); + assertEquals(true, props.get("male")); + assertArrayEquals(new String[] {"Sam", "Anna", "Grace", "Qwe"}, (String[]) props.get("kids")); + Map latitude = Map.of("latitude", 13.1D, "longitude", 33.46789D, "height", 100.0D); + assertEquals( + PointValue.fromMap(VirtualValues.map( + latitude.keySet().toArray(new String[0]), + latitude.values().stream().map(ValueUtils::of).toArray(AnyValue[]::new))), + props.get("place")); + assertEquals(LocalDateTimeValue.parse("2015-05-18T19:32:24.000").asObject(), props.get("born")); + } + + private static void assertSecondUserNodeProps(Map props) { + assertEquals("Jim", props.get("name")); + assertEquals(42L, props.get("age")); + } + + private static void assertFirstAnotherNodeProps(Map map) { + assertEquals(1L, map.get("foo")); + assertArrayEquals(new long[] {1L, 2L}, (long[]) map.get("listInt")); + } + + private static void assertSecondAnotherNodeProps(Map map) { + assertEquals("Sam", map.get("bar")); + } + + private static void assertRelationshipProps(Map props) { + assertEquals(DurationValue.parse("P5M1DT12H"), props.get("bffSince")); + assertEquals(1993L, props.get("since")); + } +}