-
Notifications
You must be signed in to change notification settings - Fork 200
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(datastore): refactor datastore category to use APICategoryGraphQ…
…LBehavior (#3666) * WIP * DataStore compiles without SDK dependency * refactor(datastore-v2): use api plugin with async sequences * change to use Publisher operators for auth type streams * add nondeterminsitc operation for better testability * fix unit test cases * fix broken unit test cases of AWSAPIPlugin * fix broken AWSDataStorePlugin unit test cases * fix OutgoingMutationQueue test case * remove unused methods * fix broken test cases of SyncMutationToCloudOperationTests * fix broken unit test cases of API and DataStore * resolve plugins build issues (#3654) * remove lock from SyncMutationToCloudOperation * remove test case of retryable for signOut error * resolve comments * fix(datastore): propagate remote mutationEvents to Hub for sync received (#3697) * rename the package to InternalAmplifyCredentials * rewrite NondeterminsticOperation constructor with makeStream * resolve broken test case after merging latest orgin/main * feat(amplify): make GraphQLOperationType extends from String (#3719) * refactor(datastore): new enum to represent inferred and designated authType (#3694) * refactor(datastore): new enum to represent inferred and designated auth type * resolve failed multi auth integ tests * resolve comments * fix(datastore): use error description to produce clearer error info (#3733) --------- Co-authored-by: Michael Law <1365977+lawmicha@users.noreply.github.com>
- Loading branch information
Showing
124 changed files
with
1,818 additions
and
1,492 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
100 changes: 100 additions & 0 deletions
100
Amplify/Categories/API/Operation/NondeterminsticOperation.swift
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
// | ||
// Copyright Amazon.com Inc. or its affiliates. | ||
// All Rights Reserved. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
|
||
import Combine | ||
/** | ||
A non-deterministic operation offers multiple paths to accomplish its task. | ||
It attempts the next path if all preceding paths have failed with an error that allows for continuation. | ||
*/ | ||
enum NondeterminsticOperationError: Error { | ||
case totalFailure | ||
case cancelled | ||
} | ||
|
||
final class NondeterminsticOperation<T> { | ||
/// operation that to be eval | ||
typealias Operation = () async throws -> T | ||
typealias OnError = (Error) -> Bool | ||
|
||
private let operations: AsyncStream<Operation> | ||
private var shouldTryNextOnError: OnError = { _ in true } | ||
private var cancellables = Set<AnyCancellable>() | ||
private var task: Task<Void, Never>? | ||
|
||
deinit { | ||
cancel() | ||
} | ||
|
||
init(operations: AsyncStream<Operation>, shouldTryNextOnError: OnError? = nil) { | ||
self.operations = operations | ||
if let shouldTryNextOnError { | ||
self.shouldTryNextOnError = shouldTryNextOnError | ||
} | ||
} | ||
|
||
convenience init( | ||
operationStream: AnyPublisher<Operation, Never>, | ||
shouldTryNextOnError: OnError? = nil | ||
) { | ||
var cancellables = Set<AnyCancellable>() | ||
let (asyncStream, continuation) = AsyncStream.makeStream(of: Operation.self) | ||
operationStream.sink { _ in | ||
continuation.finish() | ||
} receiveValue: { | ||
continuation.yield($0) | ||
}.store(in: &cancellables) | ||
|
||
self.init( | ||
operations: asyncStream, | ||
shouldTryNextOnError: shouldTryNextOnError | ||
) | ||
self.cancellables = cancellables | ||
} | ||
|
||
/// Synchronous version of executing the operations | ||
func execute() -> Future<T, Error> { | ||
Future { [weak self] promise in | ||
self?.task = Task { [weak self] in | ||
do { | ||
if let self { | ||
promise(.success(try await self.run())) | ||
} else { | ||
promise(.failure(NondeterminsticOperationError.cancelled)) | ||
} | ||
} catch { | ||
promise(.failure(error)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Asynchronous version of executing the operations | ||
func run() async throws -> T { | ||
for await operation in operations { | ||
if Task.isCancelled { | ||
throw NondeterminsticOperationError.cancelled | ||
} | ||
do { | ||
return try await operation() | ||
} catch { | ||
if shouldTryNextOnError(error) { | ||
continue | ||
} else { | ||
throw error | ||
} | ||
} | ||
} | ||
throw NondeterminsticOperationError.totalFailure | ||
} | ||
|
||
/// Cancel the operation | ||
func cancel() { | ||
task?.cancel() | ||
cancellables = Set<AnyCancellable>() | ||
} | ||
} |
Oops, something went wrong.