Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions internal/storage/driver/buckets_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/storage/driver/ledger_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 16 additions & 6 deletions internal/storage/driver/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ func NewFXModule() fx.Option {
&ledger.Ledger{},
)
}),
fx.Provide(func(tracerProvider trace.TracerProvider) systemstore.StoreFactory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to recreate the system store.
The system store is already existing in the *Driver service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Justification :

  • The SystemStoreFactory must be injected into the ledgerstore.Factory to provide the CountLedgersInBucket function
  • The Create(params.DB) call in the closure does not really "recreate" a heavy store - it is just a wrapper struct around the DB
  • The context needed for CountLedgersInBucket is only available at runtime, so the closure is necessary

return systemstore.NewStoreFactory(systemstore.WithTracer(
tracerProvider.Tracer("SystemStore"),
))
}),
fx.Provide(func(params struct {
fx.In

DB *bun.DB
TracerProvider trace.TracerProvider `optional:"true"`
MeterProvider metric.MeterProvider `optional:"true"`
DB *bun.DB
SystemStoreFactory systemstore.StoreFactory
TracerProvider trace.TracerProvider `optional:"true"`
MeterProvider metric.MeterProvider `optional:"true"`
}) ledgerstore.Factory {
options := make([]ledgerstore.Option, 0)
if params.TracerProvider != nil {
Expand All @@ -45,21 +51,25 @@ func NewFXModule() fx.Option {
if params.MeterProvider != nil {
options = append(options, ledgerstore.WithMeter(params.MeterProvider.Meter("store")))
}
options = append(options, ledgerstore.WithCountLedgersInBucketFunc(
func(ctx context.Context, bucketName string) (int, error) {
return params.SystemStoreFactory.Create(params.DB).CountLedgersInBucket(ctx, bucketName)
},
))
return ledgerstore.NewFactory(params.DB, options...)
}),
fx.Provide(func(
db *bun.DB,
bucketFactory bucket.Factory,
ledgerStoreFactory ledgerstore.Factory,
systemStoreFactory systemstore.StoreFactory,
tracerProvider trace.TracerProvider,
) (*Driver, error) {
return New(
db,
ledgerStoreFactory,
bucketFactory,
systemstore.NewStoreFactory(systemstore.WithTracer(
tracerProvider.Tracer("SystemStore"),
)),
systemStoreFactory,
WithTracer(tracerProvider.Tracer("StorageDriver")),
), nil
}),
Expand Down
18 changes: 18 additions & 0 deletions internal/storage/driver/system_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions internal/storage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
. "github.com/formancehq/go-libs/v2/collectionutils"
"github.com/formancehq/ledger/internal/tracing"
"github.com/uptrace/bun"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"regexp"
Expand Down Expand Up @@ -77,12 +78,18 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri
store.tracer,
store.deleteAccountMetadataHistogram,
tracing.NoResult(func(ctx context.Context) error {
_, err := store.db.NewUpdate().
query := store.db.NewUpdate().
ModelTableExpr(store.GetPrefixedRelationName("accounts")).
Set("metadata = metadata - ?", key).
Where("address = ?", account).
Where("ledger = ?", store.ledger.Name).
Exec(ctx)
Where("address = ?", account)
query = query.WhereGroup(" AND ", func(q *bun.UpdateQuery) *bun.UpdateQuery {
ledgerFilter, ledgerArgs := store.getLedgerFilterSQL(ctx)
if ledgerFilter != "" {
return q.Where(ledgerFilter, ledgerArgs...)
}
return q
})
_, err := query.Exec(ctx)
return postgres.ResolveError(err)
}),
)
Expand Down
8 changes: 4 additions & 4 deletions internal/storage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) (
store.readLogWithIdempotencyKeyHistogram,
func(ctx context.Context) (*ledger.Log, error) {
ret := &Log{}
if err := store.db.NewSelect().
query := store.db.NewSelect().
Model(ret).
ModelTableExpr(store.GetPrefixedRelationName("logs")).
Column("*").
Where("idempotency_key = ?", key).
Where("ledger = ?", store.ledger.Name).
Limit(1).
Scan(ctx); err != nil {
Limit(1)
query = store.applyLedgerFilter(ctx, query, "logs")
if err := query.Scan(ctx); err != nil {
return nil, postgres.ResolveError(err)
}

Expand Down
30 changes: 15 additions & 15 deletions internal/storage/ledger/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ func (ctx repositoryHandlerBuildContext[Opts]) useFilter(v string, matchers ...f

type repositoryHandler[Opts any] interface {
filters() []filter
buildDataset(store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error)
resolveFilter(store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error)
project(store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error)
expand(store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error)
buildDataset(ctx context.Context, store *Store, query repositoryHandlerBuildContext[Opts]) (*bun.SelectQuery, error)
resolveFilter(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], operator, property string, value any) (string, []any, error)
project(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], selectQuery *bun.SelectQuery) (*bun.SelectQuery, error)
expand(ctx context.Context, store *Store, query ledgercontroller.ResourceQuery[Opts], property string) (*bun.SelectQuery, *joinCondition, error)
}

type resourceRepository[ResourceType, OptionsType any] struct {
Expand Down Expand Up @@ -151,14 +151,14 @@ func (r *resourceRepository[ResourceType, OptionsType]) validateFilters(builder
return ret, nil
}

func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {
func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(ctx context.Context, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {

filters, err := r.validateFilters(q.Builder)
if err != nil {
return nil, err
}

dataset, err := r.resourceHandler.buildDataset(r.store, repositoryHandlerBuildContext[OptionsType]{
dataset, err := r.resourceHandler.buildDataset(ctx, r.store, repositoryHandlerBuildContext[OptionsType]{
ResourceQuery: q,
filters: filters,
})
Expand All @@ -172,7 +172,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l
if q.Builder != nil {
// Convert filters to where clause
where, args, err := q.Builder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) {
return r.resourceHandler.resolveFilter(r.store, q, operator, key, value)
return r.resourceHandler.resolveFilter(ctx, r.store, q, operator, key, value)
}))
if err != nil {
return nil, err
Expand All @@ -184,10 +184,10 @@ func (r *resourceRepository[ResourceType, OptionsType]) buildFilteredDataset(q l
}
}

return r.resourceHandler.project(r.store, q, dataset)
return r.resourceHandler.project(ctx, r.store, q, dataset)
}

func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {
func (r *resourceRepository[ResourceType, OptionsType]) expand(ctx context.Context, dataset *bun.SelectQuery, q ledgercontroller.ResourceQuery[OptionsType]) (*bun.SelectQuery, error) {
dataset = r.store.db.NewSelect().
With("dataset", dataset).
ModelTableExpr("dataset").
Expand All @@ -196,7 +196,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele
slices.Sort(q.Expand)

for i, expand := range q.Expand {
selectQuery, joinCondition, err := r.resourceHandler.expand(r.store, q, expand)
selectQuery, joinCondition, err := r.resourceHandler.expand(ctx, r.store, q, expand)
if err != nil {
return nil, err
}
Expand All @@ -222,12 +222,12 @@ func (r *resourceRepository[ResourceType, OptionsType]) expand(dataset *bun.Sele

func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (*ResourceType, error) {

finalQuery, err := r.buildFilteredDataset(query)
finalQuery, err := r.buildFilteredDataset(ctx, query)
if err != nil {
return nil, err
}

finalQuery, err = r.expand(finalQuery, query)
finalQuery, err = r.expand(ctx, finalQuery, query)
if err != nil {
return nil, err
}
Expand All @@ -248,7 +248,7 @@ func (r *resourceRepository[ResourceType, OptionsType]) GetOne(ctx context.Conte

func (r *resourceRepository[ResourceType, OptionsType]) Count(ctx context.Context, query ledgercontroller.ResourceQuery[OptionsType]) (int, error) {

finalQuery, err := r.buildFilteredDataset(query)
finalQuery, err := r.buildFilteredDataset(ctx, query)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT
panic("should not happen")
}

finalQuery, err := r.buildFilteredDataset(resourceQuery)
finalQuery, err := r.buildFilteredDataset(ctx, resourceQuery)
if err != nil {
return nil, fmt.Errorf("building filtered dataset: %w", err)
}
Expand All @@ -299,7 +299,7 @@ func (r *paginatedResourceRepository[ResourceType, OptionsType, PaginationQueryT
return nil, fmt.Errorf("paginating request: %w", err)
}

finalQuery, err = r.expand(finalQuery, resourceQuery)
finalQuery, err = r.expand(ctx, finalQuery, resourceQuery)
if err != nil {
return nil, fmt.Errorf("expanding results: %w", err)
}
Expand Down
Loading
Loading