Skip to content

Commit

Permalink
refactor(datastore-v2): use api plugin with async sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Apr 9, 2024
1 parent 946600a commit 6e29d88
Show file tree
Hide file tree
Showing 32 changed files with 423 additions and 410 deletions.
266 changes: 121 additions & 145 deletions Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,183 +6,159 @@
//

import Foundation
import Combine

/// Convenience protocol to handle any kind of GraphQLOperation
public protocol AnyGraphQLOperation {
associatedtype Success
associatedtype Failure: Error
typealias ResultListener = (Result<Success, Failure>) -> Void
}

/// Abastraction for a retryable GraphQLOperation.
public protocol RetryableGraphQLOperationBehavior: Operation, DefaultLogger {
associatedtype Payload: Decodable
// MARK: - RetryableGraphQLOperation
public final class RetryableGraphQLOperation<Payload: Decodable> {
public typealias Payload = Payload

/// GraphQLOperation concrete type
associatedtype OperationType: AnyGraphQLOperation
public let requestFactory: AsyncStream<() -> GraphQLRequest<Payload>>
public weak var api: APICategoryGraphQLBehavior?
private var task: Task<Void, Never>?

typealias RequestFactory = () async -> GraphQLRequest<Payload>
typealias OperationFactory = (GraphQLRequest<Payload>, @escaping OperationResultListener) -> OperationType
typealias OperationResultListener = OperationType.ResultListener
public init<T: AsyncSequence>(
requestFactory: T,
api: APICategoryGraphQLBehavior
) where T.Element == () -> GraphQLRequest<Payload> {
self.requestFactory = requestFactory.asyncStream
self.api = api
}

/// Operation unique identifier
var id: UUID { get }
deinit {
cancel()
}

/// Number of attempts (min 1)
var attempts: Int { get set }
public func execute(
_ operationType: GraphQLOperationType
) -> Future<GraphQLTask<Payload>.Success, APIError> {
Future() { promise in
self.task = Task { promise(await self.run(operationType)) }
}
}

/// Underlying GraphQL operation instantiated by `operationFactory`
var underlyingOperation: AtomicValue<OperationType?> { get set }
public func run(_ operationType: GraphQLOperationType) async -> Result<GraphQLTask<Payload>.Success, APIError> {
for await request in requestFactory {
do {
try Task.checkCancellation()
switch (self.api, operationType) {
case (.some(let api), .query):
return .success(try await api.query(request: request()))
case (.some(let api), .mutation):
return .success(try await api.mutate(request: request()))
default:
return .failure(.operationError("Unable to run GraphQL operation with type \(operationType)", ""))
}

/// Maximum number of allowed retries
var maxRetries: Int { get }
} catch is CancellationError {
return .failure(.operationError("GraphQL operation cancelled", ""))
} catch {
guard let error = error as? APIError,
let authError = error.underlyingError as? AuthError
else {
return .failure(.operationError("Failed to send \(operationType) GraphQL request", "", error))
}

/// GraphQLRequest factory, invoked to create a new operation
var requestFactory: RequestFactory { get }
switch authError {
case .signedOut, .notAuthorized: break;

Check failure on line 63 in Amplify/Categories/API/Operation/RetryableGraphQLOperation.swift

View workflow job for this annotation

GitHub Actions / run-swiftlint

Lines should not have trailing semicolons (trailing_semicolon)
default: return .failure(error)
}
}
}
return .failure(APIError.operationError("Failed to execute GraphQL operation \(operationType)", "", nil))
}

/// GraphQL operation factory, invoked with a newly created GraphQL request
/// and a wrapped result listener.
var operationFactory: OperationFactory { get }
public func cancel() {
task?.cancel()
}

var resultListener: OperationResultListener { get }
}

init(requestFactory: @escaping RequestFactory,
maxRetries: Int,
resultListener: @escaping OperationResultListener,
_ operationFactory: @escaping OperationFactory)
public final class RetryableGraphQLSubscriptionOperation<Payload: Decodable> {

func start(request: GraphQLRequest<Payload>)
public typealias Payload = Payload

func shouldRetry(error: APIError?) -> Bool
}
public let requestFactory: AsyncStream<() async -> GraphQLRequest<Payload>>
public weak var api: APICategoryGraphQLBehavior?
private var task: Task<Void, Error>?

extension RetryableGraphQLOperationBehavior {
public static var log: Logger {
Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self))
public init<T: AsyncSequence>(
requestFactory: T,
api: APICategoryGraphQLBehavior
) where T.Element == () async -> GraphQLRequest<Payload> {
self.requestFactory = requestFactory.asyncStream
self.api = api
}
public var log: Logger {
Self.log

deinit {
cancel()
}
}

// MARK: RetryableGraphQLOperationBehavior + default implementation
extension RetryableGraphQLOperationBehavior {
public func start(request: GraphQLRequest<Payload>) {
attempts += 1
log.debug("[\(id)] - Try [\(attempts)/\(maxRetries)]")
let wrappedResultListener: OperationResultListener = { result in
if case let .failure(error) = result, self.shouldRetry(error: error as? APIError) {
self.log.debug("\(error)")
Task {
self.start(request: await self.requestFactory())
}
return
}
public func subscribe() -> AnyPublisher<GraphQLSubscriptionEvent<Payload>, APIError> {
let subject = PassthroughSubject<GraphQLSubscriptionEvent<Payload>, APIError>()
self.task = Task { await self.trySubscribe(subject) }
return subject.eraseToAnyPublisher()
}

if case let .failure(error) = result {
self.log.debug("\(error)")
self.log.debug("[\(self.id)] - Failed")
private func trySubscribe(_ subject: PassthroughSubject<GraphQLSubscriptionEvent<Payload>, APIError>) async {
var apiError: APIError?
for await request in requestFactory {
guard let sequence = self.api?.subscribe(request: await request()) else {
continue
}
do {
try Task.checkCancellation()

if case .success = result {
self.log.debug("[Operation \(self.id)] - Success")
for try await event in sequence {
try Task.checkCancellation()
Self.log.debug("Subscribe event \(event)")
subject.send(event)
}
} catch is CancellationError {
subject.send(completion: .finished)
} catch {
if let error = error as? APIError {
apiError = error
}
Self.log.debug("Failed with subscription request: \(error)")
}
self.resultListener(result)
sequence.cancel()
}
underlyingOperation.set(operationFactory(request, wrappedResultListener))
}
}

// MARK: - RetryableGraphQLOperation
public final class RetryableGraphQLOperation<Payload: Decodable>: Operation, RetryableGraphQLOperationBehavior {
public typealias Payload = Payload
public typealias OperationType = GraphQLOperation<Payload>

public var id: UUID
public var maxRetries: Int
public var attempts: Int = 0
public var requestFactory: RequestFactory
public var underlyingOperation: AtomicValue<GraphQLOperation<Payload>?> = AtomicValue(initialValue: nil)
public var resultListener: OperationResultListener
public var operationFactory: OperationFactory

public init(requestFactory: @escaping RequestFactory,
maxRetries: Int,
resultListener: @escaping OperationResultListener,
_ operationFactory: @escaping OperationFactory) {
self.id = UUID()
self.maxRetries = max(1, maxRetries)
self.requestFactory = requestFactory
self.operationFactory = operationFactory
self.resultListener = resultListener
}

public override func main() {
Task {
start(request: await requestFactory())
if apiError != nil {
subject.send(completion: .failure(apiError!))
} else {
subject.send(completion: .finished)
}
}

override public func cancel() {
self.underlyingOperation.get()?.cancel()
}

public func shouldRetry(error: APIError?) -> Bool {
guard case let .operationError(_, _, underlyingError) = error,
let authError = underlyingError as? AuthError else {
return false
}

switch authError {
case .signedOut, .notAuthorized:
return attempts < maxRetries
default:
return false
}
public func cancel() {
self.task?.cancel()
}
}

// MARK: - RetryableGraphQLSubscriptionOperation
public final class RetryableGraphQLSubscriptionOperation<Payload: Decodable>: Operation,
RetryableGraphQLOperationBehavior {
public typealias OperationType = GraphQLSubscriptionOperation<Payload>

public typealias Payload = Payload

public var id: UUID
public var maxRetries: Int
public var attempts: Int = 0
public var underlyingOperation: AtomicValue<GraphQLSubscriptionOperation<Payload>?> = AtomicValue(initialValue: nil)
public var requestFactory: RequestFactory
public var resultListener: OperationResultListener
public var operationFactory: OperationFactory

public init(requestFactory: @escaping RequestFactory,
maxRetries: Int,
resultListener: @escaping OperationResultListener,
_ operationFactory: @escaping OperationFactory) {
self.id = UUID()
self.maxRetries = max(1, maxRetries)
self.requestFactory = requestFactory
self.operationFactory = operationFactory
self.resultListener = resultListener
}
public override func main() {
Task {
start(request: await requestFactory())
extension AsyncSequence {
fileprivate var asyncStream: AsyncStream<Self.Element> {
AsyncStream { continuation in
Task {
var it = self.makeAsyncIterator()
do {
while let ele = try await it.next() {
continuation.yield(ele)
}
continuation.finish()
} catch {
continuation.finish()
}
}
}
}
}

public override func cancel() {
self.underlyingOperation.get()?.cancel()
extension RetryableGraphQLSubscriptionOperation {
public static var log: Logger {
Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self))
}

public func shouldRetry(error: APIError?) -> Bool {
return attempts < maxRetries
public var log: Logger {
Self.log
}

}

// MARK: GraphQLOperation - GraphQLSubscriptionOperation + AnyGraphQLOperation
extension GraphQLOperation: AnyGraphQLOperation {}
extension GraphQLSubscriptionOperation: AnyGraphQLOperation {}
2 changes: 2 additions & 0 deletions Amplify/Core/Support/AmplifyAsyncThrowingSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//

import Foundation
import Combine

public typealias WeakAmplifyAsyncThrowingSequenceRef<Element> = WeakRef<AmplifyAsyncThrowingSequence<Element>>

Expand Down Expand Up @@ -49,4 +50,5 @@ public class AmplifyAsyncThrowingSequence<Element: Sendable>: AsyncSequence, Can
parent?.cancel()
finish()
}

}
4 changes: 3 additions & 1 deletion Amplify/Core/Support/AmplifyTask+OperationTaskAdapters.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public class AmplifyOperationTaskAdapter<Request: AmplifyOperationRequest,
public init(operation: AmplifyOperation<Request, Success, Failure>) {
self.operation = operation
self.childTask = ChildTask(parent: operation)
resultToken = operation.subscribe(resultListener: resultListener)
resultToken = operation.subscribe { [weak self] in
self?.resultListener($0)
}
}

deinit {
Expand Down
2 changes: 1 addition & 1 deletion AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import Amplify
import AWSPluginsCore
import Foundation

final public class AWSAPIPlugin: NSObject, APICategoryPlugin, APICategoryGraphQLBehaviorExtended, AWSAPIAuthInformation {
final public class AWSAPIPlugin: NSObject, APICategoryPlugin, AWSAPIAuthInformation {
/// The unique key of the plugin within the API category.
public var key: PluginKey {
return "awsAPIPlugin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public class AWSGraphQLSubscriptionTaskRunner<R: Decodable>: InternalTaskRunner,
self.subscription = try await appSyncClient?.subscribe(
id: subscriptionId,
query: encodeRequest(query: request.document, variables: request.variables)
).sink(receiveValue: { [weak self] event in
self?.onAsyncSubscriptionEvent(event: event)
).sink(receiveValue: { event in
self.onAsyncSubscriptionEvent(event: event)
})
} catch {
let error = APIError.operationError("Unable to get connection for api \(endpointConfig.name)", "", error)
Expand Down

This file was deleted.

Loading

0 comments on commit 6e29d88

Please sign in to comment.