Skip to content

Commit d2e281f

Browse files
committed
perf: skip ledger filter in queries for single-ledger buckets
Optimize query performance for single-ledger buckets by conditionally skipping the WHERE ledger = ? clause when a bucket contains only one ledger. This reduces unnecessary filtering and can provide 5-15% performance improvement in single-ledger deployments. Implementation: - Add singleLedgerOptimization cache to ledger Store - Add CountLedgersInBucket to system store - Detect single-ledger state on CreateLedger and OpenLedger - Refactor all query builders to use conditional filtering Changes: - internal/storage/ledger/store.go: Add cache and helper methods - internal/storage/system/store.go: Add CountLedgersInBucket - internal/storage/driver/driver.go: Detect single-ledger state - internal/storage/ledger/resource_*.go: Apply conditional filtering - internal/storage/ledger/{accounts,logs,transactions}.go: Apply conditional filtering
1 parent 1d90239 commit d2e281f

File tree

11 files changed

+137
-57
lines changed

11 files changed

+137
-57
lines changed

internal/storage/driver/driver.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
8181
return nil, postgres.ResolveError(err)
8282
}
8383

84+
if err := ret.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil {
85+
logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err)
86+
}
87+
8488
return ret, nil
8589
}
8690

@@ -93,6 +97,10 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor
9397

9498
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
9599

100+
if err := store.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil {
101+
logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err)
102+
}
103+
96104
return store, ret, err
97105
}
98106

internal/storage/ledger/accounts.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,16 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri
8686
store.tracer,
8787
store.deleteAccountMetadataHistogram,
8888
tracing.NoResult(func(ctx context.Context) error {
89-
_, err := store.db.NewUpdate().
89+
query := store.db.NewUpdate().
9090
ModelTableExpr(store.GetPrefixedRelationName("accounts")).
9191
Set("metadata = metadata - ?", key).
92-
Where("address = ?", account).
93-
Where("ledger = ?", store.ledger.Name).
94-
Exec(ctx)
92+
Where("address = ?", account)
93+
94+
if filterSQL, filterArgs := store.getLedgerFilterSQL(); filterSQL != "" {
95+
query = query.Where(filterSQL, filterArgs...)
96+
}
97+
98+
_, err := query.Exec(ctx)
9599
return postgres.ResolveError(err)
96100
}),
97101
)

internal/storage/ledger/logs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,14 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) (
118118
store.readLogWithIdempotencyKeyHistogram,
119119
func(ctx context.Context) (*ledger.Log, error) {
120120
ret := &Log{}
121-
if err := store.db.NewSelect().
121+
query := store.db.NewSelect().
122122
Model(ret).
123123
ModelTableExpr(store.GetPrefixedRelationName("logs")).
124124
Column("*").
125125
Where("idempotency_key = ?", key).
126-
Where("ledger = ?", store.ledger.Name).
127-
Limit(1).
128-
Scan(ctx); err != nil {
126+
Limit(1)
127+
query = store.applyLedgerFilter(query, "logs")
128+
if err := query.Scan(ctx); err != nil {
129129
return nil, postgres.ResolveError(err)
130130
}
131131

internal/storage/ledger/resource_accounts.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ func (h accountsResourceHandler) Schema() common.EntitySchema {
2828
func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
2929
ret := h.store.db.NewSelect().
3030
ModelTableExpr(h.store.GetPrefixedRelationName("accounts")).
31-
Column("address", "address_array", "first_usage", "insertion_date", "updated_at").
32-
Where("ledger = ?", h.store.ledger.Name)
31+
Column("address", "address_array", "first_usage", "insertion_date", "updated_at")
32+
ret = h.store.applyLedgerFilter(ret, "accounts")
3333

3434
if opts.PIT != nil && !opts.PIT.IsZero() {
3535
ret = ret.Where("accounts.first_usage <= ?", opts.PIT)
@@ -39,10 +39,10 @@ func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuild
3939
selectDistinctAccountMetadataHistories := h.store.db.NewSelect().
4040
DistinctOn("accounts_address").
4141
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
42-
Where("ledger = ?", h.store.ledger.Name).
4342
Column("accounts_address").
4443
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
4544
Where("date <= ?", opts.PIT)
45+
selectDistinctAccountMetadataHistories = h.store.applyLedgerFilter(selectDistinctAccountMetadataHistories, "accounts_metadata")
4646

4747
ret = ret.
4848
Join(
@@ -66,8 +66,7 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o
6666
case balanceRegex.MatchString(property) || property == "balance":
6767

6868
selectBalance := h.store.db.NewSelect().
69-
Where("accounts_address = dataset.address").
70-
Where("ledger = ?", h.store.ledger.Name)
69+
Where("accounts_address = dataset.address")
7170

7271
if opts.PIT != nil && !opts.PIT.IsZero() {
7372
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
@@ -78,10 +77,12 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o
7877
DistinctOn("asset").
7978
ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance").
8079
Where("effective_date <= ?", opts.PIT)
80+
selectBalance = h.store.applyLedgerFilter(selectBalance, "moves")
8181
} else {
8282
selectBalance = selectBalance.
8383
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
8484
ColumnExpr("input - output as balance")
85+
selectBalance = h.store.applyLedgerFilter(selectBalance, "accounts_volumes")
8586
}
8687

8788
if balanceRegex.MatchString(property) {
@@ -128,8 +129,8 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
128129
selectRowsQuery = selectRowsQuery.
129130
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
130131
DistinctOn("accounts_address, asset").
131-
Column("accounts_address", "asset").
132-
Where("ledger = ?", h.store.ledger.Name)
132+
Column("accounts_address", "asset")
133+
selectRowsQuery = h.store.applyLedgerFilter(selectRowsQuery, "moves")
133134
if property == "volumes" {
134135
selectRowsQuery = selectRowsQuery.
135136
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes").
@@ -143,8 +144,8 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
143144
selectRowsQuery = selectRowsQuery.
144145
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
145146
Column("asset", "accounts_address").
146-
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes").
147-
Where("ledger = ?", h.store.ledger.Name)
147+
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
148+
selectRowsQuery = h.store.applyLedgerFilter(selectRowsQuery, "accounts_volumes")
148149
}
149150

150151
return h.store.db.NewSelect().

internal/storage/ledger/resource_aggregated_balances.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
2626
ret := h.store.db.NewSelect().
2727
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
2828
DistinctOn("accounts_address, asset").
29-
Column("accounts_address", "asset").
30-
Where("ledger = ?", h.store.ledger.Name)
29+
Column("accounts_address", "asset")
30+
ret = h.store.applyLedgerFilter(ret, "moves")
3131
if query.Opts.UseInsertionDate {
3232
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
3333
return nil, NewErrMissingFeature(features.FeatureMovesHistory)
@@ -52,8 +52,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
5252
subQuery := h.store.db.NewSelect().
5353
TableExpr(h.store.GetPrefixedRelationName("accounts")).
5454
Column("address_array").
55-
Where("accounts.address = accounts_address").
56-
Where("ledger = ?", h.store.ledger.Name)
55+
Where("accounts.address = accounts_address")
56+
subQuery = h.store.applyLedgerFilter(subQuery, "accounts")
5757

5858
ret = ret.
5959
ColumnExpr("accounts.address_array as accounts_address_array").
@@ -65,9 +65,9 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
6565
DistinctOn("accounts_address").
6666
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
6767
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
68-
Where("ledger = ?", h.store.ledger.Name).
6968
Where("accounts_metadata.accounts_address = moves.accounts_address").
7069
Where("date <= ?", query.PIT)
70+
subQuery = h.store.applyLedgerFilter(subQuery, "accounts_metadata")
7171

7272
ret = ret.
7373
Join(`left join lateral (?) accounts_metadata on true`, subQuery).
@@ -79,17 +79,17 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
7979
ret := h.store.db.NewSelect().
8080
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
8181
Column("asset", "accounts_address").
82-
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes").
83-
Where("ledger = ?", h.store.ledger.Name)
82+
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
83+
ret = h.store.applyLedgerFilter(ret, "accounts_volumes")
8484

8585
if query.UseFilter("metadata") || query.UseFilter("address", func(value any) bool {
8686
return isPartialAddress(value.(string))
8787
}) {
8888
subQuery := h.store.db.NewSelect().
8989
TableExpr(h.store.GetPrefixedRelationName("accounts")).
9090
Column("address").
91-
Where("ledger = ?", h.store.ledger.Name).
9291
Where("accounts.address = accounts_address")
92+
subQuery = h.store.applyLedgerFilter(subQuery, "accounts")
9393

9494
if query.UseFilter("address") {
9595
subQuery = subQuery.ColumnExpr("address_array as accounts_address_array")

internal/storage/ledger/resource_logs.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ func (h logsResourceHandler) Schema() common.EntitySchema {
2121
}
2222

2323
func (h logsResourceHandler) BuildDataset(_ common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
24-
return h.store.db.NewSelect().
24+
ret := h.store.db.NewSelect().
2525
ModelTableExpr(h.store.GetPrefixedRelationName("logs")).
26-
ColumnExpr("*").
27-
Where("ledger = ?", h.store.ledger.Name), nil
26+
ColumnExpr("*")
27+
ret = h.store.applyLedgerFilter(ret, "logs")
28+
return ret, nil
2829
}
2930

3031
func (h logsResourceHandler) ResolveFilter(_ common.ResourceQuery[any], operator, property string, value any) (string, []any, error) {

internal/storage/ledger/resource_transactions.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
4444
"destinations",
4545
"sources_arrays",
4646
"destinations_arrays",
47-
).
48-
Where("ledger = ?", h.store.ledger.Name)
47+
)
48+
ret = h.store.applyLedgerFilter(ret, "transactions")
4949

5050
if slices.Contains(opts.Expand, "volumes") {
5151
ret = ret.Column("post_commit_volumes")
@@ -59,10 +59,10 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
5959
selectDistinctTransactionMetadataHistories := h.store.db.NewSelect().
6060
DistinctOn("transactions_id").
6161
ModelTableExpr(h.store.GetPrefixedRelationName("transactions_metadata")).
62-
Where("ledger = ?", h.store.ledger.Name).
6362
Column("transactions_id", "metadata").
6463
Order("transactions_id", "revision desc").
6564
Where("date <= ?", opts.PIT)
65+
selectDistinctTransactionMetadataHistories = h.store.applyLedgerFilter(selectDistinctTransactionMetadataHistories, "transactions_metadata")
6666

6767
ret = ret.
6868
Join(
@@ -124,19 +124,21 @@ func (h transactionsResourceHandler) Expand(_ common.ResourceQuery[any], propert
124124
return nil, nil, nil
125125
}
126126

127+
innerMostQuery := h.store.db.NewSelect().
128+
DistinctOn("transactions_id, accounts_address, asset").
129+
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
130+
Column("transactions_id", "accounts_address", "asset").
131+
ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`).
132+
Where("transactions_id in (select id from dataset)")
133+
innerMostQuery = h.store.applyLedgerFilter(innerMostQuery, "moves")
134+
127135
ret := h.store.db.NewSelect().
128136
TableExpr(
129137
"(?) data",
130138
h.store.db.NewSelect().
131139
TableExpr(
132140
"(?) moves",
133-
h.store.db.NewSelect().
134-
DistinctOn("transactions_id, accounts_address, asset").
135-
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
136-
Column("transactions_id", "accounts_address", "asset").
137-
ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`).
138-
Where("ledger = ?", h.store.ledger.Name).
139-
Where("transactions_id in (select id from dataset)"),
141+
innerMostQuery,
140142
).
141143
Column("transactions_id", "accounts_address").
142144
ColumnExpr(`public.aggregate_objects(json_build_object(moves.asset, json_build_object('input', (moves.post_commit_effective_volumes).inputs, 'output', (moves.post_commit_effective_volumes).outputs))::jsonb) AS post_commit_effective_volumes`).

internal/storage/ledger/resource_volumes.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
3939
ColumnExpr("input - output as balance").
4040
ColumnExpr("accounts_address as account").
4141
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
42-
Where("ledger = ?", h.store.ledger.Name).
4342
Order("accounts_address", "asset")
43+
selectVolumes = h.store.applyLedgerFilter(selectVolumes, "accounts_volumes")
4444

4545
if query.UseFilter("metadata") || query.UseFilter("first_usage") || needAddressSegments {
4646
accountsQuery := h.store.db.NewSelect().
4747
TableExpr(h.store.GetPrefixedRelationName("accounts")).
4848
Column("address").
49-
Where("ledger = ?", h.store.ledger.Name).
5049
Where("accounts.address = accounts_address")
50+
accountsQuery = h.store.applyLedgerFilter(accountsQuery, "accounts")
5151

5252
if needAddressSegments {
5353
accountsQuery = accountsQuery.ColumnExpr("address_array as account_array")
@@ -77,9 +77,9 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
7777
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
7878
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
7979
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
80-
Where("ledger = ?", h.store.ledger.Name).
8180
GroupExpr("accounts_address, asset").
8281
Order("accounts_address", "asset")
82+
selectVolumes = h.store.applyLedgerFilter(selectVolumes, "moves")
8383

8484
dateFilterColumn := "effective_date"
8585
if query.Opts.UseInsertionDate {
@@ -97,8 +97,8 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
9797
if needAddressSegments || query.UseFilter("first_usage") {
9898
accountsQuery := h.store.db.NewSelect().
9999
TableExpr(h.store.GetPrefixedRelationName("accounts")).
100-
Where("accounts.address = accounts_address").
101-
Where("ledger = ?", h.store.ledger.Name)
100+
Where("accounts.address = accounts_address")
101+
accountsQuery = h.store.applyLedgerFilter(accountsQuery, "accounts")
102102

103103
if needAddressSegments {
104104
accountsQuery = accountsQuery.ColumnExpr("address_array")
@@ -116,8 +116,8 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
116116
DistinctOn("accounts_address").
117117
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
118118
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
119-
Where("ledger = ?", h.store.ledger.Name).
120119
Where("accounts_metadata.accounts_address = moves.accounts_address")
120+
subQuery = h.store.applyLedgerFilter(subQuery, "accounts_metadata")
121121

122122
selectVolumes = selectVolumes.
123123
Join(`left join lateral (?) accounts_metadata on true`, subQuery).

internal/storage/ledger/store.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"sync"
78
"github.com/formancehq/go-libs/v3/bun/bunpaginate"
89
"github.com/formancehq/go-libs/v3/migrations"
910
"github.com/formancehq/go-libs/v3/platform/postgres"
@@ -19,11 +20,18 @@ import (
1920
"github.com/uptrace/bun"
2021
)
2122

23+
type singleLedgerOptimization struct {
24+
mu sync.RWMutex
25+
enabled bool
26+
}
27+
2228
type Store struct {
2329
db bun.IDB
2430
bucket bucket.Bucket
2531
ledger ledger.Ledger
2632

33+
singleLedgerCache *singleLedgerOptimization
34+
2735
tracer trace.Tracer
2836
meter metric.Meter
2937
checkBucketSchemaHistogram metric.Int64Histogram
@@ -165,9 +173,10 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
165173

166174
func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
167175
ret := &Store{
168-
db: db,
169-
ledger: l,
170-
bucket: bucket,
176+
db: db,
177+
ledger: l,
178+
bucket: bucket,
179+
singleLedgerCache: &singleLedgerOptimization{enabled: false},
171180
}
172181
for _, opt := range append(defaultOptions, opts...) {
173182
opt(ret)
@@ -267,6 +276,39 @@ func (store *Store) WithDB(db bun.IDB) *Store {
267276
return &ret
268277
}
269278

279+
func (store *Store) isSingleLedger() bool {
280+
store.singleLedgerCache.mu.RLock()
281+
defer store.singleLedgerCache.mu.RUnlock()
282+
return store.singleLedgerCache.enabled
283+
}
284+
285+
func (store *Store) applyLedgerFilter(query *bun.SelectQuery, tableAlias string) *bun.SelectQuery {
286+
if store.isSingleLedger() {
287+
return query
288+
}
289+
return query.Where(tableAlias+".ledger = ?", store.ledger.Name)
290+
}
291+
292+
func (store *Store) getLedgerFilterSQL() (string, []any) {
293+
if store.isSingleLedger() {
294+
return "", nil
295+
}
296+
return "ledger = ?", []any{store.ledger.Name}
297+
}
298+
299+
func (store *Store) UpdateSingleLedgerState(ctx context.Context, countFunc func(ctx context.Context, bucketName string) (int, error)) error {
300+
count, err := countFunc(ctx, store.ledger.Bucket)
301+
if err != nil {
302+
return fmt.Errorf("failed to count ledgers in bucket: %w", err)
303+
}
304+
305+
store.singleLedgerCache.mu.Lock()
306+
defer store.singleLedgerCache.mu.Unlock()
307+
store.singleLedgerCache.enabled = (count == 1)
308+
309+
return nil
310+
}
311+
270312
type Option func(s *Store)
271313

272314
func WithMeter(meter metric.Meter) Option {

0 commit comments

Comments
 (0)