Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/src/main/java/io/ably/lib/objects/Adapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.realtime.ChannelBase;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.realtime.Connection;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ErrorInfo;
Expand All @@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) {
}

@Override
public @NotNull ConnectionManager getConnectionManager() {
return ably.connection.connectionManager;
public @NotNull Connection getConnection() {
return ably.connection;
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.ChannelBase;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.realtime.Connection;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ClientOptions;
import org.jetbrains.annotations.Blocking;
Expand All @@ -18,13 +18,13 @@ public interface ObjectsAdapter {
@NotNull ClientOptions getClientOptions();

/**
* Retrieves the connection manager for handling connection state and operations.
* Retrieves the connection instance for handling connection state and operations.
* Used to check connection status, obtain error information, and manage
* message transmission across the Ably connection.
*
* @return the connection manager instance
* @return the connection instance
*/
@NotNull ConnectionManager getConnectionManager();
@NotNull Connection getConnection();

/**
* Retrieves the current time in milliseconds from the Ably server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class ConnectionManager implements ConnectListener {
* This field is initialized only if the LiveObjects plugin is present in the classpath.
*/
private final LiveObjectsPlugin liveObjectsPlugin;
public Long objectsGCGracePeriod = null;

/**
* Methods on the channels map owned by the {@link AblyRealtime} instance
Expand Down Expand Up @@ -1297,6 +1298,7 @@ private synchronized void onConnected(ProtocolMessage message) {
maxIdleInterval = connectionDetails.maxIdleInterval;
connectionStateTtl = connectionDetails.connectionStateTtl;
maxMessageSize = connectionDetails.maxMessageSize;
objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod;

/* set the clientId resolved from token, if any */
String clientId = connectionDetails.clientId;
Expand Down
8 changes: 8 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class ConnectionDetails {
*/
public Long connectionStateTtl;

/**
* The duration in milliseconds used to retain tombstoned objects at client side.
*/
public Long objectsGCGracePeriod;

ConnectionDetails() {
maxIdleInterval = Defaults.maxIdleInterval;
connectionStateTtl = Defaults.connectionStateTtl;
Expand Down Expand Up @@ -114,6 +119,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
case "connectionStateTtl":
connectionStateTtl = unpacker.unpackLong();
break;
case "objectsGCGracePeriod":
objectsGCGracePeriod = unpacker.unpackLong();
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -2578,6 +2579,29 @@ public void connect_should_not_rewrite_immediate_attach() throws AblyException {
}
}

@Test
public void channel_get_objects_throws_exception() throws AblyException {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
try (AblyRealtime ably = new AblyRealtime(opts)) {

/* wait until connected */
new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected);
assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected);

/* create a channel and attach */
final Channel channel = ably.channels.get("channel");
channel.attach();
new ChannelWaiter(channel).waitFor(ChannelState.attached);
assertEquals("Verify attached state reached", channel.state, ChannelState.attached);

AblyException exception = assertThrows(AblyException.class, channel::getObjects);
assertNotNull(exception);
assertEquals(40019, exception.errorInfo.code);
assertEquals(400, exception.errorInfo.statusCode);
assertTrue(exception.errorInfo.message.contains("LiveObjects plugin hasn't been installed"));
}
}

static class DetachingProtocolListener implements DebugOptions.RawProtocolListener {

public Channel theChannel;
Expand Down
14 changes: 14 additions & 0 deletions liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package io.ably.lib.objects

import io.ably.lib.realtime.ChannelState
import io.ably.lib.realtime.CompletionListener
import io.ably.lib.realtime.ConnectionEvent
import io.ably.lib.realtime.ConnectionStateListener
import io.ably.lib.types.ChannelMode
import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.ProtocolMessage
Expand All @@ -10,6 +12,8 @@ import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

internal val ObjectsAdapter.connectionManager get() = connection.connectionManager

/**
* Spec: RTO15g
*/
Expand Down Expand Up @@ -45,6 +49,16 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa
}
}

internal fun ObjectsAdapter.onGCGracePeriodUpdated(block : (Long?) -> Unit) : ObjectsSubscription {
connectionManager.objectsGCGracePeriod?.let { block(it) }
// Return new objectsGCGracePeriod whenever connection state changes to connected
val listener: (_: ConnectionStateListener.ConnectionStateChange) -> Unit = {
block(connectionManager.objectsGCGracePeriod)
}
connection.on(ConnectionEvent.connected, listener)
return ObjectsSubscription { connection.off(listener) }
}

/**
* Retrieves the channel modes for a specific channel.
* This method returns the modes that are set for the specified channel.
Expand Down
19 changes: 16 additions & 3 deletions liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap
internal object ObjectsPoolDefaults {
const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes
/**
* The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails`
* object of the `CONNECTED` event.
* If the server does not provide this value, the SDK will fall back to this default value.
* Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation
* with an earlier serial that would not have been applied if the tombstone still existed.
*
Expand Down Expand Up @@ -49,10 +52,19 @@ internal class ObjectsPool(
private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private var gcJob: Job // Job for the garbage collection coroutine

@Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS
private var gcPeriodSubscription: ObjectsSubscription

init {
// RTO3b - Initialize pool with root object
pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects)
// Start garbage collection coroutine
// Start garbage collection coroutine with server-provided grace period if available
gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period ->
period?.let {
gcGracePeriod = it
Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms")
} ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms")
}
Comment on lines +62 to +67
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reset the grace period to the documented default when the server omits it

When a later onGCGracePeriodUpdated callback receives null, we continue using whatever server value we last saw. That contradicts the documented behavior (“fall back to this default value”) and can leave us running with a stale (possibly much shorter) grace period after reconnects, risking premature tombstone eviction. The log message also becomes misleading in that case. Please restore gcGracePeriod to the default before logging.

     gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period ->
       period?.let {
         gcGracePeriod = it
         Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms")
-      } ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms")
+      } ?: run {
+        gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS
+        Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms")
+      }
     }
🤖 Prompt for AI Agents
In liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt around lines
62-67, the onGCGracePeriodUpdated null branch must reset gcGracePeriod to the
documented default before logging; change the null path to assign the default
grace period (use the existing default constant or the module's documented
default value) to gcGracePeriod and then log that the default is being used so
we don't retain a stale server value or emit a misleading message.

gcJob = startGCJob()
}

Expand Down Expand Up @@ -123,9 +135,9 @@ internal class ObjectsPool(
*/
private fun onGCInterval() {
pool.entries.removeIf { (_, obj) ->
if (obj.isEligibleForGc()) { true } // Remove from pool
if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool
else {
obj.onGCInterval()
obj.onGCInterval(gcGracePeriod)
false // Keep in pool
}
}
Expand All @@ -152,6 +164,7 @@ internal class ObjectsPool(
* Should be called when the pool is no longer needed.
*/
fun dispose() {
gcPeriodSubscription.unsubscribe()
gcJob.cancel()
gcScope.cancel()
pool.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.ably.lib.objects.type
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
import io.ably.lib.objects.ObjectState
import io.ably.lib.objects.ObjectsPoolDefaults
import io.ably.lib.objects.objectError
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
import io.ably.lib.objects.type.livemap.noOpMapUpdate
Expand Down Expand Up @@ -136,10 +135,20 @@ internal abstract class BaseRealtimeObject(

/**
* Checks if the object is eligible for garbage collection.
*
* An object is eligible for garbage collection if it has been tombstoned and
* the time since tombstoning exceeds the specified grace period.
*
* @param gcGracePeriod The grace period in milliseconds that tombstoned objects
* should be kept before being eligible for collection.
* This value is retrieved from the server's connection details
* or defaults to 24 hours if not provided by the server.
* @return true if the object is tombstoned and the grace period has elapsed,
* false otherwise
*/
internal fun isEligibleForGc(): Boolean {
internal fun isEligibleForGc(gcGracePeriod: Long): Boolean {
val currentTime = System.currentTimeMillis()
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true
}

/**
Expand Down Expand Up @@ -195,12 +204,22 @@ internal abstract class BaseRealtimeObject(
/**
* Called during garbage collection intervals to clean up expired entries.
*
* This method is invoked periodically (every 5 minutes) by the ObjectsPool
* to perform cleanup of tombstoned data that has exceeded the grace period.
*
* This method should identify and remove entries that:
* - Have been marked as tombstoned
* - Have a tombstone timestamp older than the configured grace period
* - Have a tombstone timestamp older than the specified grace period
*
* @param gcGracePeriod The grace period in milliseconds that tombstoned entries
* should be kept before being eligible for removal.
* This value is retrieved from the server's connection details
* or defaults to 24 hours if not provided by the server.
* Must be greater than 2 minutes to ensure proper operation
* ordering and avoid issues with delayed operations.
*
* Implementations typically use single-pass removal techniques to
* efficiently clean up expired data without creating temporary collections.
*/
abstract fun onGCInterval()
abstract fun onGCInterval(gcGracePeriod: Long)
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ internal class DefaultLiveCounter private constructor(
liveCounterManager.notify(update as LiveCounterUpdate)
}

override fun onGCInterval() {
override fun onGCInterval(gcGracePeriod: Long) {
// Nothing to GC for a counter object
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ internal class DefaultLiveMap private constructor(
liveMapManager.notify(update as LiveMapUpdate)
}

override fun onGCInterval() {
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() }
override fun onGCInterval(gcGracePeriod: Long) {
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) }
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package io.ably.lib.objects.type.livemap
import io.ably.lib.objects.*
import io.ably.lib.objects.ObjectData
import io.ably.lib.objects.ObjectsPool
import io.ably.lib.objects.ObjectsPoolDefaults
import io.ably.lib.objects.type.BaseRealtimeObject
import io.ably.lib.objects.type.ObjectType
import io.ably.lib.objects.type.counter.LiveCounter
Expand Down Expand Up @@ -61,9 +60,9 @@ internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapVal
/**
* Extension function to check if a LiveMapEntry is expired and ready for garbage collection
*/
internal fun LiveMapEntry.isEligibleForGc(): Boolean {
internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long): Boolean {
val currentTime = System.currentTimeMillis()
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true
}

private fun fromObjectValue(objValue: ObjectValue): LiveMapValue {
Expand Down
Loading
Loading