Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
internal var id: String

/// The `ObjectMessage`s gathered during this sync sequence.
internal var syncObjectsPool: [SyncObjectsPoolEntry]
internal var syncObjectsPool: SyncObjectsPool
}

internal init(
Expand Down Expand Up @@ -790,16 +790,8 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
}
}

let syncObjectsPoolEntries = objectMessages.compactMap { objectMessage in
if let object = objectMessage.object {
SyncObjectsPoolEntry(state: object, objectMessageSerialTimestamp: objectMessage.serialTimestamp)
} else {
nil
}
}

// If populated, this contains a full set of sync data for the channel, and should be applied to the ObjectsPool.
let completedSyncObjectsPool: [SyncObjectsPoolEntry]?
let completedSyncObjectsPool: SyncObjectsPool?
// The SyncSequence, if any, to store in the SYNCING state that results from this OBJECT_SYNC.
let syncSequenceForSyncingState: SyncSequence?

Expand All @@ -809,15 +801,17 @@ internal final class InternalDefaultRealtimeObjects: Sendable, InternalRealtimeO
} else {
nil
}
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: [])
// RTO5b
updatedSyncSequence.syncObjectsPool.append(contentsOf: syncObjectsPoolEntries)
var updatedSyncSequence = syncSequenceToContinue ?? .init(id: syncCursor.sequenceID, syncObjectsPool: .init())
// RTO5f
updatedSyncSequence.syncObjectsPool.accumulate(objectMessages, logger: logger)
syncSequenceForSyncingState = updatedSyncSequence

completedSyncObjectsPool = syncCursor.isEndOfSequence ? updatedSyncSequence.syncObjectsPool : nil
} else {
// RTO5a5: The sync data is contained entirely within this single OBJECT_SYNC
completedSyncObjectsPool = syncObjectsPoolEntries
var pool = SyncObjectsPool()
pool.accumulate(objectMessages, logger: logger)
completedSyncObjectsPool = pool
syncSequenceForSyncingState = nil
}

Expand Down
49 changes: 25 additions & 24 deletions Sources/AblyLiveObjects/Internal/ObjectsPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ internal struct ObjectsPool {

/// Applies the objects gathered during an `OBJECT_SYNC` to this `ObjectsPool`, per RTO5c1 and RTO5c2.
internal mutating func nosync_applySyncObjectsPool(
_ syncObjectsPool: [SyncObjectsPoolEntry],
_ syncObjectsPool: SyncObjectsPool,
logger: Logger,
internalQueue: DispatchQueue,
userCallbackQueue: DispatchQueue,
Expand All @@ -293,72 +293,73 @@ internal struct ObjectsPool {
var updatesToExistingObjects: [ObjectsPool.Entry.DeferredUpdate] = []

// RTO5c1: For each ObjectState member in the SyncObjectsPool list
for syncObjectsPoolEntry in syncObjectsPool {
receivedObjectIds.insert(syncObjectsPoolEntry.state.objectId)
for objectMessage in syncObjectsPool {
// Every message yielded by SyncObjectsPool is guaranteed to have a non-nil `.object` with `.map` or `.counter`.
guard let state = objectMessage.object else {
preconditionFailure("SyncObjectsPool yielded a message with nil object")
}
receivedObjectIds.insert(state.objectId)

// RTO5c1a: If an object with ObjectState.objectId exists in the internal ObjectsPool
if let existingEntry = entries[syncObjectsPoolEntry.state.objectId] {
logger.log("Updating existing object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
if let existingEntry = entries[state.objectId] {
logger.log("Updating existing object with ID: \(state.objectId)", level: .debug)

// RTO5c1a1: Override the internal data for the object as per RTLC6, RTLM6
let deferredUpdate = existingEntry.nosync_replaceData(
using: syncObjectsPoolEntry.state,
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
using: state,
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
objectsPool: &self,
userCallbackQueue: userCallbackQueue,
)
// RTO5c1a2: Store this update to emit at end
updatesToExistingObjects.append(deferredUpdate)
} else {
// RTO5c1b: If an object with ObjectState.objectId does not exist in the internal ObjectsPool
logger.log("Creating new object with ID: \(syncObjectsPoolEntry.state.objectId)", level: .debug)
logger.log("Creating new object with ID: \(state.objectId)", level: .debug)

// RTO5c1b1: Create a new LiveObject using the data from ObjectState and add it to the internal ObjectsPool:
let newEntry: Entry?
let newEntry: Entry

if syncObjectsPoolEntry.state.counter != nil {
if state.counter != nil {
// RTO5c1b1a: If ObjectState.counter is present, create a zero-value LiveCounter,
// set its private objectId equal to ObjectState.objectId and override its internal data per RTLC6
let counter = InternalDefaultLiveCounter.createZeroValued(
objectID: syncObjectsPoolEntry.state.objectId,
objectID: state.objectId,
logger: logger,
internalQueue: internalQueue,
userCallbackQueue: userCallbackQueue,
clock: clock,
)
_ = counter.nosync_replaceData(
using: syncObjectsPoolEntry.state,
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
using: state,
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
)
newEntry = .counter(counter)
} else if let objectsMap = syncObjectsPoolEntry.state.map {
} else if let objectsMap = state.map {
// RTO5c1b1b: If ObjectState.map is present, create a zero-value LiveMap,
// set its private objectId equal to ObjectState.objectId, set its private semantics
// equal to ObjectState.map.semantics and override its internal data per RTLM6
let map = InternalDefaultLiveMap.createZeroValued(
objectID: syncObjectsPoolEntry.state.objectId,
objectID: state.objectId,
semantics: objectsMap.semantics,
logger: logger,
internalQueue: internalQueue,
userCallbackQueue: userCallbackQueue,
clock: clock,
)
_ = map.nosync_replaceData(
using: syncObjectsPoolEntry.state,
objectMessageSerialTimestamp: syncObjectsPoolEntry.objectMessageSerialTimestamp,
using: state,
objectMessageSerialTimestamp: objectMessage.serialTimestamp,
objectsPool: &self,
)
newEntry = .map(map)
} else {
// RTO5c1b1c: Otherwise, log a warning that an unsupported object state message has been received, and discard the current ObjectState without taking any action
logger.log("Unsupported object state message received for objectId: \(syncObjectsPoolEntry.state.objectId)", level: .warn)
newEntry = nil
// SyncObjectsPool guarantees every yielded message has `.map` or `.counter`.
preconditionFailure("SyncObjectsPool entry for objectId \(state.objectId) has neither counter nor map")
}

if let newEntry {
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
entries[syncObjectsPoolEntry.state.objectId] = newEntry
}
// Note that we will never replace the root object here, and thus never break the RTO3b invariant that the root object is always a map. This is because the pool always contains a root object and thus we always go through the RTO5c1a branch of the `if` above.
entries[state.objectId] = newEntry
}
}

Expand Down
86 changes: 86 additions & 0 deletions Sources/AblyLiveObjects/Internal/SyncObjectsPool.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import Foundation

/// The RTO5f collection of objects gathered during an `OBJECT_SYNC` sequence, ready to be applied to the `ObjectsPool`.
///
/// Every stored message is guaranteed to have a non-nil `.object` with either `.map` or `.counter` populated.
internal struct SyncObjectsPool: Collection {
/// Keyed by `objectId`. Every value has a non-nil `.object` with either `.map` or `.counter` populated; the
/// `accumulate` method enforces this invariant.
private var objectMessages: [String: InboundObjectMessage]

/// Creates an empty pool.
internal init() {
objectMessages = [:]
}

/// Accumulates object messages into the pool per RTO5f.
internal mutating func accumulate(
_ objectMessages: [InboundObjectMessage],
logger: Logger,
) {
for objectMessage in objectMessages {
accumulate(objectMessage, logger: logger)
}
}

/// Accumulates a single `ObjectMessage` into the pool per RTO5f.
private mutating func accumulate(
_ objectMessage: InboundObjectMessage,
logger: Logger,
) {
// RTO5f3: Reject unsupported object types before pool lookup. Only messages whose `.object` has `.map` or `.counter`
// are stored, which callers of the iteration can rely on.
guard let object = objectMessage.object, object.map != nil || object.counter != nil else {
logger.log("Skipping unsupported object type during sync for objectId \(objectMessage.object?.objectId ?? "unknown")", level: .warn)
return
}

let objectId = object.objectId

if let existing = objectMessages[objectId] {
// RTO5f2: An entry already exists for this objectId (partial object state).
if object.map != nil {
// RTO5f2a: Incoming message has a map.
if object.tombstone {
// RTO5f2a1: Incoming tombstone is true — replace the entire entry.
objectMessages[objectId] = objectMessage
} else {
// RTO5f2a2: Merge map entries into the existing message.
var merged = existing
if let incomingEntries = object.map?.entries {
var mergedObject = merged.object!
guard var mergedMap = mergedObject.map else {
// Not a specified scenario — the server won't send a map and a non-map for the same
// objectId in practice. Guard defensively rather than force-unwrapping.
logger.log("Existing entry for objectId \(objectId) is not a map; replacing with incoming message", level: .error)
objectMessages[objectId] = objectMessage
return
}
var mergedEntries = mergedMap.entries ?? [:]
mergedEntries.merge(incomingEntries) { _, new in new }
mergedMap.entries = mergedEntries
mergedObject.map = mergedMap
merged.object = mergedObject
}
objectMessages[objectId] = merged
}
} else {
// RTO5f2b: Incoming message has a counter — log error, skip.
logger.log("Received partial counter sync for objectId \(objectId); skipping", level: .error)
}
} else {
// RTO5f1: No entry exists for this objectId — store the message.
objectMessages[objectId] = objectMessage
}
}

// MARK: - Collection conformance

internal typealias Index = Dictionary<String, InboundObjectMessage>.Values.Index
internal typealias Element = InboundObjectMessage

internal var startIndex: Index { objectMessages.values.startIndex }
internal var endIndex: Index { objectMessages.values.endIndex }
internal func index(after i: Index) -> Index { objectMessages.values.index(after: i) }
internal subscript(position: Index) -> Element { objectMessages.values[position] }
}
15 changes: 0 additions & 15 deletions Sources/AblyLiveObjects/Internal/SyncObjectsPoolEntry.swift

This file was deleted.

2 changes: 1 addition & 1 deletion Sources/AblyLiveObjects/Protocol/ObjectMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Foundation
// This file contains the ObjectMessage types that we use within the codebase. We convert them to and from the corresponding wire types (e.g. `InboundWireObjectMessage`) for sending and receiving over the wire.

/// An `ObjectMessage` received in the `state` property of an `OBJECT` or `OBJECT_SYNC` `ProtocolMessage`.
internal struct InboundObjectMessage {
internal struct InboundObjectMessage: Equatable {
internal var id: String? // OM2a
internal var clientId: String? // OM2b
internal var connectionId: String? // OM2c
Expand Down
2 changes: 1 addition & 1 deletion Sources/AblyLiveObjects/Protocol/WireObjectMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ extension WireObjectData: WireObjectCodable {
/// A type that can be either a string or binary data.
///
/// Used to represent the values that `WireObjectData.bytes` might hold, after being encoded per OD4 or before being decoded per OD5.
internal enum StringOrData: WireCodable {
internal enum StringOrData: Equatable, WireCodable {
case string(String)
case data(Data)

Expand Down
2 changes: 2 additions & 0 deletions Tests/AblyLiveObjectsTests/Helpers/TestFactories.swift
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ struct TestFactories {
object: ObjectState? = nil,
serial: String? = nil,
siteCode: String? = nil,
serialTimestamp: Date? = nil,
) -> InboundObjectMessage {
InboundObjectMessage(
id: id,
Expand All @@ -279,6 +280,7 @@ struct TestFactories {
object: object,
serial: serial,
siteCode: siteCode,
serialTimestamp: serialTimestamp,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct InternalDefaultRealtimeObjectsTests {
// @spec RTO5a1
// @spec RTO5a3
// @spec RTO5a4
// @spec RTO5b
// @spec RTO5f
// @spec RTO5c3
// @spec RTO5c4
// @spec RTO5c5
Expand Down
Loading
Loading