From a70c8276c71c7550d128ac516f5d549c6f2be6ad Mon Sep 17 00:00:00 2001
From: evgeny <khokhlov.e.n@gmail.com>
Date: Mon, 14 Oct 2024 12:10:48 +0100
Subject: [PATCH] [ECO-5033] fix: race condition when
 calling`AblyRealtime#connect()` on terminated state

---
 .../ably/lib/transport/ConnectionManager.java | 30 +++++++++++++++++--
 1 file changed, 27 insertions(+), 3 deletions(-)

diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
index bb2033e42..4a10c7487 100644
--- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
+++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
@@ -71,6 +71,14 @@ public class ConnectionManager implements ConnectListener {
     static ErrorInfo REASON_REFUSED = new ErrorInfo("Access refused", 401, 40100);
     static ErrorInfo REASON_TOO_BIG = new ErrorInfo("Connection closed; message too large", 400, 40000);
 
+    /**
+     * When connection manager entering terminal state {@code currentState.terminal == true} it should clean up
+     * {@link #handlerThread} and invoke {@link #stopConnectivityListener}.
+     * <p>
+     * If this flag is true that means that current state is terminal but cleaning up still in progress
+     */
+    private boolean cleaningUpAfterEnteringTerminalState = false;
+
     /**
      * Methods on the channels map owned by the {@link AblyRealtime} instance
      * which the {@link ConnectionManager} needs access to.
@@ -696,6 +704,8 @@ public void run() {
                             /* indicate that this thread is committed to die */
                             handlerThread = null;
                             stopConnectivityListener();
+                            cleaningUpAfterEnteringTerminalState = false;
+                            ConnectionManager.this.notifyAll();
                             return;
                         }
 
@@ -790,7 +800,13 @@ public synchronized State getConnectionState() {
     public synchronized void connect() {
         /* connect() is the only action that will bring the ConnectionManager out of a terminal currentState */
         if(currentState.terminal || currentState.state == ConnectionState.initialized) {
-            startup();
+            try {
+                startup();
+            } catch(InterruptedException e) {
+                Thread.currentThread().interrupt();
+                Log.e(TAG, "Failed to start up connection", e);
+                return;
+            }
         }
         requestState(ConnectionState.connecting);
     }
@@ -853,6 +869,7 @@ private synchronized ConnectionStateChange setState(ITransport transport, StateI
         Log.v(TAG, "setState(): setting " + newState.state + "; reason " + reason);
         ConnectionStateChange change = new ConnectionStateChange(currentState.state, newConnectionState, newState.timeout, reason);
         currentState = newState;
+        cleaningUpAfterEnteringTerminalState = currentState.terminal;
         stateError = reason;
 
         return change;
@@ -1338,10 +1355,17 @@ private void onHeartbeat(ProtocolMessage message) {
      * ConnectionManager lifecycle
      ******************************/
 
-    private synchronized void startup() {
-        if(handlerThread == null) {
+    private synchronized void startup() throws InterruptedException {
+        while (cleaningUpAfterEnteringTerminalState) {
+            Log.v(TAG, "Waiting for termination action to clean up handler thread");
+            wait();
+        }
+
+        if (handlerThread == null) {
             (handlerThread = new Thread(new ActionHandler())).start();
             startConnectivityListener();
+        } else {
+            Log.v(TAG, "`connect()` has been called twice on uninitialized or terminal state");
         }
     }