Skip to content

Commit

Permalink
Add shutdownAsync (#109)
Browse files Browse the repository at this point in the history
* Add shutdownAsync and tests

* Better async tests

* Actually throw error on double shutdown

* Fix the tests
  • Loading branch information
0xTim authored Jul 31, 2024
1 parent 15b3fb7 commit e048c8e
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 4 deletions.
40 changes: 40 additions & 0 deletions Sources/AsyncKit/ConnectionPool/EventLoopGroupConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
///
/// - Warning: This method is soft-deprecated. Use `syncShutdownGracefully()` or
/// `shutdownGracefully()` instead.
@available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()")
public func shutdown() {
// synchronize access to closing
guard self.lock.withLock({
Expand All @@ -190,6 +191,44 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
}
}

/// Closes the connection pool.
///
/// All available connections will be closed immediately. Any connections still in use will be
/// closed as soon as they are returned to the pool. Once closed, the pool can not be used to
/// create new connections.
///
/// Connection pools must be closed before they deinitialize.
///
/// This method shuts down asynchronously, waiting for all connection closures to complete before
/// returning.
///
/// - Warning: The pool is always fully shut down once this method returns, even if an error is
/// thrown. All errors are purely advisory.
public func shutdownAsync() async throws {
// synchronize access to closing
guard self.lock.withLock({
// check to make sure we aren't double closing
guard !self.didShutdown else {
return false
}
self.didShutdown = true
self.logger.debug("Connection pool shutting down, closing each event loop's storage")
return true
}) else {
self.logger.debug("Cannot shutdown the connection pool more than once")
throw ConnectionPoolError.shutdown
}

// shutdown all pools
for pool in self.storage.values {
do {
try await pool.close().get()
} catch {
self.logger.error("Failed shutting down event loop pool: \(error)")
}
}
}

/// Closes the connection pool.
///
/// All available connections will be closed immediately. Any connections still in use will be
Expand All @@ -203,6 +242,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
///
/// - Warning: The pool is always fully shut down once this method returns, even if an error is
/// thrown. All errors are purely advisory.
@available(*, noasync, message: "This calls wait() and should not be used in an async context", renamed: "shutdownAsync()")
public func syncShutdownGracefully() throws {
// - TODO: Does this need to assert "not on any EventLoop", as `EventLoopGroup.syncShutdownGracefully()` does?
var possibleError: Error? = nil
Expand Down
225 changes: 225 additions & 0 deletions Tests/AsyncKitTests/AsyncConnectionPoolTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import Atomics
import AsyncKit
import XCTest
import NIOConcurrencyHelpers
import Logging
import NIOCore
import NIOEmbedded

final class AsyncConnectionPoolTests: AsyncKitAsyncTestCase {
func testPooling() async throws {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 2,
on: self.group.any()
)

// make two connections
let connA = try await pool.requestConnection().get()
XCTAssertEqual(connA.isClosed, false)
let connB = try await pool.requestConnection().get()
XCTAssertEqual(connB.isClosed, false)
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// try to make a third, but pool only supports 2
let futureC = pool.requestConnection()
let connC = ManagedAtomic<FooConnection?>(nil)
futureC.whenSuccess { connC.store($0, ordering: .relaxed) }
XCTAssertNil(connC.load(ordering: .relaxed))
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// release one of the connections, allowing the third to be made
pool.releaseConnection(connB)
let connCRet = try await futureC.get()
XCTAssertNotNil(connC.load(ordering: .relaxed))
XCTAssert(connC.load(ordering: .relaxed) === connB)
XCTAssert(connCRet === connC.load(ordering: .relaxed))
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// try to make a third again, with two active
let futureD = pool.requestConnection()
let connD = ManagedAtomic<FooConnection?>(nil)
futureD.whenSuccess { connD.store($0, ordering: .relaxed) }
XCTAssertNil(connD.load(ordering: .relaxed))
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 2)

// this time, close the connection before releasing it
try await connCRet.close().get()
pool.releaseConnection(connC.load(ordering: .relaxed)!)
let connDRet = try await futureD.get()
XCTAssert(connD.load(ordering: .relaxed) !== connB)
XCTAssert(connDRet === connD.load(ordering: .relaxed))
XCTAssertEqual(connD.load(ordering: .relaxed)?.isClosed, false)
XCTAssertEqual(foo.connectionsCreated.load(ordering: .relaxed), 3)

try! await pool.close().get()
}

func testFIFOWaiters() async throws {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 1,
on: self.group.any()
)

// * User A makes a request for a connection, gets connection number 1.
let a_1 = pool.requestConnection()
let a = try await a_1.get()

// * User B makes a request for a connection, they are exhausted so he gets a promise.
let b_1 = pool.requestConnection()

// * User A makes another request for a connection, they are still exhausted so he gets a promise.
let a_2 = pool.requestConnection()

// * User A returns connection number 1. His previous request is fulfilled with connection number 1.
pool.releaseConnection(a)

// * User B gets his connection
let b = try await b_1.get()
XCTAssert(a === b)

// * User B releases his connection
pool.releaseConnection(b)

// * User A's second connection request is fulfilled
let c = try await a_2.get()
XCTAssert(a === c)

try! await pool.close().get()
}

func testConnectError() async throws {
let db = ErrorDatabase()
let pool = EventLoopConnectionPool(
source: db,
maxConnections: 1,
on: self.group.any()
)

do {
_ = try await pool.requestConnection().get()
XCTFail("should not have created connection")
} catch _ as ErrorDatabase.Error {
// pass
}

// test that we can still make another request even after a failed request
do {
_ = try await pool.requestConnection().get()
XCTFail("should not have created connection")
} catch _ as ErrorDatabase.Error {
// pass
}

try! await pool.close().get()
}

func testPoolClose() async throws {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 1,
on: self.group.any()
)
let _ = try await pool.requestConnection().get()
let b = pool.requestConnection()
try await pool.close().get()

let c = pool.requestConnection()

// check that waiters are failed
do {
_ = try await b.get()
XCTFail("should not have created connection")
} catch ConnectionPoolError.shutdown {
// pass
}

// check that new requests fail
do {
_ = try await c.get()
XCTFail("should not have created connection")
} catch ConnectionPoolError.shutdown {
// pass
}
}

func testGracefulShutdownAsync() async throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
source: foo,
maxConnectionsPerEventLoop: 2,
on: self.group
)

try await pool.shutdownAsync()
var errorCaught = false

do {
try await pool.shutdownAsync()
} catch {
errorCaught = true
XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown)
}
XCTAssertTrue(errorCaught)
}

func testShutdownWithHeldConnection() async throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
source: foo,
maxConnectionsPerEventLoop: 2,
on: self.group
)

let connection = try await pool.requestConnection().get()

try await pool.shutdownAsync()
var errorCaught = false

do {
try await pool.shutdownAsync()
} catch {
errorCaught = true
XCTAssertEqual(error as? ConnectionPoolError, ConnectionPoolError.shutdown)
}
XCTAssertTrue(errorCaught)

let result1 = try await connection.eventLoop.submit { connection.isClosed }.get()
XCTAssertFalse(result1)
pool.releaseConnection(connection)
let result2 = try await connection.eventLoop.submit { connection.isClosed }.get()
XCTAssertTrue(result2)
}

func testEventLoopDelegation() async throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
source: foo,
maxConnectionsPerEventLoop: 1,
on: self.group
)

for _ in 0..<500 {
let eventLoop = self.group.any()
let a = pool.requestConnection(
on: eventLoop
).map { conn in
XCTAssertTrue(eventLoop.inEventLoop)
pool.releaseConnection(conn)
}
let b = pool.requestConnection(
on: eventLoop
).map { conn in
XCTAssertTrue(eventLoop.inEventLoop)
pool.releaseConnection(conn)
}
_ = try await a.and(b).get()
}

try await pool.shutdownAsync()
}
}
17 changes: 17 additions & 0 deletions Tests/AsyncKitTests/AsyncKitTestsCommon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,20 @@ class AsyncKitTestCase: XCTestCase {
XCTAssertTrue(isLoggingConfigured)
}
}

class AsyncKitAsyncTestCase: XCTestCase {
var group: (any EventLoopGroup)!
var eventLoop: any EventLoop { self.group.any() }

override func setUp() async throws {
try await super.setUp()
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
XCTAssertTrue(isLoggingConfigured)
}

override func tearDown() async throws {
try await self.group.shutdownGracefully()
self.group = nil
try await super.tearDown()
}
}
8 changes: 4 additions & 4 deletions Tests/AsyncKitTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ final class ConnectionPoolTests: AsyncKitTestCase {
XCTAssertEqual($0 as? ConnectionPoolError, ConnectionPoolError.shutdown)
}
}

func testGracefulShutdownWithHeldConnection() throws {
let foo = FooDatabase()
let pool = EventLoopGroupConnectionPool(
Expand Down Expand Up @@ -312,7 +312,7 @@ final class ConnectionPoolTests: AsyncKitTestCase {
}
}

private struct ErrorDatabase: ConnectionPoolSource {
struct ErrorDatabase: ConnectionPoolSource {
enum Error: Swift.Error {
case test
}
Expand All @@ -322,7 +322,7 @@ private struct ErrorDatabase: ConnectionPoolSource {
}
}

private final class FooDatabase: ConnectionPoolSource {
final class FooDatabase: ConnectionPoolSource {
var connectionsCreated: ManagedAtomic<Int>

init() {
Expand All @@ -336,7 +336,7 @@ private final class FooDatabase: ConnectionPoolSource {
}
}

private final class FooConnection: ConnectionPoolItem, AtomicReference {
final class FooConnection: ConnectionPoolItem, AtomicReference {
var isClosed: Bool
let eventLoop: EventLoop

Expand Down

0 comments on commit e048c8e

Please sign in to comment.