From 6e29d884d508804f195ef08e47dfa64f1a779a44 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Thu, 4 Apr 2024 11:53:05 -0700 Subject: [PATCH] refactor(datastore-v2): use api plugin with async sequences --- .../Operation/RetryableGraphQLOperation.swift | 266 ++++++++---------- .../AmplifyAsyncThrowingSequence.swift | 2 + .../AmplifyTask+OperationTaskAdapters.swift | 4 +- .../Sources/AWSAPIPlugin/AWSAPIPlugin.swift | 2 +- .../AWSGraphQLSubscriptionTaskRunner.swift | 4 +- .../APICategoryGraphQLBehaviorExtended.swift | 41 --- .../Auth/AWSAuthModeStrategy.swift | 30 ++ .../StorageEngine+SyncRequirement.swift | 2 +- .../InitialSync/InitialSyncOperation.swift | 73 ++--- .../InitialSync/InitialSyncOrchestrator.swift | 6 +- .../OutgoingMutationQueue+Action.swift | 2 +- .../OutgoingMutationQueue+State.swift | 2 +- .../OutgoingMutationQueue.swift | 12 +- ...ocessMutationErrorFromCloudOperation.swift | 62 ++-- .../SyncMutationToCloudOperation.swift | 85 +++--- .../Sync/RemoteSyncEngine+Action.swift | 4 +- .../Sync/RemoteSyncEngine+State.swift | 4 +- .../Sync/RemoteSyncEngine.swift | 8 +- .../Sync/RemoteSyncEngineBehavior.swift | 2 +- .../AWSIncomingEventReconciliationQueue.swift | 4 +- ...WSIncomingSubscriptionEventPublisher.swift | 2 +- ...omingAsyncSubscriptionEventPublisher.swift | 174 ++++++------ .../AWSModelReconciliationQueue.swift | 4 +- .../Sync/Support/AsyncStream+Extensions.swift | 21 ++ .../Mocks/MockOutgoingMutationQueue.swift | 2 +- .../Mocks/MockRemoteSyncEngine.swift | 2 +- .../TestSupport/Mocks/NoOpMutationQueue.swift | 2 +- .../DataStoreConnectionScenario1Tests.swift | 2 +- .../DataStoreHubEventsTests.swift | 1 - ...reLargeNumberModelsSubscriptionTests.swift | 2 +- .../HubEventsIntegrationTestBase.swift | 4 +- .../Mocks/MockAPICategoryPlugin.swift | 2 +- 32 files changed, 423 insertions(+), 410 deletions(-) delete mode 100644 AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift create mode 100644 AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift diff --git a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift index ed2a6e2753..86281585b0 100644 --- a/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift +++ b/Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift @@ -6,183 +6,159 @@ // import Foundation +import Combine -/// Convenience protocol to handle any kind of GraphQLOperation -public protocol AnyGraphQLOperation { - associatedtype Success - associatedtype Failure: Error - typealias ResultListener = (Result) -> Void -} -/// Abastraction for a retryable GraphQLOperation. -public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger { - associatedtype Payload: Decodable +// MARK: - RetryableGraphQLOperation +public final class RetryableGraphQLOperation { + public typealias Payload = Payload - /// GraphQLOperation concrete type - associatedtype OperationType: AnyGraphQLOperation + public let requestFactory: AsyncStream<() -> GraphQLRequest> + public weak var api: APICategoryGraphQLBehavior? + private var task: Task? - typealias RequestFactory = () async -> GraphQLRequest - typealias OperationFactory = (GraphQLRequest, @escaping OperationResultListener) -> OperationType - typealias OperationResultListener = OperationType.ResultListener + public init( + requestFactory: T, + api: APICategoryGraphQLBehavior + ) where T.Element == () -> GraphQLRequest { + self.requestFactory = requestFactory.asyncStream + self.api = api + } - /// Operation unique identifier - var id: UUID { get } + deinit { + cancel() + } - /// Number of attempts (min 1) - var attempts: Int { get set } + public func execute( + _ operationType: GraphQLOperationType + ) -> Future.Success, APIError> { + Future() { promise in + self.task = Task { promise(await self.run(operationType)) } + } + } - /// Underlying GraphQL operation instantiated by `operationFactory` - var underlyingOperation: AtomicValue { get set } + public func run(_ operationType: GraphQLOperationType) async -> Result.Success, APIError> { + for await request in requestFactory { + do { + try Task.checkCancellation() + switch (self.api, operationType) { + case (.some(let api), .query): + return .success(try await api.query(request: request())) + case (.some(let api), .mutation): + return .success(try await api.mutate(request: request())) + default: + return .failure(.operationError("Unable to run GraphQL operation with type \(operationType)", "")) + } - /// Maximum number of allowed retries - var maxRetries: Int { get } + } catch is CancellationError { + return .failure(.operationError("GraphQL operation cancelled", "")) + } catch { + guard let error = error as? APIError, + let authError = error.underlyingError as? AuthError + else { + return .failure(.operationError("Failed to send \(operationType) GraphQL request", "", error)) + } - /// GraphQLRequest factory, invoked to create a new operation - var requestFactory: RequestFactory { get } + switch authError { + case .signedOut, .notAuthorized: break; + default: return .failure(error) + } + } + } + return .failure(APIError.operationError("Failed to execute GraphQL operation \(operationType)", "", nil)) + } - /// GraphQL operation factory, invoked with a newly created GraphQL request - /// and a wrapped result listener. - var operationFactory: OperationFactory { get } + public func cancel() { + task?.cancel() + } - var resultListener: OperationResultListener { get } +} - init(requestFactory: @escaping RequestFactory, - maxRetries: Int, - resultListener: @escaping OperationResultListener, - _ operationFactory: @escaping OperationFactory) +public final class RetryableGraphQLSubscriptionOperation { - func start(request: GraphQLRequest) + public typealias Payload = Payload - func shouldRetry(error: APIError?) -> Bool -} + public let requestFactory: AsyncStream<() async -> GraphQLRequest> + public weak var api: APICategoryGraphQLBehavior? + private var task: Task? -extension RetryableGraphQLOperationBehavior { - public static var log: Logger { - Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self)) + public init( + requestFactory: T, + api: APICategoryGraphQLBehavior + ) where T.Element == () async -> GraphQLRequest { + self.requestFactory = requestFactory.asyncStream + self.api = api } - public var log: Logger { - Self.log + + deinit { + cancel() } -} -// MARK: RetryableGraphQLOperationBehavior + default implementation -extension RetryableGraphQLOperationBehavior { - public func start(request: GraphQLRequest) { - attempts += 1 - log.debug("[\(id)] - Try [\(attempts)/\(maxRetries)]") - let wrappedResultListener: OperationResultListener = { result in - if case let .failure(error) = result, self.shouldRetry(error: error as? APIError) { - self.log.debug("\(error)") - Task { - self.start(request: await self.requestFactory()) - } - return - } + public func subscribe() -> AnyPublisher, APIError> { + let subject = PassthroughSubject, APIError>() + self.task = Task { await self.trySubscribe(subject) } + return subject.eraseToAnyPublisher() + } - if case let .failure(error) = result { - self.log.debug("\(error)") - self.log.debug("[\(self.id)] - Failed") + private func trySubscribe(_ subject: PassthroughSubject, APIError>) async { + var apiError: APIError? + for await request in requestFactory { + guard let sequence = self.api?.subscribe(request: await request()) else { + continue } + do { + try Task.checkCancellation() - if case .success = result { - self.log.debug("[Operation \(self.id)] - Success") + for try await event in sequence { + try Task.checkCancellation() + Self.log.debug("Subscribe event \(event)") + subject.send(event) + } + } catch is CancellationError { + subject.send(completion: .finished) + } catch { + if let error = error as? APIError { + apiError = error + } + Self.log.debug("Failed with subscription request: \(error)") } - self.resultListener(result) + sequence.cancel() } - underlyingOperation.set(operationFactory(request, wrappedResultListener)) - } -} - -// MARK: - RetryableGraphQLOperation -public final class RetryableGraphQLOperation: Operation, RetryableGraphQLOperationBehavior { - public typealias Payload = Payload - public typealias OperationType = GraphQLOperation - - public var id: UUID - public var maxRetries: Int - public var attempts: Int = 0 - public var requestFactory: RequestFactory - public var underlyingOperation: AtomicValue?> = AtomicValue(initialValue: nil) - public var resultListener: OperationResultListener - public var operationFactory: OperationFactory - - public init(requestFactory: @escaping RequestFactory, - maxRetries: Int, - resultListener: @escaping OperationResultListener, - _ operationFactory: @escaping OperationFactory) { - self.id = UUID() - self.maxRetries = max(1, maxRetries) - self.requestFactory = requestFactory - self.operationFactory = operationFactory - self.resultListener = resultListener - } - - public override func main() { - Task { - start(request: await requestFactory()) + if apiError != nil { + subject.send(completion: .failure(apiError!)) + } else { + subject.send(completion: .finished) } } - override public func cancel() { - self.underlyingOperation.get()?.cancel() - } - - public func shouldRetry(error: APIError?) -> Bool { - guard case let .operationError(_, _, underlyingError) = error, - let authError = underlyingError as? AuthError else { - return false - } - - switch authError { - case .signedOut, .notAuthorized: - return attempts < maxRetries - default: - return false - } + public func cancel() { + self.task?.cancel() } } -// MARK: - RetryableGraphQLSubscriptionOperation -public final class RetryableGraphQLSubscriptionOperation: Operation, - RetryableGraphQLOperationBehavior { - public typealias OperationType = GraphQLSubscriptionOperation - - public typealias Payload = Payload - - public var id: UUID - public var maxRetries: Int - public var attempts: Int = 0 - public var underlyingOperation: AtomicValue?> = AtomicValue(initialValue: nil) - public var requestFactory: RequestFactory - public var resultListener: OperationResultListener - public var operationFactory: OperationFactory - - public init(requestFactory: @escaping RequestFactory, - maxRetries: Int, - resultListener: @escaping OperationResultListener, - _ operationFactory: @escaping OperationFactory) { - self.id = UUID() - self.maxRetries = max(1, maxRetries) - self.requestFactory = requestFactory - self.operationFactory = operationFactory - self.resultListener = resultListener - } - public override func main() { - Task { - start(request: await requestFactory()) +extension AsyncSequence { + fileprivate var asyncStream: AsyncStream { + AsyncStream { continuation in + Task { + var it = self.makeAsyncIterator() + do { + while let ele = try await it.next() { + continuation.yield(ele) + } + continuation.finish() + } catch { + continuation.finish() + } + } } } +} - public override func cancel() { - self.underlyingOperation.get()?.cancel() +extension RetryableGraphQLSubscriptionOperation { + public static var log: Logger { + Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self)) } - - public func shouldRetry(error: APIError?) -> Bool { - return attempts < maxRetries + public var log: Logger { + Self.log } - } - -// MARK: GraphQLOperation - GraphQLSubscriptionOperation + AnyGraphQLOperation -extension GraphQLOperation: AnyGraphQLOperation {} -extension GraphQLSubscriptionOperation: AnyGraphQLOperation {} diff --git a/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift b/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift index 38772392da..6a4841f13b 100644 --- a/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift +++ b/Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift @@ -6,6 +6,7 @@ // import Foundation +import Combine public typealias WeakAmplifyAsyncThrowingSequenceRef = WeakRef> @@ -49,4 +50,5 @@ public class AmplifyAsyncThrowingSequence: AsyncSequence, Can parent?.cancel() finish() } + } diff --git a/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift b/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift index fb56c6df18..50e505bce9 100644 --- a/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift +++ b/Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift @@ -20,7 +20,9 @@ public class AmplifyOperationTaskAdapter) { self.operation = operation self.childTask = ChildTask(parent: operation) - resultToken = operation.subscribe(resultListener: resultListener) + resultToken = operation.subscribe { [weak self] in + self?.resultListener($0) + } } deinit { diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift index ce124f1f54..9d3117dd17 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift @@ -9,7 +9,7 @@ import Amplify import AWSPluginsCore import Foundation -final public class AWSAPIPlugin: NSObject, APICategoryPlugin, APICategoryGraphQLBehaviorExtended, AWSAPIAuthInformation { +final public class AWSAPIPlugin: NSObject, APICategoryPlugin, AWSAPIAuthInformation { /// The unique key of the plugin within the API category. public var key: PluginKey { return "awsAPIPlugin" diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift index 3e70654298..c58bc2da90 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift @@ -106,8 +106,8 @@ public class AWSGraphQLSubscriptionTaskRunner: InternalTaskRunner, self.subscription = try await appSyncClient?.subscribe( id: subscriptionId, query: encodeRequest(query: request.document, variables: request.variables) - ).sink(receiveValue: { [weak self] event in - self?.onAsyncSubscriptionEvent(event: event) + ).sink(receiveValue: { event in + self.onAsyncSubscriptionEvent(event: event) }) } catch { let error = APIError.operationError("Unable to get connection for api \(endpointConfig.name)", "", error) diff --git a/AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift b/AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift deleted file mode 100644 index 91d56f9763..0000000000 --- a/AmplifyPlugins/Core/AWSPluginsCore/API/APICategoryGraphQLBehaviorExtended.swift +++ /dev/null @@ -1,41 +0,0 @@ -// -// Copyright Amazon.com Inc. or its affiliates. -// All Rights Reserved. -// -// SPDX-License-Identifier: Apache-2.0 -// - -import Foundation -import Amplify - -/// Extending the existing `APICategoryGraphQLBehavior` to include callback based APIs. -/// -/// This exists to allow DataStore to continue to use the `APICategoryGraphQLCallbackBehavior` APIs without exposing -/// them publicly from Amplify in `APICategoryGraphQLBehavior`. Eventually, the goal is for DataStore to use the -/// Async APIs, at which point, this protocol can be completely removed. Introducing this protocol allows Amplify to -/// to fully deprecate the callback based APIs, while allowing DataStore a gradual migration path forward in moving -/// away from APIPlugin's callback APIs to the Async APIs. -/// See https://github.com/aws-amplify/amplify-ios/issues/2252 for more details -/// -/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly -/// by host applications. The behavior of this may change without warning. -public protocol APICategoryGraphQLBehaviorExtended: - APICategoryGraphQLCallbackBehavior, APICategoryGraphQLBehavior, AnyObject { } - -/// Listener callback based APIs -/// -/// - Warning: Although this has `public` access, it is intended for internal use and should not be used directly -/// by host applications. The behavior of this may change without warning. -public protocol APICategoryGraphQLCallbackBehavior { - @discardableResult - func query(request: GraphQLRequest, - listener: GraphQLOperation.ResultListener?) -> GraphQLOperation - @discardableResult - func mutate(request: GraphQLRequest, - listener: GraphQLOperation.ResultListener?) -> GraphQLOperation - - func subscribe(request: GraphQLRequest, - valueListener: GraphQLSubscriptionOperation.InProcessListener?, - completionListener: GraphQLSubscriptionOperation.ResultListener?) - -> GraphQLSubscriptionOperation -} diff --git a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift index 020dc68e60..eeeea09ff1 100644 --- a/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift +++ b/AmplifyPlugins/Core/AWSPluginsCore/Auth/AWSAuthModeStrategy.swift @@ -6,6 +6,7 @@ // import Foundation +import Combine import Amplify /// Represents different auth strategies supported by a client @@ -95,6 +96,35 @@ public struct AWSAuthorizationTypeIterator: AuthorizationTypeIterator { } } +extension AuthorizationTypeIterator { + public var asyncStream: AsyncStream { + var it = self + return AsyncStream { continuation in + while let authType = it.next() { + continuation.yield(authType) + } + continuation.finish() + } + } + + public var optionalAsyncStream: AsyncStream { + var it = self + if it.hasNext { + return AsyncStream { continuation in + while let authType = it.next() { + continuation.yield(authType) + } + continuation.finish() + } + } else { + return AsyncStream { continuation in + continuation.yield(nil) + continuation.finish() + } + } + } +} + // MARK: - AWSDefaultAuthModeStrategy /// AWS default auth mode strategy. diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift index 0b99894ea6..b6a8aac20c 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift @@ -25,7 +25,7 @@ extension StorageEngine { )) } - guard let apiGraphQL = api as? APICategoryGraphQLBehaviorExtended else { + guard let apiGraphQL = api as? APICategoryGraphQLBehavior else { log.info("Unable to find GraphQL API plugin for syncEngine. syncEngine will not be started") return .failure(.configuration( "Unable to find suitable GraphQL API plugin for syncEngine. syncEngine will not be started", diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift index e01f235b88..80b29e2245 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOperation.swift @@ -13,7 +13,7 @@ import Foundation final class InitialSyncOperation: AsynchronousOperation { typealias SyncQueryResult = PaginatedList - private weak var api: APICategoryGraphQLBehaviorExtended? + private weak var api: APICategoryGraphQLBehavior? private weak var reconciliationQueue: IncomingEventReconciliationQueue? private weak var storageAdapter: StorageEngineAdapter? private let dataStoreConfiguration: DataStoreConfiguration @@ -22,6 +22,7 @@ final class InitialSyncOperation: AsynchronousOperation { private let modelSchema: ModelSchema private var recordsReceived: UInt + private var queryTask: Task? private var syncMaxRecords: UInt { return dataStoreConfiguration.syncMaxRecords @@ -61,7 +62,7 @@ final class InitialSyncOperation: AsynchronousOperation { } init(modelSchema: ModelSchema, - api: APICategoryGraphQLBehaviorExtended?, + api: APICategoryGraphQLBehavior?, reconciliationQueue: IncomingEventReconciliationQueue?, storageAdapter: StorageEngineAdapter?, dataStoreConfiguration: DataStoreConfiguration, @@ -86,7 +87,7 @@ final class InitialSyncOperation: AsynchronousOperation { log.info("Beginning sync for \(modelSchema.name)") let lastSyncMetadata = getLastSyncMetadata() let lastSyncTime = getLastSyncTime(lastSyncMetadata) - Task { + self.queryTask = Task { await query(lastSyncTime: lastSyncTime) } } @@ -168,42 +169,41 @@ final class InitialSyncOperation: AsynchronousOperation { } let minSyncPageSize = Int(min(syncMaxRecords - recordsReceived, syncPageSize)) let limit = minSyncPageSize < 0 ? Int(syncPageSize) : minSyncPageSize - let completionListener: GraphQLOperation.ResultListener = { result in - switch result { - case .failure(let apiError): - if self.isAuthSignedOutError(apiError: apiError) { - self.log.error("Sync for \(self.modelSchema.name) failed due to signed out error \(apiError.errorDescription)") - } - - // TODO: Retry query on error - let error = DataStoreError.api(apiError) - self.dataStoreConfiguration.errorHandler(error) - self.finish(result: .failure(error)) - case .success(let graphQLResult): - self.handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult) + let authTypes = await authModeStrategy.authTypesFor(schema: modelSchema, operation: .read) + .optionalAsyncStream.map { authType in { + GraphQLRequest.syncQuery(modelSchema: self.modelSchema, + where: self.syncPredicate, + limit: limit, + nextToken: nextToken, + lastSync: lastSyncTime, + authType: authType + ) + }} + + + let result: Result, APIError> = await RetryableGraphQLOperation( + requestFactory: authTypes, + api: api + ).run(.query) + + switch result { + case .success(let graphQLResult): + await handleQueryResults(lastSyncTime: lastSyncTime, graphQLResult: graphQLResult) + case .failure(let apiError): + if self.isAuthSignedOutError(apiError: apiError) { + self.log.error("Sync for \(self.modelSchema.name) failed due to signed out error \(apiError.errorDescription)") } + self.dataStoreConfiguration.errorHandler(DataStoreError.api(apiError)) + self.finish(result: .failure(.api(apiError))) } - - var authTypes = await authModeStrategy.authTypesFor(schema: modelSchema, operation: .read) - - RetryableGraphQLOperation(requestFactory: { - GraphQLRequest.syncQuery(modelSchema: self.modelSchema, - where: self.syncPredicate, - limit: limit, - nextToken: nextToken, - lastSync: lastSyncTime, - authType: authTypes.next()) - }, - maxRetries: authTypes.count, - resultListener: completionListener) { nextRequest, wrappedCompletionListener in - api.query(request: nextRequest, listener: wrappedCompletionListener) - }.main() } /// Disposes of the query results: Stops if error, reconciles results if success, and kick off a new query if there /// is a next token - private func handleQueryResults(lastSyncTime: Int64?, - graphQLResult: Result>) { + private func handleQueryResults( + lastSyncTime: Int64?, + graphQLResult: Result> + ) async { guard !isCancelled else { finish(result: .successfulVoid) return @@ -238,9 +238,7 @@ final class InitialSyncOperation: AsynchronousOperation { } if let nextToken = syncQueryResult.nextToken, recordsReceived < syncMaxRecords { - Task { - await self.query(lastSyncTime: lastSyncTime, nextToken: nextToken) - } + await self.query(lastSyncTime: lastSyncTime, nextToken: nextToken) } else { updateModelSyncMetadata(lastSyncTime: syncQueryResult.startedAt) } @@ -292,6 +290,9 @@ final class InitialSyncOperation: AsynchronousOperation { super.finish() } + override func cancel() { + self.queryTask?.cancel() + } } extension InitialSyncOperation: DefaultLogger { diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift index dbfe953ab1..806b19a240 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/InitialSync/InitialSyncOrchestrator.swift @@ -19,7 +19,7 @@ protocol InitialSyncOrchestrator { typealias InitialSyncOrchestratorFactory = (DataStoreConfiguration, AuthModeStrategy, - APICategoryGraphQLBehaviorExtended?, + APICategoryGraphQLBehavior?, IncomingEventReconciliationQueue?, StorageEngineAdapter?) -> InitialSyncOrchestrator @@ -30,7 +30,7 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator { private var initialSyncOperationSinks: [String: AnyCancellable] private let dataStoreConfiguration: DataStoreConfiguration - private weak var api: APICategoryGraphQLBehaviorExtended? + private weak var api: APICategoryGraphQLBehavior? private weak var reconciliationQueue: IncomingEventReconciliationQueue? private weak var storageAdapter: StorageEngineAdapter? private let authModeStrategy: AuthModeStrategy @@ -52,7 +52,7 @@ final class AWSInitialSyncOrchestrator: InitialSyncOrchestrator { init(dataStoreConfiguration: DataStoreConfiguration, authModeStrategy: AuthModeStrategy, - api: APICategoryGraphQLBehaviorExtended?, + api: APICategoryGraphQLBehavior?, reconciliationQueue: IncomingEventReconciliationQueue?, storageAdapter: StorageEngineAdapter?) { self.initialSyncOperationSinks = [:] diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift index a9c8309ad6..f042cfab00 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+Action.swift @@ -15,7 +15,7 @@ extension OutgoingMutationQueue { enum Action { // Startup/config actions case initialized - case receivedStart(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?) + case receivedStart(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?) case receivedSubscription // Event loop diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift index f7c59eb8ea..b8839a4b3e 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue+State.swift @@ -16,7 +16,7 @@ extension OutgoingMutationQueue { // Startup/config states case notInitialized case stopped - case starting(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?) + case starting(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?) // Event loop case requestingEvent diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 26cde77852..98ce9a3ce2 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -13,7 +13,7 @@ import AWSPluginsCore /// Submits outgoing mutation events to the provisioned API protocol OutgoingMutationQueueBehavior: AnyObject { func stopSyncingToCloud(_ completion: @escaping BasicClosure) - func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended, + func startSyncingToCloud(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher, reconciliationQueue: IncomingEventReconciliationQueue?) var publisher: AnyPublisher { get } @@ -32,7 +32,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { target: DispatchQueue.global() ) - private weak var api: APICategoryGraphQLBehaviorExtended? + private weak var api: APICategoryGraphQLBehavior? private weak var reconciliationQueue: IncomingEventReconciliationQueue? private var subscription: Subscription? @@ -84,7 +84,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { // MARK: - Public API - func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended, + func startSyncingToCloud(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher, reconciliationQueue: IncomingEventReconciliationQueue?) { log.verbose(#function) @@ -130,7 +130,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { /// Responder method for `starting`. Starts the operation queue and subscribes to /// the publisher. After subscribing to the publisher, return actions: /// - receivedSubscription - private func doStart(api: APICategoryGraphQLBehaviorExtended, + private func doStart(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher, reconciliationQueue: IncomingEventReconciliationQueue?) { log.verbose(#function) @@ -222,7 +222,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { private func processSyncMutationToCloudResult(_ result: GraphQLOperation>.OperationResult, mutationEvent: MutationEvent, - api: APICategoryGraphQLBehaviorExtended) { + api: APICategoryGraphQLBehavior) { if case let .success(graphQLResponse) = result { if case let .success(graphQLResult) = graphQLResponse { processSuccessEvent(mutationEvent, @@ -271,7 +271,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { } private func processMutationErrorFromCloud(mutationEvent: MutationEvent, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, apiError: APIError?, graphQLResponseError: GraphQLResponseError>?) { if let apiError = apiError, apiError.isOperationCancelledError { diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift index bbc4ec0895..c334745fb7 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/ProcessMutationErrorFromCloudOperation.swift @@ -27,12 +27,12 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation { private let apiError: APIError? private let completion: (Result) -> Void private var mutationOperation: AtomicValue>?> - private weak var api: APICategoryGraphQLBehaviorExtended? + private weak var api: APICategoryGraphQLBehavior? private weak var reconciliationQueue: IncomingEventReconciliationQueue? init(dataStoreConfiguration: DataStoreConfiguration, mutationEvent: MutationEvent, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, storageAdapter: StorageEngineAdapter, graphQLResponseError: GraphQLResponseError>? = nil, apiError: APIError? = nil, @@ -296,44 +296,44 @@ class ProcessMutationErrorFromCloudOperation: AsynchronousOperation { } log.verbose("\(#function) sending mutation with data: \(apiRequest)") - let graphQLOperation = api.mutate(request: apiRequest) { [weak self] result in - guard let self = self, !self.isCancelled else { - return - } + Task { [weak self] in + do { + let result = try await api.mutate(request: apiRequest) + guard let self = self, !self.isCancelled else { + self?.finish(result: .failure(APIError.operationError("Mutation operation cancelled", ""))) + return + } - self.log.verbose("sendMutationToCloud received asyncEvent: \(result)") - self.validate(cloudResult: result, request: apiRequest) + self.log.verbose("sendMutationToCloud received asyncEvent: \(result)") + self.validate(cloudResult: result, request: apiRequest) + } catch { + self?.finish(result: .failure(APIError.operationError("Failed to do mutation", "", error))) + } } - mutationOperation.set(graphQLOperation) } - private func validate(cloudResult: MutationSyncCloudResult, request: MutationSyncAPIRequest) { + private func validate(cloudResult: GraphQLResponse, request: MutationSyncAPIRequest) { guard !isCancelled else { return } - if case .failure(let error) = cloudResult { - dataStoreConfiguration.errorHandler(error) - } - - if case let .success(graphQLResponse) = cloudResult { - if case .failure(let error) = graphQLResponse { - dataStoreConfiguration.errorHandler(error) - } else if case let .success(graphQLResult) = graphQLResponse { - guard let reconciliationQueue = reconciliationQueue else { - let dataStoreError = DataStoreError.configuration( - "reconciliationQueue is unexpectedly nil", - """ - The reference to reconciliationQueue has been released while an ongoing mutation was being processed. - \(AmplifyErrorMessages.reportBugToAWS()) - """ - ) - finish(result: .failure(dataStoreError)) - return - } - - reconciliationQueue.offer([graphQLResult], modelName: mutationEvent.modelName) + switch cloudResult { + case .success(let mutationSyncResult): + guard let reconciliationQueue = reconciliationQueue else { + let dataStoreError = DataStoreError.configuration( + "reconciliationQueue is unexpectedly nil", + """ + The reference to reconciliationQueue has been released while an ongoing mutation was being processed. + \(AmplifyErrorMessages.reportBugToAWS()) + """ + ) + finish(result: .failure(dataStoreError)) + return } + + reconciliationQueue.offer([mutationSyncResult], modelName: mutationEvent.modelName) + case .failure(let graphQLResponseError): + dataStoreConfiguration.errorHandler(graphQLResponseError) } finish(result: .success(nil)) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift index 3732bafb4a..db45220167 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/SyncMutationToCloudOperation.swift @@ -17,23 +17,23 @@ class SyncMutationToCloudOperation: AsynchronousOperation { typealias MutationSyncCloudResult = GraphQLOperation>.OperationResult - private weak var api: APICategoryGraphQLBehaviorExtended? + private weak var api: APICategoryGraphQLBehavior? private let mutationEvent: MutationEvent private let getLatestSyncMetadata: () -> MutationSyncMetadata? private let completion: GraphQLOperation>.ResultListener private let requestRetryablePolicy: RequestRetryablePolicy - private let lock: NSRecursiveLock +// private let lock: NSRecursiveLock private var networkReachabilityPublisher: AnyPublisher? - private var mutationOperation: GraphQLOperation>? + private var mutationOperation: Task? private var mutationRetryNotifier: MutationRetryNotifier? private var currentAttemptNumber: Int private var authTypesIterator: AWSAuthorizationTypeIterator? init(mutationEvent: MutationEvent, getLatestSyncMetadata: @escaping () -> MutationSyncMetadata?, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, authModeStrategy: AuthModeStrategy, networkReachabilityPublisher: AnyPublisher? = nil, currentAttemptNumber: Int = 1, @@ -46,7 +46,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { self.completion = completion self.currentAttemptNumber = currentAttemptNumber self.requestRetryablePolicy = requestRetryablePolicy ?? RequestRetryablePolicy() - self.lock = NSRecursiveLock() +// self.lock = NSRecursiveLock() if let modelSchema = ModelRegistry.modelSchema(from: mutationEvent.modelName), let mutationType = GraphQLMutationType(rawValue: mutationEvent.mutationType) { @@ -66,11 +66,11 @@ class SyncMutationToCloudOperation: AsynchronousOperation { override func cancel() { log.verbose(#function) - lock.execute { - mutationOperation?.cancel() - mutationRetryNotifier?.cancel() - mutationRetryNotifier = nil - } +// lock.execute { + mutationOperation?.cancel() + mutationRetryNotifier?.cancel() + mutationRetryNotifier = nil +// } let apiError = APIError(error: OperationCancelledError()) finish(result: .failure(apiError)) @@ -209,41 +209,56 @@ class SyncMutationToCloudOperation: AsynchronousOperation { return } log.verbose("\(#function) sending mutation with sync data: \(apiRequest)") - lock.execute { - mutationOperation = api.mutate(request: apiRequest) { [weak self] result in - self?.respond(toCloudResult: result, withAPIRequest: apiRequest) + + mutationOperation = Task { [weak self] in + let result: GraphQLResponse> + do { + result = try await api.mutate(request: apiRequest) + } catch { + result = .failure(.unknown("Failed to send sync mutation request", "", error)) } + +// self?.lock.execute { [weak self] in + self?.respond( + toCloudResult: result, + withAPIRequest: apiRequest + ) +// } } + } /// Initiates a locking context private func respond( - toCloudResult result: GraphQLOperation>.OperationResult, + toCloudResult result: GraphQLResponse>, withAPIRequest apiRequest: GraphQLRequest> ) { - lock.execute { - guard !self.isCancelled else { - Amplify.log.debug("SyncMutationToCloudOperation cancelled, aborting") - return - } - - log.verbose("GraphQL mutation operation received result: \(result)") - validate(cloudResult: result, request: apiRequest) +// lock.execute { + guard !self.isCancelled else { + Amplify.log.debug("SyncMutationToCloudOperation cancelled, aborting") + return } + + log.verbose("GraphQL mutation operation received result: \(result)") + validate(cloudResult: result, request: apiRequest) +// } } /// - Warning: Must be invoked from a locking context - private func validate(cloudResult: MutationSyncCloudResult, - request: GraphQLRequest>) { - guard !isCancelled else { + private func validate( + cloudResult: GraphQLResponse>, + request: GraphQLRequest> + ) { + guard !isCancelled, let mutationOperation, !mutationOperation.isCancelled else { return } - if case .failure(let error) = cloudResult { - let advice = getRetryAdviceIfRetryable(error: error) + if case .failure(let error) = cloudResult, + let apiError = error.underlyingError as? APIError { + let advice = getRetryAdviceIfRetryable(error: apiError) guard advice.shouldRetry else { - finish(result: .failure(error)) + finish(result: .failure(apiError)) return } @@ -257,7 +272,7 @@ class SyncMutationToCloudOperation: AsynchronousOperation { return } - finish(result: cloudResult) + finish(result: .success(cloudResult)) } /// - Warning: Must be invoked from a locking context @@ -341,20 +356,20 @@ class SyncMutationToCloudOperation: AsynchronousOperation { /// Initiates a locking context private func respondToMutationNotifierTriggered(withAuthType authType: AWSAuthorizationType?) { log.verbose("\(#function) mutationRetryNotifier triggered") - lock.execute { +// lock.execute { sendMutationToCloud(withAuthType: authType) mutationRetryNotifier = nil - } +// } } /// Cleans up operation resources, finalizes AsynchronousOperation states, and invokes `completion` with `result` /// - Parameter result: The MutationSyncCloudResult to pass to `completion` private func finish(result: MutationSyncCloudResult) { log.verbose(#function) - lock.execute { - mutationOperation?.removeResultListener() - mutationOperation = nil - } +// lock.execute { + mutationOperation?.cancel() + mutationOperation = nil +// } DispatchQueue.global().async { self.completion(result) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift index 7421637a54..7ace6cd086 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+Action.swift @@ -18,10 +18,10 @@ extension RemoteSyncEngine { case pausedSubscriptions case pausedMutationQueue(StorageEngineAdapter) - case clearedStateOutgoingMutations(APICategoryGraphQLBehaviorExtended, StorageEngineAdapter) + case clearedStateOutgoingMutations(APICategoryGraphQLBehavior, StorageEngineAdapter) case initializedSubscriptions case performedInitialSync - case activatedCloudSubscriptions(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?) + case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?) case activatedMutationQueue case notifiedSyncStarted case cleanedUp(AmplifyError) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift index d55c3fe5c3..a1ecebfbbb 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine+State.swift @@ -18,10 +18,10 @@ extension RemoteSyncEngine { case pausingSubscriptions case pausingMutationQueue case clearingStateOutgoingMutations(StorageEngineAdapter) - case initializingSubscriptions(APICategoryGraphQLBehaviorExtended, StorageEngineAdapter) + case initializingSubscriptions(APICategoryGraphQLBehavior, StorageEngineAdapter) case performingInitialSync case activatingCloudSubscriptions - case activatingMutationQueue(APICategoryGraphQLBehaviorExtended, MutationEventPublisher, IncomingEventReconciliationQueue?) + case activatingMutationQueue(APICategoryGraphQLBehavior, MutationEventPublisher, IncomingEventReconciliationQueue?) case notifyingSyncStarted case syncEngineActive diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift index 26bb453571..fd30c9ecae 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngine.swift @@ -21,7 +21,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { private var authModeStrategy: AuthModeStrategy // Assigned at `start` - weak var api: APICategoryGraphQLBehaviorExtended? + weak var api: APICategoryGraphQLBehavior? weak var auth: AuthCategoryBehavior? // Assigned and released inside `performInitialQueries`, but we maintain a reference so we can `reset` @@ -197,7 +197,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { } // swiftlint:enable cyclomatic_complexity - func start(api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?) { + func start(api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?) { guard storageAdapter != nil else { log.error(error: DataStoreError.nilStorageAdapter()) remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.nilStorageAdapter())) @@ -280,7 +280,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { } } - private func initializeSubscriptions(api: APICategoryGraphQLBehaviorExtended, + private func initializeSubscriptions(api: APICategoryGraphQLBehavior, storageAdapter: StorageEngineAdapter) async { log.debug("[InitializeSubscription] \(#function)") let syncableModelSchemas = ModelRegistry.modelSchemas.filter { $0.isSyncable } @@ -363,7 +363,7 @@ class RemoteSyncEngine: RemoteSyncEngineBehavior { reconciliationQueue.start() } - private func startMutationQueue(api: APICategoryGraphQLBehaviorExtended, + private func startMutationQueue(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher, reconciliationQueue: IncomingEventReconciliationQueue?) { log.debug(#function) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift index ee5710ff21..765d72473f 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/RemoteSyncEngineBehavior.swift @@ -41,7 +41,7 @@ protocol RemoteSyncEngineBehavior: AnyObject { /// the updates in the Datastore /// 1. Mutation processor drains messages off the queue in serial and sends to the service, invoking /// any local callbacks on error if necessary - func start(api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?) + func start(api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?) func stop(completion: @escaping DataStoreCallback) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift index 7705120b4b..4a6d765aca 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingEventReconciliationQueue.swift @@ -15,7 +15,7 @@ typealias DisableSubscriptions = () -> Bool // Used for testing: typealias IncomingEventReconciliationQueueFactory = ([ModelSchema], - APICategoryGraphQLBehaviorExtended, + APICategoryGraphQLBehavior, StorageEngineAdapter, [DataStoreSyncExpression], AuthCategoryBehavior?, @@ -46,7 +46,7 @@ final class AWSIncomingEventReconciliationQueue: IncomingEventReconciliationQueu private let modelSchemasCount: Int init(modelSchemas: [ModelSchema], - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, storageAdapter: StorageEngineAdapter, syncExpressions: [DataStoreSyncExpression], auth: AuthCategoryBehavior? = nil, diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift index 32365419fe..76c4b7dc9a 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/AWSIncomingSubscriptionEventPublisher.swift @@ -23,7 +23,7 @@ final class AWSIncomingSubscriptionEventPublisher: IncomingSubscriptionEventPubl } init(modelSchema: ModelSchema, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, modelPredicate: QueryPredicate?, auth: AuthCategoryBehavior?, authModeStrategy: AuthModeStrategy) async { diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift index d5dae69b37..b125e239a1 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/IncomingAsyncSubscriptionEventPublisher.swift @@ -6,7 +6,7 @@ // import Amplify -import AWSPluginsCore +@_spi(WebSocket) import AWSPluginsCore import Combine import Foundation @@ -39,7 +39,8 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { return onCreateConnected && onUpdateConnected && onDeleteConnected } - private let incomingSubscriptionEvents: PassthroughSubject + private let incomingSubscriptionEvents = PassthroughSubject() + private var cancelables = Set() private let awsAuthService: AWSAuthServiceBehavior private let consistencyQueue: DispatchQueue @@ -47,7 +48,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { private let modelName: ModelName init(modelSchema: ModelSchema, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, modelPredicate: QueryPredicate?, auth: AuthCategoryBehavior?, authModeStrategy: AuthModeStrategy, @@ -67,72 +68,74 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { connectionStatusQueue.maxConcurrentOperationCount = 1 connectionStatusQueue.isSuspended = false - let incomingSubscriptionEvents = PassthroughSubject() - self.incomingSubscriptionEvents = incomingSubscriptionEvents self.awsAuthService = awsAuthService ?? AWSAuthService() // onCreate operation - let onCreateValueListener = onCreateValueListenerHandler(event:) - let onCreateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, - operations: [.create, .read]) - self.onCreateValueListener = onCreateValueListener - self.onCreateOperation = RetryableGraphQLSubscriptionOperation( - requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor( - for: modelSchema, - subscriptionType: .onCreate, - api: api, - auth: auth, - awsAuthService: self.awsAuthService, - authTypeProvider: onCreateAuthTypeProvider), - maxRetries: onCreateAuthTypeProvider.count, - resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in - api.subscribe(request: nextRequest, - valueListener: onCreateValueListener, - completionListener: wrappedCompletion) - } - onCreateOperation?.main() + self.onCreateValueListener = onCreateValueListenerHandler(event:) + self.onCreateOperation = await retryableOperation( + subscriptionType: .create, + modelSchema: modelSchema, + authModeStrategy: authModeStrategy, + auth: auth, + api: api + ) + onCreateOperation?.subscribe() + .sink(receiveCompletion: genericCompletionListenerHandler(result:), receiveValue: onCreateValueListener!) + .store(in: &cancelables) // onUpdate operation - let onUpdateValueListener = onUpdateValueListenerHandler(event:) - let onUpdateAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, - operations: [.update, .read]) - self.onUpdateValueListener = onUpdateValueListener - self.onUpdateOperation = RetryableGraphQLSubscriptionOperation( - requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor( - for: modelSchema, - subscriptionType: .onUpdate, - api: api, - auth: auth, - awsAuthService: self.awsAuthService, - authTypeProvider: onUpdateAuthTypeProvider), - maxRetries: onUpdateAuthTypeProvider.count, - resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in - api.subscribe(request: nextRequest, - valueListener: onUpdateValueListener, - completionListener: wrappedCompletion) - } - onUpdateOperation?.main() + self.onUpdateValueListener = onUpdateValueListenerHandler(event:) + self.onUpdateOperation = await retryableOperation( + subscriptionType: .update, + modelSchema: modelSchema, + authModeStrategy: authModeStrategy, + auth: auth, + api: api + ) + onUpdateOperation?.subscribe() + .sink(receiveCompletion: genericCompletionListenerHandler(result:), receiveValue: onUpdateValueListener!) + .store(in: &cancelables) // onDelete operation - let onDeleteValueListener = onDeleteValueListenerHandler(event:) - let onDeleteAuthTypeProvider = await authModeStrategy.authTypesFor(schema: modelSchema, - operations: [.delete, .read]) - self.onDeleteValueListener = onDeleteValueListener - self.onDeleteOperation = RetryableGraphQLSubscriptionOperation( - requestFactory: IncomingAsyncSubscriptionEventPublisher.apiRequestFactoryFor( - for: modelSchema, - subscriptionType: .onDelete, - api: api, - auth: auth, - awsAuthService: self.awsAuthService, - authTypeProvider: onDeleteAuthTypeProvider), - maxRetries: onUpdateAuthTypeProvider.count, - resultListener: genericCompletionListenerHandler) { nextRequest, wrappedCompletion in - api.subscribe(request: nextRequest, - valueListener: onDeleteValueListener, - completionListener: wrappedCompletion) - } - onDeleteOperation?.main() + self.onDeleteValueListener = onDeleteValueListenerHandler(event:) + self.onDeleteOperation = await retryableOperation( + subscriptionType: .delete, + modelSchema: modelSchema, + authModeStrategy: authModeStrategy, + auth: auth, + api: api + ) + onDeleteOperation?.subscribe() + .sink(receiveCompletion: genericCompletionListenerHandler(result:), receiveValue: onDeleteValueListener!) + .store(in: &cancelables) + } + + + func retryableOperation( + subscriptionType: IncomingAsyncSubscriptionType, + modelSchema: ModelSchema, + authModeStrategy: AuthModeStrategy, + auth: AuthCategoryBehavior?, + api: APICategoryGraphQLBehavior + ) async -> RetryableGraphQLSubscriptionOperation { + let authTypeProvider = await authModeStrategy.authTypesFor( + schema: modelSchema, + operations: subscriptionType.operations + ) + + return RetryableGraphQLSubscriptionOperation( + requestFactory: authTypeProvider.optionalAsyncStream.map { authType in { + await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest( + for: modelSchema, + subscriptionType: subscriptionType.subscriptionType, + api: api, + auth: auth, + authType: authType, + awsAuthService: self.awsAuthService + ) + }}, + api: api + ) } func onCreateValueListenerHandler(event: Event) { @@ -183,9 +186,9 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { } } - func genericCompletionListenerHandler(result: Result) { + func genericCompletionListenerHandler(result: Subscribers.Completion) { switch result { - case .success: + case .finished: send(completion: .finished) case .failure(let apiError): log.verbose("[InitializeSubscription.1] API.subscribe failed for `\(modelName)` error: \(apiError.errorDescription)") @@ -196,7 +199,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { static func makeAPIRequest(for modelSchema: ModelSchema, subscriptionType: GraphQLSubscriptionType, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?, authType: AWSAuthorizationType?, awsAuthService: AWSAuthServiceBehavior) async -> GraphQLRequest { @@ -226,7 +229,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { return request } - static func hasOIDCAuthProviderAvailable(api: APICategoryGraphQLBehaviorExtended) -> AmplifyOIDCAuthProvider? { + static func hasOIDCAuthProviderAvailable(api: APICategoryGraphQLBehavior) -> AmplifyOIDCAuthProvider? { if let apiPlugin = api as? APICategoryAuthProviderFactoryBehavior, let oidcAuthProvider = apiPlugin.apiAuthProviderFactory().oidcAuthProvider() { return oidcAuthProvider @@ -254,7 +257,7 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { func cancel() { consistencyQueue.sync { - genericCompletionListenerHandler(result: .successfulVoid) + genericCompletionListenerHandler(result: .finished) onCreateOperation?.cancel() onCreateOperation = nil @@ -287,30 +290,33 @@ final class IncomingAsyncSubscriptionEventPublisher: AmplifyCancellable { onDeleteOperation = nil onDeleteValueListener?(.connection(.disconnected)) - genericCompletionListenerHandler(result: .successfulVoid) + genericCompletionListenerHandler(result: .finished) } } } -// MARK: - IncomingAsyncSubscriptionEventPublisher + API request factory -extension IncomingAsyncSubscriptionEventPublisher { - static func apiRequestFactoryFor(for modelSchema: ModelSchema, - subscriptionType: GraphQLSubscriptionType, - api: APICategoryGraphQLBehaviorExtended, - auth: AuthCategoryBehavior?, - awsAuthService: AWSAuthServiceBehavior, - authTypeProvider: AWSAuthorizationTypeIterator) -> RetryableGraphQLOperation.RequestFactory { - var authTypes = authTypeProvider - return { - return await IncomingAsyncSubscriptionEventPublisher.makeAPIRequest(for: modelSchema, - subscriptionType: subscriptionType, - api: api, - auth: auth, - authType: authTypes.next(), - awsAuthService: awsAuthService) +enum IncomingAsyncSubscriptionType { + case create + case delete + case update + + var operations: [ModelOperation] { + switch self { + case .create: return [.create, .read] + case .delete: return [.delete, .read] + case .update: return [.update, .read] } } + + var subscriptionType: GraphQLSubscriptionType { + switch self { + case .create: return .onCreate + case .delete: return .onDelete + case .update: return .onUpdate + } + } + } extension IncomingAsyncSubscriptionEventPublisher: DefaultLogger { diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift index 7eacedb029..03074d82e3 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/SubscriptionSync/ReconcileAndLocalSave/AWSModelReconciliationQueue.swift @@ -14,7 +14,7 @@ import Foundation typealias ModelReconciliationQueueFactory = ( ModelSchema, StorageEngineAdapter, - APICategoryGraphQLBehaviorExtended, + APICategoryGraphQLBehavior, ReconcileAndSaveOperationQueue, QueryPredicate?, AuthCategoryBehavior?, @@ -78,7 +78,7 @@ final class AWSModelReconciliationQueue: ModelReconciliationQueue { init(modelSchema: ModelSchema, storageAdapter: StorageEngineAdapter?, - api: APICategoryGraphQLBehaviorExtended, + api: APICategoryGraphQLBehavior, reconcileAndSaveQueue: ReconcileAndSaveOperationQueue, modelPredicate: QueryPredicate?, auth: AuthCategoryBehavior?, diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift new file mode 100644 index 0000000000..0dee36be66 --- /dev/null +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/Support/AsyncStream+Extensions.swift @@ -0,0 +1,21 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + + +import Foundation +import Combine + +extension AsyncStream { + static func from(seq: any Sequence) -> AsyncStream { + AsyncStream { continuation in + for ele in seq { + continuation.yield(ele) + } + continuation.finish() + } + } +} diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift index 3eec2bce57..e0223bac2b 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockOutgoingMutationQueue.swift @@ -17,7 +17,7 @@ class MockOutgoingMutationQueue: OutgoingMutationQueueBehavior { completion() } - func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended, + func startSyncingToCloud(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher, reconciliationQueue: IncomingEventReconciliationQueue?) { // no-op diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift index 105ab41ebf..1f2036784e 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/MockRemoteSyncEngine.swift @@ -37,7 +37,7 @@ class MockRemoteSyncEngine: RemoteSyncEngineBehavior { init() { self.remoteSyncTopicPublisher = PassthroughSubject() } - func start(api: APICategoryGraphQLBehaviorExtended, auth: AuthCategoryBehavior?) { + func start(api: APICategoryGraphQLBehavior, auth: AuthCategoryBehavior?) { syncing = true } diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift index 56a65bc96b..82c9b031af 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/TestSupport/Mocks/NoOpMutationQueue.swift @@ -17,7 +17,7 @@ class NoOpMutationQueue: OutgoingMutationQueueBehavior { completion() } - func startSyncingToCloud(api: APICategoryGraphQLBehaviorExtended, + func startSyncingToCloud(api: APICategoryGraphQLBehavior, mutationEventPublisher: MutationEventPublisher, reconciliationQueue: IncomingEventReconciliationQueue?) { // do nothing diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift index ae26e441e7..49c5fb097e 100644 --- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/Connection/DataStoreConnectionScenario1Tests.swift @@ -228,7 +228,7 @@ class DataStoreConnectionScenario1Tests: SyncEngineIntegrationTestBase { } func testDeleteWithInvalidCondition() async throws { - await setUp(withModels: TestModelRegistration()) + await setUp(withModels: TestModelRegistration(), logLevel: .verbose) try await startAmplifyAndWaitForSync() let team = Team1(name: "name") let project = Project1(team: team) diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift index 916a4d26ea..a5fe3115f4 100644 --- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreHubEventsTests.swift @@ -37,7 +37,6 @@ class DataStoreHubEventTests: HubEventsIntegrationTestBase { /// {modelName: "Some Model name", isFullSync: true/false, isDeltaSync: false/true, createCount: #, updateCount: #, deleteCount: #} /// - syncQueriesReady received, payload should be nil func testDataStoreConfiguredDispatchesHubEvents() async throws { - Amplify.Logging.logLevel = .verbose try configureAmplify(withModels: TestModelRegistration()) try await Amplify.DataStore.clear() await Amplify.reset() diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift index cc09f0f40d..4121f562bd 100644 --- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/DataStoreLargeNumberModelsSubscriptionTests.swift @@ -46,7 +46,7 @@ class DataStoreLargeNumberModelsSubscriptionTests: SyncEngineIntegrationTestBase } func testDataStoreStop_subscriptionsShouldAllUnsubscribed() async throws { - await setUp(withModels: TestModelRegistration()) + await setUp(withModels: TestModelRegistration(), logLevel: .verbose) try await startAmplifyAndWaitForSync() try await stopDataStoreAndVerifyAppSyncClientDisconnected() diff --git a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift index 9bb4167312..e480689b9e 100644 --- a/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift +++ b/AmplifyPlugins/DataStore/Tests/DataStoreHostApp/AWSDataStorePluginIntegrationTests/TestSupport/HubEventsIntegrationTestBase.swift @@ -52,7 +52,9 @@ class HubEventsIntegrationTestBase: XCTestCase { #if os(watchOS) try Amplify.add(plugin: AWSDataStorePlugin(modelRegistration: models, configuration: .subscriptionsDisabled)) #else - try Amplify.add(plugin: AWSDataStorePlugin(modelRegistration: models)) + try Amplify.add(plugin: AWSDataStorePlugin( + modelRegistration: models + )) #endif try Amplify.add(plugin: AWSAPIPlugin( modelRegistration: models, diff --git a/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift b/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift index 2b65491429..31ac246db1 100644 --- a/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift +++ b/AmplifyTestCommon/Mocks/MockAPICategoryPlugin.swift @@ -13,7 +13,7 @@ import Foundation class MockAPICategoryPlugin: MessageReporter, APICategoryPlugin, APICategoryReachabilityBehavior, - APICategoryGraphQLBehaviorExtended { + APICategoryGraphQLBehavior { var authProviderFactory: APIAuthProviderFactory?