Skip to content

Commit

Permalink
Merge pull request #1052 from ably/ECO-5163/fix-duplicate-messages
Browse files Browse the repository at this point in the history
[ECO-5163] fix: duplicated messages because of duplicated attach message
  • Loading branch information
ttypic authored Dec 10, 2024
2 parents aad1c87 + f653d2d commit 6a2ec04
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 3 deletions.
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo());
}

// (RTL4i)
if (connectionManager.getConnectionState().state == ConnectionState.connecting
|| connectionManager.getConnectionState().state == ConnectionState.disconnected) {
if (listener != null) {
on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed));
}
setState(ChannelState.attaching, null);
return;
}

/* send attach request and pending state */
Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request");
ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,9 +1684,14 @@ private void sendImpl(QueuedMessage msg) throws AblyException {

private void sendQueuedMessages() {
synchronized(this) {
while(queuedMessages.size() > 0) {
while(!queuedMessages.isEmpty()) {
try {
sendImpl(queuedMessages.get(0));
QueuedMessage message = queuedMessages.get(0);
// Do not send attach message from queued messages to prevent duplication
// (we always send attach on connect event)
if (message.msg.action != ProtocolMessage.Action.attach) {
sendImpl(message);
}
} catch (AblyException e) {
Log.e(TAG, "sendQueuedMessages(): Unexpected error sending queued messages", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void connect_fail_notfound_error() throws AblyException {
public void connect_fail_authorized_error() throws AblyException {
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.appId + ".invalid_key_id:invalid_key_value");
String keyId = testVars.keys[0].keyName.split("\\.")[1];
ClientOptions opts = createOptions(testVars.appId + "." + keyId + ":invalid_key_value");
ably = new AblyRealtime(opts);
ConnectionWaiter connectionWaiter = new ConnectionWaiter(ably.connection);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.MessageAction;
import io.ably.lib.types.MessageExtras;
import io.ably.lib.types.Param;
Expand Down Expand Up @@ -995,6 +997,10 @@ public void should_have_serial_action_createdAt() throws AblyException {
msgComplete.onSuccess();
});

CompletionWaiter attachListener = new CompletionWaiter();
channel.attach(attachListener);
assertNull(attachListener.waitFor(1, 10_000));

/* publish to the channel */
JsonObject chatMessage = new JsonObject();
chatMessage.addProperty("text", "hello world!");
Expand All @@ -1010,4 +1016,44 @@ public void should_have_serial_action_createdAt() throws AblyException {
assertNull(msgComplete.waitFor(1, 10_000));
}
}

@Test
public void should_not_duplicate_messages() throws Exception {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
String testChannelName = "my-channel" + System.currentTimeMillis();
try (AblyRest rest = new AblyRest(opts)) {
final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName);

Message[] messages = new Message[] {
new Message("name", "message 1"),
new Message("name", "message 2"),
new Message("name", "message 3"),
};

channel.publish(messages);
}

try (AblyRealtime realtime = new AblyRealtime(opts)) {
final ChannelOptions options = new ChannelOptions();
options.params = new HashMap<>();
options.params.put("rewind", "10");
final Channel channel = realtime.channels.get(testChannelName, options);
final CompletionWaiter completionWaiter = new CompletionWaiter();
final AtomicInteger counter = new AtomicInteger();

channel.subscribe(message -> {
int value = counter.incrementAndGet();
if (value == 3) completionWaiter.onSuccess();
});

completionWaiter.waitFor();

assertEquals("Should be exactly 3 messages", 3, counter.get());

Thread.sleep(1500);

assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get());
}
}

}

0 comments on commit 6a2ec04

Please sign in to comment.