Skip to content

Commit

Permalink
[performance] BatchImageBuilder: write .class files in batches
Browse files Browse the repository at this point in the history
ProcessTaskManager
* use java.util.concurrent for queue
* signal Cancel/Exception/Stop via the queue
* implements AutoCloseable for try-with-resource
* drain as much Elements as possible from the queue

Improves the performance of "Clean all projects"

For example building platform workspace on Windows
AbstractImageBuilder.compile(): 120 sec -> 91 sec

With this change the Compiler is actually waiting for parsing most time
and not for the write to FileSystem anymore.
  • Loading branch information
EcljpseB0T committed Sep 12, 2024
1 parent f32a077 commit c9812d0
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,6 @@ protected void restoreAptProblems() {

protected void processCompiledUnits(int startingIndex, boolean lastRound) throws java.lang.Error {
CompilationUnitDeclaration unit = null;
ProcessTaskManager processingTask = null;
try {
if (this.useSingleThread) {
// process all units (some more could be injected in the loop by the lookup environment)
Expand Down Expand Up @@ -596,30 +595,37 @@ protected void processCompiledUnits(int startingIndex, boolean lastRound) throws
}));
}
} else {
processingTask = new ProcessTaskManager(this, startingIndex);
int acceptedCount = 0;
// process all units (some more could be injected in the loop by the lookup environment)
// the processTask can continue to process units until its fixed sized cache is full then it must wait
// for this this thread to accept the units as they appear (it only waits if no units are available)
while (true) {
try {
unit = processingTask.removeNextUnit(); // waits if no units are in the processed queue
} catch (Error | RuntimeException e) {
unit = processingTask.unitToProcess;
throw e;
try (ProcessTaskManager processingTask = new ProcessTaskManager(this, startingIndex)){
int acceptedCount = 0;
// process all units (some more could be injected in the loop by the lookup environment)
// the processTask can continue to process units until its fixed sized cache is full then it must wait
// for this this thread to accept the units as they appear (it only waits if no units are available)
while (true) {
try {
Collection<CompilationUnitDeclaration> units;
try {
units = processingTask.removeNextUnits();
} catch (Error | RuntimeException e) {
unit = processingTask.getUnitToProcess();
throw e;
}
if (units == null) {
break;
}
for (CompilationUnitDeclaration u : units) {
unit = u;
reportWorked(1, acceptedCount++);
this.stats.lineCount += unit.compilationResult.lineSeparatorPositions.length;
this.requestor.acceptResult(unit.compilationResult.tagAsAccepted());
if (this.options.verbose)
this.out.println(Messages.bind(Messages.compilation_done,
new String[] { String.valueOf(acceptedCount), String.valueOf(this.totalUnits),
new String(unit.getFileName()) }));
}
} finally {
this.requestor.flush();
}
}
if (unit == null) break;
reportWorked(1, acceptedCount++);
this.stats.lineCount += unit.compilationResult.lineSeparatorPositions.length;
this.requestor.acceptResult(unit.compilationResult.tagAsAccepted());
if (this.options.verbose)
this.out.println(
Messages.bind(Messages.compilation_done,
new String[] {
String.valueOf(acceptedCount),
String.valueOf(this.totalUnits),
new String(unit.getFileName())
}));
}
}
if (!lastRound) {
Expand All @@ -640,10 +646,6 @@ protected void processCompiledUnits(int startingIndex, boolean lastRound) throws
this.handleInternalException(e, unit, null);
throw e; // rethrow
} finally {
if (processingTask != null) {
processingTask.shutdown();
processingTask = null;
}
reset();
this.annotationProcessorStartIndex = 0;
this.stats.endTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,11 @@ public interface ICompilerRequestor {
* Accept a compilation result.
*/
public void acceptResult(CompilationResult result);

/**
* Called to finalize possibly multiple {@link #acceptResult(CompilationResult)}
*/
public default void flush() {
//nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

package org.eclipse.jdt.internal.compiler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -25,19 +30,19 @@
import org.eclipse.jdt.internal.compiler.problem.AbortCompilation;
import org.eclipse.jdt.internal.compiler.util.Messages;

public class ProcessTaskManager {
public class ProcessTaskManager implements AutoCloseable {

private final Compiler compiler;
private final int startingIndex;
private volatile Future<?> processingTask; // synchronized write, volatile read
CompilationUnitDeclaration unitToProcess;
private Throwable caughtException;
private final Future<?> processingTask; // synchronized write, volatile read
/** synchronized access **/
private CompilationUnitDeclaration lastUnitToProcess;

// queue
volatile int currentIndex, availableIndex, size, sleepCount;
private final CompilationUnitDeclaration[] units;
/** contains CompilationUnitDeclaration or something else as stop signal **/
private final BlockingQueue<Object> units;

public static final int PROCESSED_QUEUE_SIZE = 100;
private static final int PROCESSED_QUEUE_SIZE = 100;
private static final Object STOP_SIGNAL = new Object();

/** Normally a single thread is created an reused on subsequent builds **/
private static final ExecutorService executor = Executors.newCachedThreadPool(r -> {
Expand All @@ -46,159 +51,131 @@ public class ProcessTaskManager {
return t;
});

public ProcessTaskManager(Compiler compiler, int startingIndex) {
this.compiler = compiler;
this.startingIndex = startingIndex;
public ProcessTaskManager(Compiler compiler, int startingIndex) {
this.compiler = compiler;
this.startingIndex = startingIndex;

this.currentIndex = 0;
this.availableIndex = 0;
this.size = PROCESSED_QUEUE_SIZE;
this.sleepCount = 0; // 0 is no one, +1 is the processing thread & -1 is the writing/main thread
this.units = new CompilationUnitDeclaration[this.size];
this.units = new ArrayBlockingQueue<>(PROCESSED_QUEUE_SIZE);

synchronized (this) {
this.processingTask = executor.submit(this::compile);
this.processingTask = executor.submit(this::processing);
}
}

// add unit to the queue - wait if no space is available
private synchronized void addNextUnit(CompilationUnitDeclaration newElement) {
while (this.units[this.availableIndex] != null) {
//System.out.print('a');
//if (this.sleepCount < 0) throw new IllegalStateException(Integer.valueOf(this.sleepCount).toString());
this.sleepCount = 1;
private void addNextUnit(Object newElement) {
try {
wait(250);
} catch (InterruptedException ignore) {
// ignore
this.units.put(newElement);
} catch (InterruptedException interrupt) {
throw new RuntimeException(interrupt);
}
this.sleepCount = 0;
}

this.units[this.availableIndex++] = newElement;
if (this.availableIndex >= this.size)
this.availableIndex = 0;
if (this.sleepCount <= -1)
notify(); // wake up writing thread to accept next unit - could be the last one - must avoid deadlock
}
private Object lastSignal = null;

public CompilationUnitDeclaration removeNextUnit() throws Error {
CompilationUnitDeclaration next = null;
boolean yield = false;
synchronized (this) {
next = this.units[this.currentIndex];
if (next == null || this.caughtException != null) {
do {
if (this.processingTask == null) {
if (this.caughtException != null) {
// rethrow the caught exception from the processingThread in the main compiler thread
if (this.caughtException instanceof Error)
throw (Error) this.caughtException;
throw (RuntimeException) this.caughtException;
}
return null;
/** returns null when no more elements can be expected **/
public Collection<CompilationUnitDeclaration> removeNextUnits() throws Error {
List<CompilationUnitDeclaration> elements = new ArrayList<>();
do {
Object next = this.lastSignal;
this.lastSignal = null;
try {
// wait until at least 1 element is available:
if (next ==null) {
next = this.units.take();
}
//System.out.print('r');
//if (this.sleepCount > 0) throw new IllegalStateException(Integer.valueOf(this.sleepCount).toString());
this.sleepCount = -1;
try {
wait(100);
} catch (InterruptedException ignore) {
// ignore
while (next instanceof CompilationUnitDeclaration cu) {
elements.add(cu);
// optionally read more elements if already available:
next = this.units.poll();
}
this.sleepCount = 0;
next = this.units[this.currentIndex];
} while (next == null);
}

this.units[this.currentIndex++] = null;
if (this.currentIndex >= this.size)
this.currentIndex = 0;
if (this.sleepCount >= 1 && ++this.sleepCount > 4) {
notify(); // wake up processing thread to add next unit but only after removing some elements first
yield = this.sleepCount > 8;
}
if (!elements.isEmpty()) {
if (next !=null) {
// defer any stop signal until all CompilationUnitDeclaration read
this.lastSignal= next;
}
return elements;
}
} catch (InterruptedException interrupt) {
throw new RuntimeException(interrupt);
}
if (next instanceof Error error) {
throw error;
}
if (next instanceof RuntimeException runtimeException) {
throw runtimeException;
}
if (next == STOP_SIGNAL) {
return null;
}
throw new IllegalStateException(String.valueOf(next));
} while (true);
}
if (yield)
Thread.yield();
return next;
}

private void compile() {
int unitIndex = this.startingIndex;
synchronized (this) { // wait until processingTask is assigned
@SuppressWarnings("unused")
Future<?> p = this.processingTask;
}
boolean noAnnotations = this.compiler.annotationProcessorManager == null;
while (this.processingTask != null) {
this.unitToProcess = null;
int index = -1;
boolean cleanup = noAnnotations || this.compiler.shouldCleanup(unitIndex);
private void processing() {
try {
synchronized (this) {
if (this.processingTask == null) return;
int unitIndex = this.startingIndex;
boolean noAnnotations = this.compiler.annotationProcessorManager == null;
while (true) {
CompilationUnitDeclaration unitToProcess;
int index = -1;
boolean cleanup = noAnnotations || this.compiler.shouldCleanup(unitIndex);
try {
synchronized (this) {
unitToProcess = this.compiler.getUnitToProcess(unitIndex);
this.lastUnitToProcess = unitToProcess;
if (unitToProcess == null) {
break;
}
index = unitIndex++;
if (unitToProcess.compilationResult.hasBeenAccepted) {
continue;
}
}

this.unitToProcess = this.compiler.getUnitToProcess(unitIndex);
if (this.unitToProcess == null) {
this.processingTask = null;
return;
}
index = unitIndex++;
if (this.unitToProcess.compilationResult.hasBeenAccepted)
continue;
}
try {
this.compiler.reportProgress(Messages.bind(Messages.compilation_processing,
new String(unitToProcess.getFileName())));
if (this.compiler.options.verbose)
this.compiler.out.println(Messages.bind(Messages.compilation_process,
new String[] { String.valueOf(index + 1), String.valueOf(this.compiler.totalUnits),
new String(unitToProcess.getFileName()) }));
try {
this.compiler.process(unitToProcess, index);
} catch (AbortCompilation abortCompilation) {
unitToProcess.cleanUp();
throw abortCompilation;
} catch (Error | RuntimeException uncheckedThrowable) {
throw new RuntimeException(
"Internal Error compiling " + new String(unitToProcess.getFileName()), //$NON-NLS-1$
uncheckedThrowable);
}
} finally {
// cleanup compilation unit result, but only if not annotation processed.
if (cleanup) {
unitToProcess.cleanUp();
}
}

try {
this.compiler.reportProgress(Messages.bind(Messages.compilation_processing, new String(this.unitToProcess.getFileName())));
if (this.compiler.options.verbose)
this.compiler.out.println(
Messages.bind(Messages.compilation_process,
new String[] {
String.valueOf(index + 1),
String.valueOf(this.compiler.totalUnits),
new String(this.unitToProcess.getFileName())
}));
try {
this.compiler.process(this.unitToProcess, index);
} catch (AbortCompilation keptCancelation) {
throw keptCancelation;
} catch (Error | RuntimeException e) {
throw new RuntimeException("Internal Error compiling " + new String(this.unitToProcess.getFileName()), e); //$NON-NLS-1$
addNextUnit(unitToProcess);
} catch (Error | RuntimeException uncheckedThrowable) {
this.units.clear(); // make sure there is room for a premature stop signal
addNextUnit(uncheckedThrowable);
return;
}
} finally {
// cleanup compilation unit result, but only if not annotation processed.
if (this.unitToProcess != null && cleanup)
this.unitToProcess.cleanUp();
}

addNextUnit(this.unitToProcess);
} catch (Error | RuntimeException e) {
synchronized (this) {
this.processingTask = null;
this.caughtException = e;
}
return;
} finally {
addNextUnit(STOP_SIGNAL);
}
}
}
synchronized CompilationUnitDeclaration getUnitToProcess(){
return this.lastUnitToProcess;
}

public void shutdown() {
try {
Future<?> t = null;
synchronized (this) {
t = this.processingTask;
if (t != null) {
// stop processing on error:
this.processingTask = null;
notifyAll();
}
}
if (t != null) {
t.get(250, TimeUnit.MILLISECONDS); // do not wait forever
@Override
public void close() {
try {
this.processingTask.get(250, TimeUnit.MILLISECONDS); // do not wait forever
} catch (InterruptedException | ExecutionException | TimeoutException ignored) {
// ignore
}
} catch (InterruptedException | ExecutionException | TimeoutException ignored) {
// ignore
}
}
}
Loading

0 comments on commit c9812d0

Please sign in to comment.