Skip to content

Commit 8257830

Browse files
committed
refactor(pool): use a unique ID per connection
1 parent 5784733 commit 8257830

File tree

3 files changed

+110
-67
lines changed

3 files changed

+110
-67
lines changed

sqlx-core/src/pool/connect.rs

+73-47
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::rt::JoinHandle;
77
use crate::Error;
88
use ease_off::EaseOff;
99
use event_listener::{Event, EventListener};
10+
use std::fmt::{Display, Formatter};
1011
use std::future::Future;
1112
use std::pin::Pin;
1213
use std::ptr;
@@ -246,20 +247,24 @@ impl<DB: Database> PoolConnector<DB> for DefaultConnector<DB> {
246247

247248
/// Metadata passed to [`PoolConnector::connect()`] for every connection attempt.
248249
#[derive(Debug)]
250+
#[non_exhaustive]
249251
pub struct PoolConnectMetadata {
250252
/// The instant at which the current connection task was started, including all attempts.
251253
///
252254
/// May be used for reporting purposes, or to implement a custom backoff.
253255
pub start: Instant,
254256
/// The number of attempts that have occurred so far.
255257
pub num_attempts: usize,
258+
/// The current size of the pool.
256259
pub pool_size: usize,
260+
/// The ID of the connection, unique for the pool.
261+
pub connection_id: ConnectionId,
257262
}
258263

259264
pub struct DynConnector<DB: Database> {
260265
// We want to spawn the connection attempt as a task anyway
261266
connect: Box<
262-
dyn Fn(ConnectPermit<DB>, usize) -> JoinHandle<crate::Result<PoolConnection<DB>>>
267+
dyn Fn(ConnectionId, ConnectPermit<DB>) -> JoinHandle<crate::Result<PoolConnection<DB>>>
263268
+ Send
264269
+ Sync
265270
+ 'static,
@@ -271,53 +276,92 @@ impl<DB: Database> DynConnector<DB> {
271276
let connector = Arc::new(connector);
272277

273278
Self {
274-
connect: Box::new(move |permit, size| {
275-
crate::rt::spawn(connect_with_backoff(permit, connector.clone(), size))
279+
connect: Box::new(move |id, permit| {
280+
crate::rt::spawn(connect_with_backoff(id, permit, connector.clone()))
276281
}),
277282
}
278283
}
279284

280285
pub fn connect(
281286
&self,
287+
id: ConnectionId,
282288
permit: ConnectPermit<DB>,
283-
size: usize,
284289
) -> JoinHandle<crate::Result<PoolConnection<DB>>> {
285-
(self.connect)(permit, size)
290+
(self.connect)(id, permit)
286291
}
287292
}
288293

289294
pub struct ConnectionCounter {
290-
connections: AtomicUsize,
295+
count: AtomicUsize,
296+
next_id: AtomicUsize,
291297
connect_available: Event,
292298
}
293299

300+
/// An opaque connection ID, unique for every connection attempt with the same pool.
301+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
302+
pub struct ConnectionId(usize);
303+
294304
impl ConnectionCounter {
295305
pub fn new() -> Self {
296306
Self {
297-
connections: AtomicUsize::new(0),
307+
count: AtomicUsize::new(0),
308+
next_id: AtomicUsize::new(1),
298309
connect_available: Event::new(),
299310
}
300311
}
301312

302313
pub fn connections(&self) -> usize {
303-
self.connections.load(Ordering::Acquire)
314+
self.count.load(Ordering::Acquire)
304315
}
305316

306317
pub async fn drain(&self) {
307-
while self.connections.load(Ordering::Acquire) > 0 {
318+
while self.count.load(Ordering::Acquire) > 0 {
308319
self.connect_available.listen().await;
309320
}
310321
}
311322

323+
/// Attempt to acquire a permit from both this instance, and the parent pool, if applicable.
324+
///
325+
/// Returns the permit, and the ID of the new connection.
326+
pub fn try_acquire_permit<DB: Database>(
327+
&self,
328+
pool: &Arc<PoolInner<DB>>,
329+
) -> Option<(ConnectionId, ConnectPermit<DB>)> {
330+
debug_assert!(ptr::addr_eq(self, &pool.counter));
331+
332+
// Don't skip the queue.
333+
if pool.options.fair && self.connect_available.total_listeners() > 0 {
334+
return None;
335+
}
336+
337+
let prev_size = self
338+
.count
339+
.fetch_update(Ordering::Release, Ordering::Acquire, |connections| {
340+
(connections < pool.options.max_connections).then_some(connections + 1)
341+
})
342+
.ok()?;
343+
344+
let size = prev_size + 1;
345+
346+
tracing::trace!(target: "sqlx::pool::connect", size, "increased size");
347+
348+
Some((
349+
ConnectionId(self.next_id.fetch_add(1, Ordering::SeqCst)),
350+
ConnectPermit {
351+
pool: Some(Arc::clone(pool)),
352+
},
353+
))
354+
}
355+
312356
/// Attempt to acquire a permit from both this instance, and the parent pool, if applicable.
313357
///
314358
/// Returns the permit, and the current size of the pool.
315359
pub async fn acquire_permit<DB: Database>(
316360
&self,
317361
pool: &Arc<PoolInner<DB>>,
318-
) -> (usize, ConnectPermit<DB>) {
362+
) -> (ConnectionId, ConnectPermit<DB>) {
319363
// Check that `self` can increase size first before we check the parent.
320-
let (size, permit) = self.acquire_permit_self(pool).await;
364+
let acquired = self.acquire_permit_self(pool).await;
321365

322366
if let Some(parent) = &pool.options.parent_pool {
323367
let (_, permit) = parent.0.counter.acquire_permit_self(&parent.0).await;
@@ -326,46 +370,21 @@ impl ConnectionCounter {
326370
permit.consume();
327371
}
328372

329-
(size, permit)
373+
acquired
330374
}
331375

332376
// Separate method because `async fn`s cannot be recursive.
333377
/// Attempt to acquire a [`ConnectPermit`] from this instance and this instance only.
334378
async fn acquire_permit_self<DB: Database>(
335379
&self,
336380
pool: &Arc<PoolInner<DB>>,
337-
) -> (usize, ConnectPermit<DB>) {
338-
debug_assert!(ptr::addr_eq(self, &pool.counter));
339-
340-
let mut should_wait = pool.options.fair && self.connect_available.total_listeners() > 0;
341-
381+
) -> (ConnectionId, ConnectPermit<DB>) {
342382
for attempt in 1usize.. {
343-
if should_wait {
344-
self.connect_available.listen().await;
383+
if let Some(acquired) = self.try_acquire_permit(pool) {
384+
return acquired;
345385
}
346386

347-
let res = self.connections.fetch_update(
348-
Ordering::Release,
349-
Ordering::Acquire,
350-
|connections| {
351-
(connections < pool.options.max_connections).then_some(connections + 1)
352-
},
353-
);
354-
355-
if let Ok(prev_size) = res {
356-
let size = prev_size + 1;
357-
358-
tracing::trace!(target: "sqlx::pool::connect", size, "increased size");
359-
360-
return (
361-
prev_size + 1,
362-
ConnectPermit {
363-
pool: Some(Arc::clone(pool)),
364-
},
365-
);
366-
}
367-
368-
should_wait = true;
387+
self.connect_available.listen().await;
369388

370389
if attempt == 2 {
371390
tracing::warn!(
@@ -380,7 +399,7 @@ impl ConnectionCounter {
380399
pub fn release_permit<DB: Database>(&self, pool: &PoolInner<DB>) {
381400
debug_assert!(ptr::addr_eq(self, &pool.counter));
382401

383-
self.connections.fetch_sub(1, Ordering::Release);
402+
self.count.fetch_sub(1, Ordering::Release);
384403
self.connect_available.notify(1usize);
385404

386405
if let Some(parent) = &pool.options.parent_pool {
@@ -415,16 +434,22 @@ impl<DB: Database> Drop for ConnectPermit<DB> {
415434
}
416435
}
417436

437+
impl Display for ConnectionId {
438+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
439+
Display::fmt(&self.0, f)
440+
}
441+
}
442+
418443
#[tracing::instrument(
419444
target = "sqlx::pool::connect",
420-
skip_all,
421-
fields(connection = size),
445+
skip_all,
446+
fields(%connection_id),
422447
err
423448
)]
424449
async fn connect_with_backoff<DB: Database>(
450+
connection_id: ConnectionId,
425451
permit: ConnectPermit<DB>,
426452
connector: Arc<impl PoolConnector<DB>>,
427-
size: usize,
428453
) -> crate::Result<PoolConnection<DB>> {
429454
if permit.pool().is_closed() {
430455
return Err(Error::PoolClosed);
@@ -436,7 +461,8 @@ async fn connect_with_backoff<DB: Database>(
436461
let meta = PoolConnectMetadata {
437462
start: ease_off.started_at(),
438463
num_attempts: attempt,
439-
pool_size: size,
464+
pool_size: permit.pool().size(),
465+
connection_id,
440466
};
441467

442468
let conn = ease_off
@@ -445,7 +471,7 @@ async fn connect_with_backoff<DB: Database>(
445471
.or_retry_if(|e| can_retry_error(e.inner()))?;
446472

447473
if let Some(conn) = conn {
448-
return Ok(Floating::new_live(conn, permit).reattach());
474+
return Ok(Floating::new_live(conn, connection_id, permit).reattach());
449475
}
450476
}
451477

sqlx-core/src/pool/connection.rs

+21-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::database::Database;
1010
use crate::error::Error;
1111

1212
use super::inner::{is_beyond_max_lifetime, PoolInner};
13-
use crate::pool::connect::ConnectPermit;
13+
use crate::pool::connect::{ConnectPermit, ConnectionId};
1414
use crate::pool::options::PoolConnectionMetadata;
1515
use std::future::Future;
1616

@@ -27,6 +27,7 @@ pub struct PoolConnection<DB: Database> {
2727

2828
pub(super) struct Live<DB: Database> {
2929
pub(super) raw: DB::Connection,
30+
pub(super) id: ConnectionId,
3031
pub(super) created_at: Instant,
3132
}
3233

@@ -247,10 +248,11 @@ impl<DB: Database> DerefMut for Idle<DB> {
247248
}
248249

249250
impl<DB: Database> Floating<DB, Live<DB>> {
250-
pub fn new_live(conn: DB::Connection, permit: ConnectPermit<DB>) -> Self {
251+
pub fn new_live(conn: DB::Connection, id: ConnectionId, permit: ConnectPermit<DB>) -> Self {
251252
Self {
252253
inner: Live {
253254
raw: conn,
255+
id,
254256
created_at: Instant::now(),
255257
},
256258
permit,
@@ -381,17 +383,29 @@ impl<DB: Database> Floating<DB, Idle<DB>> {
381383
}
382384
}
383385

384-
pub async fn close(self) -> ConnectPermit<DB> {
386+
pub async fn close(self) -> (ConnectionId, ConnectPermit<DB>) {
387+
let connection_id = self.inner.live.id;
388+
389+
tracing::debug!(%connection_id, "closing connection (gracefully)");
390+
385391
if let Err(error) = self.inner.live.raw.close().await {
386-
tracing::debug!(%error, "error occurred while closing the pool connection");
392+
tracing::debug!(
393+
%connection_id,
394+
%error,
395+
"error occurred while closing the pool connection"
396+
);
387397
}
388-
self.permit
398+
(connection_id, self.permit)
389399
}
390400

391-
pub async fn close_hard(self) -> ConnectPermit<DB> {
401+
pub async fn close_hard(self) -> (ConnectionId, ConnectPermit<DB>) {
402+
let connection_id = self.inner.live.id;
403+
404+
tracing::debug!(%connection_id, "closing connection (hard)");
405+
392406
let _ = self.inner.live.raw.close_hard().await;
393407

394-
self.permit
408+
(connection_id, self.permit)
395409
}
396410

397411
pub fn metadata(&self) -> PoolConnectionMetadata {

sqlx-core/src/pool/inner.rs

+16-13
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::sync::Arc;
1111
use std::task::ready;
1212

1313
use crate::logger::private_level_filter_to_trace_level;
14-
use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector};
14+
use crate::pool::connect::{ConnectPermit, ConnectionCounter, ConnectionId, DynConnector};
1515
use crate::pool::idle::IdleQueue;
1616
use crate::rt::JoinHandle;
1717
use crate::{private_tracing_dynamic_event, rt};
@@ -165,7 +165,7 @@ impl<DB: Database> PoolInner<DB> {
165165
// Poll the task returned by `finish_acquire`
166166
match ready!(before_acquire.poll_unpin(cx)) {
167167
Some(Ok(conn)) => return Ready(Ok(conn)),
168-
Some(Err(permit)) => {
168+
Some(Err((id, permit))) => {
169169
// We don't strictly need to poll `connect` here; all we really want to do
170170
// is to check if it is `None`. But since currently there's no getter for that,
171171
// it doesn't really hurt to just poll it here.
@@ -174,7 +174,7 @@ impl<DB: Database> PoolInner<DB> {
174174
// If we're not already attempting to connect,
175175
// take the permit returned from closing the connection and
176176
// attempt to open a new one.
177-
connect = Some(self.connector.connect(permit, self.size())).into();
177+
connect = Some(self.connector.connect(id, permit)).into();
178178
}
179179
// `permit` is dropped in these branches, allowing another task to use it
180180
Ready(Some(res)) => return Ready(res),
@@ -189,8 +189,8 @@ impl<DB: Database> PoolInner<DB> {
189189
None => (),
190190
}
191191

192-
if let Ready(Some((size, permit))) = acquire_connect_permit.poll_unpin(cx) {
193-
connect = Some(self.connector.connect(permit, size)).into();
192+
if let Ready(Some((id, permit))) = acquire_connect_permit.poll_unpin(cx) {
193+
connect = Some(self.connector.connect(id, permit)).into();
194194
}
195195

196196
if let Ready(Some(res)) = connect.poll_unpin(cx) {
@@ -236,11 +236,11 @@ impl<DB: Database> PoolInner<DB> {
236236
//
237237
// If no extra permits are available then we shouldn't be trying to spin up
238238
// connections anyway.
239-
let Some((size, permit)) = self.counter.acquire_permit(self).now_or_never() else {
239+
let Some((id, permit)) = self.counter.acquire_permit(self).now_or_never() else {
240240
return Ok(());
241241
};
242242

243-
let conn = self.connector.connect(permit, size).await?;
243+
let conn = self.connector.connect(id, permit).await?;
244244

245245
// We skip `after_release` since the connection was never provided to user code
246246
// besides inside `PollConnector::connect()`, if they override it.
@@ -296,13 +296,16 @@ fn is_beyond_idle_timeout<DB: Database>(idle: &Idle<DB>, options: &PoolOptions<D
296296
}
297297

298298
/// Execute `test_before_acquire` and/or `before_acquire` in a background task, if applicable.
299-
///
299+
///
300300
/// Otherwise, immediately returns the connection.
301301
fn finish_acquire<DB: Database>(
302-
mut conn: Floating<DB, Idle<DB>>
303-
) -> Either<JoinHandle<Result<PoolConnection<DB>, ConnectPermit<DB>>>, PoolConnection<DB>> {
302+
mut conn: Floating<DB, Idle<DB>>,
303+
) -> Either<
304+
JoinHandle<Result<PoolConnection<DB>, (ConnectionId, ConnectPermit<DB>)>>,
305+
PoolConnection<DB>,
306+
> {
304307
let pool = conn.permit.pool();
305-
308+
306309
if pool.options.test_before_acquire || pool.options.before_acquire.is_some() {
307310
// Spawn a task so the call may complete even if `acquire()` is cancelled.
308311
return Either::Left(rt::spawn(async move {
@@ -333,11 +336,11 @@ fn finish_acquire<DB: Database>(
333336
Ok(true) => {}
334337
}
335338
}
336-
339+
337340
Ok(conn.into_live().reattach())
338341
}));
339342
}
340-
343+
341344
// No checks are configured, return immediately.
342345
Either::Right(conn.into_live().reattach())
343346
}

0 commit comments

Comments
 (0)