diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java index 94fff39e5f..ce828ddc13 100644 --- a/src/org/jgroups/protocols/UNICAST3.java +++ b/src/org/jgroups/protocols/UNICAST3.java @@ -155,6 +155,8 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{ protected final LongAdder loopbed_back_msgs=new LongAdder(); + // Queues messages until a {@link ReceiverEntry} has been created. Queued messages are then removed from + // the cache and added to the ReceiverEntry protected final MessageCache msg_cache=new MessageCache(); protected static final Message DUMMY_OOB_MSG=new EmptyMessage().setFlag(Message.Flag.OOB); @@ -528,7 +530,7 @@ public void up(MessageBatch batch) { if(hdr.first) entry=getReceiverEntry(sender, hdr.seqno(), hdr.first, hdr.connId()); else if(entry == null) { - msg_cache.cache(sender, msg); + msg_cache.add(sender, msg); log.trace("%s: cached %s#%d", local_addr, sender, hdr.seqno()); } } @@ -538,7 +540,7 @@ else if(entry == null) { sendRequestForFirstSeqno(sender); else { if(!msg_cache.isEmpty()) { // quick and dirty check - List queued_msgs=msg_cache.drain(sender); + Collection queued_msgs=msg_cache.drain(sender); if(queued_msgs != null) addQueuedMessages(sender, entry, queued_msgs); } @@ -652,7 +654,7 @@ public Object down(Message msg) { } if(msg.getSrc() == null) - msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr + msg.setSrc(local_addr); // this needs to be done, so we can check whether the message sender is the local_addr if(loopback && Objects.equals(local_addr, dst)) {// https://issues.redhat.com/browse/JGRP-2547 if(msg.isFlagSet(DONT_LOOPBACK)) @@ -792,12 +794,12 @@ public void expired(Address key) { protected void handleDataReceived(final Address sender, long seqno, short conn_id, boolean first, final Message msg) { ReceiverEntry entry=getReceiverEntry(sender, seqno, first, conn_id); if(entry == null) { - msg_cache.cache(sender, msg); + msg_cache.add(sender, msg); log.trace("%s: cached %s#%d", local_addr, sender, seqno); return; } if(!msg_cache.isEmpty()) { // quick and dirty check - List queued_msgs=msg_cache.drain(sender); + Collection queued_msgs=msg_cache.drain(sender); if(queued_msgs != null) addQueuedMessages(sender, entry, queued_msgs); } @@ -822,7 +824,7 @@ protected void addMessage(ReceiverEntry entry, Address sender, long seqno, Messa deliverMessage(msg, sender, seqno); } - protected void addQueuedMessages(final Address sender, final ReceiverEntry entry, List queued_msgs) { + protected void addQueuedMessages(final Address sender, final ReceiverEntry entry, Collection queued_msgs) { for(Message msg: queued_msgs) { UnicastHeader3 hdr=msg.getHeader(this.id); if(hdr.conn_id != entry.conn_id) { @@ -1039,7 +1041,7 @@ protected void handleResendingOfFirstMessage(Address sender, int timestamp) { Message rsp=win.get(win.getLow() +1); if(rsp != null) { // We need to copy the UnicastHeader and put it back into the message because Message.copy() doesn't copy - // the headers and therefore we'd modify the original message in the sender retransmission window + // the headers, and therefore we'd modify the original message in the sender retransmission window // (https://issues.redhat.com/browse/JGRP-965) Message copy=rsp.copy(true, true); UnicastHeader3 hdr=copy.getHeader(this.id); @@ -1482,47 +1484,4 @@ public String toString() { } } - /** - * Used to queue messages until a {@link ReceiverEntry} has been created. Queued messages are then removed from - * the cache and added to the ReceiverEntry - */ - protected class MessageCache { - private final Map> map=new ConcurrentHashMap<>(); - private volatile boolean is_empty=true; - - protected MessageCache cache(Address sender, Message msg) { - List list=map.computeIfAbsent(sender, addr -> new ArrayList<>()); - list.add(msg); - is_empty=false; - return this; - } - - protected List drain(Address sender) { - List list=map.remove(sender); - if(map.isEmpty()) - is_empty=true; - return list; - } - - protected MessageCache clear() { - map.clear(); - is_empty=true; - return this; - } - - /** Returns a count of all messages */ - protected int size() { - return map.values().stream().mapToInt(Collection::size).sum(); - } - - protected boolean isEmpty() { - return is_empty; - } - - public String toString() { - return String.format("%d message(s)", size()); - } - } - - } diff --git a/src/org/jgroups/util/MessageCache.java b/src/org/jgroups/util/MessageCache.java new file mode 100644 index 0000000000..bfed587964 --- /dev/null +++ b/src/org/jgroups/util/MessageCache.java @@ -0,0 +1,55 @@ +package org.jgroups.util; + +import org.jgroups.Address; +import org.jgroups.Message; + +import java.util.Collection; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * A cache associating members and messages + * @author Bela Ban + * @since 5.3.2 + */ +public class MessageCache { + protected final Map> map=new ConcurrentHashMap<>(); + protected volatile boolean is_empty=true; + + public MessageCache add(Address sender, Message msg) { + Queue list=map.computeIfAbsent(sender, addr -> new ConcurrentLinkedQueue<>()); + list.add(msg); + is_empty=false; + return this; + } + + public Collection drain(Address sender) { + if(sender == null) + return null; + Queue queue=map.remove(sender); + if(map.isEmpty()) + is_empty=true; + return queue; + } + + public MessageCache clear() { + map.clear(); + is_empty=true; + return this; + } + + /** Returns a count of all messages */ + public int size() { + return map.values().stream().mapToInt(Collection::size).sum(); + } + + public boolean isEmpty() { + return is_empty; + } + + public String toString() { + return String.format("%d message(s)", size()); + } +} diff --git a/tests/junit-functional/org/jgroups/tests/MessageCacheTest.java b/tests/junit-functional/org/jgroups/tests/MessageCacheTest.java new file mode 100644 index 0000000000..e5357c5a2a --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/MessageCacheTest.java @@ -0,0 +1,56 @@ +package org.jgroups.tests; + +import org.jgroups.Address; +import org.jgroups.Global; +import org.jgroups.Message; +import org.jgroups.ObjectMessage; +import org.jgroups.util.MessageCache; +import org.jgroups.util.Util; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Collection; + +/** + * Tests {@link org.jgroups.util.MessageCache} + * @author Bela Ban + * @since 5.3.2 + */ +@Test(groups= Global.FUNCTIONAL,singleThreaded=true) +public class MessageCacheTest { + protected MessageCache cache; + protected static final Address A=Util.createRandomAddress("A"), B=Util.createRandomAddress("B"), + C=Util.createRandomAddress("C"); + + @BeforeMethod protected void setup() { + cache=new MessageCache(); + } + + public void testCreation() { + assert cache.isEmpty(); + } + + public void testAdd() { + for(int i=1; i <= 5; i++) { + cache.add(A, new ObjectMessage(A, i)); + cache.add(B, new ObjectMessage(B, i+10)); + } + assert !cache.isEmpty(); + assert cache.size() == 10; + } + + public void testDrain() { + testAdd(); + Collection list=cache.drain(null); + assert list == null; + list=cache.drain(C); + assert list == null; + list=cache.drain(B); + assert list.size() == 5; + assert cache.size() == 5; + assert !cache.isEmpty(); + list=cache.drain(A); + assert list.size() == 5; + assert cache.isEmpty(); + } +}