Skip to content

Commit ca70d8c

Browse files
authored
Use ConnectionLease to return connection (#571)
1 parent c7e2cda commit ca70d8c

File tree

10 files changed

+124
-105
lines changed

10 files changed

+124
-105
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
public struct ConnectionLease<Connection: PooledConnection>: Sendable {
2+
public var connection: Connection
3+
4+
@usableFromInline
5+
let _release: @Sendable (Connection) -> ()
6+
7+
@inlinable
8+
public init(connection: Connection, release: @escaping @Sendable (Connection) -> Void) {
9+
self.connection = connection
10+
self._release = release
11+
}
12+
13+
@inlinable
14+
public func release() {
15+
self._release(self.connection)
16+
}
17+
}

Sources/ConnectionPoolModule/ConnectionPool.swift

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public protocol ConnectionRequestProtocol: Sendable {
8888

8989
/// A function that is called with a connection or a
9090
/// `PoolError`.
91-
func complete(with: Result<Connection, ConnectionPoolError>)
91+
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>)
9292
}
9393

9494
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
@@ -402,8 +402,11 @@ public final class ConnectionPool<
402402
/*private*/ func runRequestAction(_ action: StateMachine.RequestAction) {
403403
switch action {
404404
case .leaseConnection(let requests, let connection):
405+
let lease = ConnectionLease(connection: connection) { connection in
406+
self.releaseConnection(connection)
407+
}
405408
for request in requests {
406-
request.complete(with: .success(connection))
409+
request.complete(with: .success(lease))
407410
}
408411

409412
case .failRequest(let request, let error):

Sources/ConnectionPoolModule/ConnectionRequest.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequest
55
public var id: ID
66

77
@usableFromInline
8-
private(set) var continuation: CheckedContinuation<Connection, any Error>
8+
private(set) var continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>
99

1010
@inlinable
1111
init(
1212
id: Int,
13-
continuation: CheckedContinuation<Connection, any Error>
13+
continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>
1414
) {
1515
self.id = id
1616
self.continuation = continuation
1717
}
1818

19-
public func complete(with result: Result<Connection, ConnectionPoolError>) {
19+
public func complete(with result: Result<ConnectionLease<Connection>, ConnectionPoolError>) {
2020
self.continuation.resume(with: result)
2121
}
2222
}
@@ -46,15 +46,15 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {
4646
}
4747

4848
@inlinable
49-
public func leaseConnection() async throws -> Connection {
49+
public func leaseConnection() async throws -> ConnectionLease<Connection> {
5050
let requestID = requestIDGenerator.next()
5151

5252
let connection = try await withTaskCancellationHandler {
5353
if Task.isCancelled {
5454
throw CancellationError()
5555
}
5656

57-
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Connection, Error>) in
57+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ConnectionLease<Connection>, Error>) in
5858
let request = Request(
5959
id: requestID,
6060
continuation: continuation
@@ -71,8 +71,8 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {
7171

7272
@inlinable
7373
public func withConnection<Result>(_ closure: (Connection) async throws -> Result) async throws -> Result {
74-
let connection = try await self.leaseConnection()
75-
defer { self.releaseConnection(connection) }
76-
return try await closure(connection)
74+
let lease = try await self.leaseConnection()
75+
defer { lease.release() }
76+
return try await closure(lease.connection)
7777
}
7878
}
Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import _ConnectionPoolModule
22

3-
public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
4-
public typealias Connection = MockConnection
5-
3+
public final class MockRequest<Connection: PooledConnection>: ConnectionRequestProtocol, Hashable, Sendable {
64
public struct ID: Hashable, Sendable {
75
var objectID: ObjectIdentifier
86

@@ -11,7 +9,7 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
119
}
1210
}
1311

14-
public init() {}
12+
public init(connectionType: Connection.Type = Connection.self) {}
1513

1614
public var id: ID { ID(self) }
1715

@@ -23,7 +21,7 @@ public final class MockRequest: ConnectionRequestProtocol, Hashable, Sendable {
2321
hasher.combine(self.id)
2422
}
2523

26-
public func complete(with: Result<Connection, ConnectionPoolError>) {
24+
public func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>) {
2725

2826
}
2927
}

Sources/PostgresNIO/Pool/PostgresClient.swift

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -301,11 +301,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
301301
/// - Returns: The closure's return value.
302302
@_disfavoredOverload
303303
public func withConnection<Result>(_ closure: (PostgresConnection) async throws -> Result) async throws -> Result {
304-
let connection = try await self.leaseConnection()
304+
let lease = try await self.leaseConnection()
305305

306-
defer { self.pool.releaseConnection(connection) }
306+
defer { lease.release() }
307307

308-
return try await closure(connection)
308+
return try await closure(lease.connection)
309309
}
310310

311311
#if compiler(>=6.0)
@@ -319,11 +319,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
319319
// DO NOT FIX THE WHITESPACE IN THE NEXT LINE UNTIL 5.10 IS UNSUPPORTED
320320
// https://github.com/swiftlang/swift/issues/79285
321321
_ closure: (PostgresConnection) async throws -> sending Result) async throws -> sending Result {
322-
let connection = try await self.leaseConnection()
322+
let lease = try await self.leaseConnection()
323323

324-
defer { self.pool.releaseConnection(connection) }
324+
defer { lease.release() }
325325

326-
return try await closure(connection)
326+
return try await closure(lease.connection)
327327
}
328328

329329
/// Lease a connection, which is in an open transaction state, for the provided `closure`'s lifetime.
@@ -404,7 +404,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
404404
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
405405
}
406406

407-
let connection = try await self.leaseConnection()
407+
let lease = try await self.leaseConnection()
408+
let connection = lease.connection
408409

409410
var logger = logger
410411
logger[postgresMetadataKey: .connectionID] = "\(connection.id)"
@@ -419,12 +420,12 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
419420
connection.channel.write(HandlerTask.extendedQuery(context), promise: nil)
420421

421422
promise.futureResult.whenFailure { _ in
422-
self.pool.releaseConnection(connection)
423+
lease.release()
423424
}
424425

425426
return try await promise.futureResult.map {
426427
$0.asyncSequence(onFinish: {
427-
self.pool.releaseConnection(connection)
428+
lease.release()
428429
})
429430
}.get()
430431
} catch var error as PSQLError {
@@ -446,7 +447,8 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
446447
let logger = logger ?? Self.loggingDisabled
447448

448449
do {
449-
let connection = try await self.leaseConnection()
450+
let lease = try await self.leaseConnection()
451+
let connection = lease.connection
450452

451453
let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self)
452454
let task = HandlerTask.executePreparedStatement(.init(
@@ -460,11 +462,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
460462
connection.channel.write(task, promise: nil)
461463

462464
promise.futureResult.whenFailure { _ in
463-
self.pool.releaseConnection(connection)
465+
lease.release()
464466
}
465467

466468
return try await promise.futureResult
467-
.map { $0.asyncSequence(onFinish: { self.pool.releaseConnection(connection) }) }
469+
.map { $0.asyncSequence(onFinish: { lease.release() }) }
468470
.get()
469471
.map { try preparedStatement.decodeRow($0) }
470472
} catch var error as PSQLError {
@@ -504,7 +506,7 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
504506

505507
// MARK: - Private Methods -
506508

507-
private func leaseConnection() async throws -> PostgresConnection {
509+
private func leaseConnection() async throws -> ConnectionLease<PostgresConnection> {
508510
if !self.runningAtomic.load(ordering: .relaxed) {
509511
self.backgroundLogger.warning("Trying to lease connection from `PostgresClient`, but `PostgresClient.run()` hasn't been called yet.")
510512
}

0 commit comments

Comments
 (0)