Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion Sources/AblyLiveObjects/Internal/InternalDefaultLiveMap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ internal final class InternalDefaultLiveMap: Sendable {
}
}

internal var testsOnly_clearTimeserial: String? {
mutableStateMutex.withSync { mutableState in
mutableState.clearTimeserial
}
}

private let logger: Logger
private let userCallbackQueue: DispatchQueue
private let clock: SimpleClock
Expand Down Expand Up @@ -396,6 +402,15 @@ internal final class InternalDefaultLiveMap: Sendable {
}
}

/// Test-only method to apply a MAP_CLEAR operation, per RTLM24.
internal func testsOnly_applyMapClearOperation(serial: String?) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
mutableStateMutex.withSync { mutableState in
mutableState.applyMapClearOperation(
serial: serial,
)
}
}

/// Resets the map's data, per RTO4b2. This is to be used when an `ATTACHED` ProtocolMessage indicates that the only object in a channel is an empty root map.
internal func nosync_resetData() {
mutableStateMutex.withoutSync { mutableState in
Expand Down Expand Up @@ -452,6 +467,9 @@ internal final class InternalDefaultLiveMap: Sendable {
/// The "private `semantics` field" of RTO5c1b1b.
internal var semantics: WireEnum<ObjectsMapSemantics>?

/// RTLM25
internal var clearTimeserial: String?

/// Replaces the internal data of this map with the provided ObjectState, per RTLM6.
///
/// - Parameters:
Expand Down Expand Up @@ -492,6 +510,9 @@ internal final class InternalDefaultLiveMap: Sendable {
// RTLM6g: Store the current data value as previousData for use in RTLM6h
let previousData = data

// RTLM6i
clearTimeserial = state.map?.clearTimeserial

// RTLM6b: Set the private flag createOperationIsMerged to false
liveObjectMutableState.createOperationIsMerged = false

Expand Down Expand Up @@ -702,6 +723,15 @@ internal final class InternalDefaultLiveMap: Sendable {
liveObjectMutableState.emit(.update(.init(update: dataBeforeApplyingOperation.mapValues { _ in .removed })), on: userCallbackQueue)
// RTLM15d5b
return true
case .known(.mapClear):
// RTLM15d8
let update = applyMapClearOperation(
serial: applicableOperation.objectMessageSerial,
)
// RTLM15d8a
liveObjectMutableState.emit(update, on: userCallbackQueue)
// RTLM15d8b
return true
default:
// RTLM15d4
logger.log("Operation \(operation) has unsupported action for LiveMap; discarding", level: .warn)
Expand All @@ -720,6 +750,11 @@ internal final class InternalDefaultLiveMap: Sendable {
userCallbackQueue: DispatchQueue,
clock: SimpleClock,
) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
// RTLM7h
if let clearTimeserial, operationTimeserial.map({ $0 <= clearTimeserial }) ?? true {
return .noop
}

// RTLM7a: If an entry exists in the private data for the specified key
if let existingEntry = data[key] {
// RTLM7a1: If the operation cannot be applied as per RTLM9, discard the operation
Expand Down Expand Up @@ -762,6 +797,11 @@ internal final class InternalDefaultLiveMap: Sendable {
internal mutating func applyMapRemoveOperation(key: String, operationTimeserial: String?, operationSerialTimestamp: Date?, logger: Logger, clock: SimpleClock) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
// (Note that, where the spec tells us to set ObjectsMapEntry.data to nil, we actually set it to an empty ObjectData, which is equivalent, since it contains no data)

// RTLM8g
if let clearTimeserial, operationTimeserial.map({ $0 <= clearTimeserial }) ?? true {
return .noop
}

// Calculate the tombstonedAt for the new or updated entry per RTLM8f
let tombstonedAt: Date?
if let operationSerialTimestamp {
Expand Down Expand Up @@ -869,11 +909,44 @@ internal final class InternalDefaultLiveMap: Sendable {
)
}

/// Applies a `MAP_CLEAR` operation, per RTLM24.
internal mutating func applyMapClearOperation(
serial: String?,
) -> LiveObjectUpdate<DefaultLiveMapUpdate> {
guard let serial else {
return .noop
}

// RTLM24c
if let clearTimeserial, serial <= clearTimeserial {
return .noop
}

// RTLM24d
clearTimeserial = serial

// RTLM24e, RTLM24e1: entry timeserial is nil, or serial > entry timeserial
let keysToRemove = data.filter { _, entry in
guard let entryTimeserial = entry.timeserial else {
return true
}
return serial > entryTimeserial
}.keys

for key in keysToRemove {
data.removeValue(forKey: key)
}

// RTLM24e1b, RTLM24f
let removedKeys = Dictionary(uniqueKeysWithValues: keysToRemove.map { ($0, LiveMapUpdateAction.removed) })
return .update(DefaultLiveMapUpdate(update: removedKeys))
}

/// Resets the map's data and emits a `removed` event for the existing keys, per RTO4b2 and RTO4b2a. This is to be used when an `ATTACHED` ProtocolMessage indicates that the only object in a channel is an empty root map.
internal mutating func resetData(userCallbackQueue: DispatchQueue) {
// RTO4b2
let previousData = data
data = [:]
resetDataToZeroValued()

// RTO4b2a
let mapUpdate = DefaultLiveMapUpdate(update: previousData.mapValues { _ in .removed })
Expand All @@ -884,6 +957,7 @@ internal final class InternalDefaultLiveMap: Sendable {
mutating func resetDataToZeroValued() {
// RTLM4
data = [:]
clearTimeserial = nil
}

/// Releases entries that were tombstoned more than `gracePeriod` ago, per RTLM19.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
switch operation.action {
case let .known(action):
switch action {
case .mapCreate, .mapSet, .mapRemove, .counterCreate, .counterInc, .objectDelete:
case .mapCreate, .mapSet, .mapRemove, .counterCreate, .counterInc, .objectDelete, .mapClear:
// RTO9a2a3
let applied = entry.nosync_apply(
operation,
Expand Down
8 changes: 8 additions & 0 deletions Sources/AblyLiveObjects/Protocol/ObjectMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ internal struct ObjectOperation: Equatable {
internal var objectDelete: WireObjectDelete? // OOP3o
internal var mapCreateWithObjectId: MapCreateWithObjectId? // OOP3p
internal var counterCreateWithObjectId: CounterCreateWithObjectId? // OOP3q
internal var mapClear: WireMapClear? // OOP3r
}

internal struct ObjectData: Equatable {
Expand Down Expand Up @@ -96,6 +97,7 @@ internal struct ObjectsMapEntry: Equatable {
internal struct ObjectsMap: Equatable {
internal var semantics: WireEnum<ObjectsMapSemantics> // OMP3a
internal var entries: [String: ObjectsMapEntry]? // OMP3b
internal var clearTimeserial: String? // OMP3c
}

internal struct ObjectState: Equatable {
Expand Down Expand Up @@ -178,6 +180,7 @@ internal extension ObjectOperation {
counterCreate = wireObjectOperation.counterCreate
counterInc = wireObjectOperation.counterInc
objectDelete = wireObjectOperation.objectDelete
mapClear = wireObjectOperation.mapClear
// Outbound-only — do not access on inbound data
mapCreateWithObjectId = nil
counterCreateWithObjectId = nil
Expand All @@ -199,6 +202,7 @@ internal extension ObjectOperation {
objectDelete: objectDelete,
mapCreateWithObjectId: mapCreateWithObjectId?.toWire(),
counterCreateWithObjectId: counterCreateWithObjectId?.toWire(),
mapClear: mapClear,
)
}
}
Expand Down Expand Up @@ -414,6 +418,7 @@ internal extension ObjectsMap {
entries = try wireObjectsMap.entries?.ablyLiveObjects_mapValuesWithTypedThrow { wireMapEntry throws(ARTErrorInfo) in
try .init(wireObjectsMapEntry: wireMapEntry, format: format)
}
clearTimeserial = wireObjectsMap.clearTimeserial
}

/// Converts this `ObjectsMap` to a `WireObjectsMap`, applying the data encoding rules of OD4.
Expand All @@ -424,6 +429,7 @@ internal extension ObjectsMap {
.init(
semantics: semantics,
entries: entries?.mapValues { $0.toWire(format: format) },
clearTimeserial: clearTimeserial,
)
}
}
Expand Down Expand Up @@ -520,6 +526,7 @@ extension ObjectOperation: CustomDebugStringConvertible {
if let objectDelete { parts.append("objectDelete: \(objectDelete)") }
if let mapCreateWithObjectId { parts.append("mapCreateWithObjectId: \(mapCreateWithObjectId)") }
if let counterCreateWithObjectId { parts.append("counterCreateWithObjectId: \(counterCreateWithObjectId)") }
if let mapClear { parts.append("mapClear: \(mapClear)") }

return "{ " + parts.joined(separator: ", ") + " }"
}
Expand Down Expand Up @@ -553,6 +560,7 @@ extension ObjectsMap: CustomDebugStringConvertible {
.joined(separator: ", ")
parts.append("entries: { \(formattedEntries) }")
}
if let clearTimeserial { parts.append("clearTimeserial: \(clearTimeserial)") }

return "{ " + parts.joined(separator: ", ") + " }"
}
Expand Down
27 changes: 27 additions & 0 deletions Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ internal enum ObjectOperationAction: Int {
case counterCreate = 3
case counterInc = 4
case objectDelete = 5
case mapClear = 6
}

// OMP2
Expand All @@ -169,6 +170,7 @@ internal struct WireObjectOperation {
internal var objectDelete: WireObjectDelete? // OOP3o
internal var mapCreateWithObjectId: WireMapCreateWithObjectId? // OOP3p
internal var counterCreateWithObjectId: WireCounterCreateWithObjectId? // OOP3q
internal var mapClear: WireMapClear? // OOP3r
}

extension WireObjectOperation: WireObjectCodable {
Expand All @@ -183,6 +185,7 @@ extension WireObjectOperation: WireObjectCodable {
case objectDelete
case mapCreateWithObjectId
case counterCreateWithObjectId
case mapClear
}

internal init(wireObject: [String: WireValue]) throws(ARTErrorInfo) {
Expand All @@ -195,6 +198,7 @@ extension WireObjectOperation: WireObjectCodable {
counterCreate = try wireObject.optionalDecodableValueForKey(WireKey.counterCreate.rawValue)
counterInc = try wireObject.optionalDecodableValueForKey(WireKey.counterInc.rawValue)
objectDelete = try wireObject.optionalDecodableValueForKey(WireKey.objectDelete.rawValue)
mapClear = try wireObject.optionalDecodableValueForKey(WireKey.mapClear.rawValue)
// Outbound-only — do not access on inbound data
mapCreateWithObjectId = nil
counterCreateWithObjectId = nil
Expand Down Expand Up @@ -230,6 +234,9 @@ extension WireObjectOperation: WireObjectCodable {
if let counterCreateWithObjectId {
result[WireKey.counterCreateWithObjectId.rawValue] = .object(counterCreateWithObjectId.toWireObject)
}
if let mapClear {
result[WireKey.mapClear.rawValue] = .object(mapClear.toWireObject)
}

return result
}
Expand Down Expand Up @@ -292,12 +299,14 @@ extension WireObjectState: WireObjectCodable {
internal struct WireObjectsMap {
internal var semantics: WireEnum<ObjectsMapSemantics> // OMP3a
internal var entries: [String: WireObjectsMapEntry]? // OMP3b
internal var clearTimeserial: String? // OMP3c
}

extension WireObjectsMap: WireObjectCodable {
internal enum WireKey: String {
case semantics
case entries
case clearTimeserial
}

internal init(wireObject: [String: WireValue]) throws(ARTErrorInfo) {
Expand All @@ -308,6 +317,7 @@ extension WireObjectsMap: WireObjectCodable {
}
return try WireObjectsMapEntry(wireObject: object)
}
clearTimeserial = try wireObject.optionalStringValueForKey(WireKey.clearTimeserial.rawValue)
}

internal var toWireObject: [String: WireValue] {
Expand All @@ -318,6 +328,9 @@ extension WireObjectsMap: WireObjectCodable {
if let entries {
result[WireKey.entries.rawValue] = .object(entries.mapValues { .object($0.toWireObject) })
}
if let clearTimeserial {
result[WireKey.clearTimeserial.rawValue] = .string(clearTimeserial)
}

return result
}
Expand Down Expand Up @@ -484,6 +497,20 @@ extension WireObjectDelete: WireObjectCodable {
}
}

internal struct WireMapClear: Equatable {
// Empty struct
}

extension WireMapClear: WireObjectCodable {
internal init(wireObject _: [String: WireValue]) throws(ARTErrorInfo) {
// No fields to decode
}

internal var toWireObject: [String: WireValue] {
[:]
}
}

internal struct WireMapCreateWithObjectId: Equatable {
internal var initialValue: String // MCRO2a
internal var nonce: String // MCRO2b
Expand Down
21 changes: 21 additions & 0 deletions Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ struct TestFactories {
objectDelete: WireObjectDelete? = nil,
mapCreateWithObjectId: MapCreateWithObjectId? = nil,
counterCreateWithObjectId: CounterCreateWithObjectId? = nil,
mapClear: WireMapClear? = nil,
) -> ObjectOperation {
ObjectOperation(
action: action,
Expand All @@ -358,6 +359,7 @@ struct TestFactories {
objectDelete: objectDelete,
mapCreateWithObjectId: mapCreateWithObjectId,
counterCreateWithObjectId: counterCreateWithObjectId,
mapClear: mapClear,
)
}

Expand Down Expand Up @@ -533,10 +535,12 @@ struct TestFactories {
static func objectsMap(
semantics: WireEnum<ObjectsMapSemantics> = .known(.lww),
entries: [String: ObjectsMapEntry]? = nil,
clearTimeserial: String? = nil,
) -> ObjectsMap {
ObjectsMap(
semantics: semantics,
entries: entries,
clearTimeserial: clearTimeserial,
)
}

Expand Down Expand Up @@ -599,6 +603,23 @@ struct TestFactories {
)
}

/// Creates an InboundObjectMessage with a MAP_CLEAR operation
static func mapClearOperationMessage(
objectId: String = "map:test@123",
serial: String = "ts1",
siteCode: String = "site1",
) -> InboundObjectMessage {
inboundObjectMessage(
operation: objectOperation(
action: .known(.mapClear),
objectId: objectId,
mapClear: WireMapClear(),
),
serial: serial,
siteCode: siteCode,
)
}

/// Creates an InboundObjectMessage with a MAP_CREATE operation
static func mapCreateOperationMessage(
objectId: String = "map:test@123",
Expand Down
Loading