From cdb7ed2075952d7a797ea9fe150c857f49046100 Mon Sep 17 00:00:00 2001 From: Di Wu Date: Fri, 24 May 2024 11:53:08 -0700 Subject: [PATCH] fix(datastore): change OutgoingMutationQueue use TaskQueue for state transition --- .../OutgoingMutationQueue/OutgoingMutationQueue.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift index 26cde77852..1ca931a66d 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Sync/MutationSync/OutgoingMutationQueue/OutgoingMutationQueue.swift @@ -27,10 +27,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { private let operationQueue: OperationQueue /// A DispatchQueue for synchronizing state on the mutation queue - private let mutationDispatchQueue = DispatchQueue( - label: "com.amazonaws.OutgoingMutationQueue", - target: DispatchQueue.global() - ) + private let mutationDispatchQueue = TaskQueue() private weak var api: APICategoryGraphQLBehaviorExtended? private weak var reconciliationQueue: IncomingEventReconciliationQueue? @@ -55,7 +52,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { let operationQueue = OperationQueue() operationQueue.name = "com.amazonaws.OutgoingMutationOperationQueue" - operationQueue.underlyingQueue = mutationDispatchQueue + operationQueue.qualityOfService = .default operationQueue.maxConcurrentOperationCount = 1 operationQueue.isSuspended = true @@ -139,6 +136,10 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { queryMutationEventsFromStorage { [weak self] in guard let self = self else { return } + guard case .starting = self.stateMachine.state else { + self.log.debug("Failed to continue doStart operation, current state(\(self.stateMachine.state)) is not starting") + return + } self.operationQueue.isSuspended = false // State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)`