From c7c388c7e765289d4be6e1169c3da1522fc7d5dc Mon Sep 17 00:00:00 2001 From: David Date: Sat, 24 May 2025 08:43:36 -0500 Subject: [PATCH] Collect aggregate results into ConcurrentLinkedQueue --- .../aggregation/CollectedResultsResolver.java | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/namehillsoftware/handoff/promises/aggregation/CollectedResultsResolver.java b/src/main/java/com/namehillsoftware/handoff/promises/aggregation/CollectedResultsResolver.java index 81291a3..ab5cc80 100644 --- a/src/main/java/com/namehillsoftware/handoff/promises/aggregation/CollectedResultsResolver.java +++ b/src/main/java/com/namehillsoftware/handoff/promises/aggregation/CollectedResultsResolver.java @@ -4,23 +4,23 @@ import com.namehillsoftware.handoff.promises.Promise; import com.namehillsoftware.handoff.promises.response.ImmediateResponse; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.LinkedList; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; public class CollectedResultsResolver implements ImmediateResponse { - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final LinkedList collectedResults = new LinkedList<>(); - private final int expectedResultSize; + private final ConcurrentLinkedQueue> collectedResults = new ConcurrentLinkedQueue<>(); + private final int expectedSize; + private final AtomicInteger remainingResults; private final Messenger> collectionMessenger; public CollectedResultsResolver(Messenger> collectionMessenger, Collection> promises) { this.collectionMessenger = collectionMessenger; - expectedResultSize = promises.size(); + expectedSize = promises.size(); + remainingResults = new AtomicInteger(expectedSize); for (Promise promise : promises) promise.then(this); @@ -30,40 +30,26 @@ public CollectedResultsResolver(Messenger> collectionMesseng @Override public Void respond(TResult result) { - final Lock lock = readWriteLock.writeLock(); - lock.lock(); - try { - collectedResults.add(result); - } finally { - lock.unlock(); - } + collectedResults.add(new ResultBox<>(result)); - attemptResolve(); + if (remainingResults.decrementAndGet() <= 0 && collectionMessenger != null) + collectionMessenger.sendResolution(getResults()); return null; } - private void attemptResolve() { - if (collectionMessenger == null) return; - - final Lock lock = readWriteLock.readLock(); - lock.lock(); - try { - if (collectedResults.size() < expectedResultSize) return; - - collectionMessenger.sendResolution(collectedResults); - } finally { - lock.unlock(); + public final Collection getResults() { + final ArrayList results = new ArrayList<>(expectedSize); + for (ResultBox box : collectedResults) { + results.add(box.result); } + return results; } - public final Collection getResults() { - final Lock lock = readWriteLock.readLock(); - lock.lock(); - try { - return collectedResults; - } finally { - lock.unlock(); + private static class ResultBox { + final TResult result; + ResultBox(TResult result) { + this.result = result; } } }