diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift index 040dfb7f4a..711ad7ab57 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine+SyncRequirement.swift @@ -74,6 +74,12 @@ extension StorageEngine { } } + /// Expresses whether the `StorageEngine` syncs from a remote source + /// based on whether the `AWSAPIPlugin` is present. + var syncsFromRemote: Bool { + tryGetAPIPlugin() != nil + } + private func tryGetAPIPlugin() -> APICategoryPlugin? { do { return try Amplify.API.getPlugin(for: validAPIPluginKey) diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineBehavior.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineBehavior.swift index d17a12f38c..43c8878703 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineBehavior.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngineBehavior.swift @@ -31,4 +31,7 @@ protocol StorageEngineBehavior: AnyObject, ModelStorageBehavior { func startSync() -> Result func stopSync(completion: @escaping DataStoreCallback) func clear(completion: @escaping DataStoreCallback) + + /// expresses whether the conforming type is syncing from a remote source. + var syncsFromRemote: Bool { get } } diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Subscribe/DataStoreObserveQueryOperation.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Subscribe/DataStoreObserveQueryOperation.swift index dada158b38..775b576da5 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Subscribe/DataStoreObserveQueryOperation.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Subscribe/DataStoreObserveQueryOperation.swift @@ -222,12 +222,26 @@ class ObserveQueryTaskRunner: InternalTaskRunner, InternalTaskAsyncThr func subscribeToItemChanges() { serialQueue.async { [weak self] in guard let self = self else { return } + self.batchItemsChangedSink = self.dataStorePublisher.publisher .filter { _ in !self.dispatchedModelSyncedEvent.get() } .filter(self.filterByModelName(mutationEvent:)) .filter(self.filterByPredicateMatch(mutationEvent:)) .handleEvents(receiveOutput: self.onItemChangeDuringSync(mutationEvent:) ) - .collect(.byTimeOrCount(self.serialQueue, self.itemsChangedPeriodicPublishTimeInSeconds, self.itemsChangedMaxSize)) + .collect( + .byTimeOrCount( + // on queue + self.serialQueue, + // collect over this timeframe + self.itemsChangedPeriodicPublishTimeInSeconds, + // If the `storageEngine` does sync from remote, the initial batch should + // collect snapshots based on time / snapshots received. + // If it doesn't, it should publish each snapshot without waiting. + self.storageEngine.syncsFromRemote + ? self.itemsChangedMaxSize + : 1 + ) + ) .sink(receiveCompletion: self.onReceiveCompletion(completed:), receiveValue: self.onItemsChangeDuringSync(mutationEvents:)) diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift index 8dfed11a58..5b5fb2d46a 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Sync/SubscriptionSync/Support/MockSQLiteStorageEngineAdapter.swift @@ -301,6 +301,8 @@ class MockStorageEngineBehavior: StorageEngineBehavior { } + var syncsFromRemote: Bool { true } + var mockSyncEnginePublisher: PassthroughSubject! var mockSyncEngineSubscription: AnyCancellable! { willSet {