Skip to content

Commit

Permalink
OWSChatConnection: Use libsignal ChatConnection, replacing ChatService
Browse files Browse the repository at this point in the history
  • Loading branch information
jrose-signal committed Jan 9, 2025
1 parent 8a05fd0 commit a856278
Showing 1 changed file with 39 additions and 46 deletions.
85 changes: 39 additions & 46 deletions SignalServiceKit/Network/OWSChatConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ internal class OWSChatConnectionWithLibSignalShadowing: OWSChatConnectionUsingSS
// We don't use the `.open` state here, only `.closed` and `.connecting`.
// This is a bit less efficient (because we keep the whole connection Task around),
// but also easier for state management.
@Atomic private var chatService: OWSChatConnectionUsingLibSignal<UnauthenticatedChatService>.ConnectionState = .closed
@Atomic private var chatService: OWSChatConnectionUsingLibSignal<UnauthenticatedChatConnection>.ConnectionState = .closed

private var _shadowingFrequency: Double
private var shadowingFrequency: Double {
Expand Down Expand Up @@ -1442,10 +1442,6 @@ internal class OWSChatConnectionWithLibSignalShadowing: OWSChatConnectionUsingSS
}
}

private static func makeChatService(libsignalNet: Net) -> UnauthenticatedChatService {
return libsignalNet.createUnauthenticatedChatService()
}

internal init(libsignalNet: Net, type: OWSChatConnectionType, accountManager: TSAccountManager, appExpiry: AppExpiry, appReadiness: AppReadiness, currentCallProvider: any CurrentCallProvider, db: any DB, registrationStateChangeManager: RegistrationStateChangeManager, shadowingFrequency: Double) {
owsPrecondition((0.0...1.0).contains(shadowingFrequency))
self.libsignalNet = libsignalNet
Expand Down Expand Up @@ -1493,9 +1489,9 @@ internal class OWSChatConnectionWithLibSignalShadowing: OWSChatConnectionUsingSS
chatService = .connecting(token: NSObject(), task: Task { [self] in
do {
owsAssertDebug(type == .unidentified)
let service = libsignalNet.createUnauthenticatedChatService()
let debugInfo = try await service.connect()
Logger.verbose("\(logPrefix): libsignal shadowing socket connected: \(debugInfo.connectionInfo)")
let service = try await libsignalNet.connectUnauthenticatedChat()
service.start(listener: self)
Logger.verbose("\(logPrefix): libsignal shadowing socket connected: \(service.info())")
return service

} catch {
Expand Down Expand Up @@ -1579,12 +1575,12 @@ internal class OWSChatConnectionWithLibSignalShadowing: OWSChatConnectionUsingSS
throw SignalError.chatServiceInactive("not connected")
}

let (healthCheckResult, debugInfo) = try await service.sendAndDebug(.init(method: "GET", pathAndQuery: "/v1/keepalive", timeout: 3))
let healthCheckResult = try await service.send(.init(method: "GET", pathAndQuery: "/v1/keepalive", timeout: 3))
let succeeded = (200...299).contains(healthCheckResult.status)
if !succeeded {
Logger.warn("\(logPrefix): [\(originalRequestId)] keepalive via libsignal responded with status [\(healthCheckResult.status)] (\(debugInfo.connectionInfo))")
Logger.warn("\(logPrefix): [\(originalRequestId)] keepalive via libsignal responded with status [\(healthCheckResult.status)] (\(service.info())")
} else {
Logger.verbose("\(logPrefix): [\(originalRequestId)] keepalive via libsignal responded with status [\(healthCheckResult.status)] (\(debugInfo.connectionInfo))")
Logger.verbose("\(logPrefix): [\(originalRequestId)] keepalive via libsignal responded with status [\(healthCheckResult.status)] (\(service.info())")
}

updateStatsAsync { stats in
Expand All @@ -1607,7 +1603,7 @@ internal class OWSChatConnectionWithLibSignalShadowing: OWSChatConnectionUsingSS
}
}

func connectionWasInterrupted(_ service: UnauthenticatedChatService, error: Error?) {
func connectionWasInterrupted(_ service: UnauthenticatedChatConnection, error: Error?) {
// Don't do anything if the shadowing connection gets interrupted.
// Either the main connection will also be interrupted, and they'll reconnect together,
// or requests to the shadowing connection will come back as "chatServiceInactive".
Expand All @@ -1618,13 +1614,13 @@ internal class OWSChatConnectionWithLibSignalShadowing: OWSChatConnectionUsingSS
}
}

internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatConnection, ConnectionEventsListener {
internal class OWSChatConnectionUsingLibSignal<Connection: ChatConnection>: OWSChatConnection, ConnectionEventsListener {
fileprivate let libsignalNet: Net

fileprivate enum ConnectionState {
case closed
case connecting(token: NSObject, task: Task<Service?, Never>)
case open(Service)
case connecting(token: NSObject, task: Task<Connection?, Never>)
case open(Connection)

var asExternalState: OWSChatConnectionState {
switch self {
Expand All @@ -1641,18 +1637,18 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
return activeToken === token
}

func isActive(_ service: Service) -> Bool {
guard case .open(let activeService) = self else {
func isActive(_ connection: Connection) -> Bool {
guard case .open(let activeConnection) = self else {
return false
}
return activeService === service
return activeConnection === connection
}

func waitToFinishConnecting() async -> Service? {
func waitToFinishConnecting() async -> Connection? {
switch self {
case .closed: nil
case .connecting(token: _, task: let task): await task.value
case .open(let service): service
case .open(let connection): connection
}
}
}
Expand All @@ -1675,7 +1671,7 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
super.init(type: type, accountManager: accountManager, appExpiry: appExpiry, appReadiness: appReadiness, currentCallProvider: currentCallProvider, db: db, registrationStateChangeManager: registrationStateChangeManager)
}

fileprivate func makeChatService() -> Service {
fileprivate func connectChatService() async throws -> Connection {
fatalError("must be overridden by subclass")
}

Expand Down Expand Up @@ -1713,8 +1709,8 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
func connectionAttemptCompleted(_ state: ConnectionState) {
self.serialQueue.async {
guard self.connection.isCurrentlyConnecting(token) else {
// We finished connecting, but either we've since been asked to disconnect,
// or the chat config has changed (causing chatService to be recreated).
// We finished connecting, but we've since been asked to disconnect
// (either because we should be offline, or because config has changed).
return
}

Expand All @@ -1723,8 +1719,7 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
}

do {
let chatService = self.makeChatService()
try await chatService.connect()
let chatService = try await self.connectChatService()
if type == .identified {
self.didConnectIdentified()
}
Expand Down Expand Up @@ -1769,7 +1764,7 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
}
connection = .closed

// Spin off a background task to disconnect the previous service.
// Spin off a background task to disconnect the previous connection.
_ = Task {
do {
try await previousConnection.waitToFinishConnecting()?.disconnect()
Expand Down Expand Up @@ -1842,22 +1837,22 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
owsAssertDebug(requestUrl.host == nil)
owsAssertDebug(!requestUrl.path.hasPrefix("/"))

let libsignalRequest = ChatService.Request(method: requestInfo.httpMethod, pathAndQuery: "/\(requestUrl.relativeString)", headers: httpHeaders.headers, body: body, timeout: request.timeoutInterval)
let libsignalRequest = ChatConnection.Request(method: requestInfo.httpMethod, pathAndQuery: "/\(requestUrl.relativeString)", headers: httpHeaders.headers, body: body, timeout: request.timeoutInterval)

unsubmittedRequestTokenForEarlyExit = nil
_ = Promise.wrapAsync { [self, connection] in
// LibSignalClient's ChatService doesn't keep track of outstanding requests,
// LibSignalClient's ChatConnection doesn't keep track of outstanding requests,
// so we keep the request token alive until we get the response instead.
defer {
removeUnsubmittedRequestToken(unsubmittedRequestToken)
}
guard let chatService = await connection.waitToFinishConnecting() else {
throw SignalError.chatServiceInactive("no connection to chat server")
}
return try await chatService.sendAndDebug(libsignalRequest)
}.done(on: self.serialQueue) { (response: ChatService.Response, debugInfo: ChatService.DebugInfo) in
return (try await chatService.send(libsignalRequest), chatService.info())
}.done(on: self.serialQueue) { (response, connectionInfo) in
if DebugFlags.internalLogging {
Logger.info("received response for requestId: \(requestId), message: \(response.message), route: \(debugInfo.connectionInfo)")
Logger.info("received response for requestId: \(requestId), message: \(response.message), route: \(connectionInfo)")
}

self.ensureBackgroundKeepAlive(.receiveResponse)
Expand Down Expand Up @@ -1885,7 +1880,7 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
}
}

func connectionWasInterrupted(_ service: Service, error: Error?) {
func connectionWasInterrupted(_ service: Connection, error: Error?) {
self.serialQueue.async { [self] in
guard connection.isActive(service) else {
// Already done with this service.
Expand Down Expand Up @@ -1916,25 +1911,25 @@ internal class OWSChatConnectionUsingLibSignal<Service: ChatService>: OWSChatCon
}
}

internal class OWSUnauthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<UnauthenticatedChatService> {
internal class OWSUnauthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<UnauthenticatedChatConnection> {
init(libsignalNet: Net, accountManager: TSAccountManager, appExpiry: AppExpiry, appReadiness: AppReadiness, currentCallProvider: any CurrentCallProvider, db: any DB, registrationStateChangeManager: RegistrationStateChangeManager) {
super.init(libsignalNet: libsignalNet, type: .unidentified, accountManager: accountManager, appExpiry: appExpiry, appReadiness: appReadiness, currentCallProvider: currentCallProvider, db: db, registrationStateChangeManager: registrationStateChangeManager)
}

fileprivate override var connection: ConnectionState {
didSet {
if case .open(let service) = connection {
service.setListener(self)
service.start(listener: self)
}
}
}

override func makeChatService() -> UnauthenticatedChatService {
return libsignalNet.createUnauthenticatedChatService()
override func connectChatService() async throws -> UnauthenticatedChatConnection {
return try await libsignalNet.connectUnauthenticatedChat()
}
}

internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<AuthenticatedChatService>, ChatListener {
internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<AuthenticatedChatConnection>, ChatConnectionListener {
private let _hasEmptiedInitialQueue = AtomicBool(false, lock: .sharedGlobal)
override var hasEmptiedInitialQueue: Bool {
_hasEmptiedInitialQueue.get()
Expand All @@ -1944,15 +1939,12 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<
super.init(libsignalNet: libsignalNet, type: .identified, accountManager: accountManager, appExpiry: appExpiry, appReadiness: appReadiness, currentCallProvider: currentCallProvider, db: db, registrationStateChangeManager: registrationStateChangeManager)
}

fileprivate override func makeChatService() -> AuthenticatedChatService {
fileprivate override func connectChatService() async throws -> AuthenticatedChatConnection {
let (username, password) = db.read { tx in
(accountManager.storedServerUsername(tx: tx), accountManager.storedServerAuthToken(tx: tx))
}
// Note that we still create a service for an unregistered user. Connections will fail, however.
let service = libsignalNet.createAuthenticatedChatService(username: username ?? "", password: password ?? "", receiveStories: StoryManager.areStoriesEnabled)
// We do *not* set the listener until the connection succeeds,
// so that we don't get callbacks until we update our internal state.
return service
// Note that we still try to connect for an unregistered user, so that we get a consistent error thrown.
return try await libsignalNet.connectAuthenticatedChat(username: username ?? "", password: password ?? "", receiveStories: StoryManager.areStoriesEnabled)
}

fileprivate override var connection: ConnectionState {
Expand All @@ -1961,7 +1953,8 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<
case .connecting(token: _, task: _):
break
case .open(let service):
service.setListener(self)
// Note that we don't get callbacks until this point.
service.start(listener: self)
if accountManager.registrationStateWithMaybeSneakyTransaction.isDeregistered {
db.write { tx in
registrationStateChangeManager.setIsDeregisteredOrDelinked(false, tx: tx)
Expand All @@ -1979,7 +1972,7 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<
}
}

func chatService(_ chat: AuthenticatedChatService, didReceiveIncomingMessage envelope: Data, serverDeliveryTimestamp: UInt64, sendAck: @escaping () async throws -> Void) {
func chatConnection(_ chat: AuthenticatedChatConnection, didReceiveIncomingMessage envelope: Data, serverDeliveryTimestamp: UInt64, sendAck: @escaping () async throws -> Void) {
ensureBackgroundKeepAlive(.receiveMessage)
let backgroundTask = OWSBackgroundTask(label: "handleIncomingMessage")

Expand Down Expand Up @@ -2008,7 +2001,7 @@ internal class OWSAuthConnectionUsingLibSignal: OWSChatConnectionUsingLibSignal<
}
}

func chatServiceDidReceiveQueueEmpty(_ chat: AuthenticatedChatService) {
func chatConnectionDidReceiveQueueEmpty(_ chat: AuthenticatedChatConnection) {
self.serialQueue.async { [self] in
guard self.connection.isActive(chat) else {
// We have since disconnected from the chat service instance that reported the empty queue.
Expand Down

0 comments on commit a856278

Please sign in to comment.