Skip to content

Commit

Permalink
[NOID] Backport runMany updates
Browse files Browse the repository at this point in the history
  • Loading branch information
gem-neo4j committed Apr 8, 2024
1 parent a98bc35 commit 24a05ce
Show file tree
Hide file tree
Showing 4 changed files with 1,198 additions and 111 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/apoc/cypher/Cypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private Object consumeResult(Result result, BlockingQueue<RowResult> queue, bool
}
}

private String removeShellControlCommands(String stmt) {
public static String removeShellControlCommands(String stmt) {
Matcher matcher = shellControl.matcher(stmt.trim());
if (matcher.find()) {
// an empty file get transformed into ":begin\n:commit" and that statement is not matched by the pattern
Expand All @@ -257,7 +257,7 @@ private boolean isPeriodicOperation(String stmt) {
return stmt.matches("(?is).*using\\s+periodic.*");
}

private Map<String, Object> toMap(QueryStatistics stats, long time, long rows) {
protected static Map<String, Object> toMap(QueryStatistics stats, long time, long rows) {
final Map<String, Object> map = map(
"rows", rows,
"time", time);
Expand Down
237 changes: 128 additions & 109 deletions full/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -59,6 +61,7 @@
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.security.AuthorizationViolationException;
import org.neo4j.internal.helpers.collection.Iterators;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
Expand All @@ -76,7 +79,6 @@
@Extended
public class CypherExtended {

public static final String COMPILED_PREFIX = "CYPHER runtime=" + Util.COMPILED;
public static final int PARTITIONS = 100 * Runtime.getRuntime().availableProcessors();
public static final int MAX_BATCH = 10000;

Expand Down Expand Up @@ -134,42 +136,84 @@ private Stream<RowResult> runNonSchemaFiles(
@SuppressWarnings("unchecked")
final Map<String, Object> parameters =
(Map<String, Object>) config.getOrDefault("parameters", Collections.emptyMap());
final boolean schemaOperation = false;
return runFiles(fileNames, config, parameters, schemaOperation, defaultStatistics);
return runFiles(fileNames, config, parameters, defaultStatistics);
}

// This runs the files sequentially
private Stream<RowResult> runFiles(
List<String> fileNames,
Map<String, Object> config,
Map<String, Object> parameters,
boolean schemaOperation,
boolean defaultStatistics) {
List<String> fileNames, Map<String, Object> config, Map<String, Object> params, boolean defaultStatistics) {
boolean reportError = Util.toBoolean(config.get("reportError"));
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics", defaultStatistics));
int timeout = Util.toInteger(config.getOrDefault("timeout", 10));
int queueCapacity = Util.toInteger(config.getOrDefault("queueCapacity", 100));
var result = fileNames.stream().flatMap(fileName -> {
return fileNames.stream().flatMap(fileName -> {
final Reader reader = readerForFile(fileName);
final Scanner scanner = createScannerFor(reader);
return runManyStatements(
scanner,
parameters,
schemaOperation,
addStatistics,
timeout,
queueCapacity,
reportError,
fileName)
.onClose(() -> Util.close(
scanner,
(e) -> log.info(
"Cannot close the scanner for file " + fileName
+ " because the following exception",
e)));
AtomicBoolean hasFailed = new AtomicBoolean(false);
return Iterators.stream(new Scanner(reader).useDelimiter(";\r?\n"))
.map(Cypher::removeShellControlCommands)
.filter(s -> !s.isBlank())
.flatMap(s -> streamInNewTx(s, params, addStatistics, fileName, reportError, hasFailed));
});
}

private Stream<RowResult> streamInNewTx(
String cypher,
Map<String, Object> params,
boolean stats,
String fileName,
boolean reportError,
AtomicBoolean hasFailed) {
if (hasFailed.get()) return null;
else if (isPeriodicOperation(cypher))
return streamInNewExplicitTx(cypher, params, stats, fileName, reportError);
final var innerTx = db.beginTx();
try {
// Hello fellow wanderer,
// At this point you may have questions like;
// - "Why do we execute this statement in a new transaction?"
// My guess is as good as yours. This is the way of the apoc. Safe travels.

final var results = new RunManyResultSpliterator(innerTx.execute(cypher, params), stats, fileName, tx);
return StreamSupport.stream(results, false).onClose(results::close).onClose(innerTx::commit);
} catch (AuthorizationViolationException accessModeException) {
// We meet again, few people make it this far into this world!
// I hope you're not still seeking answers, there are few to give.
// It has been written, in some long forgotten commits,
// that failures of this kind should be avoided. The ancestors
// were brave and used a regex based cypher parser to avoid
// trying to execute schema changing statements all together.
// We don't have that courage, and try to forget about it
// after the fact instead.
// One can only hope that by keeping this tradition alive,
// in some form, we make some poor souls happier.
innerTx.close();
return Stream.empty();
} catch (Throwable t) {
innerTx.close();
hasFailed.set(true);
if (reportError) {
String error = t.getMessage();
return Stream.of(new RowResult(-1, Map.of("error", error), fileName));
} else {
return null;
}
}
}

return result;
private Stream<RowResult> streamInNewExplicitTx(
String cypher, Map<String, Object> params, boolean stats, String fileName, boolean reportError) {
try {
final var results = new RunManyResultSpliterator(
db.executeTransactionally(cypher, params, result -> result), stats, fileName, tx);
return StreamSupport.stream(results, false).onClose(results::close);
} catch (AuthorizationViolationException accessModeException) {
return Stream.empty();
} catch (Throwable t) {
if (reportError) {
String error = t.getMessage();
return Stream.of(new RowResult(-1, Map.of("error", error), fileName));
} else {
return null;
}
}
}

@Procedure(mode = Mode.SCHEMA)
Expand All @@ -186,34 +230,8 @@ public Stream<RowResult> runSchemaFile(
public Stream<RowResult> runSchemaFiles(
@Name("file") List<String> fileNames,
@Name(value = "config", defaultValue = "{}") Map<String, Object> config) {
final boolean schemaOperation = true;
final Map<String, Object> parameters = Collections.emptyMap();
return runFiles(fileNames, config, parameters, schemaOperation, true);
}

private Stream<RowResult> runManyStatements(
Scanner scanner,
Map<String, Object> params,
boolean schemaOperation,
boolean addStatistics,
int timeout,
int queueCapacity,
boolean reportError,
String fileName) {
BlockingQueue<RowResult> queue = runInSeparateThreadAndSendTombstone(
queueCapacity,
internalQueue -> {
if (schemaOperation) {
runSchemaStatementsInTx(
scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
} else {
runDataStatementsInTx(
scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
}
},
RowResult.TOMBSTONE);
return StreamSupport.stream(
new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, Integer.MAX_VALUE), false);
return runFiles(fileNames, config, parameters, true);
}

private <T> BlockingQueue<T> runInSeparateThreadAndSendTombstone(
Expand Down Expand Up @@ -246,7 +264,6 @@ private void runDataStatementsInTx(
BlockingQueue<RowResult> queue,
Map<String, Object> params,
boolean addStatistics,
long timeout,
boolean reportError,
String fileName) {
while (scanner.hasNext()) {
Expand Down Expand Up @@ -294,43 +311,6 @@ private void collectError(BlockingQueue<RowResult> queue, boolean reportError, E
QueueUtil.put(queue, result, 10);
}

private Scanner createScannerFor(Reader reader) {
Scanner scanner = new Scanner(reader);
scanner.useDelimiter(";\r?\n");
return scanner;
}

private void runSchemaStatementsInTx(
Scanner scanner,
BlockingQueue<RowResult> queue,
Map<String, Object> params,
boolean addStatistics,
long timeout,
boolean reportError,
String fileName) {
while (scanner.hasNext()) {
String stmt = removeShellControlCommands(scanner.next());
if (stmt.trim().isEmpty()) continue;
boolean schemaOperation;
try {
schemaOperation = isSchemaOperation(stmt);
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return;
}
if (schemaOperation) {
Util.inTx(db, pools, txInThread -> {
try (Result result = txInThread.execute(stmt, params)) {
return consumeResult(result, queue, addStatistics, tx, fileName);
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return null;
}
});
}
}
}

private static final Pattern shellControl =
Pattern.compile("^:?\\b(begin|commit|rollback)\\b", Pattern.CASE_INSENSITIVE);

Expand Down Expand Up @@ -419,10 +399,6 @@ public static String withParamMapping(String fragment, Collection<String> keys)
return declaration + fragment;
}

public static String compiled(String fragment) {
return fragment.substring(0, 6).equalsIgnoreCase("cypher") ? fragment : COMPILED_PREFIX + fragment;
}

@Procedure
@Description(
"apoc.cypher.parallel(fragment, `paramMap`, `keyList`) yield value - executes fragments in parallel through a list defined in `paramMap` with a key `keyList`")
Expand All @@ -445,18 +421,6 @@ public Stream<MapResult> parallel(
parallelParams.replace(key, v);
return tx.execute(statement, parallelParams).stream().map(MapResult::new);
});

/*
params.entrySet().stream()
.filter( e -> asCollection(e.getValue()).size() > 100)
.map( (e) -> (Map.Entry<String,Collection>)(Map.Entry)e )
.max( (max,e) -> e.getValue().size() )
.map( (e) -> e.getValue().parallelStream().map( (v) -> {
Map map = new HashMap<>(params);
map.put(e.getKey(),as)
}));
return db.execute(statement,params).stream().map(MapResult::new);
*/
}

@Procedure
Expand Down Expand Up @@ -590,3 +554,58 @@ private Future<List<Map<String, Object>>> submit(
});
}
}

class RunManyResultSpliterator implements Spliterator<CypherExtended.RowResult>, AutoCloseable {
private final Result result;
private final long start;
private boolean statistics;
private String fileName;
private int rowCount;

private Transaction transaction;

RunManyResultSpliterator(Result result, boolean statistics, String fileName, Transaction transaction) {
this.result = result;
this.start = System.currentTimeMillis();
this.statistics = statistics;
this.fileName = fileName;
this.transaction = transaction;
}

@Override
public boolean tryAdvance(Consumer<? super CypherExtended.RowResult> action) {
if (result.hasNext()) {
Map<String, Object> res = EntityUtil.anyRebind(transaction, result.next());
action.accept(new CypherExtended.RowResult(rowCount++, res, fileName));
return true;
} else if (statistics) {
final var stats =
CypherExtended.toMap(result.getQueryStatistics(), System.currentTimeMillis() - start, rowCount);
statistics = false;
action.accept(new CypherExtended.RowResult(-1, stats, fileName));
return true;
}
close();
return false;
}

@Override
public Spliterator<CypherExtended.RowResult> trySplit() {
return null;
}

@Override
public long estimateSize() {
return result.hasNext() ? Long.MAX_VALUE : 1;
}

@Override
public int characteristics() {
return Spliterator.ORDERED;
}

@Override
public void close() {
result.close();
}
}
Loading

0 comments on commit 24a05ce

Please sign in to comment.