diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 1210c1053..9ec2a0d40 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -81,6 +81,10 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto return nil, postgres.ResolveError(err) } + if err := ret.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil { + logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err) + } + return ret, nil } @@ -93,6 +97,10 @@ func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Stor store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret) + if err := store.UpdateSingleLedgerState(ctx, d.systemStoreFactory.Create(d.db).CountLedgersInBucket); err != nil { + logging.FromContext(ctx).Debugf("Failed to update single-ledger state: %v", err) + } + return store, ret, err } diff --git a/internal/storage/ledger/accounts.go b/internal/storage/ledger/accounts.go index 553e0df1b..5a30b77f4 100644 --- a/internal/storage/ledger/accounts.go +++ b/internal/storage/ledger/accounts.go @@ -86,12 +86,16 @@ func (store *Store) DeleteAccountMetadata(ctx context.Context, account, key stri store.tracer, store.deleteAccountMetadataHistogram, tracing.NoResult(func(ctx context.Context) error { - _, err := store.db.NewUpdate(). + query := store.db.NewUpdate(). ModelTableExpr(store.GetPrefixedRelationName("accounts")). Set("metadata = metadata - ?", key). - Where("address = ?", account). - Where("ledger = ?", store.ledger.Name). - Exec(ctx) + Where("address = ?", account) + + if filterSQL, filterArgs := store.getLedgerFilterSQL(); filterSQL != "" { + query = query.Where(filterSQL, filterArgs...) + } + + _, err := query.Exec(ctx) return postgres.ResolveError(err) }), ) diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 471dfa02d..1e9b51fc5 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -118,14 +118,14 @@ func (store *Store) ReadLogWithIdempotencyKey(ctx context.Context, key string) ( store.readLogWithIdempotencyKeyHistogram, func(ctx context.Context) (*ledger.Log, error) { ret := &Log{} - if err := store.db.NewSelect(). + query := store.db.NewSelect(). Model(ret). ModelTableExpr(store.GetPrefixedRelationName("logs")). Column("*"). Where("idempotency_key = ?", key). - Where("ledger = ?", store.ledger.Name). - Limit(1). - Scan(ctx); err != nil { + Limit(1) + query = store.applyLedgerFilter(query, "logs") + if err := query.Scan(ctx); err != nil { return nil, postgres.ResolveError(err) } diff --git a/internal/storage/ledger/resource_accounts.go b/internal/storage/ledger/resource_accounts.go index 6ea7fa8e8..f9a6d9cb3 100644 --- a/internal/storage/ledger/resource_accounts.go +++ b/internal/storage/ledger/resource_accounts.go @@ -28,8 +28,8 @@ func (h accountsResourceHandler) Schema() common.EntitySchema { func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { ret := h.store.db.NewSelect(). ModelTableExpr(h.store.GetPrefixedRelationName("accounts")). - Column("address", "address_array", "first_usage", "insertion_date", "updated_at"). - Where("ledger = ?", h.store.ledger.Name) + Column("address", "address_array", "first_usage", "insertion_date", "updated_at") + ret = h.store.applyLedgerFilter(ret, "accounts") if opts.PIT != nil && !opts.PIT.IsZero() { ret = ret.Where("accounts.first_usage <= ?", opts.PIT) @@ -39,10 +39,10 @@ func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuild selectDistinctAccountMetadataHistories := h.store.db.NewSelect(). DistinctOn("accounts_address"). ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")). - Where("ledger = ?", h.store.ledger.Name). Column("accounts_address"). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). Where("date <= ?", opts.PIT) + selectDistinctAccountMetadataHistories = h.store.applyLedgerFilter(selectDistinctAccountMetadataHistories, "accounts_metadata") ret = ret. Join( @@ -66,8 +66,7 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o case balanceRegex.MatchString(property) || property == "balance": selectBalance := h.store.db.NewSelect(). - Where("accounts_address = dataset.address"). - Where("ledger = ?", h.store.ledger.Name) + Where("accounts_address = dataset.address") if opts.PIT != nil && !opts.PIT.IsZero() { if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { @@ -78,10 +77,12 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o DistinctOn("asset"). ColumnExpr("first_value((post_commit_effective_volumes).inputs - (post_commit_effective_volumes).outputs) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as balance"). Where("effective_date <= ?", opts.PIT) + selectBalance = h.store.applyLedgerFilter(selectBalance, "moves") } else { selectBalance = selectBalance. ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")). ColumnExpr("input - output as balance") + selectBalance = h.store.applyLedgerFilter(selectBalance, "accounts_volumes") } if balanceRegex.MatchString(property) { @@ -128,8 +129,8 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property selectRowsQuery = selectRowsQuery. ModelTableExpr(h.store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - Where("ledger = ?", h.store.ledger.Name) + Column("accounts_address", "asset") + selectRowsQuery = h.store.applyLedgerFilter(selectRowsQuery, "moves") if property == "volumes" { selectRowsQuery = selectRowsQuery. ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes"). @@ -143,8 +144,8 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property selectRowsQuery = selectRowsQuery. ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). - ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes"). - Where("ledger = ?", h.store.ledger.Name) + ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes") + selectRowsQuery = h.store.applyLedgerFilter(selectRowsQuery, "accounts_volumes") } return h.store.db.NewSelect(). diff --git a/internal/storage/ledger/resource_aggregated_balances.go b/internal/storage/ledger/resource_aggregated_balances.go index 275a26e0e..4d3c28534 100644 --- a/internal/storage/ledger/resource_aggregated_balances.go +++ b/internal/storage/ledger/resource_aggregated_balances.go @@ -26,8 +26,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R ret := h.store.db.NewSelect(). ModelTableExpr(h.store.GetPrefixedRelationName("moves")). DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - Where("ledger = ?", h.store.ledger.Name) + Column("accounts_address", "asset") + ret = h.store.applyLedgerFilter(ret, "moves") if query.Opts.UseInsertionDate { if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") { return nil, NewErrMissingFeature(features.FeatureMovesHistory) @@ -52,8 +52,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R subQuery := h.store.db.NewSelect(). TableExpr(h.store.GetPrefixedRelationName("accounts")). Column("address_array"). - Where("accounts.address = accounts_address"). - Where("ledger = ?", h.store.ledger.Name) + Where("accounts.address = accounts_address") + subQuery = h.store.applyLedgerFilter(subQuery, "accounts") ret = ret. ColumnExpr("accounts.address_array as accounts_address_array"). @@ -65,9 +65,9 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R DistinctOn("accounts_address"). ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). - Where("ledger = ?", h.store.ledger.Name). Where("accounts_metadata.accounts_address = moves.accounts_address"). Where("date <= ?", query.PIT) + subQuery = h.store.applyLedgerFilter(subQuery, "accounts_metadata") ret = ret. Join(`left join lateral (?) accounts_metadata on true`, subQuery). @@ -79,8 +79,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R ret := h.store.db.NewSelect(). ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")). Column("asset", "accounts_address"). - ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes"). - Where("ledger = ?", h.store.ledger.Name) + ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes") + ret = h.store.applyLedgerFilter(ret, "accounts_volumes") if query.UseFilter("metadata") || query.UseFilter("address", func(value any) bool { return isPartialAddress(value.(string)) @@ -88,8 +88,8 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R subQuery := h.store.db.NewSelect(). TableExpr(h.store.GetPrefixedRelationName("accounts")). Column("address"). - Where("ledger = ?", h.store.ledger.Name). Where("accounts.address = accounts_address") + subQuery = h.store.applyLedgerFilter(subQuery, "accounts") if query.UseFilter("address") { subQuery = subQuery.ColumnExpr("address_array as accounts_address_array") diff --git a/internal/storage/ledger/resource_logs.go b/internal/storage/ledger/resource_logs.go index 06d0135b2..bf53653a7 100644 --- a/internal/storage/ledger/resource_logs.go +++ b/internal/storage/ledger/resource_logs.go @@ -21,10 +21,11 @@ func (h logsResourceHandler) Schema() common.EntitySchema { } func (h logsResourceHandler) BuildDataset(_ common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) { - return h.store.db.NewSelect(). + ret := h.store.db.NewSelect(). ModelTableExpr(h.store.GetPrefixedRelationName("logs")). - ColumnExpr("*"). - Where("ledger = ?", h.store.ledger.Name), nil + ColumnExpr("*") + ret = h.store.applyLedgerFilter(ret, "logs") + return ret, nil } func (h logsResourceHandler) ResolveFilter(_ common.ResourceQuery[any], operator, property string, value any) (string, []any, error) { diff --git a/internal/storage/ledger/resource_transactions.go b/internal/storage/ledger/resource_transactions.go index 11c656bc9..66b5704b3 100644 --- a/internal/storage/ledger/resource_transactions.go +++ b/internal/storage/ledger/resource_transactions.go @@ -44,8 +44,8 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB "destinations", "sources_arrays", "destinations_arrays", - ). - Where("ledger = ?", h.store.ledger.Name) + ) + ret = h.store.applyLedgerFilter(ret, "transactions") if slices.Contains(opts.Expand, "volumes") { ret = ret.Column("post_commit_volumes") @@ -59,10 +59,10 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB selectDistinctTransactionMetadataHistories := h.store.db.NewSelect(). DistinctOn("transactions_id"). ModelTableExpr(h.store.GetPrefixedRelationName("transactions_metadata")). - Where("ledger = ?", h.store.ledger.Name). Column("transactions_id", "metadata"). Order("transactions_id", "revision desc"). Where("date <= ?", opts.PIT) + selectDistinctTransactionMetadataHistories = h.store.applyLedgerFilter(selectDistinctTransactionMetadataHistories, "transactions_metadata") ret = ret. Join( @@ -124,19 +124,21 @@ func (h transactionsResourceHandler) Expand(_ common.ResourceQuery[any], propert return nil, nil, nil } + innerMostQuery := h.store.db.NewSelect(). + DistinctOn("transactions_id, accounts_address, asset"). + ModelTableExpr(h.store.GetPrefixedRelationName("moves")). + Column("transactions_id", "accounts_address", "asset"). + ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). + Where("transactions_id in (select id from dataset)") + innerMostQuery = h.store.applyLedgerFilter(innerMostQuery, "moves") + ret := h.store.db.NewSelect(). TableExpr( "(?) data", h.store.db.NewSelect(). TableExpr( "(?) moves", - h.store.db.NewSelect(). - DistinctOn("transactions_id, accounts_address, asset"). - ModelTableExpr(h.store.GetPrefixedRelationName("moves")). - Column("transactions_id", "accounts_address", "asset"). - ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`). - Where("ledger = ?", h.store.ledger.Name). - Where("transactions_id in (select id from dataset)"), + innerMostQuery, ). Column("transactions_id", "accounts_address"). ColumnExpr(`public.aggregate_objects(json_build_object(moves.asset, json_build_object('input', (moves.post_commit_effective_volumes).inputs, 'output', (moves.post_commit_effective_volumes).outputs))::jsonb) AS post_commit_effective_volumes`). diff --git a/internal/storage/ledger/resource_volumes.go b/internal/storage/ledger/resource_volumes.go index 403cf2b73..32d26a37d 100644 --- a/internal/storage/ledger/resource_volumes.go +++ b/internal/storage/ledger/resource_volumes.go @@ -39,15 +39,15 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild ColumnExpr("input - output as balance"). ColumnExpr("accounts_address as account"). ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")). - Where("ledger = ?", h.store.ledger.Name). Order("accounts_address", "asset") + selectVolumes = h.store.applyLedgerFilter(selectVolumes, "accounts_volumes") if query.UseFilter("metadata") || query.UseFilter("first_usage") || needAddressSegments { accountsQuery := h.store.db.NewSelect(). TableExpr(h.store.GetPrefixedRelationName("accounts")). Column("address"). - Where("ledger = ?", h.store.ledger.Name). Where("accounts.address = accounts_address") + accountsQuery = h.store.applyLedgerFilter(accountsQuery, "accounts") if needAddressSegments { accountsQuery = accountsQuery.ColumnExpr("address_array as account_array") @@ -77,9 +77,9 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild ColumnExpr("sum(case when is_source then amount else 0 end) as output"). ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). ModelTableExpr(h.store.GetPrefixedRelationName("moves")). - Where("ledger = ?", h.store.ledger.Name). GroupExpr("accounts_address, asset"). Order("accounts_address", "asset") + selectVolumes = h.store.applyLedgerFilter(selectVolumes, "moves") dateFilterColumn := "effective_date" if query.Opts.UseInsertionDate { @@ -97,8 +97,8 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild if needAddressSegments || query.UseFilter("first_usage") { accountsQuery := h.store.db.NewSelect(). TableExpr(h.store.GetPrefixedRelationName("accounts")). - Where("accounts.address = accounts_address"). - Where("ledger = ?", h.store.ledger.Name) + Where("accounts.address = accounts_address") + accountsQuery = h.store.applyLedgerFilter(accountsQuery, "accounts") if needAddressSegments { accountsQuery = accountsQuery.ColumnExpr("address_array") @@ -116,8 +116,8 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild DistinctOn("accounts_address"). ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")). ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata"). - Where("ledger = ?", h.store.ledger.Name). Where("accounts_metadata.accounts_address = moves.accounts_address") + subQuery = h.store.applyLedgerFilter(subQuery, "accounts_metadata") selectVolumes = selectVolumes. Join(`left join lateral (?) accounts_metadata on true`, subQuery). diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index fed1ffb2f..ff0ae92c3 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "sync" "github.com/formancehq/go-libs/v3/bun/bunpaginate" "github.com/formancehq/go-libs/v3/migrations" "github.com/formancehq/go-libs/v3/platform/postgres" @@ -19,11 +20,18 @@ import ( "github.com/uptrace/bun" ) +type singleLedgerOptimization struct { + mu sync.RWMutex + enabled bool +} + type Store struct { db bun.IDB bucket bucket.Bucket ledger ledger.Ledger + singleLedgerCache *singleLedgerOptimization + tracer trace.Tracer meter metric.Meter checkBucketSchemaHistogram metric.Int64Histogram @@ -165,9 +173,10 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store { ret := &Store{ - db: db, - ledger: l, - bucket: bucket, + db: db, + ledger: l, + bucket: bucket, + singleLedgerCache: &singleLedgerOptimization{enabled: false}, } for _, opt := range append(defaultOptions, opts...) { opt(ret) @@ -267,6 +276,39 @@ func (store *Store) WithDB(db bun.IDB) *Store { return &ret } +func (store *Store) isSingleLedger() bool { + store.singleLedgerCache.mu.RLock() + defer store.singleLedgerCache.mu.RUnlock() + return store.singleLedgerCache.enabled +} + +func (store *Store) applyLedgerFilter(query *bun.SelectQuery, tableAlias string) *bun.SelectQuery { + if store.isSingleLedger() { + return query + } + return query.Where(tableAlias+".ledger = ?", store.ledger.Name) +} + +func (store *Store) getLedgerFilterSQL() (string, []any) { + if store.isSingleLedger() { + return "", nil + } + return "ledger = ?", []any{store.ledger.Name} +} + +func (store *Store) UpdateSingleLedgerState(ctx context.Context, countFunc func(ctx context.Context, bucketName string) (int, error)) error { + count, err := countFunc(ctx, store.ledger.Bucket) + if err != nil { + return fmt.Errorf("failed to count ledgers in bucket: %w", err) + } + + store.singleLedgerCache.mu.Lock() + defer store.singleLedgerCache.mu.Unlock() + store.singleLedgerCache.enabled = (count == 1) + + return nil +} + type Option func(s *Store) func WithMeter(meter metric.Meter) Option { diff --git a/internal/storage/ledger/transactions.go b/internal/storage/ledger/transactions.go index c2a1eab96..88813fc89 100644 --- a/internal/storage/ledger/transactions.go +++ b/internal/storage/ledger/transactions.go @@ -181,6 +181,13 @@ func (store *Store) updateTxWithRetrieve(ctx context.Context, id uint64, query * } me := &modifiedEntity{} + fallbackQuery := store.db.NewSelect(). + ModelTableExpr(store.GetPrefixedRelationName("transactions")). + ColumnExpr("*, false as modified"). + Where("id = ?", id). + Limit(1) + fallbackQuery = store.applyLedgerFilter(fallbackQuery, "transactions") + err := store.db.NewSelect(). With("upd", query). ModelTableExpr( @@ -188,13 +195,7 @@ func (store *Store) updateTxWithRetrieve(ctx context.Context, id uint64, query * store.db.NewSelect(). ColumnExpr("upd.*, true as modified"). ModelTableExpr("upd"). - UnionAll( - store.db.NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("transactions")). - ColumnExpr("*, false as modified"). - Where("id = ? and ledger = ?", id, store.ledger.Name). - Limit(1), - ), + UnionAll(fallbackQuery), ). Model(me). ColumnExpr("*"). @@ -216,8 +217,11 @@ func (store *Store) RevertTransaction(ctx context.Context, id uint64, at time.Ti ModelTableExpr(store.GetPrefixedRelationName("transactions")). Where("id = ?", id). Where("reverted_at is null"). - Where("ledger = ?", store.ledger.Name). Returning("*") + + if filterSQL, filterArgs := store.getLedgerFilterSQL(); filterSQL != "" { + query = query.Where(filterSQL, filterArgs...) + } if at.IsZero() { query = query. Set("reverted_at = " + store.GetPrefixedRelationName("transaction_date") + "()"). @@ -247,10 +251,13 @@ func (store *Store) UpdateTransactionMetadata(ctx context.Context, id uint64, m Model(&ledger.Transaction{}). ModelTableExpr(store.GetPrefixedRelationName("transactions")). Where("id = ?", id). - Where("ledger = ?", store.ledger.Name). Set("metadata = metadata || ?", m). Where("not (metadata @> ?)", m). Returning("*") + + if filterSQL, filterArgs := store.getLedgerFilterSQL(); filterSQL != "" { + updateQuery = updateQuery.Where(filterSQL, filterArgs...) + } if at.IsZero() { updateQuery = updateQuery.Set("updated_at = " + store.GetPrefixedRelationName("transaction_date") + "()") } else { @@ -277,9 +284,12 @@ func (store *Store) DeleteTransactionMetadata(ctx context.Context, id uint64, ke ModelTableExpr(store.GetPrefixedRelationName("transactions")). Set("metadata = metadata - ?", key). Where("id = ?", id). - Where("ledger = ?", store.ledger.Name). Where("metadata -> ? is not null", key). Returning("*") + + if filterSQL, filterArgs := store.getLedgerFilterSQL(); filterSQL != "" { + updateQuery = updateQuery.Where(filterSQL, filterArgs...) + } if at.IsZero() { updateQuery = updateQuery.Set("updated_at = " + store.GetPrefixedRelationName("transaction_date") + "()") } else { diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index 264c4b58f..13ad1b5e6 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -24,6 +24,7 @@ type Store interface { Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload] GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator @@ -61,6 +62,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error) return buckets, nil } +func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucketName string) (int, error) { + count, err := d.db.NewSelect(). + Model(&ledger.Ledger{}). + Where("bucket = ?", bucketName). + Count(ctx) + if err != nil { + return 0, fmt.Errorf("counting ledgers in bucket: %w", postgres.ResolveError(err)) + } + return count, nil +} + func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { if l.Metadata == nil {