From 5e94520957938397bb51c1fd7dfcde27dbf1b178 Mon Sep 17 00:00:00 2001 From: David Cruz Date: Thu, 3 Jul 2025 14:48:48 -0700 Subject: [PATCH] Add AcquireTimeout to Pool Add a timeout for acquiring a free connection from the connection pool. --- pgxpool/pool.go | 11 +++++++++++ pgxpool/pool_test.go | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/pgxpool/pool.go b/pgxpool/pool.go index 3eabafa4a..036aab8be 100644 --- a/pgxpool/pool.go +++ b/pgxpool/pool.go @@ -94,6 +94,7 @@ type Pool struct { maxConnLifetimeJitter time.Duration maxConnIdleTime time.Duration healthCheckPeriod time.Duration + acquireTimeout time.Duration healthCheckChan chan struct{} @@ -170,6 +171,9 @@ type Config struct { // HealthCheckPeriod is the duration between checks of the health of idle connections. HealthCheckPeriod time.Duration + // AcquireTimeout is the timeout for acquiring a connection from the pool + AcquireTimeout time.Duration + createdByParseConfig bool // Used to enforce created by ParseConfig rule. } @@ -225,6 +229,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) { maxConnLifetimeJitter: config.MaxConnLifetimeJitter, maxConnIdleTime: config.MaxConnIdleTime, healthCheckPeriod: config.HealthCheckPeriod, + acquireTimeout: config.AcquireTimeout, healthCheckChan: make(chan struct{}, 1), closeChan: make(chan struct{}), } @@ -566,6 +571,12 @@ func (p *Pool) Acquire(ctx context.Context) (c *Conn, err error) { }() } + if p.acquireTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, p.acquireTimeout) + defer cancel() + } + // Try to acquire from the connection pool up to maxConns + 1 times, so that // any that fatal errors would empty the pool and still at least try 1 fresh // connection. diff --git a/pgxpool/pool_test.go b/pgxpool/pool_test.go index cb8e68105..fdbaca01e 100644 --- a/pgxpool/pool_test.go +++ b/pgxpool/pool_test.go @@ -693,6 +693,46 @@ func TestPoolBackgroundChecksMinConns(t *testing.T) { require.EqualValues(t, 3, stats.NewConnsCount()) } +func TestPoolAcquireTimeout(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE")) + require.NoError(t, err) + + config.AcquireTimeout = 1 * time.Second + + db, err := pgxpool.NewWithConfig(ctx, config) + require.NoError(t, err) + + // Acquire all available connections to starve the pool + maxConns := int(db.Stat().MaxConns()) + conns := make([]*pgxpool.Conn, 0, maxConns) + for i := 0; i < maxConns; i++ { + conn, err := db.Acquire(ctx) + require.NoError(t, err) + + conns = append(conns, conn) + } + + // If the timeout is not triggered within two seconds (2x the configured timeout), fail the test + timer := time.AfterFunc(2*time.Second, func() { + t.Fatal("Acquire() attempt did not timeout") + }) + + _, err = db.Acquire(ctx) + require.ErrorAs(t, err, &context.DeadlineExceeded) + + timer.Stop() + + for _, conn := range conns { + conn.Release() + } + waitForReleaseToComplete() +} + func TestPoolExec(t *testing.T) { t.Parallel()