Skip to content

Commit 9610b41

Browse files
committed
[ECO-5056] use server-provided GC grace period for garbage collection
- Add support for server-configured objectsGCGracePeriod from connection details, with fallback to default 24-hour period. Updates ObjectsPool and all LiveObjects - Implementations to use the server-provided value for tombstone cleanup timing.
1 parent 39c62f6 commit 9610b41

File tree

10 files changed

+77
-25
lines changed

10 files changed

+77
-25
lines changed

lib/src/main/java/io/ably/lib/objects/Adapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.ably.lib.realtime.AblyRealtime;
44
import io.ably.lib.realtime.ChannelBase;
5-
import io.ably.lib.transport.ConnectionManager;
5+
import io.ably.lib.realtime.Connection;
66
import io.ably.lib.types.AblyException;
77
import io.ably.lib.types.ClientOptions;
88
import io.ably.lib.types.ErrorInfo;
@@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) {
2323
}
2424

2525
@Override
26-
public @NotNull ConnectionManager getConnectionManager() {
27-
return ably.connection.connectionManager;
26+
public @NotNull Connection getConnection() {
27+
return ably.connection;
2828
}
2929

3030
@Override

lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.ably.lib.objects;
22

33
import io.ably.lib.realtime.ChannelBase;
4-
import io.ably.lib.transport.ConnectionManager;
4+
import io.ably.lib.realtime.Connection;
55
import io.ably.lib.types.AblyException;
66
import io.ably.lib.types.ClientOptions;
77
import org.jetbrains.annotations.Blocking;
@@ -18,13 +18,13 @@ public interface ObjectsAdapter {
1818
@NotNull ClientOptions getClientOptions();
1919

2020
/**
21-
* Retrieves the connection manager for handling connection state and operations.
21+
* Retrieves the connection instance for handling connection state and operations.
2222
* Used to check connection status, obtain error information, and manage
2323
* message transmission across the Ably connection.
2424
*
25-
* @return the connection manager instance
25+
* @return the connection instance
2626
*/
27-
@NotNull ConnectionManager getConnectionManager();
27+
@NotNull Connection getConnection();
2828

2929
/**
3030
* Retrieves the current time in milliseconds from the Ably server.

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public class ConnectionManager implements ConnectListener {
100100
* This field is initialized only if the LiveObjects plugin is present in the classpath.
101101
*/
102102
private final LiveObjectsPlugin liveObjectsPlugin;
103+
public Long objectsGCGracePeriod = null;
103104

104105
/**
105106
* Methods on the channels map owned by the {@link AblyRealtime} instance
@@ -1297,6 +1298,7 @@ private synchronized void onConnected(ProtocolMessage message) {
12971298
maxIdleInterval = connectionDetails.maxIdleInterval;
12981299
connectionStateTtl = connectionDetails.connectionStateTtl;
12991300
maxMessageSize = connectionDetails.maxMessageSize;
1301+
objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod;
13001302

13011303
/* set the clientId resolved from token, if any */
13021304
String clientId = connectionDetails.clientId;

lib/src/main/java/io/ably/lib/types/ConnectionDetails.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public class ConnectionDetails {
7474
*/
7575
public Long connectionStateTtl;
7676

77+
/**
78+
* The duration in milliseconds used to retain tombstoned objects at client side.
79+
*/
80+
public Long objectsGCGracePeriod;
81+
7782
ConnectionDetails() {
7883
maxIdleInterval = Defaults.maxIdleInterval;
7984
connectionStateTtl = Defaults.connectionStateTtl;
@@ -114,6 +119,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
114119
case "connectionStateTtl":
115120
connectionStateTtl = unpacker.unpackLong();
116121
break;
122+
case "objectsGCGracePeriod":
123+
objectsGCGracePeriod = unpacker.unpackLong();
124+
break;
117125
default:
118126
Log.v(TAG, "Unexpected field: " + fieldName);
119127
unpacker.skipValue();

liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package io.ably.lib.objects
22

33
import io.ably.lib.realtime.ChannelState
44
import io.ably.lib.realtime.CompletionListener
5+
import io.ably.lib.realtime.ConnectionEvent
56
import io.ably.lib.types.ChannelMode
67
import io.ably.lib.types.ErrorInfo
78
import io.ably.lib.types.ProtocolMessage
@@ -15,7 +16,7 @@ import kotlin.coroutines.resumeWithException
1516
*/
1617
internal suspend fun ObjectsAdapter.sendAsync(message: ProtocolMessage) = suspendCancellableCoroutine { continuation ->
1718
try {
18-
connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener {
19+
connection.connectionManager.send(message, clientOptions.queueMessages, object : CompletionListener {
1920
override fun onSuccess() {
2021
continuation.resume(Unit)
2122
}
@@ -45,6 +46,17 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa
4546
}
4647
}
4748

49+
internal fun ObjectsAdapter.retrieveObjectsGCGracePeriod(block : (Long?) -> Unit) {
50+
val connectionManager = connection.connectionManager
51+
if (connectionManager.objectsGCGracePeriod != null) {
52+
block(connectionManager.objectsGCGracePeriod)
53+
return
54+
}
55+
connection.once(ConnectionEvent.connected) {
56+
block(connectionManager.objectsGCGracePeriod)
57+
}
58+
}
59+
4860
/**
4961
* Retrieves the channel modes for a specific channel.
5062
* This method returns the modes that are set for the specified channel.
@@ -76,7 +88,7 @@ internal fun ObjectsAdapter.getChannelModes(channelName: String): Array<ChannelM
7688
* Spec: RTO15d
7789
*/
7890
internal fun ObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Array<ObjectMessage>) {
79-
val maximumAllowedSize = connectionManager.maxMessageSize
91+
val maximumAllowedSize = connection.connectionManager.maxMessageSize
8092
val objectsTotalMessageSize = objectMessages.sumOf { it.size() }
8193
if (objectsTotalMessageSize > maximumAllowedSize) {
8294
throw ablyException("ObjectMessages size $objectsTotalMessageSize exceeds maximum allowed size of $maximumAllowedSize bytes",
@@ -131,8 +143,8 @@ internal fun ObjectsAdapter.throwIfInvalidWriteApiConfiguration(channelName: Str
131143
}
132144

133145
internal fun ObjectsAdapter.throwIfUnpublishableState(channelName: String) {
134-
if (!connectionManager.isActive) {
135-
throw ablyException(connectionManager.stateErrorInfo)
146+
if (!connection.connectionManager.isActive) {
147+
throw ablyException(connection.connectionManager.stateErrorInfo)
136148
}
137149
throwIfInChannelState(channelName, arrayOf(ChannelState.failed, ChannelState.suspended))
138150
}

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap
1414
internal object ObjectsPoolDefaults {
1515
const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes
1616
/**
17+
* The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails`
18+
* object of the `CONNECTED` event.
19+
* If the server does not provide this value, the SDK will fall back to this default value.
1720
* Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation
1821
* with an earlier serial that would not have been applied if the tombstone still existed.
1922
*
@@ -49,10 +52,19 @@ internal class ObjectsPool(
4952
private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
5053
private var gcJob: Job // Job for the garbage collection coroutine
5154

55+
@Volatile
56+
private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS
57+
5258
init {
5359
// RTO3b - Initialize pool with root object
5460
pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects)
55-
// Start garbage collection coroutine
61+
// Start garbage collection coroutine with server-provided grace period if available
62+
realtimeObjects.adapter.retrieveObjectsGCGracePeriod { period ->
63+
period?.let {
64+
gcGracePeriod = it
65+
Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms")
66+
} ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms")
67+
}
5668
gcJob = startGCJob()
5769
}
5870

@@ -123,9 +135,9 @@ internal class ObjectsPool(
123135
*/
124136
private fun onGCInterval() {
125137
pool.entries.removeIf { (_, obj) ->
126-
if (obj.isEligibleForGc()) { true } // Remove from pool
138+
if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool
127139
else {
128-
obj.onGCInterval()
140+
obj.onGCInterval(gcGracePeriod)
129141
false // Keep in pool
130142
}
131143
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package io.ably.lib.objects.type
33
import io.ably.lib.objects.ObjectMessage
44
import io.ably.lib.objects.ObjectOperation
55
import io.ably.lib.objects.ObjectState
6-
import io.ably.lib.objects.ObjectsPoolDefaults
76
import io.ably.lib.objects.objectError
87
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
98
import io.ably.lib.objects.type.livemap.noOpMapUpdate
@@ -136,10 +135,20 @@ internal abstract class BaseRealtimeObject(
136135

137136
/**
138137
* Checks if the object is eligible for garbage collection.
138+
*
139+
* An object is eligible for garbage collection if it has been tombstoned and
140+
* the time since tombstoning exceeds the specified grace period.
141+
*
142+
* @param gcGracePeriod The grace period in milliseconds that tombstoned objects
143+
* should be kept before being eligible for collection.
144+
* This value is retrieved from the server's connection details
145+
* or defaults to 24 hours if not provided by the server.
146+
* @return true if the object is tombstoned and the grace period has elapsed,
147+
* false otherwise
139148
*/
140-
internal fun isEligibleForGc(): Boolean {
149+
internal fun isEligibleForGc(gcGracePeriod: Long): Boolean {
141150
val currentTime = System.currentTimeMillis()
142-
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
151+
return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true
143152
}
144153

145154
/**
@@ -195,12 +204,22 @@ internal abstract class BaseRealtimeObject(
195204
/**
196205
* Called during garbage collection intervals to clean up expired entries.
197206
*
207+
* This method is invoked periodically (every 5 minutes) by the ObjectsPool
208+
* to perform cleanup of tombstoned data that has exceeded the grace period.
209+
*
198210
* This method should identify and remove entries that:
199211
* - Have been marked as tombstoned
200-
* - Have a tombstone timestamp older than the configured grace period
212+
* - Have a tombstone timestamp older than the specified grace period
213+
*
214+
* @param gcGracePeriod The grace period in milliseconds that tombstoned entries
215+
* should be kept before being eligible for removal.
216+
* This value is retrieved from the server's connection details
217+
* or defaults to 24 hours if not provided by the server.
218+
* Must be greater than 2 minutes to ensure proper operation
219+
* ordering and avoid issues with delayed operations.
201220
*
202221
* Implementations typically use single-pass removal techniques to
203222
* efficiently clean up expired data without creating temporary collections.
204223
*/
205-
abstract fun onGCInterval()
224+
abstract fun onGCInterval(gcGracePeriod: Long)
206225
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ internal class DefaultLiveCounter private constructor(
109109
liveCounterManager.notify(update as LiveCounterUpdate)
110110
}
111111

112-
override fun onGCInterval() {
112+
override fun onGCInterval(gcGracePeriod: Long) {
113113
// Nothing to GC for a counter object
114114
return
115115
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,8 @@ internal class DefaultLiveMap private constructor(
182182
liveMapManager.notify(update as LiveMapUpdate)
183183
}
184184

185-
override fun onGCInterval() {
186-
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() }
185+
override fun onGCInterval(gcGracePeriod: Long) {
186+
data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) }
187187
}
188188

189189
companion object {

liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package io.ably.lib.objects.type.livemap
33
import io.ably.lib.objects.*
44
import io.ably.lib.objects.ObjectData
55
import io.ably.lib.objects.ObjectsPool
6-
import io.ably.lib.objects.ObjectsPoolDefaults
76
import io.ably.lib.objects.type.BaseRealtimeObject
87
import io.ably.lib.objects.type.ObjectType
98
import io.ably.lib.objects.type.counter.LiveCounter
@@ -61,9 +60,9 @@ internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapVal
6160
/**
6261
* Extension function to check if a LiveMapEntry is expired and ready for garbage collection
6362
*/
64-
internal fun LiveMapEntry.isEligibleForGc(): Boolean {
63+
internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long): Boolean {
6564
val currentTime = System.currentTimeMillis()
66-
return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true
65+
return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true
6766
}
6867

6968
private fun fromObjectValue(objValue: ObjectValue): LiveMapValue {

0 commit comments

Comments
 (0)