Skip to content

Commit 00ee72f

Browse files
authored
Merge pull request #983 from ably/feature/no-connection-serial
Feature/no connection serial
2 parents accf93b + 7e7485e commit 00ee72f

27 files changed

+986
-854
lines changed

lib/src/main/java/io/ably/lib/http/HttpCore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ <T> T httpExecute(HttpURLConnection conn, String method, Param[] headers, Reques
209209
if(!acceptSet) { conn.setRequestProperty(HttpConstants.Headers.ACCEPT, HttpConstants.ContentTypes.JSON); }
210210

211211
/* pass required headers */
212-
conn.setRequestProperty(Defaults.ABLY_VERSION_HEADER, Defaults.ABLY_VERSION);
212+
conn.setRequestProperty(Defaults.ABLY_PROTOCOL_VERSION_HEADER, Defaults.ABLY_PROTOCOL_VERSION); // RSC7a
213213
conn.setRequestProperty(Defaults.ABLY_AGENT_HEADER, AgentHeaderCreator.create(options.agents, platformAgentProvider));
214214

215215
/* prepare request body */

lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.ArrayList;
44
import java.util.HashMap;
5-
import java.util.Iterator;
65
import java.util.List;
76
import java.util.Map;
87

@@ -15,8 +14,10 @@
1514
import io.ably.lib.types.ErrorInfo;
1615
import io.ably.lib.types.ProtocolMessage;
1716
import io.ably.lib.types.ReadOnlyMap;
17+
import io.ably.lib.types.RecoveryKeyContext;
1818
import io.ably.lib.util.InternalMap;
1919
import io.ably.lib.util.Log;
20+
import io.ably.lib.util.StringUtils;
2021

2122
/**
2223
* A client that extends the functionality of the {@link AblyRest} and provides additional realtime-specific features.
@@ -71,6 +72,14 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
7172
}
7273
});
7374

75+
if (!StringUtils.isNullOrEmpty(options.recover)) {
76+
RecoveryKeyContext recoveryKeyContext = RecoveryKeyContext.decode(options.recover);
77+
if (recoveryKeyContext != null) {
78+
setChannelSerialsFromRecoverOption(recoveryKeyContext.getChannelSerials()); // RTN16j
79+
connection.connectionManager.msgSerial = recoveryKeyContext.getMsgSerial(); //RTN16f
80+
}
81+
}
82+
7483
if(options.autoConnect) connection.connect();
7584
}
7685

@@ -233,9 +242,8 @@ public void onMessage(ProtocolMessage msg) {
233242

234243
@Override
235244
public void suspendAll(ErrorInfo error, boolean notifyStateChange) {
236-
for(Iterator<Map.Entry<String, Channel>> it = map.entrySet().iterator(); it.hasNext(); ) {
237-
Map.Entry<String, Channel> entry = it.next();
238-
entry.getValue().setSuspended(error, notifyStateChange);
245+
for (Channel channel : map.values()) {
246+
channel.setSuspended(error, notifyStateChange);
239247
}
240248
}
241249

@@ -245,7 +253,7 @@ public void suspendAll(ErrorInfo error, boolean notifyStateChange) {
245253
* @param queuedMessages Queued messages transferred from ConnectionManager
246254
*/
247255
@Override
248-
public void transferToChannels(List<ConnectionManager.QueuedMessage> queuedMessages) {
256+
public void transferToChannelQueue(List<ConnectionManager.QueuedMessage> queuedMessages) {
249257
final Map<String, List<ConnectionManager.QueuedMessage>> channelQueueMap = new HashMap<>();
250258
for (ConnectionManager.QueuedMessage queuedMessage : queuedMessages) {
251259
final String channelName = queuedMessage.msg.channel;
@@ -255,16 +263,10 @@ public void transferToChannels(List<ConnectionManager.QueuedMessage> queuedMessa
255263
channelQueueMap.get(channelName).add(queuedMessage);
256264
}
257265

258-
for (Map.Entry<String, Channel> channelEntry : map.entrySet()) {
259-
Channel channel = channelEntry.getValue();
266+
for (Channel channel : map.values()) {
260267
if (channel.state.isReattachable()) {
261268
Log.d(TAG, "reAttach(); channel = " + channel.name);
262-
263-
if (channelQueueMap.containsKey(channel.name)){
264-
channel.transferQueuedPresenceMessages(channelQueueMap.get(channel.name));
265-
}else {
266-
channel.transferQueuedPresenceMessages(null);
267-
}
269+
channel.transferQueuedPresenceMessages(channelQueueMap.getOrDefault(channel.name, null));
268270
}
269271
}
270272
}
@@ -274,6 +276,27 @@ private void clear() {
274276
}
275277
}
276278

279+
protected void setChannelSerialsFromRecoverOption(Map<String, String> serials) {
280+
for (Map.Entry<String, String> entry : serials.entrySet()) {
281+
String channelName = entry.getKey();
282+
String channelSerial = entry.getValue();
283+
Channel channel = this.channels.get(channelName);
284+
if (channel != null) {
285+
channel.properties.channelSerial = channelSerial;
286+
}
287+
}
288+
}
289+
290+
protected Map<String, String> getChannelSerials() {
291+
Map<String, String> channelSerials = new HashMap<>();
292+
for (Channel channel : this.channels.values()) {
293+
if (channel.state == ChannelState.attached) {
294+
channelSerials.put(channel.name, channel.properties.channelSerial);
295+
}
296+
}
297+
return channelSerials;
298+
}
299+
277300
/********************
278301
* internal
279302
********************/

0 commit comments

Comments
 (0)