Skip to content

Commit

Permalink
Fix parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
shartte committed Dec 29, 2023
1 parent 2f562fe commit bf9c2e3
Show file tree
Hide file tree
Showing 28 changed files with 517 additions and 343 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package net.neoforged.jst.cli.io;
package net.neoforged.jst.api;

import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;

public interface SourceEntry {
public interface FileEntry {
/**
* @return True for directories.
*/
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/net/neoforged/jst/api/FileSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package net.neoforged.jst.api;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public interface FileSink extends AutoCloseable {
@Override
default void close() throws IOException {
}

boolean isOrdered();

void put(FileEntry entry, byte[] content) throws IOException;
}
19 changes: 19 additions & 0 deletions api/src/main/java/net/neoforged/jst/api/FileSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package net.neoforged.jst.api;

import com.intellij.openapi.vfs.VirtualFile;
import com.intellij.openapi.vfs.VirtualFileManager;

import java.io.IOException;
import java.util.stream.Stream;

public interface FileSource extends AutoCloseable {
VirtualFile createSourceRoot(VirtualFileManager vfsManager);

Stream<FileEntry> streamEntries() throws IOException;

boolean isOrdered();

@Override
default void close() throws IOException {
}
}
13 changes: 13 additions & 0 deletions api/src/main/java/net/neoforged/jst/api/IntelliJEnvironment.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package net.neoforged.jst.api;

import com.intellij.core.CoreApplicationEnvironment;
import com.intellij.core.JavaCoreProjectEnvironment;
import com.intellij.psi.PsiManager;

public interface IntelliJEnvironment {
CoreApplicationEnvironment getAppEnv();

JavaCoreProjectEnvironment getProjectEnv();

PsiManager getPsiManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import com.intellij.psi.PsiFile;

public interface SourceTransformer {
default void beforeRun() {
default void beforeRun(TransformContext context) {
}

default void afterRun() {
default void afterRun(TransformContext context) {
}

void visitFile(PsiFile psiFile, Replacements replacements);
}

4 changes: 4 additions & 0 deletions api/src/main/java/net/neoforged/jst/api/TransformContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package net.neoforged.jst.api;

public record TransformContext(IntelliJEnvironment environment, FileSource source, FileSink sink) {
}
8 changes: 8 additions & 0 deletions cli/src/main/java/net/neoforged/jst/cli/IoSuppplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package net.neoforged.jst.cli;

import java.io.IOException;

@FunctionalInterface
public interface IoSuppplier {
byte[] getContent() throws IOException;
}
8 changes: 4 additions & 4 deletions cli/src/main/java/net/neoforged/jst/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import net.neoforged.jst.api.SourceTransformer;
import net.neoforged.jst.api.SourceTransformerPlugin;
import net.neoforged.jst.cli.io.Sink;
import net.neoforged.jst.cli.io.Source;
import net.neoforged.jst.cli.io.FileSinks;
import net.neoforged.jst.cli.io.FileSources;
import org.jetbrains.annotations.VisibleForTesting;
import picocli.CommandLine;

Expand Down Expand Up @@ -53,7 +53,7 @@ public static int innerMain(String... args) {
@Override
public Integer call() throws Exception {

try (var source = new Source(inputPath, inputFormat);
try (var source = FileSources.create(inputPath, inputFormat);
var processor = new SourceFileProcessor()) {

if (librariesList != null) {
Expand All @@ -62,7 +62,7 @@ public Integer call() throws Exception {

var orderedTransformers = new ArrayList<>(enabledTransformers);

try (var sink = new Sink(source, outputPath, outputFormat)) {
try (var sink = FileSinks.create(outputPath, outputFormat, source)) {
processor.process(source, sink, orderedTransformers);
}

Expand Down
108 changes: 108 additions & 0 deletions cli/src/main/java/net/neoforged/jst/cli/OrderedParallelWorkQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package net.neoforged.jst.cli;

import net.neoforged.jst.api.FileEntry;
import net.neoforged.jst.api.FileSink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;

class OrderedParallelWorkQueue implements AutoCloseable {
private final Deque<Future<List<WorkResult>>> pending;
private final FileSink sink;
private final int maxQueueDepth;

public OrderedParallelWorkQueue(FileSink sink, int maxQueueDepth) {
this.sink = sink;
this.maxQueueDepth = maxQueueDepth;
if (maxQueueDepth < 0) {
throw new IllegalArgumentException("Max queue depth must not be negative");
}
this.pending = new ArrayDeque<>(maxQueueDepth);
}

public void submit(Consumer<FileSink> producer) throws IOException {
if (pending.isEmpty()) {
// Can write directly if nothing else is pending
producer.accept(sink);
} else {
// Needs to be queued behind currently queued async work
submitAsync(producer);
}
}

public void submitAsync(Consumer<FileSink> producer) {
try {
if (maxQueueDepth <= 0) {
// Forced into synchronous mode
submit(producer);
return;
}
drainTo(maxQueueDepth - 1);
pending.add(CompletableFuture.supplyAsync(() -> {
try (var parallelSink = new ParallelSink()) {
producer.accept(parallelSink);
return parallelSink.workResults;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}));
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private static final class ParallelSink implements FileSink {
private final List<WorkResult> workResults = new ArrayList<>();

@Override
public boolean isOrdered() {
return false;
}

@Override
public void put(FileEntry entry, byte[] content) {
workResults.add(new WorkResult(entry, content));
}
}

private void drainTo(int drainTo) throws InterruptedException, IOException {
while (pending.size() > drainTo) {
List<WorkResult> workResults;
try {
workResults = pending.removeFirst().get();
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException ioe) {
throw ioe;
}
throw new RuntimeException(e.getCause());
}
for (var workResult : workResults) {
sink.put(workResult.entry, workResult.content);
}
}
}

@Override
public void close() throws IOException {
try {
drainTo(0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
sink.close();
}
}

private record WorkResult(FileEntry entry, byte[] content) {
}
}
80 changes: 0 additions & 80 deletions cli/src/main/java/net/neoforged/jst/cli/OrderedWorkQueue.java

This file was deleted.

Loading

0 comments on commit bf9c2e3

Please sign in to comment.