Skip to content

Commit e18caa6

Browse files
5dharsh62thisisabhashaws-amplify-ops
authored
feat(API): implement AppSyncRealTimeClient and WebSocketClient with URLSession (#3527)
* feat(datastore): replace appSync realtime client with URLSession * Add timeout for appsync realtime request * adding unit test cases * move websocket components to core * added more interfaces for unit test * add unit test cases for websocket client * fix broken integration test cases * change appsync client state to a subject * Add resettable for appsync and websocket clients * add integration test cases for appsync client * Add subscription actor * fix subscription life cycle issue * add integration test cases for datastore with large number of models * add integration test case for max subscription reached * fix auth error handling for AppSyncRealTimeRequest errors * add doc comments for WebSocketClient and AppSyncRealTimeClient * add integration test case for retry on maxSubscriptionReached * move sendRequest to appSyncRealTimeClient * Add more doc comments * bind tasks cancellables to connection life cycle * update websocket spi name * resolve comments * fix(Logging): Updating the required reason API usage (#3570) * chore: release 2.27.3 [skip ci] * chore: finalize release 2.27.3 [skip ci] * feat(Auth): Adding forceAliasCreation option during confirmSignUp (#3382) * feat(Auth): Adding forceAliasCreation option during confirmSignUp * update pinpoint unit tests * Revert "update pinpoint unit tests" This reverts commit 0f804d8. * rename CognitoAuth to AuthToken --------- Co-authored-by: Harsh <6162866+harsh62@users.noreply.github.com> Co-authored-by: Abhash Kumar Singh <thisisabhash@gmail.com> Co-authored-by: aws-amplify-ops <aws-amplify-ops@amazon.com>
1 parent 7d9640a commit e18caa6

File tree

77 files changed

+4569
-1162
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

77 files changed

+4569
-1162
lines changed

Amplify/Core/Support/JSONValue+Subscript.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ public extension JSONValue {
2626
return nil
2727
}
2828
}
29+
30+
subscript(dynamicMember member: String) -> JSONValue? {
31+
self[member]
32+
}
2933
}

Amplify/Core/Support/JSONValue.swift

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import Foundation
99

1010
/// A utility type that allows us to represent an arbitrary JSON structure
11+
@dynamicMemberLookup
1112
public enum JSONValue {
1213
case array([JSONValue])
1314
case boolean(Bool)
@@ -105,3 +106,62 @@ extension JSONValue: ExpressibleByStringLiteral {
105106
self = .string(value)
106107
}
107108
}
109+
110+
extension JSONValue {
111+
112+
public var asObject: [String: JSONValue]? {
113+
if case .object(let object) = self {
114+
return object
115+
}
116+
117+
return nil
118+
}
119+
120+
public var asArray: [JSONValue]? {
121+
if case .array(let array) = self {
122+
return array
123+
}
124+
125+
return nil
126+
}
127+
128+
public var stringValue: String? {
129+
if case .string(let string) = self {
130+
return string
131+
}
132+
133+
return nil
134+
}
135+
136+
public var intValue: Int? {
137+
if case .number(let double) = self,
138+
double < Double(Int.max) && double >= Double(Int.min) {
139+
return Int(double)
140+
}
141+
return nil
142+
}
143+
144+
public var doubleValue: Double? {
145+
if case .number(let double) = self {
146+
return double
147+
}
148+
149+
return nil
150+
}
151+
152+
public var booleanValue: Bool? {
153+
if case .boolean(let bool) = self {
154+
return bool
155+
}
156+
157+
return nil
158+
}
159+
160+
public var isNull: Bool {
161+
if case .null = self {
162+
return true
163+
}
164+
165+
return false
166+
}
167+
}

AmplifyPlugins/API/Sources/AWSAPIPlugin/APIError+Unauthorized.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//
77

88
import Amplify
9-
import AppSyncRealTimeClient
109

1110
extension APIError {
1211

AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin+Configure.swift

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import Amplify
99
import AWSPluginsCore
10-
import AppSyncRealTimeClient
1110
import AwsCommonRuntimeKit
1211

1312
public extension AWSAPIPlugin {
@@ -53,14 +52,14 @@ extension AWSAPIPlugin {
5352
struct ConfigurationDependencies {
5453
let authService: AWSAuthServiceBehavior
5554
let pluginConfig: AWSAPICategoryPluginConfiguration
56-
let subscriptionConnectionFactory: SubscriptionConnectionFactory
55+
let appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol
5756
let logLevel: Amplify.LogLevel
5857

5958
init(
6059
configurationValues: JSONValue,
6160
apiAuthProviderFactory: APIAuthProviderFactory,
6261
authService: AWSAuthServiceBehavior? = nil,
63-
subscriptionConnectionFactory: SubscriptionConnectionFactory? = nil,
62+
appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol? = nil,
6463
logLevel: Amplify.LogLevel? = nil
6564
) throws {
6665
let authService = authService
@@ -72,28 +71,26 @@ extension AWSAPIPlugin {
7271
authService: authService
7372
)
7473

75-
let subscriptionConnectionFactory = subscriptionConnectionFactory
76-
?? AWSSubscriptionConnectionFactory()
77-
7874
let logLevel = logLevel ?? Amplify.Logging.logLevel
7975

8076
self.init(
8177
pluginConfig: pluginConfig,
8278
authService: authService,
83-
subscriptionConnectionFactory: subscriptionConnectionFactory,
79+
appSyncRealTimeClientFactory: appSyncRealTimeClientFactory
80+
?? AppSyncRealTimeClientFactory(),
8481
logLevel: logLevel
8582
)
8683
}
8784

8885
init(
8986
pluginConfig: AWSAPICategoryPluginConfiguration,
9087
authService: AWSAuthServiceBehavior,
91-
subscriptionConnectionFactory: SubscriptionConnectionFactory,
88+
appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol,
9289
logLevel: Amplify.LogLevel
9390
) {
9491
self.pluginConfig = pluginConfig
9592
self.authService = authService
96-
self.subscriptionConnectionFactory = subscriptionConnectionFactory
93+
self.appSyncRealTimeClientFactory = appSyncRealTimeClientFactory
9794
self.logLevel = logLevel
9895
}
9996

@@ -108,8 +105,6 @@ extension AWSAPIPlugin {
108105
func configure(using dependencies: ConfigurationDependencies) {
109106
authService = dependencies.authService
110107
pluginConfig = dependencies.pluginConfig
111-
subscriptionConnectionFactory = dependencies.subscriptionConnectionFactory
112-
AppSyncRealTimeClient.logLevel = AppSyncRealTimeClient.LogLevel(
113-
rawValue: dependencies.logLevel.rawValue) ?? .error
108+
appSyncRealTimeClientFactory = dependencies.appSyncRealTimeClientFactory
114109
}
115110
}

AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin+GraphQLBehavior.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public extension AWSAPIPlugin {
6161
let operation = AWSGraphQLSubscriptionOperation(
6262
request: request.toOperationRequest(operationType: .subscription),
6363
pluginConfig: pluginConfig,
64-
subscriptionConnectionFactory: subscriptionConnectionFactory,
64+
appSyncRealTimeClientFactory: appSyncRealTimeClientFactory,
6565
authService: authService,
6666
apiAuthProviderFactory: authProviderFactory,
6767
inProcessListener: valueListener,
@@ -74,7 +74,7 @@ public extension AWSAPIPlugin {
7474
let request = request.toOperationRequest(operationType: .subscription)
7575
let runner = AWSGraphQLSubscriptionTaskRunner(request: request,
7676
pluginConfig: pluginConfig,
77-
subscriptionConnectionFactory: subscriptionConnectionFactory,
77+
appSyncClientFactory: appSyncRealTimeClientFactory,
7878
authService: authService,
7979
apiAuthProviderFactory: authProviderFactory)
8080
return runner.sequence

AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin+Log.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//
77

88
import Amplify
9-
import AppSyncRealTimeClient
109

1110
extension AWSAPIPlugin {
1211
var log: Logger {

AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin+Resettable.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import Foundation
1111
extension AWSAPIPlugin: Resettable {
1212

1313
public func reset() async {
14+
if let resettableAppSyncRealClientFactory = appSyncRealTimeClientFactory as? Resettable {
15+
await resettableAppSyncRealClientFactory.reset()
16+
}
17+
appSyncRealTimeClientFactory = nil
18+
1419
mapper.reset()
1520

1621
await session.cancelAndReset()
@@ -24,8 +29,6 @@ extension AWSAPIPlugin: Resettable {
2429
reachabilityMapLock.execute {
2530
reachabilityMap.removeAll()
2631
}
27-
28-
subscriptionConnectionFactory = nil
2932
}
3033

3134
}

AmplifyPlugins/API/Sources/AWSAPIPlugin/AWSAPIPlugin.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ final public class AWSAPIPlugin: NSObject, APICategoryPlugin, APICategoryGraphQL
3636

3737
/// Creating and retrieving connections for subscriptions. This will be instantiated during the configuration phase,
3838
/// and is clearable by `reset()`. This is implicitly unwrapped to be destroyed when resetting.
39-
var subscriptionConnectionFactory: SubscriptionConnectionFactory!
39+
var appSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol!
4040

4141
var authProviderFactory: APIAuthProviderFactory
4242

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//
2+
// Copyright Amazon.com Inc. or its affiliates.
3+
// All Rights Reserved.
4+
//
5+
// SPDX-License-Identifier: Apache-2.0
6+
//
7+
8+
9+
import Foundation
10+
import Combine
11+
import Amplify
12+
13+
extension AppSyncRealTimeClient {
14+
/**
15+
Submit an AppSync request to real-time server.
16+
- Returns:
17+
Void indicates request is finished successfully
18+
- Throws:
19+
Error is throwed when request is failed
20+
*/
21+
func sendRequest(
22+
_ request: AppSyncRealTimeRequest,
23+
timeout: TimeInterval = 5
24+
) async throws {
25+
var responseSubscriptions = Set<AnyCancellable>()
26+
try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
27+
guard let self else {
28+
Self.log.debug("[AppSyncRealTimeClient] client has already been disposed")
29+
continuation.resume(returning: ())
30+
return
31+
}
32+
33+
// listen to response
34+
self.subject
35+
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
36+
.flatMap { Self.filterResponse(request: request, response: $0) }
37+
.timeout(.seconds(timeout), scheduler: DispatchQueue.global(qos: .userInitiated), customError: { .timeout })
38+
.first()
39+
.sink(receiveCompletion: { completion in
40+
switch completion {
41+
case .finished:
42+
continuation.resume(returning: ())
43+
case .failure(let error):
44+
continuation.resume(throwing: error)
45+
}
46+
}, receiveValue: { _ in })
47+
.store(in: &responseSubscriptions)
48+
49+
// sending request; error is discarded and will be classified as timeout
50+
Task {
51+
do {
52+
let decoratedRequest = await self.requestInterceptor.interceptRequest(
53+
event: request,
54+
url: self.endpoint
55+
)
56+
let requestJSON = String(data: try Self.jsonEncoder.encode(decoratedRequest), encoding: .utf8)
57+
58+
try await self.webSocketClient.write(message: requestJSON!)
59+
} catch {
60+
Self.log.debug("[AppSyncRealTimeClient]Failed to send AppSync request \(request), error: \(error)")
61+
}
62+
}
63+
}
64+
}
65+
66+
private static func filterResponse(
67+
request: AppSyncRealTimeRequest,
68+
response: AppSyncRealTimeResponse
69+
) -> AnyPublisher<AppSyncRealTimeResponse, AppSyncRealTimeRequest.Error> {
70+
let justTheResponse = Just(response)
71+
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
72+
.eraseToAnyPublisher()
73+
74+
switch (request, response.type) {
75+
case (.connectionInit, .connectionAck):
76+
return justTheResponse
77+
78+
case (.start(let startRequest), .startAck) where startRequest.id == response.id:
79+
return justTheResponse
80+
81+
case (.stop(let id), .stopAck) where id == response.id:
82+
return justTheResponse
83+
84+
case (_, .error)
85+
where request.id != nil
86+
&& request.id == response.id
87+
&& response.payload?.errors != nil:
88+
let errorsJson: JSONValue = (response.payload?.errors)!
89+
let errors = errorsJson.asArray ?? [errorsJson]
90+
let reqeustErrors = errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
91+
if reqeustErrors.isEmpty {
92+
return Empty(
93+
outputType: AppSyncRealTimeResponse.self,
94+
failureType: AppSyncRealTimeRequest.Error.self
95+
).eraseToAnyPublisher()
96+
} else {
97+
return Fail(
98+
outputType: AppSyncRealTimeResponse.self,
99+
failure: reqeustErrors.first!
100+
).eraseToAnyPublisher()
101+
}
102+
103+
default:
104+
return Empty(
105+
outputType: AppSyncRealTimeResponse.self,
106+
failureType: AppSyncRealTimeRequest.Error.self
107+
).eraseToAnyPublisher()
108+
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)