|
13 | 13 | import java.util.HashMap;
|
14 | 14 | import java.util.List;
|
15 | 15 | import java.util.Locale;
|
| 16 | +import java.util.Map; |
| 17 | +import java.util.concurrent.atomic.AtomicInteger; |
| 18 | +import java.util.stream.Collectors; |
16 | 19 |
|
17 | 20 | import com.google.gson.Gson;
|
18 | 21 | import com.google.gson.JsonArray;
|
19 | 22 | import com.google.gson.JsonElement;
|
20 | 23 | import com.google.gson.JsonObject;
|
21 | 24 | import com.google.gson.JsonPrimitive;
|
| 25 | +import io.ably.lib.types.ChannelOptions; |
22 | 26 | import io.ably.lib.types.MessageAction;
|
23 | 27 | import io.ably.lib.types.MessageExtras;
|
24 | 28 | import io.ably.lib.types.Param;
|
@@ -1010,4 +1014,42 @@ public void should_have_serial_action_createdAt() throws AblyException {
|
1010 | 1014 | assertNull(msgComplete.waitFor(1, 10_000));
|
1011 | 1015 | }
|
1012 | 1016 | }
|
| 1017 | + |
| 1018 | + @Test |
| 1019 | + public void should_not_duplicate_messages() throws Exception { |
| 1020 | + ClientOptions opts = createOptions(testVars.keys[0].keyStr); |
| 1021 | + opts.logLevel = 2; |
| 1022 | + String testChannelName = "my-channel" + System.currentTimeMillis(); |
| 1023 | + try (AblyRest rest = new AblyRest(opts)) { |
| 1024 | + final io.ably.lib.rest.Channel channel = rest.channels.get(testChannelName); |
| 1025 | + List<String> testMessagesData = List.of("message 1", "message 2", "message 3"); |
| 1026 | + List<Message> messages = testMessagesData |
| 1027 | + .stream() |
| 1028 | + .map(data -> new Message("name", data)) |
| 1029 | + .collect(Collectors.toList()); |
| 1030 | + channel.publish(messages.toArray(Message[]::new)); |
| 1031 | + } |
| 1032 | + |
| 1033 | + try (AblyRealtime realtime = new AblyRealtime(opts)) { |
| 1034 | + final ChannelOptions options = new ChannelOptions(); |
| 1035 | + options.params = Map.of("rewind", "10"); |
| 1036 | + final Channel channel = realtime.channels.get(testChannelName, options); |
| 1037 | + final CompletionWaiter completionWaiter = new CompletionWaiter(); |
| 1038 | + final AtomicInteger counter = new AtomicInteger(); |
| 1039 | + |
| 1040 | + channel.subscribe(message -> { |
| 1041 | + int value = counter.incrementAndGet(); |
| 1042 | + if (value == 3) completionWaiter.onSuccess(); |
| 1043 | + }); |
| 1044 | + |
| 1045 | + completionWaiter.waitFor(); |
| 1046 | + |
| 1047 | + assertEquals("Should be exactly 3 messages",3, counter.get()); |
| 1048 | + |
| 1049 | + Thread.sleep(1500); |
| 1050 | + |
| 1051 | + assertEquals("Should be exactly 3 messages even after 1.5 sec wait",3, counter.get()); |
| 1052 | + } |
| 1053 | + } |
| 1054 | + |
1013 | 1055 | }
|
0 commit comments