Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ let package = Package(
),
],
dependencies: [
// TODO: Unpin before release
.package(
url: "https://github.com/ably/ably-cocoa",
from: "1.2.55",
url: "https://github.com/ably/ably-cocoa.git",
revision: "dbdd4db5c0c64f4330e200ff2ca9bc9528598ff3",
),
// TODO: Unpin before release
.package(
url: "https://github.com/ably/ably-cocoa-plugin-support",
from: "1.0.0",
revision: "242fac1d4a829c8a63f9b3f96a71809e1f6eeffc",
),
.package(
url: "https://github.com/apple/swift-argument-parser",
Expand Down
14 changes: 7 additions & 7 deletions Sources/AblyLiveObjects/Internal/CoreSDK.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import Ably
/// This provides us with a mockable interface to ably-cocoa, and it also allows internal components and their tests not to need to worry about some of the boring details of how we bridge Swift types to `_AblyPluginSupportPrivate`'s Objective-C API (i.e. boxing).
internal protocol CoreSDK: AnyObject, Sendable {
/// Implements the internal `#publish` method of RTO15.
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void)
func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void)

/// Implements the server time fetch of RTO16, including the storing and usage of the local clock offset.
func nosync_fetchServerTime(callback: @escaping @Sendable (Result<Date, ARTErrorInfo>) -> Void)

/// Replaces the implementation of ``nosync_publish(objectMessages:callback:)``.
///
/// Used by integration tests, for example to disable `ObjectMessage` publishing so that a test can verify that a behaviour is not a side effect of an `ObjectMessage` sent by the SDK.
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void)
func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult)

/// Returns the current state of the Realtime channel that this wraps.
var nosync_channelState: _AblyPluginSupportPrivate.RealtimeChannelState { get }
Expand All @@ -34,7 +34,7 @@ internal final class DefaultCoreSDK: CoreSDK {
/// This enables the `testsOnly_overridePublish(with:)` test hook.
///
/// - Note: This should be `throws(ARTErrorInfo)` but that causes a compilation error of "Runtime support for typed throws function types is only available in macOS 15.0.0 or newer".
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> Void)?
private nonisolated(unsafe) var overriddenPublishImplementation: (([OutboundObjectMessage]) async throws -> PublishResult)?

internal init(
channel: _AblyPluginSupportPrivate.RealtimeChannel,
Expand All @@ -50,7 +50,7 @@ internal final class DefaultCoreSDK: CoreSDK {

// MARK: - CoreSDK conformance

internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void) {
internal func nosync_publish(objectMessages: [OutboundObjectMessage], callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void) {
logger.log("nosync_publish(objectMessages: \(LoggingUtilities.formatObjectMessagesForLogging(objectMessages)))", level: .debug)

// Use the overridden implementation if supplied
Expand All @@ -61,8 +61,8 @@ internal final class DefaultCoreSDK: CoreSDK {
let queue = pluginAPI.internalQueue(for: client)
Task {
do {
try await overriddenImplementation(objectMessages)
queue.async { callback(.success(())) }
let publishResult = try await overriddenImplementation(objectMessages)
queue.async { callback(.success(publishResult)) }
} catch {
guard let artErrorInfo = error as? ARTErrorInfo else {
preconditionFailure("Expected ARTErrorInfo, got \(error)")
Expand All @@ -83,7 +83,7 @@ internal final class DefaultCoreSDK: CoreSDK {
)
}

internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> Void) {
internal func testsOnly_overridePublish(with newImplementation: @escaping ([OutboundObjectMessage]) async throws(ARTErrorInfo) -> PublishResult) {
mutex.withLock {
overriddenPublishImplementation = newImplementation
}
Expand Down
47 changes: 39 additions & 8 deletions Sources/AblyLiveObjects/Internal/DefaultInternalPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
/// A class that wraps an object message.
///
/// We need this intermediate type because we want object messages to be structs — because they're nicer to work with internally — but a struct can't conform to the class-bound `_AblyPluginSupportPrivate.ObjectMessageProtocol`.
private final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
internal final class ObjectMessageBox<T>: _AblyPluginSupportPrivate.ObjectMessageProtocol where T: Sendable {
internal let objectMessage: T

init(objectMessage: T) {
internal init(objectMessage: T) {
self.objectMessage = objectMessage
}
}
Expand Down Expand Up @@ -143,11 +143,32 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
)
}

internal func nosync_onChannelStateChanged(_ channel: _AblyPluginSupportPrivate.RealtimeChannel, toState state: _AblyPluginSupportPrivate.RealtimeChannelState, reason: (any _AblyPluginSupportPrivate.PublicErrorInfo)?) {
let errorReason = reason.map { ARTErrorInfo.castPluginPublicErrorInfo($0) }
nosync_realtimeObjects(for: channel).nosync_onChannelStateChanged(toState: state, reason: errorReason)
}

internal func nosync_onConnected(withConnectionDetails connectionDetails: (any ConnectionDetailsProtocol)?, channel: any RealtimeChannel) {
let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
let realtimeObjects = nosync_realtimeObjects(for: channel)

let gracePeriod = connectionDetails?.objectsGCGracePeriod?.doubleValue ?? InternalDefaultRealtimeObjects.GarbageCollectionOptions.defaultGracePeriod
// RTO10b
nosync_realtimeObjects(for: channel).nosync_setGarbageCollectionGracePeriod(gracePeriod)
realtimeObjects.nosync_setGarbageCollectionGracePeriod(gracePeriod)

// Push the siteCode from connectionDetails
let siteCode: String? = {
guard let connectionDetails else {
return nil
}

// This is a fallback; our ably-cocoa dependency version should ensure that this is never triggered.
guard (connectionDetails as AnyObject).responds(to: #selector(ConnectionDetailsProtocol.siteCode)) else {
preconditionFailure("ably-cocoa's connectionDetails does not implement siteCode. Please update ably-cocoa to a version that supports apply-on-ACK.")
}

return connectionDetails.siteCode?()
}()
realtimeObjects.nosync_setSiteCode(siteCode)
}

// MARK: - Sending `OBJECT` ProtocolMessage
Expand All @@ -157,21 +178,31 @@ internal final class DefaultInternalPlugin: NSObject, _AblyPluginSupportPrivate.
channel: _AblyPluginSupportPrivate.RealtimeChannel,
client: _AblyPluginSupportPrivate.RealtimeClient,
pluginAPI: PluginAPIProtocol,
callback: @escaping @Sendable (Result<Void, ARTErrorInfo>) -> Void,
callback: @escaping @Sendable (Result<PublishResult, ARTErrorInfo>) -> Void,
) {
let objectMessageBoxes: [ObjectMessageBox<OutboundObjectMessage>] = objectMessages.map { .init(objectMessage: $0) }
let internalQueue = pluginAPI.internalQueue(for: client)

pluginAPI.nosync_sendObject(
// This is a fallback; our ably-cocoa dependency version should ensure that this is never triggered.
guard (pluginAPI as AnyObject).responds(to: #selector(PluginAPIProtocol.nosync_sendObject(withObjectMessages:channel:completionWithResult:))) else {
preconditionFailure("ably-cocoa does not implement nosync_sendObjectWithObjectMessages:channel:completionWithResult:. Please update ably-cocoa to a version that supports apply-on-ACK.")
}

pluginAPI.nosync_sendObject!(
withObjectMessages: objectMessageBoxes,
channel: channel,
) { error in
) { error, pluginPublishResult in
dispatchPrecondition(condition: .onQueue(internalQueue))

if let error {
callback(.failure(ARTErrorInfo.castPluginPublicErrorInfo(error)))
} else {
callback(.success(()))
guard let pluginPublishResult else {
preconditionFailure("Got nil publishResult and nil error")
}

let publishResult = PublishResult(pluginPublishResult: pluginPublishResult)
callback(.success(publishResult))
}
}
}
Expand Down
36 changes: 26 additions & 10 deletions Sources/AblyLiveObjects/Internal/InternalDefaultLiveCounter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ internal final class InternalDefaultLiveCounter: Sendable {
}
}

internal func increment(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
internal func increment(amount: Double, coreSDK: CoreSDK, realtimeObjects: any InternalRealtimeObjectsProtocol) async throws(ARTErrorInfo) {
try await withCheckedContinuation { (continuation: CheckedContinuation<Result<Void, ARTErrorInfo>, _>) in
do throws(ARTErrorInfo) {
try mutableStateMutex.withSync { mutableState throws(ARTErrorInfo) in
Expand Down Expand Up @@ -131,8 +131,8 @@ internal final class InternalDefaultLiveCounter: Sendable {
),
)

// RTLC12f
coreSDK.nosync_publish(objectMessages: [objectMessage]) { result in
// RTLC12g
realtimeObjects.nosync_publishAndApply(objectMessages: [objectMessage], coreSDK: coreSDK) { result in
continuation.resume(returning: result)
}
}
Expand All @@ -142,9 +142,9 @@ internal final class InternalDefaultLiveCounter: Sendable {
}.get()
}

internal func decrement(amount: Double, coreSDK: CoreSDK) async throws(ARTErrorInfo) {
internal func decrement(amount: Double, coreSDK: CoreSDK, realtimeObjects: any InternalRealtimeObjectsProtocol) async throws(ARTErrorInfo) {
// RTLC13b
try await increment(amount: -amount, coreSDK: coreSDK)
try await increment(amount: -amount, coreSDK: coreSDK, realtimeObjects: realtimeObjects)
}

@discardableResult
Expand Down Expand Up @@ -245,16 +245,20 @@ internal final class InternalDefaultLiveCounter: Sendable {
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
///
/// - Returns: `true` if the operation was applied, `false` if it was skipped (RTLC7g).
internal func nosync_apply(
_ operation: ObjectOperation,
source: ObjectsOperationSource,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
) {
) -> Bool {
mutableStateMutex.withoutSync { mutableState in
mutableState.apply(
operation,
source: source,
objectMessageSerial: objectMessageSerial,
objectMessageSiteCode: objectMessageSiteCode,
objectMessageSerialTimestamp: objectMessageSerialTimestamp,
Expand Down Expand Up @@ -379,29 +383,34 @@ internal final class InternalDefaultLiveCounter: Sendable {
}

/// Attempts to apply an operation from an inbound `ObjectMessage`, per RTLC7.
///
/// - Returns: `true` if the operation was applied, `false` if skipped (RTLC7g).
internal mutating func apply(
_ operation: ObjectOperation,
source: ObjectsOperationSource,
objectMessageSerial: String?,
objectMessageSiteCode: String?,
objectMessageSerialTimestamp: Date?,
objectsPool: inout ObjectsPool,
logger: Logger,
clock: SimpleClock,
userCallbackQueue: DispatchQueue,
) {
) -> Bool {
guard let applicableOperation = liveObjectMutableState.canApplyOperation(objectMessageSerial: objectMessageSerial, objectMessageSiteCode: objectMessageSiteCode, logger: logger) else {
// RTLC7b
logger.log("Operation \(operation) (serial: \(String(describing: objectMessageSerial)), siteCode: \(String(describing: objectMessageSiteCode))) should not be applied; discarding", level: .debug)
return
return false
}

// RTLC7c
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
if source == .channel {
liveObjectMutableState.siteTimeserials[applicableOperation.objectMessageSiteCode] = applicableOperation.objectMessageSerial
}

// RTLC7e
// TODO: are we still meant to update siteTimeserials? https://github.com/ably/specification/pull/350/files#r2218718854
if liveObjectMutableState.isTombstone {
return
return false
}

switch operation.action {
Expand All @@ -413,11 +422,15 @@ internal final class InternalDefaultLiveCounter: Sendable {
)
// RTLC7d1a
liveObjectMutableState.emit(update, on: userCallbackQueue)
// RTLC7d1b
return true
case .known(.counterInc):
// RTLC7d2
let update = applyCounterIncOperation(operation.counterOp)
// RTLC7d2a
liveObjectMutableState.emit(update, on: userCallbackQueue)
// RTLC7d2b
return true
case .known(.objectDelete):
let dataBeforeApplyingOperation = data

Expand All @@ -431,9 +444,12 @@ internal final class InternalDefaultLiveCounter: Sendable {

// RTLC7d4a
liveObjectMutableState.emit(.update(.init(amount: -dataBeforeApplyingOperation)), on: userCallbackQueue)
// RTLC7d4b
return true
default:
// RTLC7d3
logger.log("Operation \(operation) has unsupported action for LiveCounter; discarding", level: .warn)
return false
}
}

Expand Down
Loading
Loading