Skip to content

Commit 3e25109

Browse files
committed
fix: remove cache and use dynamic ledger count query
- Remove singleLedgerOptimization cache structure - Add countLedgersInBucket function to Store - Modify isSingleLedger to query count on each call - Update applyLedgerFilter and getLedgerFilterSQL to accept context - Inject countFunc via factory options in module.go - Remove all UpdateSingleLedgerState calls from driver - Add Ctx field to ResourceQuery for context propagation - Update all call sites to pass context parameter (26 locations)
1 parent d2e281f commit 3e25109

File tree

12 files changed

+85
-79
lines changed

12 files changed

+85
-79
lines changed

internal/storage/common/resource.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ func (r *ResourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele
209209
}
210210

211211
func (r *ResourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Context, query ResourceQuery[OptionsType]) (*ResourceType, error) {
212+
query.Ctx = ctx
212213

213214
finalQuery, err := r.buildFilteredDataset(query)
214215
if err != nil {
@@ -235,6 +236,7 @@ func (r *ResourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Conte
235236
}
236237

237238
func (r *ResourceRepository[ResourceType, OptionsType]) Count(ctx context.Context, query ResourceQuery[OptionsType]) (int, error) {
239+
query.Ctx = ctx
238240

239241
finalQuery, err := r.buildFilteredDataset(query)
240242
if err != nil {
@@ -325,6 +327,7 @@ func (r *PaginatedResourceRepository[ResourceType, OptionsType]) Paginate(
325327
default:
326328
panic("should not happen")
327329
}
330+
resourceQuery.Ctx = ctx
328331

329332
finalQuery, err := r.buildFilteredDataset(resourceQuery)
330333
if err != nil {
@@ -406,11 +409,12 @@ func NewPaginatedResourceRepositoryMapper[ToResourceType any, OriginalResourceTy
406409
}
407410

408411
type ResourceQuery[Opts any] struct {
409-
PIT *time.Time `json:"pit"`
410-
OOT *time.Time `json:"oot"`
411-
Builder query.Builder `json:"qb"`
412-
Expand []string `json:"expand,omitempty"`
413-
Opts Opts `json:"opts"`
412+
Ctx context.Context `json:"-"`
413+
PIT *time.Time `json:"pit"`
414+
OOT *time.Time `json:"oot"`
415+
Builder query.Builder `json:"qb"`
416+
Expand []string `json:"expand,omitempty"`
417+
Opts Opts `json:"opts"`
414418
}
415419

416420
func (rq ResourceQuery[Opts]) UsePIT() bool {

internal/storage/driver/driver.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
8181
return nil, postgres.ResolveError(err)
8282
}
8383

84-
if err := ret.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil {
85-
logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err)
86-
}
87-
8884
return ret, nil
8985
}
9086

@@ -97,10 +93,6 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor
9793

9894
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
9995

100-
if err := store.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil {
101-
logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err)
102-
}
103-
10496
return store, ret, err
10597
}
10698

internal/storage/driver/module.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,18 @@ func NewFXModule() fx.Option {
2929
&ledger.Ledger{},
3030
)
3131
}),
32+
fx.Provide(func(tracerProvider trace.TracerProvider) systemstore.StoreFactory {
33+
return systemstore.NewStoreFactory(systemstore.WithTracer(
34+
tracerProvider.Tracer("SystemStore"),
35+
))
36+
}),
3237
fx.Provide(func(params struct {
3338
fx.In
3439

35-
DB *bun.DB
36-
TracerProvider trace.TracerProvider `optional:"true"`
37-
MeterProvider metric.MeterProvider `optional:"true"`
40+
DB *bun.DB
41+
SystemStoreFactory systemstore.StoreFactory
42+
TracerProvider trace.TracerProvider `optional:"true"`
43+
MeterProvider metric.MeterProvider `optional:"true"`
3844
}) ledgerstore.Factory {
3945
options := make([]ledgerstore.Option, 0)
4046
if params.TracerProvider != nil {
@@ -43,21 +49,25 @@ func NewFXModule() fx.Option {
4349
if params.MeterProvider != nil {
4450
options = append(options, ledgerstore.WithMeter(params.MeterProvider.Meter("store")))
4551
}
52+
options = append(options, ledgerstore.WithCountLedgersInBucketFunc(
53+
func(ctx context.Context, bucketName string) (int, error) {
54+
return params.SystemStoreFactory.Create(params.DB).CountLedgersInBucket(ctx, bucketName)
55+
},
56+
))
4657
return ledgerstore.NewFactory(params.DB, options...)
4758
}),
4859
fx.Provide(func(
4960
db *bun.DB,
5061
bucketFactory bucket.Factory,
5162
ledgerStoreFactory ledgerstore.Factory,
63+
systemStoreFactory systemstore.StoreFactory,
5264
tracerProvider trace.TracerProvider,
5365
) (*Driver, error) {
5466
return New(
5567
db,
5668
ledgerStoreFactory,
5769
bucketFactory,
58-
systemstore.NewStoreFactory(systemstore.WithTracer(
59-
tracerProvider.Tracer("SystemStore"),
60-
)),
70+
systemStoreFactory,
6171
WithTracer(tracerProvider.Tracer("StorageDriver")),
6272
), nil
6373
}),

internal/storage/ledger/accounts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri
9191
Set("metadata = metadata - ?", key).
9292
Where("address = ?", account)
9393

94-
if filterSQL, filterArgs := store.getLedgerFilterSQL(); filterSQL != "" {
94+
if filterSQL, filterArgs := store.getLedgerFilterSQL(ctx); filterSQL != "" {
9595
query = query.Where(filterSQL, filterArgs...)
9696
}
9797

internal/storage/ledger/logs.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) (
124124
Column("*").
125125
Where("idempotency_key = ?", key).
126126
Limit(1)
127-
query = store.applyLedgerFilter(query, "logs")
127+
query = store.applyLedgerFilter(ctx, query, "logs")
128128
if err := query.Scan(ctx); err != nil {
129129
return nil, postgres.ResolveError(err)
130130
}

internal/storage/ledger/resource_accounts.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuild
2929
ret := h.store.db.NewSelect().
3030
ModelTableExpr(h.store.GetPrefixedRelationName("accounts")).
3131
Column("address", "address_array", "first_usage", "insertion_date", "updated_at")
32-
ret = h.store.applyLedgerFilter(ret, "accounts")
32+
ret = h.store.applyLedgerFilter(opts.Ctx, ret, "accounts")
3333

3434
if opts.PIT != nil && !opts.PIT.IsZero() {
3535
ret = ret.Where("accounts.first_usage <= ?", opts.PIT)
@@ -42,7 +42,7 @@ func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuild
4242
Column("accounts_address").
4343
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
4444
Where("date <= ?", opts.PIT)
45-
selectDistinctAccountMetadataHistories = h.store.applyLedgerFilter(selectDistinctAccountMetadataHistories, "accounts_metadata")
45+
selectDistinctAccountMetadataHistories = h.store.applyLedgerFilter(opts.Ctx, selectDistinctAccountMetadataHistories, "accounts_metadata")
4646

4747
ret = ret.
4848
Join(
@@ -77,12 +77,12 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o
7777
DistinctOn("asset").
7878
ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance").
7979
Where("effective_date <= ?", opts.PIT)
80-
selectBalance = h.store.applyLedgerFilter(selectBalance, "moves")
80+
selectBalance = h.store.applyLedgerFilter(opts.Ctx, selectBalance, "moves")
8181
} else {
8282
selectBalance = selectBalance.
8383
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
8484
ColumnExpr("input - output as balance")
85-
selectBalance = h.store.applyLedgerFilter(selectBalance, "accounts_volumes")
85+
selectBalance = h.store.applyLedgerFilter(opts.Ctx, selectBalance, "accounts_volumes")
8686
}
8787

8888
if balanceRegex.MatchString(property) {
@@ -130,7 +130,7 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
130130
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
131131
DistinctOn("accounts_address, asset").
132132
Column("accounts_address", "asset")
133-
selectRowsQuery = h.store.applyLedgerFilter(selectRowsQuery, "moves")
133+
selectRowsQuery = h.store.applyLedgerFilter(opts.Ctx, selectRowsQuery, "moves")
134134
if property == "volumes" {
135135
selectRowsQuery = selectRowsQuery.
136136
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes").
@@ -145,7 +145,7 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
145145
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
146146
Column("asset", "accounts_address").
147147
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
148-
selectRowsQuery = h.store.applyLedgerFilter(selectRowsQuery, "accounts_volumes")
148+
selectRowsQuery = h.store.applyLedgerFilter(opts.Ctx, selectRowsQuery, "accounts_volumes")
149149
}
150150

151151
return h.store.db.NewSelect().

internal/storage/ledger/resource_aggregated_balances.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
2727
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
2828
DistinctOn("accounts_address, asset").
2929
Column("accounts_address", "asset")
30-
ret = h.store.applyLedgerFilter(ret, "moves")
30+
ret = h.store.applyLedgerFilter(query.Ctx, ret, "moves")
3131
if query.Opts.UseInsertionDate {
3232
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
3333
return nil, NewErrMissingFeature(features.FeatureMovesHistory)
@@ -53,7 +53,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
5353
TableExpr(h.store.GetPrefixedRelationName("accounts")).
5454
Column("address_array").
5555
Where("accounts.address = accounts_address")
56-
subQuery = h.store.applyLedgerFilter(subQuery, "accounts")
56+
subQuery = h.store.applyLedgerFilter(query.Ctx, subQuery, "accounts")
5757

5858
ret = ret.
5959
ColumnExpr("accounts.address_array as accounts_address_array").
@@ -67,7 +67,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
6767
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
6868
Where("accounts_metadata.accounts_address = moves.accounts_address").
6969
Where("date <= ?", query.PIT)
70-
subQuery = h.store.applyLedgerFilter(subQuery, "accounts_metadata")
70+
subQuery = h.store.applyLedgerFilter(query.Ctx, subQuery, "accounts_metadata")
7171

7272
ret = ret.
7373
Join(`left join lateral (?) accounts_metadata on true`, subQuery).
@@ -80,7 +80,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
8080
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
8181
Column("asset", "accounts_address").
8282
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
83-
ret = h.store.applyLedgerFilter(ret, "accounts_volumes")
83+
ret = h.store.applyLedgerFilter(query.Ctx, ret, "accounts_volumes")
8484

8585
if query.UseFilter("metadata") || query.UseFilter("address", func(value any) bool {
8686
return isPartialAddress(value.(string))
@@ -89,7 +89,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
8989
TableExpr(h.store.GetPrefixedRelationName("accounts")).
9090
Column("address").
9191
Where("accounts.address = accounts_address")
92-
subQuery = h.store.applyLedgerFilter(subQuery, "accounts")
92+
subQuery = h.store.applyLedgerFilter(query.Ctx, subQuery, "accounts")
9393

9494
if query.UseFilter("address") {
9595
subQuery = subQuery.ColumnExpr("address_array as accounts_address_array")

internal/storage/ledger/resource_logs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ func (h logsResourceHandler) Schema() common.EntitySchema {
2020
}
2121
}
2222

23-
func (h logsResourceHandler) BuildDataset(_ common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
23+
func (h logsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
2424
ret := h.store.db.NewSelect().
2525
ModelTableExpr(h.store.GetPrefixedRelationName("logs")).
2626
ColumnExpr("*")
27-
ret = h.store.applyLedgerFilter(ret, "logs")
27+
ret = h.store.applyLedgerFilter(opts.Ctx, ret, "logs")
2828
return ret, nil
2929
}
3030

internal/storage/ledger/resource_transactions.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
4545
"sources_arrays",
4646
"destinations_arrays",
4747
)
48-
ret = h.store.applyLedgerFilter(ret, "transactions")
48+
ret = h.store.applyLedgerFilter(opts.Ctx, ret, "transactions")
4949

5050
if slices.Contains(opts.Expand, "volumes") {
5151
ret = ret.Column("post_commit_volumes")
@@ -62,7 +62,7 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
6262
Column("transactions_id", "metadata").
6363
Order("transactions_id", "revision desc").
6464
Where("date <= ?", opts.PIT)
65-
selectDistinctTransactionMetadataHistories = h.store.applyLedgerFilter(selectDistinctTransactionMetadataHistories, "transactions_metadata")
65+
selectDistinctTransactionMetadataHistories = h.store.applyLedgerFilter(opts.Ctx, selectDistinctTransactionMetadataHistories, "transactions_metadata")
6666

6767
ret = ret.
6868
Join(
@@ -119,7 +119,7 @@ func (h transactionsResourceHandler) Project(_ common.ResourceQuery[any], select
119119
return selectQuery.ColumnExpr("*"), nil
120120
}
121121

122-
func (h transactionsResourceHandler) Expand(_ common.ResourceQuery[any], property string) (*bun.SelectQuery, *common.JoinCondition, error) {
122+
func (h transactionsResourceHandler) Expand(query common.ResourceQuery[any], property string) (*bun.SelectQuery, *common.JoinCondition, error) {
123123
if property != "effectiveVolumes" {
124124
return nil, nil, nil
125125
}
@@ -130,7 +130,7 @@ func (h transactionsResourceHandler) Expand(_ common.ResourceQuery[any], propert
130130
Column("transactions_id", "accounts_address", "asset").
131131
ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`).
132132
Where("transactions_id in (select id from dataset)")
133-
innerMostQuery = h.store.applyLedgerFilter(innerMostQuery, "moves")
133+
innerMostQuery = h.store.applyLedgerFilter(query.Ctx, innerMostQuery, "moves")
134134

135135
ret := h.store.db.NewSelect().
136136
TableExpr(

internal/storage/ledger/resource_volumes.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
4040
ColumnExpr("accounts_address as account").
4141
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
4242
Order("accounts_address", "asset")
43-
selectVolumes = h.store.applyLedgerFilter(selectVolumes, "accounts_volumes")
43+
selectVolumes = h.store.applyLedgerFilter(query.Ctx, selectVolumes, "accounts_volumes")
4444

4545
if query.UseFilter("metadata") || query.UseFilter("first_usage") || needAddressSegments {
4646
accountsQuery := h.store.db.NewSelect().
4747
TableExpr(h.store.GetPrefixedRelationName("accounts")).
4848
Column("address").
4949
Where("accounts.address = accounts_address")
50-
accountsQuery = h.store.applyLedgerFilter(accountsQuery, "accounts")
50+
accountsQuery = h.store.applyLedgerFilter(query.Ctx, accountsQuery, "accounts")
5151

5252
if needAddressSegments {
5353
accountsQuery = accountsQuery.ColumnExpr("address_array as account_array")
@@ -79,7 +79,7 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
7979
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
8080
GroupExpr("accounts_address, asset").
8181
Order("accounts_address", "asset")
82-
selectVolumes = h.store.applyLedgerFilter(selectVolumes, "moves")
82+
selectVolumes = h.store.applyLedgerFilter(query.Ctx, selectVolumes, "moves")
8383

8484
dateFilterColumn := "effective_date"
8585
if query.Opts.UseInsertionDate {
@@ -98,7 +98,7 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
9898
accountsQuery := h.store.db.NewSelect().
9999
TableExpr(h.store.GetPrefixedRelationName("accounts")).
100100
Where("accounts.address = accounts_address")
101-
accountsQuery = h.store.applyLedgerFilter(accountsQuery, "accounts")
101+
accountsQuery = h.store.applyLedgerFilter(query.Ctx, accountsQuery, "accounts")
102102

103103
if needAddressSegments {
104104
accountsQuery = accountsQuery.ColumnExpr("address_array")
@@ -117,7 +117,7 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
117117
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
118118
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
119119
Where("accounts_metadata.accounts_address = moves.accounts_address")
120-
subQuery = h.store.applyLedgerFilter(subQuery, "accounts_metadata")
120+
subQuery = h.store.applyLedgerFilter(query.Ctx, subQuery, "accounts_metadata")
121121

122122
selectVolumes = selectVolumes.
123123
Join(`left join lateral (?) accounts_metadata on true`, subQuery).

0 commit comments

Comments
 (0)