diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b84ba7dc0..9e0c9974c 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -243,6 +243,16 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li throw AblyException.fromErrorInfo(connectionManager.getStateErrorInfo()); } + // (RTL4i) + if (connectionManager.getConnectionState().state == ConnectionState.connecting + || connectionManager.getConnectionState().state == ConnectionState.disconnected) { + if (listener != null) { + on(new ChannelStateCompletionListener(listener, ChannelState.attached, ChannelState.failed)); + } + setState(ChannelState.attaching, null); + return; + } + /* send attach request and pending state */ Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request"); ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java index 2d00524f1..600f5fa7e 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java @@ -13,12 +13,16 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; +import io.ably.lib.types.ChannelOptions; import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageExtras; import io.ably.lib.types.Param; @@ -1010,4 +1014,41 @@ public void should_have_serial_action_createdAt() throws AblyException { assertNull(msgComplete.waitFor(1, 10_000)); } } + + @Test + public void should_not_duplicate_messages() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + String testChannelName = "my-channel" + System.currentTimeMillis(); + try (AblyRest rest = new AblyRest(opts)) { + final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName); + List testMessagesData = List.of("message 1", "message 2", "message 3"); + List messages = testMessagesData + .stream() + .map(data -> new Message("name", data)) + .collect(Collectors.toList()); + channel.publish(messages.toArray(Message[]::new)); + } + + try (AblyRealtime realtime = new AblyRealtime(opts)) { + final ChannelOptions options = new ChannelOptions(); + options.params = Map.of("rewind", "10"); + final Channel channel = realtime.channels.get(testChannelName, options); + final CompletionWaiter completionWaiter = new CompletionWaiter(); + final AtomicInteger counter = new AtomicInteger(); + + channel.subscribe(message -> { + int value = counter.incrementAndGet(); + if (value == 3) completionWaiter.onSuccess(); + }); + + completionWaiter.waitFor(); + + assertEquals("Should be exactly 3 messages", 3, counter.get()); + + Thread.sleep(1500); + + assertEquals("Should be exactly 3 messages even after 1.5 sec wait", 3, counter.get()); + } + } + }