Skip to content

Commit

Permalink
chore: amplify network package
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Mar 22, 2024
1 parent 014d9b1 commit 8859896
Show file tree
Hide file tree
Showing 29 changed files with 158 additions and 82 deletions.
19 changes: 19 additions & 0 deletions .swiftpm/xcode/xcshareddata/xcschemes/Amplify-Package.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,16 @@
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "AmplifyNetworkUnitTests"
BuildableName = "AmplifyNetworkUnitTests"
BlueprintName = "AmplifyNetworkUnitTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
Expand All @@ -767,6 +777,15 @@
savedToolIdentifier = ""
useCustomWorkingDirectory = "NO"
debugDocumentVersioning = "YES">
<MacroExpansion>
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "Amplify"
BuildableName = "Amplify"
BlueprintName = "Amplify"
ReferencedContainer = "container:">
</BuildableReference>
</MacroExpansion>
</ProfileAction>
<AnalyzeAction
buildConfiguration = "Debug">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import Foundation
import Amplify
@_spi(WebSocket) import AWSPluginsCore
@_spi(WebSocket) @_spi(AppSyncRTC) import AmplifyNetwork

class APIKeyAuthInterceptor {
private let apiKey: String
Expand All @@ -31,7 +31,10 @@ extension APIKeyAuthInterceptor: WebSocketInterceptor {

extension APIKeyAuthInterceptor: AppSyncRequestInterceptor {
func interceptRequest(event: AppSyncRealTimeRequest, url: URL) async -> AppSyncRealTimeRequest {
let host = AppSyncRealTimeClientFactory.appSyncApiEndpoint(url).host!
guard let host = AppSyncRealTimeClientFactory.appSyncApiEndpoint(url).host else {
return event
}

guard case .start(let request) = event else {
return event
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation
import Amplify
@_spi(WebSocket) import AWSPluginsCore
@_spi(WebSocket) @_spi(AppSyncRTC) import AmplifyNetwork

/// General purpose authenticatication subscriptions interceptor for providers whose only
/// requirement is to provide an authentication token via the "Authorization" header
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
//

import Foundation
@_spi(WebSocket) import AWSPluginsCore
import Amplify
import AWSClientRuntime
import ClientRuntime
import AWSPluginsCore
@_spi(WebSocket) @_spi(AppSyncRTC) import AmplifyNetwork

class IAMAuthInterceptor {

Expand Down Expand Up @@ -114,7 +115,7 @@ extension IAMAuthInterceptor: AppSyncRequestInterceptor {
return .start(.init(
id: request.id,
data: request.data,
auth: authHeader.map { .iam($0) }
auth: authHeader.map { AppSyncRealTimeRequestAuth.iam($0) }
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Amplify
import Foundation
import AWSPluginsCore
import Combine
@_spi(AppSyncRTC) import AmplifyNetwork

public class AWSGraphQLSubscriptionTaskRunner<R: Decodable>: InternalTaskRunner, InternalTaskAsyncThrowingSequence, InternalTaskThrowingChannel {
public typealias Request = GraphQLOperationRequest<R>
Expand Down Expand Up @@ -140,8 +141,8 @@ public class AWSGraphQLSubscriptionTaskRunner<R: Decodable>: InternalTaskRunner,
case .unsubscribed:
send(GraphQLSubscriptionEvent<R>.connection(.disconnected))
finish()
case .error(let errors):
fail(toAPIError(errors, type: R.self))
case .error(let payload):
fail(toAPIError(decodeAppSyncRealTimeResponseError(payload), type: R.self))
}
}

Expand Down Expand Up @@ -320,8 +321,8 @@ final public class AWSGraphQLSubscriptionOperation<R: Decodable>: GraphQLSubscri
dispatchInProcess(data: GraphQLSubscriptionEvent<R>.connection(.disconnected))
dispatch(result: .successfulVoid)
finish()
case .error(let errors):
dispatch(result: .failure(toAPIError(errors, type: R.self)))
case .error(let payload):
dispatch(result: .failure(toAPIError(decodeAppSyncRealTimeResponseError(payload), type: R.self)))
finish()
}
}
Expand Down Expand Up @@ -402,5 +403,36 @@ fileprivate func toAPIError<R: Decodable>(_ errors: [Error], type: R.Type) -> AP
errors.first
)
}
}

fileprivate func decodeAppSyncRealTimeResponseError(_ data: JSONValue?) -> [Error] {
let knownAppSyncRealTimeRequestErorrs =
decodeAppSyncRealTimeRequestError(data)
.filter { !$0.isUnknown }
if knownAppSyncRealTimeRequestErorrs.isEmpty {
let graphQLErrors = decodeGraphQLErrors(data)
return graphQLErrors.isEmpty
? [APIError.operationError("Failed to decode AppSync error response", "", nil)]
: graphQLErrors
} else {
return knownAppSyncRealTimeRequestErorrs
}
}

fileprivate func decodeGraphQLErrors(_ data: JSONValue?) -> [GraphQLError] {
do {
return try GraphQLErrorDecoder.decodeAppSyncErrors(data)
} catch {
print("Failed to decode errors: \(error)")
return []
}
}

fileprivate func decodeAppSyncRealTimeRequestError(_ data: JSONValue?) -> [AppSyncRealTimeRequest.Error] {
guard let errorsJson = data?.errors else {
print("No 'errors' field found in response json")
return []
}
let errors = errorsJson.asArray ?? [errorsJson]
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import Foundation
import Amplify
import Combine
@_spi(WebSocket) import AWSPluginsCore
import AWSPluginsCore
@_spi(WebSocket) @_spi(AppSyncRTC) import AmplifyNetwork


protocol AppSyncRealTimeClientFactoryProtocol {
func getAppSyncRealTimeClient(
Expand All @@ -21,13 +23,6 @@ protocol AppSyncRealTimeClientFactoryProtocol {
) async throws -> AppSyncRealTimeClientProtocol
}

protocol AppSyncRealTimeClientProtocol {
func connect() async throws
func disconnectWhenIdel() async
func disconnect() async
func subscribe(id: String, query: String) async throws -> AnyPublisher<AppSyncSubscriptionEvent, Never>
func unsubscribe(id: String) async throws
}

actor AppSyncRealTimeClientFactory: AppSyncRealTimeClientFactoryProtocol {
struct MapperCacheKey: Hashable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Amplify
public class AmplifyAWSServiceConfiguration {

/// - Tag: AmplifyAWSServiceConfiguration.amplifyVersion
public static let amplifyVersion = "2.28.0"
public static let amplifyVersion = "2.27.3"

/// - Tag: AmplifyAWSServiceConfiguration.platformName
public static let platformName = "amplify-swift"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import XCTest
import Combine
import Amplify
@testable import AWSAPIPlugin
@_spi(AppSyncRTC) import AmplifyNetwork

class DataStoreLargeNumberModelsSubscriptionTests: SyncEngineIntegrationTestBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Foundation
import Combine
import Amplify

@_spi(AppSyncRTC)
extension AppSyncRealTimeClient {
/**
Submit an AppSync request to real-time server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,24 @@


import Foundation
import Amplify
import Combine
@_spi(WebSocket) import AWSPluginsCore
import Amplify

@_spi(AppSyncRTC)
public protocol AppSyncRealTimeClientProtocol {
func connect() async throws
func disconnectWhenIdel() async
func disconnect() async
func subscribe(id: String, query: String) async throws -> AnyPublisher<AppSyncSubscriptionEvent, Never>
func unsubscribe(id: String) async throws
}

/**
The AppSyncRealTimeClient conforms to the AppSync real-time WebSocket protocol.
ref: https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html
*/
actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
@_spi(AppSyncRTC)
public actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {

static let jsonEncoder = JSONEncoder()
static let jsonDecoder = JSONDecoder()
Expand Down Expand Up @@ -50,7 +59,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
/// Writable data stream convert WebSocketEvent to AppSyncRealTimeResponse
internal let subject = PassthroughSubject<AppSyncRealTimeResponse, Never>()

var isConnected: Bool {
public var isConnected: Bool {
self.state.value == .connected
}

Expand All @@ -61,7 +70,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
- requestInterceptor: Interceptor for decocating AppSyncRealTimeRequest
- webSocketClient: WebSocketClient for reading/writing to connection
*/
init(
public init(
endpoint: URL,
requestInterceptor: AppSyncRequestInterceptor,
webSocketClient: AppSyncWebSocketClientProtocol
Expand All @@ -84,7 +93,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
/**
Connecting to remote AppSync real-time server.
*/
func connect() async throws {
public func connect() async throws {
switch self.state.value {
case .connecting, .connected:
log.debug("[AppSyncRealTimeClient] client is already connecting or connected")
Expand Down Expand Up @@ -116,7 +125,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
/**
Disconnect only when there are no subscriptions exist.
*/
func disconnectWhenIdel() async {
public func disconnectWhenIdel() async {
if self.subscriptions.isEmpty {
log.debug("[AppSyncRealTimeClient] no subscription exist, client is trying to disconnect")
await disconnect()
Expand All @@ -128,7 +137,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
/**
Disconnect from AppSync real-time server.
*/
func disconnect() async {
public func disconnect() async {
guard self.state.value != .disconnecting else {
log.debug("[AppSyncRealTimeClient] client already disconnecting")
return
Expand All @@ -152,7 +161,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
- Returns:
A never fail data stream for AppSyncSubscriptionEvent.
*/
func subscribe(id: String, query: String) async throws -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
public func subscribe(id: String, query: String) async throws -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
log.debug("[AppSyncRealTimeClient] Received subscription request id: \(id), query: \(query)")
let subscription = AppSyncRealTimeSubscription(id: id, query: query, appSyncRealTimeClient: self)
subscriptions[id] = subscription
Expand Down Expand Up @@ -201,7 +210,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
- Parameters:
- id: unique identifier of the subscription.
*/
func unsubscribe(id: String) async throws {
public func unsubscribe(id: String) async throws {
defer {
log.debug("[AppSyncRealTimeClient] deleted subscription with id: \(id)")
subscriptions.removeValue(forKey: id)
Expand Down Expand Up @@ -283,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
.map { response -> AppSyncSubscriptionEvent? in
switch response.type {
case .connectionError, .error:
return .error(Self.decodeAppSyncRealTimeResponseError(response.payload))
return response.payload.map { .error($0) }
case .data:
return response.payload.map { .data($0) }
default:
Expand All @@ -294,38 +303,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
.eraseToAnyPublisher()
}

private static func decodeAppSyncRealTimeResponseError(_ data: JSONValue?) -> [Error] {
let knownAppSyncRealTimeRequestErorrs =
Self.decodeAppSyncRealTimeRequestError(data)
.filter { !$0.isUnknown }
if knownAppSyncRealTimeRequestErorrs.isEmpty {
let graphQLErrors = Self.decodeGraphQLErrors(data)
return graphQLErrors.isEmpty
? [APIError.operationError("Failed to decode AppSync error response", "", nil)]
: graphQLErrors
} else {
return knownAppSyncRealTimeRequestErorrs
}
}

private static func decodeGraphQLErrors(_ data: JSONValue?) -> [GraphQLError] {
do {
return try GraphQLErrorDecoder.decodeAppSyncErrors(data)
} catch {
log.debug("[AppSyncRealTimeClient] Failed to decode errors: \(error)")
return []
}
}

private static func decodeAppSyncRealTimeRequestError(_ data: JSONValue?) -> [AppSyncRealTimeRequest.Error] {
guard let errorsJson = data?.errors else {
log.error("[AppSyncRealTimeClient] No 'errors' field found in response json")
return []
}
let errors = errorsJson.asArray ?? [errorsJson]
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}

private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
cancellable.store(in: &cancellablesBindToConnection)
}
Expand Down Expand Up @@ -434,15 +411,15 @@ extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure =
}

extension AppSyncRealTimeClient: DefaultLogger {
static var log: Logger {
public static var log: Logger {
Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self))
}

nonisolated var log: Logger { Self.log }
public nonisolated var log: Logger { Self.log }
}

extension AppSyncRealTimeClient: Resettable {
func reset() async {
public func reset() async {
subject.send(completion: .finished)
cancellables = Set()
cancellablesBindToConnection = Set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@


import Foundation
import Combine
import Amplify

@_spi(AppSyncRTC)
public enum AppSyncRealTimeRequest {
case connectionInit
case start(StartRequest)
case stop(String)

public struct StartRequest {
let id: String
let data: String
let auth: AppSyncRealTimeRequestAuth?
public let id: String
public let data: String
public let auth: AppSyncRealTimeRequestAuth?

public init(id: String, data: String, auth: AppSyncRealTimeRequestAuth?) {
self.id = id
self.data = data
self.auth = auth
}
}

var id: String? {
Expand Down Expand Up @@ -78,7 +84,7 @@ extension AppSyncRealTimeRequest {
case unauthorized
case unknown(message: String? = nil, causedBy: Swift.Error? = nil, payload: [String: Any]?)

var isUnknown: Bool {
public var isUnknown: Bool {
if case .unknown = self {
return true
}
Expand Down
Loading

0 comments on commit 8859896

Please sign in to comment.