Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4813] fix: race condition in pending message processing #1010

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 17 additions & 24 deletions lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1268,18 +1268,16 @@ private synchronized List<QueuedMessage> extractConnectionQueuePresenceMessages(
*/
private void addPendingMessagesToQueuedMessages(boolean resetMessageSerial) {
synchronized (this) {
// Add messages from pending messages to front of queuedMessages in order to retry them
queuedMessages.addAll(0, pendingMessages.queue);
List<QueuedMessage> allPendingMessages = pendingMessages.popAll();

if (resetMessageSerial){ // failed resume, so all new published messages start with msgSerial = 0
msgSerial = 0; //msgSerial will increase in sendImpl when messages are sent, RTN15c7
pendingMessages.resetStartSerial(0);
} else if(!pendingMessages.queue.isEmpty()) { // pendingMessages needs to expect next msgSerial to be the earliest previously unacknowledged message
msgSerial = pendingMessages.queue.get(0).msg.msgSerial;
pendingMessages.resetStartSerial((int) (msgSerial));
} else if (!allPendingMessages.isEmpty()) { // pendingMessages needs to expect next msgSerial to be the earliest previously unacknowledged message
msgSerial = allPendingMessages.get(0).msg.msgSerial;
}

pendingMessages.queue.clear();
// Add messages from pending messages to front of queuedMessages in order to retry them
queuedMessages.addAll(0, allPendingMessages);
}
}

Expand Down Expand Up @@ -1671,9 +1669,8 @@ private void failQueuedMessages(ErrorInfo reason) {
/**
* A class containing a queue of messages awaiting acknowledgement
*/
private class PendingMessageQueue {
private long startSerial = 0L;
private ArrayList<QueuedMessage> queue = new ArrayList<QueuedMessage>();
private static class PendingMessageQueue {
private final List<QueuedMessage> queue = new ArrayList<>();

public synchronized void push(QueuedMessage msg) {
queue.add(msg);
Expand All @@ -1682,6 +1679,8 @@ public synchronized void push(QueuedMessage msg) {
public void ack(long msgSerial, int count, ErrorInfo reason) {
QueuedMessage[] ackMessages = null, nackMessages = null;
synchronized(this) {
if (queue.isEmpty()) return;
long startSerial = queue.get(0).msg.msgSerial;
if(msgSerial < startSerial) {
/* this is an error condition and shouldn't happen but
* we can handle it gracefully by only processing the
Expand All @@ -1704,7 +1703,6 @@ public void ack(long msgSerial, int count, ErrorInfo reason) {
List<QueuedMessage> ackList = queue.subList(0, count);
ackMessages = ackList.toArray(new QueuedMessage[count]);
ackList.clear();
startSerial += count;
}
}
if(nackMessages != null) {
Expand Down Expand Up @@ -1734,6 +1732,8 @@ public void ack(long msgSerial, int count, ErrorInfo reason) {
public synchronized void nack(long serial, int count, ErrorInfo reason) {
QueuedMessage[] nackMessages = null;
synchronized(this) {
if (queue.isEmpty()) return;
long startSerial = queue.get(0).msg.msgSerial;
if(serial != startSerial) {
/* this is an error condition and shouldn't happen but
* we can handle it gracefully by only processing the
Expand Down Expand Up @@ -1761,22 +1761,15 @@ public synchronized void nack(long serial, int count, ErrorInfo reason) {
}

/**
* reset the pending message queue, failing any currently pending messages.
* Used when a resume fails and we get a different connection id.
* @param oldMsgSerial the next message serial number for the old
* connection, and thus one more than the highest message serial
* in the queue.
* @return all pending queued messages and clear the queue
*/
public synchronized void reset(long oldMsgSerial, ErrorInfo err) {
nack(startSerial, (int)(oldMsgSerial - startSerial), err);
startSerial = 0;
}

public void resetStartSerial(int from) {
startSerial = from;
synchronized List<QueuedMessage> popAll() {
List<QueuedMessage> allPendingMessages = new ArrayList<>(queue);
queue.clear();
return allPendingMessages;
}

//fail all pending queued emssages
//fail all pending queued messages
synchronized void fail(ErrorInfo reason) {
for (QueuedMessage queuedMessage: queue){
if (queuedMessage.listener != null) {
Expand Down
Loading