Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/main/java/org/neo4j/importer/v1/graph/Graphs.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -118,6 +119,34 @@ public static <T> List<List<T>> detectCycles(Map<T, Set<T>> graph) {
return cycles;
}

public static <T> List<Set<T>> findWeaklyConnectedComponents(Map<T, Set<T>> graph) {
var adjacencyList = new LinkedHashMap<T, Set<T>>();
graph.forEach((node, neighbors) -> {
neighbors.forEach(neighbor -> {
adjacencyList.computeIfAbsent(node, key -> new HashSet<>()).add(neighbor);
adjacencyList.computeIfAbsent(neighbor, key -> new HashSet<>()).add(node);
});
});
List<Set<T>> result = new ArrayList<>();
Set<T> visitedNodes = new HashSet<>();
adjacencyList.keySet().forEach(node -> {
if (!visitedNodes.contains(node)) {
var component = new HashSet<T>();
var stack = new ArrayDeque<T>();
stack.push(node);
while (!stack.isEmpty()) {
var neighbor = stack.pop();
if (visitedNodes.add(neighbor)) {
component.add(neighbor);
adjacencyList.get(neighbor).forEach(stack::push);
}
}
result.add(component);
}
});
return result;
}

private static <T> Stream<T> getAllValues(Map<T, Set<T>> graph) {
return graph.entrySet().stream()
.flatMap(entry -> mergeKeyValues(entry).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
*/
package org.neo4j.importer.v1.pipeline;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.neo4j.importer.v1.actions.Action;
import org.neo4j.importer.v1.actions.ActionStage;

public class ActionStep implements ImportStep {

private final Action action;
private final List<ImportStep> dependencies;
private final Set<ImportStep> dependencies;

ActionStep(Action action, List<ImportStep> dependencies) {
ActionStep(Action action, Set<ImportStep> dependencies) {
this.action = action;
this.dependencies = dependencies;
}
Expand All @@ -45,7 +45,7 @@ public Action action() {
}

@Override
public List<ImportStep> dependencies() {
public Set<ImportStep> dependencies() {
return dependencies;
}

Expand All @@ -60,4 +60,9 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(action, dependencies);
}

@Override
public String toString() {
return "ActionStep{" + "action=" + action + ", dependencies=" + dependencies + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package org.neo4j.importer.v1.pipeline;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.neo4j.importer.v1.targets.CustomQueryTarget;

public class CustomQueryTargetStep extends TargetStep {

private final CustomQueryTarget target;

CustomQueryTargetStep(CustomQueryTarget target, List<ImportStep> dependencies) {
CustomQueryTargetStep(CustomQueryTarget target, Set<ImportStep> dependencies) {
super(dependencies);
this.target = target;
}
Expand All @@ -46,6 +46,11 @@ public int hashCode() {
return Objects.hash(super.hashCode(), target);
}

@Override
public String toString() {
return "CustomQueryTargetStep{" + "target=" + target + "} " + super.toString();
}

@Override
protected CustomQueryTarget target() {
return target;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.neo4j.importer.v1.targets.EntityTarget;
import org.neo4j.importer.v1.targets.EntityTargetExtension;
Expand All @@ -26,7 +27,7 @@

public abstract class EntityTargetStep extends TargetStep {

protected EntityTargetStep(List<ImportStep> dependencies) {
protected EntityTargetStep(Set<ImportStep> dependencies) {
super(dependencies);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [https://neo4j.com]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.importer.v1.pipeline;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.neo4j.importer.v1.graph.Graphs;

/**
* Represents the entire parallelizable execution plan for an import step graph.
* Tasks are grouped into groups, as a list of independent ImportStepGroup. Each group can be processed
* entirely in parallel with the others.
*/
public class ImportExecutionPlan {

private final List<ImportStepGroup> groups;

ImportExecutionPlan(List<ImportStepGroup> groups) {
this.groups = groups;
}

static ImportExecutionPlan fromGraph(Map<ImportStep, Set<ImportStep>> dependencyGraph) {
var components = Graphs.findWeaklyConnectedComponents(dependencyGraph);
var groups = new ArrayList<ImportStepGroup>();
components.forEach((component) -> {
var stages = new ArrayList<ImportStepStage>();
// "out" as in (a:Step)-[:DEPENDS_ON]->(b:Step)
// a is a "dependent", b is a dependency, b needs to run first
var outDegrees = new HashMap<ImportStep, Long>();
component.forEach(dependent -> {
var dependencyCount = dependencyGraph
.getOrDefault(dependent, Collections.emptySet())
.size();
outDegrees.merge(dependent, (long) dependencyCount, Long::sum);
});
var reverseDependencyGraph = reverseDependencyGraph(dependencyGraph);
while (true) {
var currentStageSteps = outDegrees.entrySet().stream()
// tasks without dependencies are our current stage's starting points
.filter(entry -> entry.getValue() == 0)
.map(Map.Entry::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));
if (currentStageSteps.isEmpty()) {
break;
}
stages.add(new ImportStepStage(currentStageSteps));
currentStageSteps.forEach(step -> {
outDegrees.remove(step);
var dependents = reverseDependencyGraph.getOrDefault(step, Collections.emptySet());
dependents.forEach(dependency -> {
outDegrees.compute(dependency, (key, value) -> value - 1);
});
});
}
groups.add(new ImportStepGroup(stages));
});
return new ImportExecutionPlan(groups);
}

public List<ImportStepGroup> getGroups() {
return groups;
}

@Override
public String toString() {
return "ImportExecutionPlan{" + "groups=" + groups + '}';
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ImportExecutionPlan)) return false;
ImportExecutionPlan that = (ImportExecutionPlan) o;
return Objects.equals(groups, that.groups);
}

@Override
public int hashCode() {
return Objects.hashCode(groups);
}

private static Map<ImportStep, Set<ImportStep>> reverseDependencyGraph(
Map<ImportStep, Set<ImportStep>> dependencyGraph) {
var reverseDependencyGraph = new HashMap<ImportStep, Set<ImportStep>>(dependencyGraph.size());
dependencyGraph.forEach((dependent, dependencies) -> dependencies.forEach(dependency -> reverseDependencyGraph
.computeIfAbsent(dependency, k -> new LinkedHashSet<>())
.add(dependent)));
return reverseDependencyGraph;
}

/**
* Represents an independent group of related tasks (a connected component).
* The tasks are organized into sequential stages, ImportStepStage, where all tasks in a
* single stage can be executed in parallel.
*/
public static class ImportStepGroup {

private final List<ImportStepStage> stages;

public ImportStepGroup(List<ImportStepStage> stages) {
this.stages = stages;
}

public List<ImportStepStage> getStages() {
return stages;
}

@Override
public String toString() {
return "ImportStepGroup{" + "steps=" + stages + '}';
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ImportStepGroup)) return false;
ImportStepGroup that = (ImportStepGroup) o;
return Objects.equals(stages, that.stages);
}

@Override
public int hashCode() {
return Objects.hashCode(stages);
}
}

/**
* Represents a single stage of execution containing a set of tasks
* that can all run in parallel.
*/
public static class ImportStepStage {

private final Set<ImportStep> steps;

public ImportStepStage(Set<ImportStep> steps) {
this.steps = steps;
}

public Set<ImportStep> getSteps() {
return steps;
}

@Override
public String toString() {
return "ImportStepStage{" + "steps=" + steps + '}';
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ImportStepStage)) return false;
ImportStepStage that = (ImportStepStage) o;
return Objects.equals(steps, that.steps);
}

@Override
public int hashCode() {
return Objects.hashCode(steps);
}
}
}
Loading