Skip to content

Commit ca704e0

Browse files
committed
Progress Monitor now runs async
1 parent 2b94874 commit ca704e0

File tree

1 file changed

+76
-61
lines changed

1 file changed

+76
-61
lines changed

tdb2-maven-plugin/src/main/java/org/aksw/maven/plugin/jena/Tdb2MojoShared.java

+76-61
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@
2222
import java.util.Map;
2323
import java.util.Map.Entry;
2424
import java.util.Set;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.ScheduledFuture;
28+
import java.util.concurrent.TimeUnit;
2529
import java.util.function.Consumer;
30+
import java.util.function.LongSupplier;
2631

2732
import org.aksw.commons.io.util.FileUtils;
2833
import org.aksw.commons.io.util.FileUtils.OverwritePolicy;
@@ -64,6 +69,8 @@
6469
import org.eclipse.aether.util.artifact.JavaScopes;
6570
import org.eclipse.aether.util.filter.DependencyFilterUtils;
6671

72+
import com.google.common.io.CountingInputStream;
73+
6774
@Mojo(name = "load", defaultPhase = LifecyclePhase.PACKAGE)
6875
public class Tdb2MojoShared extends AbstractMojo {
6976

@@ -374,13 +381,17 @@ private static class Tracker {
374381
int maxDataPoints = 10;
375382
Deque<Entry<Long, Long>> timeAndTotalProgress = new ArrayDeque<>(maxDataPoints);
376383

384+
377385
long totalSize = -1;
378386
int fileCount = -1;
379-
long totalProgress = 0;
387+
long currentFileStartProgress = 0; // Progress before the current file
388+
LongSupplier currentFileProgress;
389+
// totalProgress = currentFileStartProgress + currentFileProgress.getAsLong()
390+
380391
String currentFileName = null;
381392
int currentFileIdx = 0;
382393
long currentFileSize = -1;
383-
long currentFileProgress = -1;
394+
// long currentFileProgress = -1;
384395
}
385396

386397
public static void packageTdb2(Consumer<String> fileCallback, Path fileToWrite, Map<String, Path> fileSet, Path basePath) throws IOException {
@@ -396,71 +407,75 @@ public static void packageTdb2(Consumer<String> fileCallback, Path fileToWrite,
396407
tracker.fileCount = fileSet.size();
397408

398409
// long startTime = System.currentTimeMillis();
410+
Runnable monitorProgress = () -> {
411+
long elapsedTime = System.currentTimeMillis();
412+
long currentFileProgress = tracker.currentFileProgress.getAsLong();
413+
long totalProgress = tracker.currentFileStartProgress + currentFileProgress;
414+
415+
float fileRatio = tracker.currentFileSize == 0
416+
? 1.0f
417+
: currentFileProgress / (float)tracker.currentFileSize;
418+
419+
float totalRatio = tracker.totalSize == 0
420+
? 1.0f
421+
: totalProgress / (float)tracker.totalSize;
422+
423+
Deque<Entry<Long, Long>> points = tracker.timeAndTotalProgress;
424+
if (points.size() >= tracker.maxDataPoints) {
425+
points.removeFirst();
426+
}
427+
Entry<Long, Long> newEntry = Map.entry(elapsedTime, totalProgress);
428+
points.addLast(newEntry);
429+
Entry<Long, Long> oldEntry = points.getFirst();
430+
431+
float relDuration = (newEntry.getKey() - oldEntry.getKey()) * 0.001f; // ms to seconds
432+
long relAmount = newEntry.getValue() - oldEntry.getValue();
433+
float throughput = relDuration < 0.001f ? 0f : relAmount / relDuration;
434+
435+
long remaining = tracker.totalSize - totalProgress;
436+
long etaInSeconds = throughput < 0.001f ? Long.MAX_VALUE : (long)(remaining / throughput);
437+
if (etaInSeconds == 0 && remaining > 0) {
438+
etaInSeconds = 1;
439+
}
440+
String etaStr = etaInSeconds == Long.MAX_VALUE
441+
? "infinite"
442+
: toString(Duration.ofSeconds(etaInSeconds));
399443

400-
TimeOutDeferredAction scheduler = TimeOutDeferredAction.of(
401-
Duration.ofSeconds(10).toMillis(),
402-
() -> {
403-
long elapsedTime = System.currentTimeMillis();
404-
long totalProgress = tracker.totalProgress;
444+
String msg = String.format("Adding file %d/%d %s %.2f%% - Total %.2f%% - ETA %s",
445+
tracker.currentFileIdx, tracker.fileCount, tracker.currentFileName,
446+
fileRatio * 100.0f, totalRatio * 100.0f, etaStr);
405447

406-
float fileRatio = tracker.currentFileSize == 0
407-
? 1.0f
408-
: tracker.currentFileProgress / (float)tracker.currentFileSize;
448+
fileCallback.accept(msg);
449+
};
409450

410-
float totalRatio = tracker.totalSize == 0
411-
? 1.0f
412-
: totalProgress / (float)tracker.totalSize;
451+
ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
452+
try {
453+
for (Entry<String, Path> e : fileSet.entrySet()) {
454+
String relPathStr = e.getKey();
455+
Path file = e.getValue();
413456

414-
Deque<Entry<Long, Long>> points = tracker.timeAndTotalProgress;
415-
if (points.size() >= tracker.maxDataPoints) {
416-
points.removeFirst();
417-
}
418-
Entry<Long, Long> newEntry = Map.entry(elapsedTime, totalProgress);
419-
points.addLast(newEntry);
420-
Entry<Long, Long> oldEntry = points.getFirst();
421-
422-
float relDuration = (newEntry.getKey() - oldEntry.getKey()) * 0.001f; // ms to seconds
423-
long relAmount = newEntry.getValue() - oldEntry.getValue();
424-
float throughput = relDuration < 0.001f ? 0f : relAmount / relDuration;
425-
426-
long remaining = tracker.totalSize - totalProgress;
427-
long etaInSeconds = throughput < 0.001f ? Long.MAX_VALUE : (long)(remaining / throughput);
428-
if (etaInSeconds == 0 && remaining > 0) {
429-
etaInSeconds = 1;
457+
Path displayPath = basePath == null ? file : basePath.relativize(file);
458+
459+
++tracker.currentFileIdx;
460+
tracker.currentFileName = displayPath.toString();
461+
tracker.currentFileSize = Files.size(file);
462+
463+
TarArchiveEntry tarEntry = new TarArchiveEntry(file, relPathStr);
464+
tOut.putArchiveEntry(tarEntry);
465+
466+
try (CountingInputStream cin = new CountingInputStream(Files.newInputStream(file))) {
467+
tracker.currentFileProgress = () -> cin.getCount();
468+
ScheduledFuture<?> future = ses.scheduleAtFixedRate(monitorProgress, 1, 10, TimeUnit.SECONDS);
469+
cin.transferTo(tOut);
470+
future.cancel(false);
430471
}
431-
String etaStr = etaInSeconds == Long.MAX_VALUE
432-
? "infinite"
433-
: toString(Duration.ofSeconds(etaInSeconds));
434-
435-
String msg = String.format("Adding file %d/%d %s %.2f%% - Total %.2f%% - ETA %s",
436-
tracker.currentFileIdx, tracker.fileCount, tracker.currentFileName,
437-
fileRatio * 100.0f, totalRatio * 100.0f, etaStr);
438-
439-
fileCallback.accept(msg);
440-
});
441-
442-
for (Entry<String, Path> e : fileSet.entrySet()) {
443-
String relPathStr = e.getKey();
444-
Path file = e.getValue();
445-
446-
Path displayPath = basePath == null ? file : basePath.relativize(file);
447-
448-
++tracker.currentFileIdx;
449-
tracker.currentFileProgress = 0;
450-
tracker.currentFileName = displayPath.toString();
451-
tracker.currentFileSize = Files.size(file);
452-
453-
TarArchiveEntry tarEntry = new TarArchiveEntry(file, relPathStr);
454-
tOut.putArchiveEntry(tarEntry);
455-
FileUtils.copy(file, tOut, contribBytes -> {
456-
tracker.currentFileProgress += contribBytes;
457-
tracker.totalProgress += contribBytes;
458-
scheduler.tick();
459-
});
460-
scheduler.forceTick();
461-
tOut.closeArchiveEntry();
472+
monitorProgress.run();
473+
tracker.currentFileStartProgress += tracker.currentFileProgress.getAsLong();
474+
tOut.closeArchiveEntry();
475+
}
476+
} finally {
477+
ses.shutdown();
462478
}
463-
464479
tOut.finish();
465480
}
466481
}

0 commit comments

Comments
 (0)