Skip to content

Commit

Permalink
feat(API): implement AppSyncRealTimeClient and WebSocketClient with U…
Browse files Browse the repository at this point in the history
…RLSession (#3575)

* feat(API): implement AppSyncRealTimeClient and WebSocketClient with URLSession (#3527)

* feat(datastore): replace appSync realtime client with URLSession

* Add timeout for appsync realtime request

* adding unit test cases

* move websocket components to core

* added more interfaces for unit test

* add unit test cases for websocket client

* fix broken integration test cases

* change appsync client state to a subject

* Add resettable for appsync and websocket clients

* add integration test cases for appsync client

* Add subscription actor

* fix subscription life cycle issue

* add integration test cases for datastore with large number of models

* add integration test case for max subscription reached

* fix auth error handling for AppSyncRealTimeRequest errors

* add doc comments for WebSocketClient and AppSyncRealTimeClient

* add integration test case for retry on maxSubscriptionReached

* move sendRequest to appSyncRealTimeClient

* Add more doc comments

* bind tasks cancellables to connection life cycle

* update websocket spi name

* resolve comments

* fix(Logging): Updating the required reason API usage (#3570)

* chore: release 2.27.3 [skip ci]

* chore: finalize release 2.27.3 [skip ci]

* feat(Auth): Adding forceAliasCreation option during confirmSignUp (#3382)

* feat(Auth): Adding forceAliasCreation option during confirmSignUp

* update pinpoint unit tests

* Revert "update pinpoint unit tests"

This reverts commit 0f804d8.

* rename CognitoAuth to AuthToken

---------

Co-authored-by: Harsh <6162866+harsh62@users.noreply.github.com>
Co-authored-by: Abhash Kumar Singh <thisisabhash@gmail.com>
Co-authored-by: aws-amplify-ops <aws-amplify-ops@amazon.com>

* add a high level readme file for AppSyncRealTimeClient module

* add a high level readme file for WebSocketClient module

* update push notification test scheme

* revert changes made in push notification category

---------

Co-authored-by: Harsh <6162866+harsh62@users.noreply.github.com>
Co-authored-by: Abhash Kumar Singh <thisisabhash@gmail.com>
Co-authored-by: aws-amplify-ops <aws-amplify-ops@amazon.com>
  • Loading branch information
4 people authored Mar 22, 2024
1 parent c2fb1c0 commit 014d9b1
Show file tree
Hide file tree
Showing 68 changed files with 4,340 additions and 1,130 deletions.
4 changes: 4 additions & 0 deletions Amplify/Core/Support/JSONValue+Subscript.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ public extension JSONValue {
return nil
}
}

subscript(dynamicMember member: String) -> JSONValue? {
self[member]
}
}
60 changes: 60 additions & 0 deletions Amplify/Core/Support/JSONValue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import Foundation

/// A utility type that allows us to represent an arbitrary JSON structure
@dynamicMemberLookup
public enum JSONValue {
case array([JSONValue])
case boolean(Bool)
Expand Down Expand Up @@ -105,3 +106,62 @@ extension JSONValue: ExpressibleByStringLiteral {
self = .string(value)
}
}

extension JSONValue {

public var asObject: [String: JSONValue]? {
if case .object(let object) = self {
return object
}

return nil
}

public var asArray: [JSONValue]? {
if case .array(let array) = self {
return array
}

return nil
}

public var stringValue: String? {
if case .string(let string) = self {
return string
}

return nil
}

public var intValue: Int? {
if case .number(let double) = self,
double < Double(Int.max) && double >= Double(Int.min) {
return Int(double)
}
return nil
}

public var doubleValue: Double? {
if case .number(let double) = self {
return double
}

return nil
}

public var booleanValue: Bool? {
if case .boolean(let bool) = self {
return bool
}

return nil
}

public var isNull: Bool {
if case .null = self {
return true
}

return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//

import Amplify
import AppSyncRealTimeClient

extension APIError {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import Amplify
import AWSPluginsCore
import AppSyncRealTimeClient
import AwsCommonRuntimeKit

public extension AWSAPIPlugin {
Expand Down Expand Up @@ -53,14 +52,14 @@ extension AWSAPIPlugin {
struct ConfigurationDependencies {
let authService: AWSAuthServiceBehavior
let pluginConfig: AWSAPICategoryPluginConfiguration
let subscriptionConnectionFactory: SubscriptionConnectionFactory
let appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol
let logLevel: Amplify.LogLevel

init(
configurationValues: JSONValue,
apiAuthProviderFactory: APIAuthProviderFactory,
authService: AWSAuthServiceBehavior? = nil,
subscriptionConnectionFactory: SubscriptionConnectionFactory? = nil,
appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol? = nil,
logLevel: Amplify.LogLevel? = nil
) throws {
let authService = authService
Expand All @@ -72,28 +71,26 @@ extension AWSAPIPlugin {
authService: authService
)

let subscriptionConnectionFactory = subscriptionConnectionFactory
?? AWSSubscriptionConnectionFactory()

let logLevel = logLevel ?? Amplify.Logging.logLevel

self.init(
pluginConfig: pluginConfig,
authService: authService,
subscriptionConnectionFactory: subscriptionConnectionFactory,
appSyncRealTimeClientFactory: appSyncRealTimeClientFactory
?? AppSyncRealTimeClientFactory(),
logLevel: logLevel
)
}

init(
pluginConfig: AWSAPICategoryPluginConfiguration,
authService: AWSAuthServiceBehavior,
subscriptionConnectionFactory: SubscriptionConnectionFactory,
appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol,
logLevel: Amplify.LogLevel
) {
self.pluginConfig = pluginConfig
self.authService = authService
self.subscriptionConnectionFactory = subscriptionConnectionFactory
self.appSyncRealTimeClientFactory = appSyncRealTimeClientFactory
self.logLevel = logLevel
}

Expand All @@ -108,8 +105,6 @@ extension AWSAPIPlugin {
func configure(using dependencies: ConfigurationDependencies) {
authService = dependencies.authService
pluginConfig = dependencies.pluginConfig
subscriptionConnectionFactory = dependencies.subscriptionConnectionFactory
AppSyncRealTimeClient.logLevel = AppSyncRealTimeClient.LogLevel(
rawValue: dependencies.logLevel.rawValue) ?? .error
appSyncRealTimeClientFactory = dependencies.appSyncRealTimeClientFactory
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public extension AWSAPIPlugin {
let operation = AWSGraphQLSubscriptionOperation(
request: request.toOperationRequest(operationType: .subscription),
pluginConfig: pluginConfig,
subscriptionConnectionFactory: subscriptionConnectionFactory,
appSyncRealTimeClientFactory: appSyncRealTimeClientFactory,
authService: authService,
apiAuthProviderFactory: authProviderFactory,
inProcessListener: valueListener,
Expand All @@ -74,7 +74,7 @@ public extension AWSAPIPlugin {
let request = request.toOperationRequest(operationType: .subscription)
let runner = AWSGraphQLSubscriptionTaskRunner(request: request,
pluginConfig: pluginConfig,
subscriptionConnectionFactory: subscriptionConnectionFactory,
appSyncClientFactory: appSyncRealTimeClientFactory,
authService: authService,
apiAuthProviderFactory: authProviderFactory)
return runner.sequence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//

import Amplify
import AppSyncRealTimeClient

extension AWSAPIPlugin {
var log: Logger {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import Foundation
extension AWSAPIPlugin: Resettable {

public func reset() async {
if let resettableAppSyncRealClientFactory = appSyncRealTimeClientFactory as? Resettable {
await resettableAppSyncRealClientFactory.reset()
}
appSyncRealTimeClientFactory = nil

mapper.reset()

await session.cancelAndReset()
Expand All @@ -24,8 +29,6 @@ extension AWSAPIPlugin: Resettable {
reachabilityMapLock.execute {
reachabilityMap.removeAll()
}

subscriptionConnectionFactory = nil
}

}
2 changes: 1 addition & 1 deletion AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final public class AWSAPIPlugin: NSObject, APICategoryPlugin, APICategoryGraphQL

/// Creating and retrieving connections for subscriptions. This will be instantiated during the configuration phase,
/// and is clearable by `reset()`. This is implicitly unwrapped to be destroyed when resetting.
var subscriptionConnectionFactory: SubscriptionConnectionFactory!
var appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol!

var authProviderFactory: APIAuthProviderFactory

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//


import Foundation
import Combine
import Amplify

extension AppSyncRealTimeClient {
/**
Submit an AppSync request to real-time server.
- Returns:
Void indicates request is finished successfully
- Throws:
Error is throwed when request is failed
*/
func sendRequest(
_ request: AppSyncRealTimeRequest,
timeout: TimeInterval = 5
) async throws {
var responseSubscriptions = Set<AnyCancellable>()
try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
guard let self else {
Self.log.debug("[AppSyncRealTimeClient] client has already been disposed")
continuation.resume(returning: ())
return
}

// listen to response
self.subject
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
.flatMap { Self.filterResponse(request: request, response: $0) }
.timeout(.seconds(timeout), scheduler: DispatchQueue.global(qos: .userInitiated), customError: { .timeout })
.first()
.sink(receiveCompletion: { completion in
switch completion {
case .finished:
continuation.resume(returning: ())
case .failure(let error):
continuation.resume(throwing: error)
}
}, receiveValue: { _ in })
.store(in: &responseSubscriptions)

// sending request; error is discarded and will be classified as timeout
Task {
do {
let decoratedRequest = await self.requestInterceptor.interceptRequest(
event: request,
url: self.endpoint
)
let requestJSON = String(data: try Self.jsonEncoder.encode(decoratedRequest), encoding: .utf8)

try await self.webSocketClient.write(message: requestJSON!)
} catch {
Self.log.debug("[AppSyncRealTimeClient]Failed to send AppSync request \(request), error: \(error)")
}
}
}
}

private static func filterResponse(
request: AppSyncRealTimeRequest,
response: AppSyncRealTimeResponse
) -> AnyPublisher<AppSyncRealTimeResponse, AppSyncRealTimeRequest.Error> {
let justTheResponse = Just(response)
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
.eraseToAnyPublisher()

switch (request, response.type) {
case (.connectionInit, .connectionAck):
return justTheResponse

case (.start(let startRequest), .startAck) where startRequest.id == response.id:
return justTheResponse

case (.stop(let id), .stopAck) where id == response.id:
return justTheResponse

case (_, .error)
where request.id != nil
&& request.id == response.id
&& response.payload?.errors != nil:
let errorsJson: JSONValue = (response.payload?.errors)!
let errors = errorsJson.asArray ?? [errorsJson]
let reqeustErrors = errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
if reqeustErrors.isEmpty {
return Empty(
outputType: AppSyncRealTimeResponse.self,
failureType: AppSyncRealTimeRequest.Error.self
).eraseToAnyPublisher()
} else {
return Fail(
outputType: AppSyncRealTimeResponse.self,
failure: reqeustErrors.first!
).eraseToAnyPublisher()
}

default:
return Empty(
outputType: AppSyncRealTimeResponse.self,
failureType: AppSyncRealTimeRequest.Error.self
).eraseToAnyPublisher()

}
}
}
Loading

0 comments on commit 014d9b1

Please sign in to comment.