diff --git a/Amplify/Core/Support/TaskQueue.swift b/Amplify/Core/Support/TaskQueue.swift index f281c57e1c..a09bfcc4d8 100644 --- a/Amplify/Core/Support/TaskQueue.swift +++ b/Amplify/Core/Support/TaskQueue.swift @@ -8,10 +8,25 @@ import Foundation /// A helper for executing asynchronous work serially. -public actor TaskQueue { - private var previousTask: Task? +public class TaskQueue { + typealias Block = @Sendable () async -> Void + private var streamContinuation: AsyncStream.Continuation! - public init() {} + public init() { + let stream = AsyncStream.init { continuation in + streamContinuation = continuation + } + + Task { + for await block in stream { + _ = await block() + } + } + } + + deinit { + streamContinuation.finish() + } /// Serializes asynchronous requests made from an async context /// @@ -25,17 +40,31 @@ public actor TaskQueue { /// TaskQueue serializes this work so that `doAsync1` is performed before `doAsync2`, /// which is performed before `doAsync3`. public func sync(block: @Sendable @escaping () async throws -> Success) async throws -> Success { - let currentTask: Task = Task { [previousTask] in - _ = await previousTask?.result - return try await block() + try await withCheckedThrowingContinuation { continuation in + streamContinuation.yield { + do { + let value = try await block() + continuation.resume(returning: value) + } catch { + continuation.resume(throwing: error) + } + } } - previousTask = currentTask - return try await currentTask.value } - public nonisolated func async(block: @Sendable @escaping () async throws -> Success) rethrows { - Task { - try await sync(block: block) + public func async(block: @Sendable @escaping () async throws -> Success) { + streamContinuation.yield { + do { + _ = try await block() + } catch { + Self.log.warn("Failed to handle async task in TaskQueue<\(Success.self)> with error: \(error)") + } } } } + +extension TaskQueue { + public static var log: Logger { + Amplify.Logging.logger(forNamespace: String(describing: self)) + } +} diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift index c8bf7efcab..25a695f9b5 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift @@ -54,6 +54,10 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { self.state.value == .connected } + internal var numberOfSubscriptions: Int { + self.subscriptions.count + } + /** Creates a new AppSyncRealTimeClient with endpoint, requestInterceptor and webSocketClient. - Parameters: diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift index c58bc2da90..4ef4928e5f 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Operation/AWSGraphQLSubscriptionTaskRunner.swift @@ -44,12 +44,12 @@ public class AWSGraphQLSubscriptionTaskRunner: InternalTaskRunner, self.apiAuthProviderFactory = apiAuthProviderFactory } + /// When the top-level AmplifyThrowingSequence is canceled, this cancel method is invoked. + /// In this situation, we need to send the disconnected event because + /// the top-level AmplifyThrowingSequence is terminated immediately upon cancellation. public func cancel() { self.send(GraphQLSubscriptionEvent.connection(.disconnected)) - Task { [weak self] in - guard let self else { - return - } + Task { guard let appSyncClient = self.appSyncClient else { return } @@ -213,12 +213,7 @@ final public class AWSGraphQLSubscriptionOperation: GraphQLSubscri override public func cancel() { super.cancel() - - Task { [weak self] in - guard let self else { - return - } - + Task { guard let appSyncRealTimeClient = self.appSyncRealTimeClient else { return } @@ -378,6 +373,31 @@ fileprivate func toAPIError(_ errors: [Error], type: R.Type) -> AP (hasAuthorizationError ? ": \(APIError.UnauthorizedMessageString)" : "") } +#if swift(<5.8) + if let errors = errors.cast(to: AppSyncRealTimeRequest.Error.self) { + let hasAuthorizationError = errors.contains(where: { $0 == .unauthorized}) + return APIError.operationError( + errorDescription(hasAuthorizationError), + "", + errors.first + ) + } else if let errors = errors.cast(to: GraphQLError.self) { + let hasAuthorizationError = errors.map(\.extensions) + .compactMap { $0.flatMap { $0["errorType"]?.stringValue } } + .contains(where: { AppSyncErrorType($0) == .unauthorized }) + return APIError.operationError( + errorDescription(hasAuthorizationError), + "", + GraphQLResponseError.error(errors) + ) + } else { + return APIError.operationError( + errorDescription(), + "", + errors.first + ) + } +#else switch errors { case let errors as [AppSyncRealTimeRequest.Error]: let hasAuthorizationError = errors.contains(where: { $0 == .unauthorized}) @@ -402,5 +422,5 @@ fileprivate func toAPIError(_ errors: [Error], type: R.Type) -> AP errors.first ) } - +#endif } diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Utils/Array+Error+TypeCast.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Utils/Array+Error+TypeCast.swift new file mode 100644 index 0000000000..3592791dc2 --- /dev/null +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/Support/Utils/Array+Error+TypeCast.swift @@ -0,0 +1,21 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + + +import Foundation + +@_spi(AmplifyAPI) +extension Array where Element == Error { + func cast(to type: T.Type) -> [T]? { + self.reduce([]) { partialResult, ele in + if let partialResult, let ele = ele as? T { + return partialResult + [ele] + } + return nil + } + } +} diff --git a/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift b/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift index 770f598a7a..c5c6b87cb4 100644 --- a/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift +++ b/AmplifyPlugins/API/Tests/APIHostApp/AWSAPIPluginFunctionalTests/GraphQLModelBasedTests.swift @@ -448,6 +448,65 @@ class GraphQLModelBasedTests: XCTestCase { await fulfillment(of: [progressInvoked], timeout: TestCommonConstants.networkTimeout) } + + /// Given: Several subscriptions with Amplify API plugin + /// When: Cancel subscriptions + /// Then: AppSync real time client automatically unsubscribe and remove the subscription + func testCancelledSubscription_automaticallyUnsubscribeAndRemoved() async throws { + let numberOfSubscription = 5 + let allSubscribedExpectation = expectation(description: "All subscriptions are subscribed") + allSubscribedExpectation.expectedFulfillmentCount = numberOfSubscription + + let subscriptions = (0..<5).map { _ in + Amplify.API.subscribe(request: .subscription(of: Comment.self, type: .onCreate)) + } + subscriptions.forEach { subscription in + Task { + do { + for try await subscriptionEvent in subscription { + switch subscriptionEvent { + case .connection(let state): + switch state { + case .connecting: + break + case .connected: + allSubscribedExpectation.fulfill() + case .disconnected: + break + } + case .data(let result): + switch result { + case .success: break + case .failure(let error): + XCTFail("\(error)") + } + } + } + } catch { + XCTFail("Unexpected subscription failure") + } + } + } + + await fulfillment(of: [allSubscribedExpectation], timeout: 3) + if let appSyncRealTimeClientFactory = + getUnderlyingAPIPlugin()?.appSyncRealTimeClientFactory as? AppSyncRealTimeClientFactory, + let appSyncRealTimeClient = + await appSyncRealTimeClientFactory.apiToClientCache.values.first as? AppSyncRealTimeClient + { + var appSyncSubscriptions = await appSyncRealTimeClient.numberOfSubscriptions + XCTAssertEqual(appSyncSubscriptions, numberOfSubscription) + + subscriptions.forEach { $0.cancel() } + try await Task.sleep(seconds: 2) + appSyncSubscriptions = await appSyncRealTimeClient.numberOfSubscriptions + XCTAssertEqual(appSyncSubscriptions, 0) + + } else { + XCTFail("There should be at least one AppSyncRealTimeClient instance") + } + } + // MARK: Helpers func createPost(id: String, title: String) async throws -> Post? { @@ -499,4 +558,8 @@ class GraphQLModelBasedTests: XCTestCase { throw error } } + + func getUnderlyingAPIPlugin() -> AWSAPIPlugin? { + return Amplify.API.plugins["awsAPIPlugin"] as? AWSAPIPlugin + } } diff --git a/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Utils/Array+Error+TypeCastTests.swift b/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Utils/Array+Error+TypeCastTests.swift new file mode 100644 index 0000000000..d1d6861a74 --- /dev/null +++ b/AmplifyPlugins/API/Tests/AWSAPIPluginTests/Support/Utils/Array+Error+TypeCastTests.swift @@ -0,0 +1,62 @@ +// +// Copyright Amazon.com Inc. or its affiliates. +// All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + + +import XCTest +@testable @_spi(AmplifyAPI) import AWSAPIPlugin + +class ArrayWithErrorElementExtensionTests: XCTestCase { + + /** + Given: errors with generic protocol type + When: cast to the correct underlying concrete type + Then: successfully casted to underlying concrete type + */ + func testCast_toCorrectErrorType_returnCastedErrorType() { + let errors: [Error] = [ + Error1(), Error1(), Error1() + ] + + let error1s = errors.cast(to: Error1.self) + XCTAssertNotNil(error1s) + XCTAssertTrue(!error1s!.isEmpty) + XCTAssertEqual(errors.count, error1s!.count) + } + + /** + Given: errors with generic protocol type + When: cast to the wong underlying concrete type + Then: return nil + */ + func testCast_toWrongErrorType_returnNil() { + let errors: [Error] = [ + Error1(), Error1(), Error1() + ] + + let error2s = errors.cast(to: Error2.self) + XCTAssertNil(error2s) + } + + /** + Given: errors with generic protocol type + When: some of the elements failed to cast to the underlying concrete type + Then: return nil + */ + + func testCast_partiallyToWrongErrorType_returnNil() { + let errors: [Error] = [ + Error2(), Error2(), Error1() + ] + + let error2s = errors.cast(to: Error2.self) + XCTAssertNil(error2s) + } + + struct Error1: Error { } + + struct Error2: Error { } +} diff --git a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/Operations/Helpers/FetchAuthSessionOperationHelper.swift b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/Operations/Helpers/FetchAuthSessionOperationHelper.swift index 7aa1997b72..d39751956a 100644 --- a/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/Operations/Helpers/FetchAuthSessionOperationHelper.swift +++ b/AmplifyPlugins/Auth/Sources/AWSCognitoAuthPlugin/Operations/Helpers/FetchAuthSessionOperationHelper.swift @@ -157,13 +157,21 @@ class FetchAuthSessionOperationHelper: DefaultLogger { } case .service(let error): - if let authError = (error as? AuthErrorConvertible)?.authError { - let session = AWSAuthCognitoSession(isSignedIn: isSignedIn, - identityIdResult: .failure(authError), - awsCredentialsResult: .failure(authError), - cognitoTokensResult: .failure(authError)) - return session + var authError: AuthError + if let convertedAuthError = (error as? AuthErrorConvertible)?.authError { + authError = convertedAuthError + } else { + authError = AuthError.service( + "Unknown service error occurred", + "See the attached error for more details", + error) } + let session = AWSAuthCognitoSession( + isSignedIn: isSignedIn, + identityIdResult: .failure(authError), + awsCredentialsResult: .failure(authError), + cognitoTokensResult: .failure(authError)) + return session default: break } diff --git a/AmplifyPlugins/Auth/Tests/AWSCognitoAuthPluginUnitTests/TaskTests/AuthorizationTests/AWSAuthFetchSignInSessionOperationTests.swift b/AmplifyPlugins/Auth/Tests/AWSCognitoAuthPluginUnitTests/TaskTests/AuthorizationTests/AWSAuthFetchSignInSessionOperationTests.swift index ed107d34d7..9bee6c05b1 100644 --- a/AmplifyPlugins/Auth/Tests/AWSCognitoAuthPluginUnitTests/TaskTests/AuthorizationTests/AWSAuthFetchSignInSessionOperationTests.swift +++ b/AmplifyPlugins/Auth/Tests/AWSCognitoAuthPluginUnitTests/TaskTests/AuthorizationTests/AWSAuthFetchSignInSessionOperationTests.swift @@ -736,4 +736,61 @@ class AWSAuthFetchSignInSessionOperationTests: BaseAuthorizationTests { let identityId = try? (session as? AuthCognitoIdentityProvider)?.getIdentityId().get() XCTAssertNotNil(identityId) } + + /// Test signedIn session with invalid response for aws credentials + /// + /// - Given: Given an auth plugin with signedIn state + /// - When: + /// - I invoke fetchAuthSession and service throws NSError + /// - Then: + /// - I should get an a valid session with the following details: + /// - isSignedIn = true + /// - aws credentails = service error + /// - identity id = service error + /// - cognito tokens = service error + /// + func testSignInSessionWithNSError() async throws { + let initialState = AuthState.configured( + AuthenticationState.signedIn(.testData), + AuthorizationState.sessionEstablished( + AmplifyCredentials.testDataWithExpiredTokens)) + + let initAuth: MockIdentityProvider.MockInitiateAuthResponse = { _ in + return InitiateAuthOutput(authenticationResult: .init(accessToken: "accessToken", + expiresIn: 1000, + idToken: "idToken", + refreshToken: "refreshToke")) + } + + let awsCredentials: MockIdentity.MockGetCredentialsResponse = { _ in + throw NSError(domain: NSURLErrorDomain, code: 1, userInfo: nil) + } + let plugin = configurePluginWith( + userPool: { MockIdentityProvider(mockInitiateAuthResponse: initAuth) }, + identityPool: { MockIdentity(mockGetCredentialsResponse: awsCredentials) }, + initialState: initialState) + + let session = try await plugin.fetchAuthSession(options: AuthFetchSessionRequest.Options()) + + XCTAssertTrue(session.isSignedIn) + let credentialsResult = (session as? AuthAWSCredentialsProvider)?.getAWSCredentials() + guard case .failure(let error) = credentialsResult, case .service = error else { + XCTFail("Should return service error") + return + } + + let identityIdResult = (session as? AuthCognitoIdentityProvider)?.getIdentityId() + guard case .failure(let identityIdError) = identityIdResult, + case .service = identityIdError else { + XCTFail("Should return service error") + return + } + + let tokensResult = (session as? AuthCognitoTokensProvider)?.getCognitoTokens() + guard case .failure(let tokenError) = tokensResult, + case .service = tokenError else { + XCTFail("Should return service error") + return + } + } } diff --git a/AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration.swift b/AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration.swift index ac6db525d2..4982582d64 100644 --- a/AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration.swift +++ b/AmplifyPlugins/Core/AWSPluginsCore/ServiceConfiguration/AmplifyAWSServiceConfiguration.swift @@ -15,7 +15,7 @@ import Amplify public class AmplifyAWSServiceConfiguration { /// - Tag: AmplifyAWSServiceConfiguration.amplifyVersion - public static let amplifyVersion = "2.28.0" + public static let amplifyVersion = "2.29.3" /// - Tag: AmplifyAWSServiceConfiguration.platformName public static let platformName = "amplify-swift" diff --git a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift index 078bf60624..53c213e4fb 100644 --- a/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift +++ b/AmplifyPlugins/DataStore/Sources/AWSDataStorePlugin/Storage/StorageEngine.swift @@ -206,6 +206,7 @@ final class StorageEngine: StorageEngineBehavior { "Cannot apply a condition on model which does not exist.", "Save the model instance without a condition first.") completion(.failure(causedBy: dataStoreError)) + return } do { diff --git a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsHasOne.swift b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsHasOne.swift index 14133c3701..2627faa3a0 100644 --- a/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsHasOne.swift +++ b/AmplifyPlugins/DataStore/Tests/AWSDataStorePluginTests/Storage/StorageEngineTestsHasOne.swift @@ -70,6 +70,28 @@ class StorageEngineTestsHasOne: StorageEngineTestsBase { } } + /// Given: A model that does not exist + /// When: save is called with a predicate + /// Then: A DataStoreError.invalidCondition error is returned + func testSaveModelWithPredicate_shouldFail() { + let team = Team(name: "Team") + let saveFinished = expectation(description: "Save finished") + storageEngine.save(team, condition: Team.keys.name.beginsWith("T")) { result in + defer { + saveFinished.fulfill() + } + guard case .failure(let error) = result, + case . invalidCondition(let errorDescription, let recoverySuggestion, _) = error else { + XCTFail("Expected failure with .invalidCondition, got \(result)") + return + } + + XCTAssertEqual(errorDescription, "Cannot apply a condition on model which does not exist.") + XCTAssertEqual(recoverySuggestion, "Save the model instance without a condition first.") + } + wait(for: [saveFinished], timeout: defaultTimeout) + } + func testBelongsToRelationshipWithoutOwner() { let teamA = Team(name: "A-Team") let projectA = Project(name: "ProjectA", team: teamA) diff --git a/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/Events/LivenessEvent.swift b/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/Events/LivenessEvent.swift index f8ba73d2b9..822373e296 100644 --- a/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/Events/LivenessEvent.swift +++ b/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/Events/LivenessEvent.swift @@ -52,6 +52,7 @@ public enum LivenessEventKind { public static let serviceQuotaExceeded = Self(rawValue: "ServiceQuotaExceededException") public static let serviceUnavailable = Self(rawValue: "ServiceUnavailableException") public static let sessionNotFound = Self(rawValue: "SessionNotFoundException") + public static let invalidSignature = Self(rawValue: "InvalidSignatureException") } } diff --git a/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/SPI/AWSPredictionsPlugin+Liveness.swift b/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/SPI/AWSPredictionsPlugin+Liveness.swift index f476122ab0..d2eec8d96e 100644 --- a/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/SPI/AWSPredictionsPlugin+Liveness.swift +++ b/AmplifyPlugins/Predictions/AWSPredictionsPlugin/Liveness/SPI/AWSPredictionsPlugin+Liveness.swift @@ -66,6 +66,7 @@ public struct FaceLivenessSessionError: Swift.Error, Equatable { public static let accessDenied = Self(code: 7) public static let invalidRegion = Self(code: 8) public static let invalidURL = Self(code: 9) + public static let invalidSignature = Self(code: 10) } extension FaceLivenessSessionError { @@ -85,6 +86,8 @@ extension FaceLivenessSessionError { self = .serviceUnavailable case .sessionNotFound: self = .sessionNotFound + case .invalidSignature: + self = .invalidSignature default: self = .unknown } diff --git a/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift b/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift index a3c92f8dcc..48c19c9182 100644 --- a/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift +++ b/AmplifyTests/CoreTests/AmplifyTaskQueueTests.swift @@ -66,4 +66,20 @@ final class AmplifyTaskQueueTests: XCTestCase { await fulfillment(of: [expectation1, expectation2, expectation3], enforceOrder: true) } + func testAsync() async throws { + let taskCount = 1_000 + let expectations: [XCTestExpectation] = (0..() + + for i in 0..