Skip to content

Commit a57930b

Browse files
committed
Simplify custom type autoloading with pgxpool
Provide some backwards-compatible configuration options for pgxpool which streamlines the use of the bulk loading of custom types: - AutoLoadTypes: a list of type (or class) names to automatically load for each connection, automatically also loading any other types these depend on. - ReuseTypeMaps: if enabled, pgxpool will cache the typemap information, avoiding the need to perform any further queries as new connections are created. ReuseTypeMaps is disabled by default as in some situations, a connection string might resolve to a pool of servers which do not share the same type name -> OID mapping.
1 parent 06c0451 commit a57930b

File tree

2 files changed

+88
-9
lines changed

2 files changed

+88
-9
lines changed

pgxpool/pool.go

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ import (
1212

1313
"github.com/jackc/pgx/v5"
1414
"github.com/jackc/pgx/v5/pgconn"
15+
"github.com/jackc/pgx/v5/pgtype"
1516
"github.com/jackc/puddle/v2"
1617
)
1718

18-
var defaultMaxConns = int32(4)
19-
var defaultMinConns = int32(0)
20-
var defaultMaxConnLifetime = time.Hour
21-
var defaultMaxConnIdleTime = time.Minute * 30
22-
var defaultHealthCheckPeriod = time.Minute
19+
var (
20+
defaultMaxConns = int32(4)
21+
defaultMinConns = int32(0)
22+
defaultMaxConnLifetime = time.Hour
23+
defaultMaxConnIdleTime = time.Minute * 30
24+
defaultHealthCheckPeriod = time.Minute
25+
)
2326

2427
type connResource struct {
2528
conn *pgx.Conn
@@ -100,6 +103,11 @@ type Pool struct {
100103

101104
closeOnce sync.Once
102105
closeChan chan struct{}
106+
107+
autoLoadTypes []string
108+
reuseTypeMap bool
109+
autoLoadMutex *sync.Mutex
110+
autoLoadTypeInfos []*pgtype.DerivedTypeInfo
103111
}
104112

105113
// Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
@@ -147,6 +155,20 @@ type Config struct {
147155
// HealthCheckPeriod is the duration between checks of the health of idle connections.
148156
HealthCheckPeriod time.Duration
149157

158+
// AutoLoadTypes is a list of user-defined types which should automatically be loaded
159+
// as each new connection is created. This will also load any derived types, directly
160+
// or indirectly required to handle these types.
161+
// This is equivalent to manually calling pgx.LoadDerivedTypes()
162+
// followed by conn.TypeMap().RegisterDerivedTypes()
163+
AutoLoadTypes []string
164+
165+
// ReuseTypeMaps, if enabled, will reuse the typemap information being used by AutoLoadTypes.
166+
// This removes the need to query the database each time a new connection is created;
167+
// only RegisterDerivedTypes will need to be called for each new connection.
168+
// In some situations, where OID mapping can differ between pg servers in the pool, perhaps due
169+
// to certain replication strategies, this should be left disabled.
170+
ReuseTypeMaps bool
171+
150172
createdByParseConfig bool // Used to enforce created by ParseConfig rule.
151173
}
152174

@@ -185,6 +207,8 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
185207
config: config,
186208
beforeConnect: config.BeforeConnect,
187209
afterConnect: config.AfterConnect,
210+
autoLoadTypes: config.AutoLoadTypes,
211+
reuseTypeMap: config.ReuseTypeMaps,
188212
beforeAcquire: config.BeforeAcquire,
189213
afterRelease: config.AfterRelease,
190214
beforeClose: config.BeforeClose,
@@ -196,6 +220,7 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
196220
healthCheckPeriod: config.HealthCheckPeriod,
197221
healthCheckChan: make(chan struct{}, 1),
198222
closeChan: make(chan struct{}),
223+
autoLoadMutex: new(sync.Mutex),
199224
}
200225

201226
if t, ok := config.ConnConfig.Tracer.(AcquireTracer); ok {
@@ -237,6 +262,18 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
237262
}
238263
}
239264

265+
if p.autoLoadTypes != nil && len(p.autoLoadTypes) > 0 {
266+
types, err := p.loadTypes(ctx, conn, p.autoLoadTypes)
267+
if err != nil {
268+
conn.Close(ctx)
269+
return nil, err
270+
}
271+
if err = conn.TypeMap().RegisterDerivedTypes(types); err != nil {
272+
conn.Close(ctx)
273+
return nil, err
274+
}
275+
}
276+
240277
jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
241278
maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)
242279

@@ -388,6 +425,27 @@ func (p *Pool) Close() {
388425
})
389426
}
390427

428+
// loadTypes is used internally to autoload the custom types for a connection,
429+
// potentially reusing previously-loaded typemap information.
430+
func (p *Pool) loadTypes(ctx context.Context, conn *pgx.Conn, typeNames []string) ([]*pgtype.DerivedTypeInfo, error) {
431+
if p.reuseTypeMap {
432+
p.autoLoadMutex.Lock()
433+
defer p.autoLoadMutex.Unlock()
434+
if p.autoLoadTypeInfos != nil {
435+
return p.autoLoadTypeInfos, nil
436+
}
437+
types, err := pgx.LoadDerivedTypes(ctx, conn, typeNames)
438+
if err != nil {
439+
return nil, err
440+
}
441+
p.autoLoadTypeInfos = types
442+
return types, err
443+
}
444+
// Avoid needing to acquire the mutex and allow connections to initialise in parallel
445+
// if we have chosen to not reuse the type mapping
446+
return pgx.LoadDerivedTypes(ctx, conn, typeNames)
447+
}
448+
391449
func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
392450
return time.Now().After(res.Value().maxAgeTime)
393451
}
@@ -482,7 +540,6 @@ func (p *Pool) checkMinConns() error {
482540
func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
483541
ctx, cancel := context.WithCancel(parentCtx)
484542
defer cancel()
485-
486543
errs := make(chan error, targetResources)
487544

488545
for i := 0; i < targetResources; i++ {
@@ -495,7 +552,6 @@ func (p *Pool) createIdleResources(parentCtx context.Context, targetResources in
495552
errs <- err
496553
}()
497554
}
498-
499555
var firstError error
500556
for i := 0; i < targetResources; i++ {
501557
err := <-errs

pgxpool/pool_test.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,31 @@ func TestPoolBeforeConnect(t *testing.T) {
261261
assert.EqualValues(t, "pgx", str)
262262
}
263263

264+
func TestAutoLoadTypes(t *testing.T) {
265+
t.Parallel()
266+
267+
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
268+
defer cancel()
269+
270+
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
271+
require.NoError(t, err)
272+
273+
db1, err := pgxpool.NewWithConfig(ctx, config)
274+
require.NoError(t, err)
275+
defer db1.Close()
276+
db1.Exec(ctx, "DROP DOMAIN IF EXISTS autoload_uint64; CREATE DOMAIN autoload_uint64 as numeric(20,0)")
277+
defer db1.Exec(ctx, "DROP DOMAIN autoload_uint64")
278+
279+
config.AutoLoadTypes = []string{"autoload_uint64"}
280+
db2, err := pgxpool.NewWithConfig(ctx, config)
281+
require.NoError(t, err)
282+
283+
var n uint64
284+
err = db2.QueryRow(ctx, "select 12::autoload_uint64").Scan(&n)
285+
require.NoError(t, err)
286+
assert.EqualValues(t, uint64(12), n)
287+
}
288+
264289
func TestPoolAfterConnect(t *testing.T) {
265290
t.Parallel()
266291

@@ -676,7 +701,6 @@ func TestPoolQuery(t *testing.T) {
676701
stats = pool.Stat()
677702
assert.EqualValues(t, 0, stats.AcquiredConns())
678703
assert.EqualValues(t, 1, stats.TotalConns())
679-
680704
}
681705

682706
func TestPoolQueryRow(t *testing.T) {
@@ -1104,7 +1128,6 @@ func TestConnectEagerlyReachesMinPoolSize(t *testing.T) {
11041128
}
11051129

11061130
t.Fatal("did not reach min pool size")
1107-
11081131
}
11091132

11101133
func TestPoolSendBatchBatchCloseTwice(t *testing.T) {

0 commit comments

Comments
 (0)