Skip to content

Commit 0ea8f4c

Browse files
committed
Fixed possible race condition with progress monitor
1 parent ca704e0 commit 0ea8f4c

File tree

1 file changed

+28
-30
lines changed

1 file changed

+28
-30
lines changed

tdb2-maven-plugin/src/main/java/org/aksw/maven/plugin/jena/Tdb2MojoShared.java

+28-30
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@ public class Tdb2MojoShared extends AbstractMojo {
110110
@Parameter(defaultValue = "nt,ttl,nq,trig,owl,nt.gz,ttl.gz,nq.gz,trig.gz,owl.gz,nt.bz2,ttl.bz2,nq.bz2,trig.bz2,owl.bz2")
111111
private String includeTypes;
112112

113-
114-
// protected boolean metaDataGraph;
115-
116-
117113
@Parameter(defaultValue = "${project.build.directory}/tdb2")
118114
private File outputFolder;
119115

@@ -253,13 +249,8 @@ public void executeActual() throws Exception {
253249
Node destNode = update.getDest();
254250

255251
String destNodeLabel = getGraphLabel(destNode);
256-
257-
// logger.info("Preparing TDB2 workload: " + update.getSource() + " -> " + update.getDest());
258-
259252
boolean isAlreadyLoaded = loadState.getFileStates().containsKey(source);
260-
// boolean isAlreadyLoaded = destNode == null
261-
// ? false
262-
// : Txn.calculateRead(dg, () -> dg.containsGraph(destNode));
253+
263254
if (isAlreadyLoaded) {
264255
logger.info("Skipping TDB2 workload (already loaded): " + source + " -> " + destNodeLabel);
265256
} else {
@@ -448,33 +439,40 @@ public static void packageTdb2(Consumer<String> fileCallback, Path fileToWrite,
448439
fileCallback.accept(msg);
449440
};
450441

451-
ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
452-
try {
453-
for (Entry<String, Path> e : fileSet.entrySet()) {
454-
String relPathStr = e.getKey();
455-
Path file = e.getValue();
456-
457-
Path displayPath = basePath == null ? file : basePath.relativize(file);
442+
for (Entry<String, Path> e : fileSet.entrySet()) {
443+
String relPathStr = e.getKey();
444+
Path file = e.getValue();
445+
Path displayPath = basePath == null ? file : basePath.relativize(file);
458446

459-
++tracker.currentFileIdx;
460-
tracker.currentFileName = displayPath.toString();
461-
tracker.currentFileSize = Files.size(file);
447+
++tracker.currentFileIdx;
448+
tracker.currentFileName = displayPath.toString();
449+
tracker.currentFileSize = Files.size(file);
462450

463-
TarArchiveEntry tarEntry = new TarArchiveEntry(file, relPathStr);
464-
tOut.putArchiveEntry(tarEntry);
451+
TarArchiveEntry tarEntry = new TarArchiveEntry(file, relPathStr);
452+
tOut.putArchiveEntry(tarEntry);
465453

466-
try (CountingInputStream cin = new CountingInputStream(Files.newInputStream(file))) {
467-
tracker.currentFileProgress = () -> cin.getCount();
468-
ScheduledFuture<?> future = ses.scheduleAtFixedRate(monitorProgress, 1, 10, TimeUnit.SECONDS);
454+
ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
455+
try (CountingInputStream cin = new CountingInputStream(Files.newInputStream(file))) {
456+
tracker.currentFileProgress = () -> cin.getCount();
457+
ScheduledFuture<?> future = ses.scheduleAtFixedRate(monitorProgress, 1, 10, TimeUnit.SECONDS);
458+
try {
469459
cin.transferTo(tOut);
460+
} finally {
470461
future.cancel(false);
471462
}
472-
monitorProgress.run();
473-
tracker.currentFileStartProgress += tracker.currentFileProgress.getAsLong();
474-
tOut.closeArchiveEntry();
463+
} finally {
464+
ses.shutdown();
465+
try {
466+
if (!ses.awaitTermination(5, TimeUnit.SECONDS)) {
467+
throw new RuntimeException("Progress monitor: Failed to stop.");
468+
}
469+
} catch (InterruptedException e1) {
470+
throw new RuntimeException("Progress monitor: Unexpected interruption", e1);
471+
}
475472
}
476-
} finally {
477-
ses.shutdown();
473+
monitorProgress.run();
474+
tracker.currentFileStartProgress += tracker.currentFileProgress.getAsLong();
475+
tOut.closeArchiveEntry();
478476
}
479477
tOut.finish();
480478
}

0 commit comments

Comments
 (0)