diff --git a/internal/storage/driver/module.go b/internal/storage/driver/module.go index f0be31b16..66e7fed8b 100644 --- a/internal/storage/driver/module.go +++ b/internal/storage/driver/module.go @@ -31,12 +31,20 @@ func NewFXModule() fx.Option { &ledger.Ledger{}, ) }), + // SystemStoreFactory is provided separately to be used both by the Driver + // and by the ledger store factory for counting ledgers in buckets + fx.Provide(func(tracerProvider trace.TracerProvider) systemstore.StoreFactory { + return systemstore.NewStoreFactory(systemstore.WithTracer( + tracerProvider.Tracer("SystemStore"), + )) + }), fx.Provide(func(params struct { fx.In - DB *bun.DB - TracerProvider trace.TracerProvider `optional:"true"` - MeterProvider metric.MeterProvider `optional:"true"` + DB *bun.DB + SystemStoreFactory systemstore.StoreFactory + TracerProvider trace.TracerProvider `optional:"true"` + MeterProvider metric.MeterProvider `optional:"true"` }) ledgerstore.Factory { options := make([]ledgerstore.Option, 0) if params.TracerProvider != nil { @@ -45,21 +53,25 @@ func NewFXModule() fx.Option { if params.MeterProvider != nil { options = append(options, ledgerstore.WithMeter(params.MeterProvider.Meter("store"))) } + options = append(options, ledgerstore.WithCountLedgersInBucketFunc( + func(ctx context.Context, bucketName string) (int, error) { + return params.SystemStoreFactory.Create(params.DB).CountLedgersInBucket(ctx, bucketName) + }, + )) return ledgerstore.NewFactory(params.DB, options...) }), fx.Provide(func( db *bun.DB, bucketFactory bucket.Factory, ledgerStoreFactory ledgerstore.Factory, + systemStoreFactory systemstore.StoreFactory, tracerProvider trace.TracerProvider, ) (*Driver, error) { return New( db, ledgerStoreFactory, bucketFactory, - systemstore.NewStoreFactory(systemstore.WithTracer( - tracerProvider.Tracer("SystemStore"), - )), + systemStoreFactory, WithTracer(tracerProvider.Tracer("StorageDriver")), ), nil }), diff --git a/internal/storage/driver/system_generated_test.go b/internal/storage/driver/system_generated_test.go index d6afce573..1f79df3e9 100644 --- a/internal/storage/driver/system_generated_test.go +++ b/internal/storage/driver/system_generated_test.go @@ -40,6 +40,21 @@ func (m *SystemStore) EXPECT() *SystemStoreMockRecorder { return m.recorder } +// CountLedgersInBucket mocks base method. +func (m *SystemStore) CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountLedgersInBucket", ctx, bucketName) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountLedgersInBucket indicates an expected call of CountLedgersInBucket. +func (mr *SystemStoreMockRecorder) CountLedgersInBucket(ctx, bucketName any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountLedgersInBucket", reflect.TypeOf((*SystemStore)(nil).CountLedgersInBucket), ctx, bucketName) +} + // CreateLedger mocks base method. func (m *SystemStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 47ed62503..209ad93b2 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -5,6 +5,7 @@ import ( "fmt" . "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/ledger/internal/tracing" + "github.com/uptrace/bun" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "regexp" @@ -77,12 +78,18 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri store.tracer, store.deleteAccountMetadataHistogram, tracing.NoResult(func(ctx context.Context) error { - _, err := store.db.NewUpdate(). + query := store.db.NewUpdate(). ModelTableExpr(store.GetPrefixedRelationName("accounts")). Set("metadata = metadata - ?", key). - Where("address = ?", account). - Where("ledger = ?", store.ledger.Name). - Exec(ctx) + Where("address = ?", account) + query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) + if ledgerFilter != "" { + return q.Where(ledgerFilter, ledgerArgs...) + } + return q + }) + _, err := query.Exec(ctx) return postgres.ResolveError(err) }), ) diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 38fd2670c..1268c732e 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -119,14 +119,14 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) ( store.readLogWithIdempotencyKeyHistogram, func(ctx context.Context) (*ledger.Log, error) { ret := &Log{} - if err := store.db.NewSelect(). + query := store.db.NewSelect(). Model(ret). ModelTableExpr(store.GetPrefixedRelationName("logs")). Column("*"). Where("idempotency_key = ?", key). - Where("ledger = ?", store.ledger.Name). - Limit(1). - Scan(ctx); err != nil { + Limit(1) + query = store.applyLedgerFilter(ctx, query, "logs") + if err := query.Scan(ctx); err != nil { return nil, postgres.ResolveError(err) } diff --git a/internal/storage/ledger/resource.go b/internal/storage/ledger/resource.go index 6e734472a..6b36035ce 100644 --- a/internal/storage/ledger/resource.go +++ b/internal/storage/ledger/resource.go @@ -87,10 +87,10 @@ func (ctx repositoryHandlerBuildContext[Opts]) useFilter(v string, matchers ...f type repositoryHandler[Opts any] interface { filters() []filter - buildDataset(store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error) - resolveFilter(store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error) - project(store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) - expand(store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error) + buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error) + resolveFilter(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error) + project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) + expand(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error) } type resourceRepository[ResourceType, OptionsType any] struct { @@ -151,14 +151,14 @@ func (r *resourceRepository[ResourceType, OptionsType]) validateFilters(builder return ret, nil } -func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { +func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(ctx context.Context, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { filters, err := r.validateFilters(q.Builder) if err != nil { return nil, err } - dataset, err := r.resourceHandler.buildDataset(r.store, repositoryHandlerBuildContext[OptionsType]{ + dataset, err := r.resourceHandler.buildDataset(ctx, r.store, repositoryHandlerBuildContext[OptionsType]{ ResourceQuery: q, filters: filters, }) @@ -172,7 +172,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l if q.Builder != nil { // Convert filters to where clause where, args, err := q.Builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { - return r.resourceHandler.resolveFilter(r.store, q, operator, key, value) + return r.resourceHandler.resolveFilter(ctx, r.store, q, operator, key, value) })) if err != nil { return nil, err @@ -184,10 +184,10 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l } } - return r.resourceHandler.project(r.store, q, dataset) + return r.resourceHandler.project(ctx, r.store, q, dataset) } -func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { +func (r *resourceRepository[ResourceType, OptionsType]) expand(ctx context.Context, dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) { dataset = r.store.db.NewSelect(). With("dataset", dataset). ModelTableExpr("dataset"). @@ -196,7 +196,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele slices.Sort(q.Expand) for i, expand := range q.Expand { - selectQuery, joinCondition, err := r.resourceHandler.expand(r.store, q, expand) + selectQuery, joinCondition, err := r.resourceHandler.expand(ctx, r.store, q, expand) if err != nil { return nil, err } @@ -222,12 +222,12 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (*ResourceType, error) { - finalQuery, err := r.buildFilteredDataset(query) + finalQuery, err := r.buildFilteredDataset(ctx, query) if err != nil { return nil, err } - finalQuery, err = r.expand(finalQuery, query) + finalQuery, err = r.expand(ctx, finalQuery, query) if err != nil { return nil, err } @@ -248,7 +248,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Conte func (r *resourceRepository[ResourceType, OptionsType]) Count(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (int, error) { - finalQuery, err := r.buildFilteredDataset(query) + finalQuery, err := r.buildFilteredDataset(ctx, query) if err != nil { return 0, err } @@ -289,7 +289,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT panic("should not happen") } - finalQuery, err := r.buildFilteredDataset(resourceQuery) + finalQuery, err := r.buildFilteredDataset(ctx, resourceQuery) if err != nil { return nil, fmt.Errorf("building filtered dataset: %w", err) } @@ -299,7 +299,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT return nil, fmt.Errorf("paginating request: %w", err) } - finalQuery, err = r.expand(finalQuery, resourceQuery) + finalQuery, err = r.expand(ctx, finalQuery, resourceQuery) if err != nil { return nil, fmt.Errorf("expanding results: %w", err) } diff --git a/internal/storage/ledger/resource_accounts.go b/internal/storage/ledger/resource_accounts.go index 3721a2b6a..3c88683e3 100644 --- a/internal/storage/ledger/resource_accounts.go +++ b/internal/storage/ledger/resource_accounts.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "fmt" ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -48,14 +49,14 @@ func (h accountsResourceHandler) filters() []filter { } } -func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { +func (h accountsResourceHandler) buildDataset(ctx context.Context, store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { ret := store.db.NewSelect() // Build the query ret = ret. ModelTableExpr(store.GetPrefixedRelationName("accounts")). - Column("address", "address_array", "first_usage", "insertion_date", "updated_at"). - Where("ledger = ?", store.ledger.Name) + Column("address", "address_array", "first_usage", "insertion_date", "updated_at") + ret = store.applyLedgerFilter(ctx, ret, "accounts") if opts.PIT != nil && !opts.PIT.IsZero() { ret = ret.Where("accounts.first_usage <= ?", opts.PIT) @@ -65,10 +66,10 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl selectDistinctAccountMetadataHistories := store.db.NewSelect(). DistinctOn("accounts_address"). ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). - Where("ledger = ?", store.ledger.Name). Column("accounts_address"). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). Where("date <= ?", opts.PIT) + selectDistinctAccountMetadataHistories = store.applyLedgerFilter(ctx, selectDistinctAccountMetadataHistories, "accounts_metadata") ret = ret. Join( @@ -83,7 +84,7 @@ func (h accountsResourceHandler) buildDataset(store *Store, opts repositoryHandl return ret, nil } -func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { +func (h accountsResourceHandler) resolveFilter(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { switch { case property == "address": return filterAccountAddress(value.(string), "address"), nil, nil @@ -92,8 +93,7 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll case balanceRegex.MatchString(property) || property == "balance": selectBalance := store.db.NewSelect(). - Where("accounts_address = dataset.address"). - Where("ledger = ?", store.ledger.Name) + Where("accounts_address = dataset.address") if opts.PIT != nil && !opts.PIT.IsZero() { if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { @@ -104,10 +104,12 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll DistinctOn("asset"). ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance"). Where("effective_date <= ?", opts.PIT) + selectBalance = store.applyLedgerFilter(ctx, selectBalance, "moves") } else { selectBalance = selectBalance. ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). ColumnExpr("input - output as balance") + selectBalance = store.applyLedgerFilter(ctx, selectBalance, "accounts_volumes") } if balanceRegex.MatchString(property) { @@ -132,11 +134,11 @@ func (h accountsResourceHandler) resolveFilter(store *Store, opts ledgercontroll } } -func (h accountsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { +func (h accountsResourceHandler) project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { return selectQuery.ColumnExpr("*"), nil } -func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h accountsResourceHandler) expand(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { switch property { case "volumes": if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { @@ -154,8 +156,8 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso selectRowsQuery = selectRowsQuery. ModelTableExpr(store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - Where("ledger = ?", store.ledger.Name) + Column("accounts_address", "asset") + selectRowsQuery = store.applyLedgerFilter(ctx, selectRowsQuery, "moves") if property == "volumes" { selectRowsQuery = selectRowsQuery. ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes"). @@ -169,8 +171,8 @@ func (h accountsResourceHandler) expand(store *Store, opts ledgercontroller.Reso selectRowsQuery = selectRowsQuery. ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). - ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes"). - Where("ledger = ?", store.ledger.Name) + ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes") + selectRowsQuery = store.applyLedgerFilter(ctx, selectRowsQuery, "accounts_volumes") } return store.db.NewSelect(). diff --git a/internal/storage/ledger/resource_aggregated_balances.go b/internal/storage/ledger/resource_aggregated_balances.go index f1cbe204f..df3baed02 100644 --- a/internal/storage/ledger/resource_aggregated_balances.go +++ b/internal/storage/ledger/resource_aggregated_balances.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "errors" "fmt" ledger "github.com/formancehq/ledger/internal" @@ -46,14 +47,14 @@ func (h aggregatedBalancesResourceRepositoryHandler) filters() []filter { } } -func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) { +func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) { if query.UsePIT() { ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - Where("ledger = ?", store.ledger.Name) + Column("accounts_address", "asset") + ret = store.applyLedgerFilter(ctx, ret, "moves") if query.Opts.UseInsertionDate { if !store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory) @@ -76,8 +77,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address_array"). - Where("accounts.address = accounts_address"). - Where("ledger = ?", store.ledger.Name) + Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") ret = ret. ColumnExpr("accounts.address_array as accounts_address_array"). @@ -89,9 +90,9 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, DistinctOn("accounts_address"). ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). - Where("ledger = ?", store.ledger.Name). Where("accounts_metadata.accounts_address = moves.accounts_address"). Where("date <= ?", query.PIT) + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts_metadata") ret = ret. Join(`left join lateral (?) accounts_metadata on true`, subQuery). @@ -103,15 +104,15 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). - ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes"). - Where("ledger = ?", store.ledger.Name) + ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes") + ret = store.applyLedgerFilter(ctx, ret, "accounts_volumes") if query.useFilter("metadata") || query.useFilter("address", isPartialAddress) { subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address"). - Where("ledger = ?", store.ledger.Name). Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") if query.useFilter("address") { subQuery = subQuery.ColumnExpr("address_array as accounts_address_array") @@ -130,7 +131,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, } } -func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], operator, property string, value any) (string, []any, error) { +func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], operator, property string, value any) (string, []any, error) { switch { case property == "address": return filterAccountAddress(value.(string), "accounts_address"), nil, nil @@ -149,11 +150,12 @@ func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(store *Store, } } -func (h aggregatedBalancesResourceRepositoryHandler) expand(_ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h aggregatedBalancesResourceRepositoryHandler) expand(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { return nil, nil, errors.New("no expand available for aggregated balances") } func (h aggregatedBalancesResourceRepositoryHandler) project( + ctx context.Context, store *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], selectQuery *bun.SelectQuery, diff --git a/internal/storage/ledger/resource_logs.go b/internal/storage/ledger/resource_logs.go index 544912a22..9abcfce7e 100644 --- a/internal/storage/ledger/resource_logs.go +++ b/internal/storage/ledger/resource_logs.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "errors" "fmt" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -21,14 +22,15 @@ func (h logsResourceHandler) filters() []filter { } } -func (h logsResourceHandler) buildDataset(store *Store, _ repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { - return store.db.NewSelect(). +func (h logsResourceHandler) buildDataset(ctx context.Context, store *Store, _ repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { + ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("logs")). - ColumnExpr("*"). - Where("ledger = ?", store.ledger.Name), nil + ColumnExpr("*") + ret = store.applyLedgerFilter(ctx, ret, "logs") + return ret, nil } -func (h logsResourceHandler) resolveFilter(_ *Store, _ ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { +func (h logsResourceHandler) resolveFilter(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { switch { case property == "date" || property == "id": return fmt.Sprintf("%s %s ?", property, convertOperatorToSQL(operator)), []any{value}, nil @@ -37,11 +39,11 @@ func (h logsResourceHandler) resolveFilter(_ *Store, _ ledgercontroller.Resource } } -func (h logsResourceHandler) expand(_ *Store, _ ledgercontroller.ResourceQuery[any], _ string) (*bun.SelectQuery, *joinCondition, error) { +func (h logsResourceHandler) expand(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[any], _ string) (*bun.SelectQuery, *joinCondition, error) { return nil, nil, errors.New("no expand supported") } -func (h logsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { +func (h logsResourceHandler) project(_ context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { return selectQuery.ColumnExpr("*"), nil } diff --git a/internal/storage/ledger/resource_transactions.go b/internal/storage/ledger/resource_transactions.go index 6b9ea4a86..d1f2d70e6 100644 --- a/internal/storage/ledger/resource_transactions.go +++ b/internal/storage/ledger/resource_transactions.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "fmt" ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -68,7 +69,7 @@ func (h transactionsResourceHandler) filters() []filter { } } -func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { +func (h transactionsResourceHandler) buildDataset(ctx context.Context, store *Store, opts repositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { ret := store.db.NewSelect(). ModelTableExpr(store.GetPrefixedRelationName("transactions")). Column( @@ -83,8 +84,8 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH "destinations", "sources_arrays", "destinations_arrays", - ). - Where("ledger = ?", store.ledger.Name) + ) + ret = store.applyLedgerFilter(ctx, ret, "transactions") if slices.Contains(opts.Expand, "volumes") { ret = ret.Column("post_commit_volumes") @@ -98,10 +99,10 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH selectDistinctTransactionMetadataHistories := store.db.NewSelect(). DistinctOn("transactions_id"). ModelTableExpr(store.GetPrefixedRelationName("transactions_metadata")). - Where("ledger = ?", store.ledger.Name). Column("transactions_id", "metadata"). Order("transactions_id", "revision desc"). Where("date <= ?", opts.PIT) + selectDistinctTransactionMetadataHistories = store.applyLedgerFilter(ctx, selectDistinctTransactionMetadataHistories, "transactions_metadata") ret = ret. Join( @@ -122,7 +123,7 @@ func (h transactionsResourceHandler) buildDataset(store *Store, opts repositoryH return ret, nil } -func (h transactionsResourceHandler) resolveFilter(store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { +func (h transactionsResourceHandler) resolveFilter(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], operator, property string, value any) (string, []any, error) { switch { case property == "id": return fmt.Sprintf("id %s ?", convertOperatorToSQL(operator)), []any{value}, nil @@ -154,28 +155,30 @@ func (h transactionsResourceHandler) resolveFilter(store *Store, opts ledgercont } } -func (h transactionsResourceHandler) project(store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { +func (h transactionsResourceHandler) project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[any], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error) { return selectQuery.ColumnExpr("*"), nil } -func (h transactionsResourceHandler) expand(store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h transactionsResourceHandler) expand(ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[any], property string) (*bun.SelectQuery, *joinCondition, error) { if property != "effectiveVolumes" { return nil, nil, nil } + movesSubquery := store.db.NewSelect(). + DistinctOn("transactions_id, accounts_address, asset"). + ModelTableExpr(store.GetPrefixedRelationName("moves")). + Column("transactions_id", "accounts_address", "asset"). + ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). + Where("transactions_id in (select id from dataset)") + movesSubquery = store.applyLedgerFilter(ctx, movesSubquery, "moves") + ret := store.db.NewSelect(). TableExpr( "(?) data", store.db.NewSelect(). TableExpr( "(?) moves", - store.db.NewSelect(). - DistinctOn("transactions_id, accounts_address, asset"). - ModelTableExpr(store.GetPrefixedRelationName("moves")). - Column("transactions_id", "accounts_address", "asset"). - ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). - Where("ledger = ?", store.ledger.Name). - Where("transactions_id in (select id from dataset)"), + movesSubquery, ). Column("transactions_id", "accounts_address"). ColumnExpr(`public.aggregate_objects(json_build_object(moves.asset, json_build_object('input', (moves.post_commit_effective_volumes).inputs, 'output', (moves.post_commit_effective_volumes).outputs))::jsonb) AS post_commit_effective_volumes`). diff --git a/internal/storage/ledger/resource_volumes.go b/internal/storage/ledger/resource_volumes.go index 475ac6533..706f73dee 100644 --- a/internal/storage/ledger/resource_volumes.go +++ b/internal/storage/ledger/resource_volumes.go @@ -1,6 +1,7 @@ package ledger import ( + "context" "errors" "fmt" ledger "github.com/formancehq/ledger/internal" @@ -54,7 +55,7 @@ func (h volumesResourceHandler) filters() []filter { } } -func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetVolumesOptions]) (*bun.SelectQuery, error) { +func (h volumesResourceHandler) buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetVolumesOptions]) (*bun.SelectQuery, error) { var selectVolumes *bun.SelectQuery @@ -65,15 +66,15 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ColumnExpr("input - output as balance"). ColumnExpr("accounts_address as account"). ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). - Where("ledger = ?", store.ledger.Name). Order("accounts_address", "asset") + selectVolumes = store.applyLedgerFilter(ctx, selectVolumes, "accounts_volumes") if query.useFilter("metadata") || needAddressSegments { subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address"). - Where("ledger = ?", store.ledger.Name). Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") if needAddressSegments { subQuery = subQuery.ColumnExpr("address_array as account_array") @@ -99,9 +100,9 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl ColumnExpr("sum(case when is_source then amount else 0 end) as output"). ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). ModelTableExpr(store.GetPrefixedRelationName("moves")). - Where("ledger = ?", store.ledger.Name). GroupExpr("accounts_address, asset"). Order("accounts_address", "asset") + selectVolumes = store.applyLedgerFilter(ctx, selectVolumes, "moves") dateFilterColumn := "effective_date" if query.Opts.UseInsertionDate { @@ -120,8 +121,8 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl subQuery := store.db.NewSelect(). TableExpr(store.GetPrefixedRelationName("accounts")). Column("address_array"). - Where("accounts.address = accounts_address"). - Where("ledger = ?", store.ledger.Name) + Where("accounts.address = accounts_address") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts") selectVolumes. ColumnExpr("(array_agg(accounts.address_array))[1] as account_array"). @@ -133,8 +134,8 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl DistinctOn("accounts_address"). ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). - Where("ledger = ?", store.ledger.Name). Where("accounts_metadata.accounts_address = moves.accounts_address") + subQuery = store.applyLedgerFilter(ctx, subQuery, "accounts_metadata") selectVolumes = selectVolumes. Join(`left join lateral (?) accounts_metadata on true`, subQuery). @@ -146,6 +147,7 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl } func (h volumesResourceHandler) resolveFilter( + ctx context.Context, store *Store, opts ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], operator, property string, @@ -184,6 +186,7 @@ func (h volumesResourceHandler) resolveFilter( } func (h volumesResourceHandler) project( + ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], selectQuery *bun.SelectQuery, @@ -208,7 +211,7 @@ func (h volumesResourceHandler) project( GroupExpr("account, asset"), nil } -func (h volumesResourceHandler) expand(_ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { +func (h volumesResourceHandler) expand(_ context.Context, _ *Store, _ ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions], property string) (*bun.SelectQuery, *joinCondition, error) { return nil, nil, errors.New("no expansion available") } diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 0c323da5a..37f147b42 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/go-libs/v2/migrations" "github.com/formancehq/go-libs/v2/platform/postgres" @@ -25,6 +26,8 @@ type Store struct { bucket bucket.Bucket ledger ledger.Ledger + countLedgersInBucket func(ctx context.Context, bucketName string) (int, error) + tracer trace.Tracer meter metric.Meter checkBucketSchemaHistogram metric.Int64Histogram @@ -135,6 +138,53 @@ func (store *Store) GetPrefixedRelationName(v string) string { return fmt.Sprintf(`"%s".%s`, store.ledger.Bucket, v) } +// isSingleLedger returns true if the bucket optimization is enabled for single-ledger scenarios. +// This allows queries to skip the WHERE ledger = ? clause when there's only one ledger in the bucket. +// isSingleLedger checks in real-time if the bucket contains only one ledger. +// This query is fast since the ledgers table has very few rows. +func (store *Store) isSingleLedger(ctx context.Context) (bool, error) { + if store.countLedgersInBucket == nil { + return false, nil + } + count, err := store.countLedgersInBucket(ctx, store.ledger.Bucket) + if err != nil { + return false, fmt.Errorf("failed to count ledgers in bucket: %w", err) + } + return count == 1, nil +} + +// applyLedgerFilter conditionally applies the WHERE ledger = ? clause to a query. +// If the bucket contains only one ledger, the filter is skipped for performance optimization. +// On error, conservatively applies the filter. +func (store *Store) applyLedgerFilter(ctx context.Context, query *bun.SelectQuery, tableAlias string) *bun.SelectQuery { + singleLedger, err := store.isSingleLedger(ctx) + if err != nil { + // Log error but continue with conservative behavior (apply filter) + trace.SpanFromContext(ctx).RecordError(err) + return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) + } + if singleLedger { + return query + } + return query.Where(fmt.Sprintf("%s.ledger = ?", tableAlias), store.ledger.Name) +} + +// getLedgerFilterSQL returns the SQL condition (without conjunction) and arguments for ledger filtering. +// Returns empty string and nil args if single-ledger optimization is enabled. +// On error, conservatively returns the filter. +func (store *Store) getLedgerFilterSQL(ctx context.Context) (string, []any) { + singleLedger, err := store.isSingleLedger(ctx) + if err != nil { + // Log error but continue with conservative behavior (return filter) + trace.SpanFromContext(ctx).RecordError(err) + return "ledger = ?", []any{store.ledger.Name} + } + if singleLedger { + return "", nil + } + return "ledger = ?", []any{store.ledger.Name} +} + func validateAddressFilter(ledger ledger.Ledger, operator string, value any) error { if operator != "$match" { return fmt.Errorf("'address' column can only be used with $match, operator used is: %s", operator) @@ -304,6 +354,12 @@ func WithTracer(tracer trace.Tracer) Option { } } +func WithCountLedgersInBucketFunc(countFunc func(ctx context.Context, bucketName string) (int, error)) Option { + return func(s *Store) { + s.countLedgersInBucket = countFunc + } +} + var defaultOptions = []Option{ WithMeter(noopmetrics.Meter{}), WithTracer(nooptracer.Tracer{}), diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index 9069de2dc..c1ada8c14 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -176,11 +176,14 @@ func (store *Store) updateTxWithRetrieve(ctx context.Context, id int, query *bun ColumnExpr("upd.*, true as modified"). ModelTableExpr("upd"). UnionAll( - store.db.NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("transactions")). - ColumnExpr("*, false as modified"). - Where("id = ? and ledger = ?", id, store.ledger.Name). - Limit(1), + func() *bun.SelectQuery { + query := store.db.NewSelect(). + ModelTableExpr(store.GetPrefixedRelationName("transactions")). + ColumnExpr("*, false as modified"). + Where("id = ?", id). + Limit(1) + return store.applyLedgerFilter(ctx, query, "transactions") + }(), ), ). Model(me). @@ -203,8 +206,14 @@ func (store *Store) RevertTransaction(ctx context.Context, id int, at time.Time) ModelTableExpr(store.GetPrefixedRelationName("transactions")). Where("id = ?", id). Where("reverted_at is null"). - Where("ledger = ?", store.ledger.Name). Returning("*") + query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) + if ledgerFilter != "" { + return q.Where(ledgerFilter, ledgerArgs...) + } + return q + }) if at.IsZero() { query = query. Set("reverted_at = (now() at time zone 'utc')"). @@ -234,10 +243,16 @@ func (store *Store) UpdateTransactionMetadata(ctx context.Context, id int, m met Model(&ledger.Transaction{}). ModelTableExpr(store.GetPrefixedRelationName("transactions")). Where("id = ?", id). - Where("ledger = ?", store.ledger.Name). Set("metadata = metadata || ?", m). Where("not (metadata @> ?)", m). Returning("*") + updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) + if ledgerFilter != "" { + return q.Where(ledgerFilter, ledgerArgs...) + } + return q + }) if at.IsZero() { updateQuery = updateQuery.Set("updated_at = " + store.GetPrefixedRelationName("transaction_date") + "()") } else { @@ -264,9 +279,15 @@ func (store *Store) DeleteTransactionMetadata(ctx context.Context, id int, key s ModelTableExpr(store.GetPrefixedRelationName("transactions")). Set("metadata = metadata - ?", key). Where("id = ?", id). - Where("ledger = ?", store.ledger.Name). Where("metadata -> ? is not null", key). Returning("*") + updateQuery = updateQuery.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery { + ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx) + if ledgerFilter != "" { + return q.Where(ledgerFilter, ledgerArgs...) + } + return q + }) if at.IsZero() { updateQuery = updateQuery.Set("updated_at = " + store.GetPrefixedRelationName("transaction_date") + "()") } else { diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index 5184b9d4b..1ac335f21 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -24,6 +24,7 @@ type Store interface { ListLedgers(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator @@ -57,6 +58,21 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error) return buckets, nil } +// CountLedgersInBucket returns the number of ledgers in a specific bucket. +// This is useful for determining if single-ledger optimizations can be applied. +func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) { + count, err := d.db.NewSelect(). + Model((*ledger.Ledger)(nil)). + Where("bucket = ?", bucketName). + Count(ctx) + + if err != nil { + return 0, postgres.ResolveError(err) + } + + return count, nil +} + func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { if l.Metadata == nil {