From 1eb2d3f984cbda66749e20e728a6c30bb0693166 Mon Sep 17 00:00:00 2001 From: Giuseppe Villani Date: Thu, 14 Mar 2024 18:02:05 +0100 Subject: [PATCH 1/3] [NOID] Fixes #3426: add apache arrow import procedure to extended (#3978) --- .../apoc.import/apoc.import.arrow.adoc | 118 ++++++ .../pages/overview/apoc.import/index.adoc | 5 + .../documentation.adoc | 11 + .../partials/generated-documentation/nav.adoc | 1 + full/build.gradle | 5 + .../java/apoc/export/arrow/ImportArrow.java | 384 ++++++++++++++++++ full/src/main/resources/extended.txt | 1 + .../apoc/export/arrow/ImportArrowTest.java | 168 ++++++++ 8 files changed, 693 insertions(+) create mode 100644 docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc create mode 100644 full/src/main/java/apoc/export/arrow/ImportArrow.java create mode 100644 full/src/test/java/apoc/export/arrow/ImportArrowTest.java diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc new file mode 100644 index 0000000000..a90843e536 --- /dev/null +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/apoc.import.arrow.adoc @@ -0,0 +1,118 @@ += apoc.import.arrow +:description: This section contains reference documentation for the apoc.import.arrow procedure. + +label:procedure[] label:apoc-extended[] + +[.emphasis] +apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array + +== Signature + +[source] +---- +apoc.import.arrow(urlOrBinaryFile :: ANY?, config = {} :: MAP?) :: (file :: STRING?, source :: STRING?, format :: STRING?, nodes :: INTEGER?, relationships :: INTEGER?, properties :: INTEGER?, time :: INTEGER?, rows :: INTEGER?, batchSize :: INTEGER?, batches :: INTEGER?, done :: BOOLEAN?, data :: STRING?) +---- + +== Input parameters +[.procedures, opts=header] +|=== +| Name | Type | Default +|urlOrBinaryFile|ANY?|null +|config|MAP?|{} +|=== + +== Config parameters +This procedure supports the following config parameters: + +.Config parameters +[opts=header, cols='1a,1a,1a,3a'] +|=== +| name | type |default | description +| unwindBatchSize | Integer | `2000` | the batch size of the unwind +| mapping | Map | `{}` | see `Mapping config` example below +|=== + +== Output parameters +[.procedures, opts=header] +|=== +| Name | Type +|file|STRING? +|source|STRING? +|format|STRING? +|nodes|INTEGER? +|relationships|INTEGER? +|properties|INTEGER? +|time|INTEGER? +|rows|INTEGER? +|batchSize|INTEGER? +|batches|INTEGER? +|done|BOOLEAN? +|data|STRING? +|=== + +[[usage-apoc.import.arrow]] +== Usage Examples + +The `apoc.import.arrow` procedure can be used to import arrow files created by the `apoc.export.arrow.*` procedures. + + +[source,cypher] +---- +CALL apoc.import.arrow("fileCreatedViaExportProcedures.arrow") +---- + +.Results +[opts=header] +|=== +| file | source | format | nodes | relationships | properties | time | rows | batchSize | batches | done | data +| "fileCreatedViaExportProcedures.arrow" | "file" | "arrow" | 3 | 1 | 15 | 105 | 4 | -1 | 0 | TRUE | NULL +|=== + + +We can also import a file from a binary `byte[]` created by the `apoc.export.arrow.stream.*` procedures. + +[source,cypher] +---- +CALL apoc.import.arrow(``) +---- + +=== Mapping config + +In order to import complex types not supported by Parquet, like Point, Duration, List of Duration, etc.. +we can use the mapping config to convert to the desired data type. +For example, if we have a node `(:MyLabel {durationProp: duration('P5M1.5D')}`, and we export it in a parquet file/binary, +we can import it by explicit a map with key the property key, and value the property type. + +That is in this example, by using the load procedure: +[source,cypher] +---- +CALL apoc.load.arrow(fileOrBinary, {mapping: {durationProp: 'Duration'}}) +---- + +Or with the import procedure: +[source,cypher] +---- +CALL apoc.import.parquet(fileOrBinary, {mapping: {durationProp: 'Duration'}}) +---- + +The mapping value types can be one of the following: + +* `Point` +* `LocalDateTime` +* `LocalTime` +* `DateTime` +* `Time` +* `Date` +* `Duration` +* `Char` +* `Byte` +* `Double` +* `Float` +* `Short` +* `Int` +* `Long` +* `Node` +* `Relationship` +* `BaseType` followed by Array, to map a list of values, where BaseType can be one of the previous type, for example `DurationArray` + + diff --git a/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc index 2deaa5088f..efb2e3f8ed 100644 --- a/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc +++ b/docs/asciidoc/modules/ROOT/pages/overview/apoc.import/index.adoc @@ -8,6 +8,11 @@ This file is generated by DocsTest, so don't change it! [.procedures, opts=header, cols='5a,1a,1a'] |=== | Qualified Name | Type | Release +|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]] + +apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array +|label:procedure[] +|label:apoc-full[] |xref::overview/apoc.import/apoc.import.csv.adoc[apoc.import.csv icon:book[]] apoc.import.csv(nodes, relationships, config) - imports nodes and relationships from the provided CSV files with given labels and types diff --git a/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc b/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc index b2b7b1105d..866e04127d 100644 --- a/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc +++ b/docs/asciidoc/modules/ROOT/partials/generated-documentation/documentation.adoc @@ -1578,6 +1578,17 @@ apoc.label.exists(element, label) - returns true or false related to label exist |label:apoc-core[] |=== +== xref::overview/apoc.import/index.adoc[] + +[.procedures, opts=header, cols='5a,1a'] +|=== +| Qualified Name | Type +|xref::overview/apoc.import/apoc.import.arrow.adoc[apoc.import.arrow icon:book[]] + +apoc.import.arrow(input, $config) - Imports arrow from the provided arrow file or byte array +|label:procedure[] +|=== + == xref::overview/apoc.load/index.adoc[] [.procedures, opts=header, cols='5a,1a,1a'] diff --git a/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc b/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc index 15ae58b042..a231b5acbd 100644 --- a/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc +++ b/docs/asciidoc/modules/ROOT/partials/generated-documentation/nav.adoc @@ -290,6 +290,7 @@ This file is generated by DocsTest, so don't change it! *** xref::overview/apoc.hashing/apoc.hashing.fingerprintGraph.adoc[] *** xref::overview/apoc.hashing/apoc.hashing.fingerprinting.adoc[] ** xref::overview/apoc.import/index.adoc[] +*** xref::overview/apoc.import/apoc.import.arrow.adoc[] *** xref::overview/apoc.import/apoc.import.csv.adoc[] *** xref::overview/apoc.import/apoc.import.graphml.adoc[] *** xref::overview/apoc.import/apoc.import.json.adoc[] diff --git a/full/build.gradle b/full/build.gradle index 2f33ad559f..f4d9b7de70 100644 --- a/full/build.gradle +++ b/full/build.gradle @@ -117,6 +117,11 @@ dependencies { exclude group: 'io.netty' } + compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' + compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' + testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' + testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' + compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons testImplementation group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons diff --git a/full/src/main/java/apoc/export/arrow/ImportArrow.java b/full/src/main/java/apoc/export/arrow/ImportArrow.java new file mode 100644 index 0000000000..6792dc001f --- /dev/null +++ b/full/src/main/java/apoc/export/arrow/ImportArrow.java @@ -0,0 +1,384 @@ +package apoc.export.arrow; + +import apoc.Extended; +import apoc.Pools; +import apoc.export.util.BatchTransaction; +import apoc.export.util.ProgressReporter; +import apoc.result.ProgressInfo; +import apoc.util.FileUtils; +import apoc.util.JsonUtil; +import apoc.util.Util; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.json.JsonWriteFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowFileReader; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.commons.lang3.StringUtils; +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.graphdb.Entity; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.Label; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.RelationshipType; +import org.neo4j.procedure.Context; +import org.neo4j.procedure.Description; +import org.neo4j.procedure.Mode; +import org.neo4j.procedure.Name; +import org.neo4j.procedure.Procedure; +import org.neo4j.values.storable.DateTimeValue; +import org.neo4j.values.storable.DateValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.LocalTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.storable.TimeValue; +import org.neo4j.values.storable.Values; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.channels.SeekableByteChannel; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@Extended +public class ImportArrow { + + // TODO: field similar to the one placed in ArrowUtils (placed in core) + // when the Arrow procedures will be placed in extended remove these lines + // and replace FIELD_ID with FIELD_ID.getName(), FIELD_LABELS with FIELD_LABELS.getName(), etc.. + public static String FIELD_ID = ""; + public static String FIELD_LABELS = "labels"; + public static String FIELD_SOURCE_ID = ""; + public static String FIELD_TARGET_ID = ""; + public static String FIELD_TYPE = ""; + // -- end ArrowUtils fields + + @Context + public Pools pools; + + @Context + public GraphDatabaseService db; + + + @Procedure(name = "apoc.import.arrow", mode = Mode.WRITE) + @Description("Imports arrow from the provided arrow file or byte array") + public Stream importFile(@Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { + + ProgressInfo result = + Util.inThread(pools, () -> { + String file = null; + String sourceInfo = "binary"; + if (input instanceof String) { + file = (String) input; + sourceInfo = "file"; + } + + final ArrowConfig conf = new ArrowConfig(config); + + final Map idMapping = new HashMap<>(); + + AtomicInteger counter = new AtomicInteger(); + try (ArrowReader reader = getReader(input); + VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) { + + final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow")); + BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); + try { + while (hasElements(counter, reader, schemaRoot)) { + + final Map row = schemaRoot.getFieldVectors() + .stream() + .collect( + HashMap::new, + (map, fieldVector) -> { + Object read = read(fieldVector, counter.get(), conf); + if (read == null) { + return; + } + map.put(fieldVector.getName(), read); + }, + HashMap::putAll); + + String relType = (String) row.remove(FIELD_TYPE); + if (relType == null) { + // is node + String[] stringLabels = (String[]) row.remove(FIELD_LABELS); + Label[] labels = Optional.ofNullable(stringLabels) + .map(l -> Arrays.stream(l).map(Label::label).toArray(Label[]::new)) + .orElse(new Label[]{}); + final Node node = btx.getTransaction().createNode(labels); + + long id = (long) row.remove(FIELD_ID); + idMapping.put(id, node.getId()); + + addProps(row, node); + reporter.update(1, 0, row.size()); + } else { + // is relationship + long sourceId = (long) row.remove(FIELD_SOURCE_ID); + Long idSource = idMapping.get(sourceId); + final Node source = btx.getTransaction().getNodeById(idSource); + + long targetId = (long) row.remove(FIELD_TARGET_ID); + Long idTarget = idMapping.get(targetId); + final Node target = btx.getTransaction().getNodeById(idTarget); + + final Relationship rel = source.createRelationshipTo(target, RelationshipType.withName(relType)); + addProps(row, rel); + reporter.update(0, 1, row.size()); + } + + counter.incrementAndGet(); + btx.increment(); + } + + btx.commit(); + } catch (RuntimeException e) { + btx.rollback(); + throw e; + } finally { + btx.close(); + } + + return reporter.getTotal(); + } + }); + + return Stream.of(result); + } + + + private ArrowReader getReader(Object input) throws IOException { + RootAllocator allocator = new RootAllocator(); + if (input instanceof String) { + final SeekableByteChannel channel = FileUtils.inputStreamFor(input, null, null, null) + .asChannel(); + return new ArrowFileReader(channel, allocator); + } + ByteArrayInputStream inputStream = new ByteArrayInputStream((byte[]) input); + return new ArrowStreamReader(inputStream, allocator); + } + + + private static boolean hasElements(AtomicInteger counter, ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException { + if (counter.get() >= schemaRoot.getRowCount()) { + if (reader.loadNextBatch()) { + counter.set(0); + } else { + return false; + } + } + + return true; + } + + private static Object read(FieldVector fieldVector, int index, ArrowConfig conf) { + + if (fieldVector.isNull(index)) { + return null; + } else if (fieldVector instanceof BitVector) { + BitVector fe = (BitVector) fieldVector; + return fe.get(index) == 1; + } else { + Object object = fieldVector.getObject(index); + if (object instanceof Collection && ((Collection) object).isEmpty()) { + return null; + } + return toValidValue(object, fieldVector.getName(), conf.getMapping()); + } + } + + private void addProps(Map row, Entity rel) { + row.forEach(rel::setProperty); + } + + /** + * Analog to ArrowConfig present in APOC Core. + * TODO Merge these 2 classes when arrow procedure will be moved to APOC Extended + */ + public static class ArrowConfig { + + private final int batchSize; + private final Map mapping; + + public ArrowConfig(Map config) { + if (config == null) { + config = Collections.emptyMap(); + } + this.mapping = (Map) config.getOrDefault("mapping", Map.of()); + this.batchSize = Util.toInteger(config.getOrDefault("batchSize", 2000)); + } + + public int getBatchSize() { + return batchSize; + } + + public Map getMapping() { + return mapping; + } + } + + public static Object toValidValue(Object object, String field, Map mapping) { + Object fieldName = mapping.get(field); + if (object != null && fieldName != null) { + return convertValue(object.toString(), fieldName.toString()); + } + + if (object instanceof Collection) { + // if there isn't a mapping config, we convert the list to a String[] + return ((Collection) object).stream() + .map(i -> toValidValue(i, field, mapping)) + .collect(Collectors.toList()) + .toArray(new String[0]); + } + if (object instanceof Map) { + return ((Map) object).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> toValidValue(e.getValue(), field, mapping))); + } + try { + // we test if is a valid Neo4j type + Values.of(object); + return object; + } catch (Exception e) { + // otherwise we try to coerce it + return object.toString(); + } + } + + /** + * In case of complex type non-readable from Parquet, i.e. Duration, Point, List of Neo4j Types... + * we can use the `mapping: {keyToConvert: valueTypeName}` config to convert them. + * For example `mapping: {myPropertyKey: "DateArray"}` + */ + private static Object convertValue(String value, String typeName) { + switch (typeName) { + // {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} + case "Point": + return getPointValue(value); + case "LocalDateTime": + return LocalDateTimeValue.parse(value).asObjectCopy(); + case "LocalTime": + return LocalTimeValue.parse(value).asObjectCopy(); + case "DateTime": + return DateTimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "Time": + return TimeValue.parse(value, () -> ZoneId.of("Z")).asObjectCopy(); + case "Date": + return DateValue.parse(value).asObjectCopy(); + case "Duration": + return DurationValue.parse(value); + case "Char": + return value.charAt(0); + case "Byte": + return value.getBytes(); + case "Double": + return Double.parseDouble(value); + case "Float": + return Float.parseFloat(value); + case "Short": + return Short.parseShort(value); + case "Int": + return Integer.parseInt(value); + case "Long": + return Long.parseLong(value); + case "Node": + case "Relationship": + return JsonUtil.parse(value, null, Map.class); + case "NO_VALUE": + return null; + default: + // If ends with "Array", for example StringArray + if (typeName.endsWith("Array")) { + value = StringUtils.removeStart(value, "["); + value = StringUtils.removeEnd(value, "]"); + String array = typeName.replace("Array", ""); + + final Object[] prototype = getPrototypeFor(array); + return Arrays.stream(value.split(",")) + .map(item -> convertValue(StringUtils.trim(item), array)) + .collect(Collectors.toList()) + .toArray(prototype); + } + return value; + } + } + + private static PointValue getPointValue(String value) { + try { + return PointValue.parse(value); + } catch (Neo4jException e) { + // fallback in case of double-quotes, e.g. {"crs":"wgs-84-3d","latitude":13.1,"longitude":33.46789,"height":100.0} + // we remove the double quotes before parsing the result, e.g. {crs:"wgs-84-3d",latitude:13.1,longitude:33.46789,height:100.0} + ObjectMapper objectMapper = new ObjectMapper() + .disable(JsonWriteFeature.QUOTE_FIELD_NAMES.mappedFeature()); + try { + Map readValue = objectMapper.readValue(value, Map.class); + String stringWithoutKeyQuotes = objectMapper.writeValueAsString(readValue); + return PointValue.parse(stringWithoutKeyQuotes); + } catch (JsonProcessingException ex) { + throw new RuntimeException(ex); + } + } + } + + // similar to CsvPropertyConverter + public static Object[] getPrototypeFor(String type) { + switch (type) { + case "Long": + return new Long[]{}; + case "Integer": + return new Integer[]{}; + case "Double": + return new Double[]{}; + case "Float": + return new Float[]{}; + case "Boolean": + return new Boolean[]{}; + case "Byte": + return new Byte[]{}; + case "Short": + return new Short[]{}; + case "Char": + return new Character[]{}; + case "String": + return new String[]{}; + case "DateTime": + return new ZonedDateTime[]{}; + case "LocalTime": + return new LocalTime[]{}; + case "LocalDateTime": + return new LocalDateTime[]{}; + case "Point": + return new PointValue[]{}; + case "Time": + return new OffsetTime[]{}; + case "Date": + return new LocalDate[]{}; + case "Duration": + return new DurationValue[]{}; + default: + throw new IllegalStateException("Type " + type + " not supported."); + } + } + +} diff --git a/full/src/main/resources/extended.txt b/full/src/main/resources/extended.txt index 714ad186ef..5a6a942662 100644 --- a/full/src/main/resources/extended.txt +++ b/full/src/main/resources/extended.txt @@ -71,6 +71,7 @@ apoc.generate.ws apoc.gephi.add apoc.get.nodes apoc.get.rels +apoc.import.arrow apoc.json.validate apoc.load.csv apoc.load.csvParams diff --git a/full/src/test/java/apoc/export/arrow/ImportArrowTest.java b/full/src/test/java/apoc/export/arrow/ImportArrowTest.java new file mode 100644 index 0000000000..d5a44b24b2 --- /dev/null +++ b/full/src/test/java/apoc/export/arrow/ImportArrowTest.java @@ -0,0 +1,168 @@ +package apoc.export.arrow; + +import apoc.meta.Meta; +import apoc.util.TestUtil; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.neo4j.configuration.GraphDatabaseSettings; +import org.neo4j.graphdb.Node; +import org.neo4j.graphdb.Relationship; +import org.neo4j.graphdb.ResourceIterator; +import org.neo4j.graphdb.Result; +import org.neo4j.kernel.impl.util.ValueUtils; +import org.neo4j.test.rule.DbmsRule; +import org.neo4j.test.rule.ImpermanentDbmsRule; +import org.neo4j.values.AnyValue; +import org.neo4j.values.storable.DurationValue; +import org.neo4j.values.storable.LocalDateTimeValue; +import org.neo4j.values.storable.PointValue; +import org.neo4j.values.virtual.VirtualValues; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import static apoc.ApocConfig.APOC_EXPORT_FILE_ENABLED; +import static apoc.ApocConfig.APOC_IMPORT_FILE_ENABLED; +import static apoc.ApocConfig.apocConfig; +import static apoc.util.TestUtil.testCall; +import static apoc.util.TestUtil.testResult; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class ImportArrowTest { + private static File directory = new File("target/arrowImport"); + static { //noinspection ResultOfMethodCallIgnored + directory.mkdirs(); + } + + private final Map MAPPING_ALL = Map.of("mapping", + Map.of("bffSince", "Duration", "place", "Point", "listInt", "LongArray", "born", "LocalDateTime") + ); + + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath()); + + + + @BeforeClass + public static void beforeClass() { + TestUtil.registerProcedure(db, ExportArrow.class, ImportArrow.class, Meta.class); + } + + @Before + public void before() { + db.executeTransactionally("MATCH (n) DETACH DELETE n"); + + db.executeTransactionally("CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace', 'Qwe'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); + db.executeTransactionally("CREATE (:Another {foo:1, listInt: [1,2]}), (:Another {bar:'Sam'})"); + + apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); + apocConfig().setProperty(APOC_EXPORT_FILE_ENABLED, true); + } + + @Test + public void testStreamRoundtripImportArrowAll() { + final byte[] bytes = db.executeTransactionally("CALL apoc.export.arrow.stream.all", + Map.of(), + this::extractByteArray); + + testImportCommon(bytes, MAPPING_ALL); + } + + @Test + public void testFileRoundtripImportArrowAll() { + String file = db.executeTransactionally("CALL apoc.export.arrow.all('test_all.arrow') YIELD file", + Map.of(), + this::extractFileName); + + testImportCommon(file, MAPPING_ALL); + } + + @Test + public void testFileRoundtripImportArrowAllWithSmallBatchSize() { + String file = db.executeTransactionally("CALL apoc.export.arrow.all('test_all.arrow') YIELD file", + Map.of(), + this::extractFileName); + + Map config = new HashMap<>(MAPPING_ALL); + config.put("batchSize", 1); + testImportCommon(file, config); + } + + private void testImportCommon(Object file, Map config) { + // then + Map params = Map.of("file", file, "config", config); + + // remove current data + db.executeTransactionally("MATCH (n) DETACH DELETE n"); + + final String query = "CALL apoc.import.arrow($file, $config)"; + testCall(db, query, params, + r -> { + assertEquals(4L, r.get("nodes")); + assertEquals(1L, r.get("relationships")); + }); + + testCall(db, "MATCH (start:User)-[rel:KNOWS]->(end:User) RETURN start, rel, end", r -> { + Node start = (Node) r.get("start"); + assertFirstUserNodeProps(start.getAllProperties()); + Node end = (Node) r.get("end"); + assertSecondUserNodeProps(end.getAllProperties()); + Relationship rel = (Relationship) r.get("rel"); + assertRelationshipProps(rel.getAllProperties()); + }); + + testResult(db, "MATCH (m:Another) RETURN m", r -> { + ResourceIterator m = r.columnAs("m"); + Node node = m.next(); + assertFirstAnotherNodeProps(node.getAllProperties()); + node = m.next(); + assertSecondAnotherNodeProps(node.getAllProperties()); + assertFalse(r.hasNext()); + }); + } + + + private String extractFileName(Result result) { + return result.columnAs("file").next(); + } + + private byte[] extractByteArray(Result result) { + return result.columnAs("value").next(); + } + + private static void assertFirstUserNodeProps(Map props) { + assertEquals("Adam", props.get("name")); + assertEquals(42L, props.get("age")); + assertEquals( true, props.get("male")); + assertArrayEquals(new String[] { "Sam", "Anna", "Grace", "Qwe" }, (String[]) props.get("kids")); + Map latitude = Map.of("latitude", 13.1D, "longitude", 33.46789D, "height", 100.0D); + assertEquals(PointValue.fromMap(VirtualValues.map(latitude.keySet().toArray(new String[0]), latitude.values().stream().map(ValueUtils::of).toArray(AnyValue[]::new))), + props.get("place")); + assertEquals(LocalDateTimeValue.parse("2015-05-18T19:32:24.000").asObject(), props.get("born")); + } + + private static void assertSecondUserNodeProps(Map props) { + assertEquals( "Jim", props.get("name")); + assertEquals(42L, props.get("age")); + } + + private static void assertFirstAnotherNodeProps(Map map) { + assertEquals(1L, map.get("foo")); + assertArrayEquals(new long[] {1L, 2L}, (long[]) map.get("listInt")); + } + + private static void assertSecondAnotherNodeProps(Map map) { + assertEquals("Sam", map.get("bar")); + } + + private static void assertRelationshipProps(Map props) { + assertEquals(DurationValue.parse("P5M1DT12H"), props.get("bffSince")); + assertEquals(1993L, props.get("since")); + } +} From f4172303e8414e726dffe648a77d0720d0905286 Mon Sep 17 00:00:00 2001 From: vga91 Date: Tue, 10 Sep 2024 14:40:08 +0200 Subject: [PATCH 2/3] [NOID] java 11 changes --- .../java/apoc/export/arrow/ImportArrow.java | 275 +++++++++--------- .../apoc/export/arrow/ImportArrowTest.java | 89 +++--- 2 files changed, 182 insertions(+), 182 deletions(-) diff --git a/full/src/main/java/apoc/export/arrow/ImportArrow.java b/full/src/main/java/apoc/export/arrow/ImportArrow.java index 6792dc001f..1b20fad6c3 100644 --- a/full/src/main/java/apoc/export/arrow/ImportArrow.java +++ b/full/src/main/java/apoc/export/arrow/ImportArrow.java @@ -11,6 +11,24 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.json.JsonWriteFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.channels.SeekableByteChannel; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; @@ -40,31 +58,11 @@ import org.neo4j.values.storable.TimeValue; import org.neo4j.values.storable.Values; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.channels.SeekableByteChannel; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; - - @Extended public class ImportArrow { // TODO: field similar to the one placed in ArrowUtils (placed in core) - // when the Arrow procedures will be placed in extended remove these lines + // when the Arrow procedures will be placed in extended remove these lines // and replace FIELD_ID with FIELD_ID.getName(), FIELD_LABELS with FIELD_LABELS.getName(), etc.. public static String FIELD_ID = ""; public static String FIELD_LABELS = "labels"; @@ -72,115 +70,115 @@ public class ImportArrow { public static String FIELD_TARGET_ID = ""; public static String FIELD_TYPE = ""; // -- end ArrowUtils fields - + @Context public Pools pools; @Context public GraphDatabaseService db; - - + @Procedure(name = "apoc.import.arrow", mode = Mode.WRITE) @Description("Imports arrow from the provided arrow file or byte array") - public Stream importFile(@Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) throws Exception { - - ProgressInfo result = - Util.inThread(pools, () -> { - String file = null; - String sourceInfo = "binary"; - if (input instanceof String) { - file = (String) input; - sourceInfo = "file"; - } + public Stream importFile( + @Name("input") Object input, @Name(value = "config", defaultValue = "{}") Map config) + throws Exception { + + ProgressInfo result = Util.inThread(pools, () -> { + String file = null; + String sourceInfo = "binary"; + if (input instanceof String) { + file = (String) input; + sourceInfo = "file"; + } - final ArrowConfig conf = new ArrowConfig(config); - - final Map idMapping = new HashMap<>(); - - AtomicInteger counter = new AtomicInteger(); - try (ArrowReader reader = getReader(input); - VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) { - - final ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow")); - BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); - try { - while (hasElements(counter, reader, schemaRoot)) { - - final Map row = schemaRoot.getFieldVectors() - .stream() - .collect( - HashMap::new, - (map, fieldVector) -> { - Object read = read(fieldVector, counter.get(), conf); - if (read == null) { - return; - } - map.put(fieldVector.getName(), read); - }, - HashMap::putAll); - - String relType = (String) row.remove(FIELD_TYPE); - if (relType == null) { - // is node - String[] stringLabels = (String[]) row.remove(FIELD_LABELS); - Label[] labels = Optional.ofNullable(stringLabels) - .map(l -> Arrays.stream(l).map(Label::label).toArray(Label[]::new)) - .orElse(new Label[]{}); - final Node node = btx.getTransaction().createNode(labels); - - long id = (long) row.remove(FIELD_ID); - idMapping.put(id, node.getId()); - - addProps(row, node); - reporter.update(1, 0, row.size()); - } else { - // is relationship - long sourceId = (long) row.remove(FIELD_SOURCE_ID); - Long idSource = idMapping.get(sourceId); - final Node source = btx.getTransaction().getNodeById(idSource); - - long targetId = (long) row.remove(FIELD_TARGET_ID); - Long idTarget = idMapping.get(targetId); - final Node target = btx.getTransaction().getNodeById(idTarget); - - final Relationship rel = source.createRelationshipTo(target, RelationshipType.withName(relType)); - addProps(row, rel); - reporter.update(0, 1, row.size()); - } - - counter.incrementAndGet(); - btx.increment(); - } - - btx.commit(); - } catch (RuntimeException e) { - btx.rollback(); - throw e; - } finally { - btx.close(); + final ArrowConfig conf = new ArrowConfig(config); + + final Map idMapping = new HashMap<>(); + + AtomicInteger counter = new AtomicInteger(); + try (ArrowReader reader = getReader(input); + VectorSchemaRoot schemaRoot = reader.getVectorSchemaRoot()) { + + final ProgressReporter reporter = + new ProgressReporter(null, null, new ProgressInfo(file, sourceInfo, "arrow")); + BatchTransaction btx = new BatchTransaction(db, conf.getBatchSize(), reporter); + try { + while (hasElements(counter, reader, schemaRoot)) { + + final Map row = schemaRoot.getFieldVectors().stream() + .collect( + HashMap::new, + (map, fieldVector) -> { + Object read = read(fieldVector, counter.get(), conf); + if (read == null) { + return; + } + map.put(fieldVector.getName(), read); + }, + HashMap::putAll); + + String relType = (String) row.remove(FIELD_TYPE); + if (relType == null) { + // is node + String[] stringLabels = (String[]) row.remove(FIELD_LABELS); + Label[] labels = Optional.ofNullable(stringLabels) + .map(l -> Arrays.stream(l).map(Label::label).toArray(Label[]::new)) + .orElse(new Label[] {}); + final Node node = btx.getTransaction().createNode(labels); + + long id = (long) row.remove(FIELD_ID); + idMapping.put(id, node.getId()); + + addProps(row, node); + reporter.update(1, 0, row.size()); + } else { + // is relationship + long sourceId = (long) row.remove(FIELD_SOURCE_ID); + Long idSource = idMapping.get(sourceId); + final Node source = btx.getTransaction().getNodeById(idSource); + + long targetId = (long) row.remove(FIELD_TARGET_ID); + Long idTarget = idMapping.get(targetId); + final Node target = btx.getTransaction().getNodeById(idTarget); + + final Relationship rel = + source.createRelationshipTo(target, RelationshipType.withName(relType)); + addProps(row, rel); + reporter.update(0, 1, row.size()); } - return reporter.getTotal(); + counter.incrementAndGet(); + btx.increment(); } - }); + + btx.commit(); + } catch (RuntimeException e) { + btx.rollback(); + throw e; + } finally { + btx.close(); + } + + return reporter.getTotal(); + } + }); return Stream.of(result); } - private ArrowReader getReader(Object input) throws IOException { RootAllocator allocator = new RootAllocator(); if (input instanceof String) { - final SeekableByteChannel channel = FileUtils.inputStreamFor(input, null, null, null) - .asChannel(); + final SeekableByteChannel channel = + FileUtils.inputStreamFor(input, null, null, null).asChannel(); return new ArrowFileReader(channel, allocator); } ByteArrayInputStream inputStream = new ByteArrayInputStream((byte[]) input); return new ArrowStreamReader(inputStream, allocator); } - - private static boolean hasElements(AtomicInteger counter, ArrowReader reader, VectorSchemaRoot schemaRoot) throws IOException { + private static boolean hasElements(AtomicInteger counter, ArrowReader reader, VectorSchemaRoot schemaRoot) + throws IOException { if (counter.get() >= schemaRoot.getRowCount()) { if (reader.loadNextBatch()) { counter.set(0); @@ -188,12 +186,12 @@ private static boolean hasElements(AtomicInteger counter, ArrowReader reader, Ve return false; } } - + return true; } private static Object read(FieldVector fieldVector, int index, ArrowConfig conf) { - + if (fieldVector.isNull(index)) { return null; } else if (fieldVector instanceof BitVector) { @@ -246,14 +244,17 @@ public static Object toValidValue(Object object, String field, Map) object).stream() - .map(i -> toValidValue(i, field, mapping)) - .collect(Collectors.toList()) - .toArray(new String[0]); + return ((Collection) object) + .stream() + .map(i -> toValidValue(i, field, mapping)) + .collect(Collectors.toList()) + .toArray(new String[0]); } if (object instanceof Map) { - return ((Map) object).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> toValidValue(e.getValue(), field, mapping))); + return ((Map) object) + .entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, e -> toValidValue(e.getValue(), field, mapping))); } try { // we test if is a valid Neo4j type @@ -272,7 +273,7 @@ public static Object toValidValue(Object object, String field, Map MAPPING_ALL = Map.of("mapping", - Map.of("bffSince", "Duration", "place", "Point", "listInt", "LongArray", "born", "LocalDateTime") - ); - - @ClassRule - public static DbmsRule db = new ImpermanentDbmsRule() - .withSetting(GraphDatabaseSettings.load_csv_file_url_root, directory.toPath().toAbsolutePath()); - + private final Map MAPPING_ALL = Map.of( + "mapping", + Map.of("bffSince", "Duration", "place", "Point", "listInt", "LongArray", "born", "LocalDateTime")); + @ClassRule + public static DbmsRule db = new ImpermanentDbmsRule() + .withSetting( + GraphDatabaseSettings.load_csv_file_url_root, + directory.toPath().toAbsolutePath()); @BeforeClass public static void beforeClass() { @@ -58,7 +58,8 @@ public static void beforeClass() { public void before() { db.executeTransactionally("MATCH (n) DETACH DELETE n"); - db.executeTransactionally("CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace', 'Qwe'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); + db.executeTransactionally( + "CREATE (f:User {name:'Adam',age:42,male:true,kids:['Sam','Anna','Grace', 'Qwe'], born:localdatetime('2015-05-18T19:32:24.000'), place:point({latitude: 13.1, longitude: 33.46789, height: 100.0})})-[:KNOWS {since: 1993, bffSince: duration('P5M1.5D')}]->(b:User {name:'Jim',age:42})"); db.executeTransactionally("CREATE (:Another {foo:1, listInt: [1,2]}), (:Another {bar:'Sam'})"); apocConfig().setProperty(APOC_IMPORT_FILE_ENABLED, true); @@ -67,27 +68,24 @@ public void before() { @Test public void testStreamRoundtripImportArrowAll() { - final byte[] bytes = db.executeTransactionally("CALL apoc.export.arrow.stream.all", - Map.of(), - this::extractByteArray); + final byte[] bytes = + db.executeTransactionally("CALL apoc.export.arrow.stream.all", Map.of(), this::extractByteArray); testImportCommon(bytes, MAPPING_ALL); } - + @Test public void testFileRoundtripImportArrowAll() { - String file = db.executeTransactionally("CALL apoc.export.arrow.all('test_all.arrow') YIELD file", - Map.of(), - this::extractFileName); - + String file = db.executeTransactionally( + "CALL apoc.export.arrow.all('test_all.arrow') YIELD file", Map.of(), this::extractFileName); + testImportCommon(file, MAPPING_ALL); } - + @Test public void testFileRoundtripImportArrowAllWithSmallBatchSize() { - String file = db.executeTransactionally("CALL apoc.export.arrow.all('test_all.arrow') YIELD file", - Map.of(), - this::extractFileName); + String file = db.executeTransactionally( + "CALL apoc.export.arrow.all('test_all.arrow') YIELD file", Map.of(), this::extractFileName); Map config = new HashMap<>(MAPPING_ALL); config.put("batchSize", 1); @@ -102,11 +100,10 @@ private void testImportCommon(Object file, Map config) { db.executeTransactionally("MATCH (n) DETACH DELETE n"); final String query = "CALL apoc.import.arrow($file, $config)"; - testCall(db, query, params, - r -> { - assertEquals(4L, r.get("nodes")); - assertEquals(1L, r.get("relationships")); - }); + testCall(db, query, params, r -> { + assertEquals(4L, r.get("nodes")); + assertEquals(1L, r.get("relationships")); + }); testCall(db, "MATCH (start:User)-[rel:KNOWS]->(end:User) RETURN start, rel, end", r -> { Node start = (Node) r.get("start"); @@ -127,11 +124,10 @@ private void testImportCommon(Object file, Map config) { }); } - private String extractFileName(Result result) { return result.columnAs("file").next(); } - + private byte[] extractByteArray(Result result) { return result.columnAs("value").next(); } @@ -139,16 +135,19 @@ private byte[] extractByteArray(Result result) { private static void assertFirstUserNodeProps(Map props) { assertEquals("Adam", props.get("name")); assertEquals(42L, props.get("age")); - assertEquals( true, props.get("male")); - assertArrayEquals(new String[] { "Sam", "Anna", "Grace", "Qwe" }, (String[]) props.get("kids")); + assertEquals(true, props.get("male")); + assertArrayEquals(new String[] {"Sam", "Anna", "Grace", "Qwe"}, (String[]) props.get("kids")); Map latitude = Map.of("latitude", 13.1D, "longitude", 33.46789D, "height", 100.0D); - assertEquals(PointValue.fromMap(VirtualValues.map(latitude.keySet().toArray(new String[0]), latitude.values().stream().map(ValueUtils::of).toArray(AnyValue[]::new))), + assertEquals( + PointValue.fromMap(VirtualValues.map( + latitude.keySet().toArray(new String[0]), + latitude.values().stream().map(ValueUtils::of).toArray(AnyValue[]::new))), props.get("place")); assertEquals(LocalDateTimeValue.parse("2015-05-18T19:32:24.000").asObject(), props.get("born")); } private static void assertSecondUserNodeProps(Map props) { - assertEquals( "Jim", props.get("name")); + assertEquals("Jim", props.get("name")); assertEquals(42L, props.get("age")); } From 91d3accdbbb465ee768e45d55754eba3efa61404 Mon Sep 17 00:00:00 2001 From: vga91 Date: Tue, 10 Sep 2024 15:20:34 +0200 Subject: [PATCH 3/3] [NOID] try removing gradle deps --- full/build.gradle | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/full/build.gradle b/full/build.gradle index f4d9b7de70..fd7e7e5a76 100644 --- a/full/build.gradle +++ b/full/build.gradle @@ -117,10 +117,10 @@ dependencies { exclude group: 'io.netty' } - compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' - compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' - testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' - testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' +// compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' +// compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' +// testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0' +// testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0' compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons testImplementation group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons