Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion Sources/Valkey/Node/ValkeyNodeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ package final class ValkeyNodeClient: Sendable {
ValkeyClientMetrics,
ContinuousClock
>
@usableFromInline
typealias ConnectionStateMachine =
SubscriptionConnectionStateMachine<
ValkeyConnection,
CheckedContinuation<ValkeyConnection, Error>,
CheckedContinuation<Void, Never>
>
/// Server address
public let serverAddress: ValkeyServerAddress
/// Connection pool
Expand All @@ -47,6 +54,17 @@ package final class ValkeyNodeClient: Sendable {
public let eventLoopGroup: any EventLoopGroup
/// Logger
public let logger: Logger
/// subscription connection state
@usableFromInline
let subscriptionConnectionStateMachine: Mutex<ConnectionStateMachine>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment: Should be this called subscriptionConnectionStateMachineMutex or subscriptionConnectionStateMachineLock as it is a mutex around ConnectionStateMachine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with its name. It is a bit more than a mutex as it manages the leasing of the connection as well

@usableFromInline
let subscriptionConnectionIDGenerator: ConnectionIDGenerator
/// Actions that can be run on a node
enum RunAction: Sendable {
case leaseSubscriptionConnection(leaseID: Int)
}
let actionStream: AsyncStream<RunAction>
let actionStreamContinuation: AsyncStream<RunAction>.Continuation

package init(
_ address: ValkeyServerAddress,
Expand Down Expand Up @@ -85,6 +103,9 @@ package final class ValkeyNodeClient: Sendable {
self.connectionFactory = connectionFactory
self.eventLoopGroup = eventLoopGroup
self.logger = logger
self.subscriptionConnectionStateMachine = .init(.init())
self.subscriptionConnectionIDGenerator = .init()
(self.actionStream, self.actionStreamContinuation) = AsyncStream.makeStream(of: RunAction.self)
}
}

Expand All @@ -93,7 +114,18 @@ extension ValkeyNodeClient {
/// Run ValkeyNode connection pool
@usableFromInline
package func run() async {
await self.connectionPool.run()
/// Run discarding task group running actions
await withDiscardingTaskGroup { group in
group.addTask {
await self.connectionPool.run()
self.shutdownSubscriptionConnection()
}
for await action in self.actionStream {
group.addTask {
await self.runAction(action)
}
}
}
}

func triggerForceShutdown() {
Expand Down Expand Up @@ -213,3 +245,25 @@ extension ValkeyNodeClient: ValkeyNodeConnectionPool {
self.triggerForceShutdown()
}
}

@available(valkeySwift 1.0, *)
extension ValkeyNodeClient {
func queueAction(_ action: RunAction) {
self.actionStreamContinuation.yield(action)
}

private func runAction(_ action: RunAction) async {
switch action {
case .leaseSubscriptionConnection(let leaseID):
do {
try await self.withConnection { connection in
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
self.acquiredSubscriptionConnection(leaseID: leaseID, connection: connection, releaseContinuation: cont)
}
}
} catch {
self.errorAcquiringSubscriptionConnection(leaseID: leaseID, error: error)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Synchronization
import _ValkeyConnectionPool

@available(valkeySwift 1.0, *)
extension ValkeyClient {
extension ValkeyNodeClient {
@usableFromInline
func leaseSubscriptionConnection(id: Int, request: CheckedContinuation<ValkeyConnection, Error>) {
self.logger.trace("Get subscription connection", metadata: ["valkey_subscription_connection_id": .stringConvertible(id)])
Expand Down Expand Up @@ -102,6 +102,25 @@ extension ValkeyClient {
break
}
}

@usableFromInline
func shutdownSubscriptionConnection() {
self.logger.trace("Shutdown subscription connection")
let action = self.subscriptionConnectionStateMachine.withLock { stateMachine in
stateMachine.shutdown()
}
switch action {
case .yield(let continuations):
for cont in continuations {
cont.resume(throwing: ValkeyClientError(.connectionClosing))
}
case .release(let continuation):
continuation.resume()
self.logger.trace("Released connection for subscriptions")
case .doNothing:
break
}
}
}

/// StateMachine for acquiring Subscription Connection.
Expand All @@ -114,6 +133,8 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
case acquiring(leaseID: Int, waiters: [Int: Request])
/// We have a connection
case acquired(AcquiredState)
/// Connection is shutdown
case shutdown

struct AcquiredState {
var leaseID: Int
Expand Down Expand Up @@ -151,6 +172,8 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
state.requestIDs.insert(id)
self = .acquired(state)
return .completeRequest(state.value)
case .shutdown:
preconditionFailure("Cannot get subscription connection when shutdown")
}
}

Expand Down Expand Up @@ -185,6 +208,9 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
self = .acquired(state)
return .doNothing
}
case .shutdown:
self = .shutdown
return .doNothing
}
}

Expand Down Expand Up @@ -213,6 +239,9 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
} else {
preconditionFailure("Acquired connection twice")
}
case .shutdown:
self = .shutdown
return .release
}
}

Expand Down Expand Up @@ -241,6 +270,9 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
} else {
preconditionFailure("Error acquiring connection we already have")
}
case .shutdown:
self = .shutdown
return .doNothing
}
}

Expand All @@ -264,6 +296,32 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
self = .acquired(state)
return .doNothing
}
case .shutdown:
self = .shutdown
return .doNothing
}
}

enum ShutdownAction {
case yield([Request])
case release(ReleaseRequest)
case doNothing
}

mutating func shutdown() -> ShutdownAction {
switch consume self.state {
case .uninitialized(let leaseID):
self = .uninitialized(nextLeaseID: leaseID)
return .doNothing
case .acquiring(let storedLeaseID, let waiters):
self = .uninitialized(nextLeaseID: storedLeaseID + 1)
return .yield(.init(waiters.values))
case .acquired(let state):
self = .uninitialized(nextLeaseID: state.leaseID + 1)
return .release(state.releaseRequest)
case .shutdown:
self = .shutdown
return .doNothing
}
}

Expand All @@ -272,10 +330,12 @@ struct SubscriptionConnectionStateMachine<Value, Request, ReleaseRequest>: ~Copy
case .uninitialized: true
case .acquiring: false
case .acquired: false
case .shutdown: true
}
}

static private func uninitialized(nextLeaseID: Int) -> Self { .init(state: .uninitialized(nextLeaseID: nextLeaseID)) }
static private func acquiring(leaseID: Int, waiters: [Int: Request]) -> Self { .init(state: .acquiring(leaseID: leaseID, waiters: waiters)) }
static private func acquired(_ state: State.AcquiredState) -> Self { .init(state: .acquired(state)) }
static private var shutdown: Self { .init(state: .shutdown) }
}
9 changes: 5 additions & 4 deletions Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ extension ValkeyClient {
isolation: isolated (any Actor)? = #isolation,
_ operation: (ValkeyConnection) async throws -> sending Value
) async throws -> sending Value {
let id = self.subscriptionConnectionIDGenerator.next()
let node = self.node
let id = node.subscriptionConnectionIDGenerator.next()

let connection = try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<ValkeyConnection, Error>) in
self.leaseSubscriptionConnection(id: id, request: cont)
node.leaseSubscriptionConnection(id: id, request: cont)
}
} onCancel: {
self.cancelSubscriptionConnection(id: id)
node.cancelSubscriptionConnection(id: id)
}

defer {
self.releaseSubscriptionConnection(id: id)
node.releaseSubscriptionConnection(id: id)
}
return try await operation(connection)
}
Expand Down
Loading