Skip to content

Commit

Permalink
using onFinished with LocalJobExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Nov 28, 2024
1 parent 93f7f7f commit f6a47fd
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ public <R> Publisher<R> executeConcurrently(JobRunContext context, int concurren
return Mono.fromCallable(() -> {
int increasedCount = counts.computeIfAbsent(context.getStatus().getName(), s -> new AtomicInteger(0)).incrementAndGet();
LOGGER.trace("Increased count for job {} limited to {}: {}", context.getStatus().getName(), concurrency, increasedCount);

if (increasedCount > concurrency) {
counts.get(context.getStatus().getName()).decrementAndGet();
return null;
}

R result = supplier.call();
int decreasedCount = counts.get(context.getStatus().getName()).decrementAndGet();
LOGGER.trace("Decreased count for job {} limited to {}: {}", context.getStatus().getName(), concurrency, decreasedCount);
return result;
context.onFinished(s -> {
int decreasedCount = counts.get(context.getStatus().getName()).decrementAndGet();
LOGGER.trace("Decreased count for job {} limited to {}: {}", context.getStatus().getName(), concurrency, decreasedCount);
});

return supplier.call();
}).subscribeOn(Schedulers.fromExecutor(executorService)).flux();
}

Expand Down

0 comments on commit f6a47fd

Please sign in to comment.