diff --git a/funding.go b/funding.go index b9dc8fee..6e8155a4 100644 --- a/funding.go +++ b/funding.go @@ -118,22 +118,14 @@ func (w *wallet) Balance(ctx context.Context) (*types.Balance, error) { }, nil } -func (w *wallet) ListSpendableVtxos(ctx context.Context) ([]clienttypes.Vtxo, error) { - if err := w.safeCheck(); err != nil { - return nil, err - } - - return w.store.VtxoStore().GetSpendableVtxos(ctx) -} - func (w *wallet) ListVtxos( - ctx context.Context, -) ([]clienttypes.Vtxo, []clienttypes.Vtxo, error) { + ctx context.Context, page types.Page, filter types.VtxoFilter, +) ([]clienttypes.Vtxo, error) { if err := w.safeCheck(); err != nil { - return nil, nil, err + return nil, err } - return w.store.VtxoStore().GetAllVtxos(ctx) + return w.store.VtxoStore().GetVtxos(ctx, page, filter) } func (w *wallet) NotifyIncomingFunds( @@ -157,7 +149,7 @@ func (w *wallet) newOffchainAddress(ctx context.Context) (string, error) { func (w *wallet) getOffchainBalance( ctx context.Context, ) (*types.OffchainBalance, map[string]uint64, error) { - vtxos, _, err := w.store.VtxoStore().GetAllVtxos(ctx) + vtxos, err := w.store.VtxoStore().GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) if err != nil { return nil, nil, err } @@ -170,10 +162,6 @@ func (w *wallet) getOffchainBalance( amountByExpiration = make(map[int64]uint64) ) for _, vtxo := range vtxos { - if vtxo.Spent || vtxo.Unrolled { - continue - } - // Classify VTXO by state. Priority: Recoverable > Preconfirmed > default. switch { case vtxo.IsRecoverable(): diff --git a/init.go b/init.go index a965b2d1..ef7d4c64 100644 --- a/init.go +++ b/init.go @@ -216,7 +216,8 @@ func (w *wallet) scheduleNextSettlement() { nextSettlement := w.scheduler.GetTaskScheduledAt() - vtxos, err := w.store.VtxoStore().GetSpendableVtxos(context.Background()) + vtxos, err := w.store.VtxoStore(). + GetVtxos(context.Background(), types.Page{}, types.VtxoFilterSpendable) if err != nil { log.WithError(err).Warn("failed to get spendable vtxos while scheduling next settlement") return diff --git a/sdk.go b/sdk.go index da3606bf..d5a114b2 100644 --- a/sdk.go +++ b/sdk.go @@ -84,8 +84,11 @@ type Wallet interface { CompleteUnroll(ctx context.Context, to string) (string, error) OnboardAgainAllExpiredBoardings(ctx context.Context) (string, error) WithdrawFromAllExpiredBoardings(ctx context.Context, to string) (string, error) - ListVtxos(ctx context.Context) (spendable, spent []clienttypes.Vtxo, err error) - ListSpendableVtxos(ctx context.Context) ([]clienttypes.Vtxo, error) + ListVtxos( + ctx context.Context, + page types.Page, + filter types.VtxoFilter, + ) ([]clienttypes.Vtxo, error) Dump(ctx context.Context) (seed string, err error) GetTransactionHistory(ctx context.Context) ([]clienttypes.Transaction, error) GetTransactionEventChannel(ctx context.Context) <-chan types.TransactionEvent diff --git a/send.go b/send.go index dac73da3..a45e9bfe 100644 --- a/send.go +++ b/send.go @@ -73,7 +73,8 @@ func (w *wallet) getSpendableVtxos( ctx context.Context, withRecoverable bool, ) ([]clienttypes.VtxoWithTapTree, error) { w.dbMu.Lock() - spendableVtxos, err := w.store.VtxoStore().GetSpendableVtxos(ctx) + spendableVtxos, err := w.store.VtxoStore(). + GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) w.dbMu.Unlock() if err != nil { return nil, err diff --git a/store/service_test.go b/store/service_test.go index da982e58..dd96c87b 100644 --- a/store/service_test.go +++ b/store/service_test.go @@ -1,6 +1,7 @@ package store_test import ( + "fmt" "testing" "time" @@ -581,10 +582,9 @@ func testVtxoStore(t *testing.T, storeSvc types.VtxoStore, storeType string) { }() t.Run("add vtxos", func(t *testing.T) { - spendable, spent, err := storeSvc.GetAllVtxos(ctx) + all, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterAll) require.NoError(t, err) - require.Empty(t, spendable) - require.Empty(t, spent) + require.Empty(t, all) count, err := storeSvc.AddVtxos(ctx, testVtxos) require.NoError(t, err) @@ -594,13 +594,12 @@ func testVtxoStore(t *testing.T, storeSvc types.VtxoStore, storeType string) { require.NoError(t, err) require.Zero(t, count) - spendable, spent, err = storeSvc.GetAllVtxos(ctx) + all, err = storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterAll) require.NoError(t, err) - require.Len(t, spendable, len(testVtxos)) - require.Empty(t, spent) - requireVtxosListEqual(t, testVtxos, spendable) + require.Len(t, all, len(testVtxos)) + requireVtxosListEqual(t, testVtxos, all) - spendable, err = storeSvc.GetSpendableVtxos(ctx) + spendable, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) require.NoError(t, err) require.Len(t, spendable, len(testVtxos)) for _, v := range spendable { @@ -609,11 +608,15 @@ func testVtxoStore(t *testing.T, storeSvc types.VtxoStore, storeType string) { } requireVtxosListEqual(t, testVtxos, spendable) - vtxos, err := storeSvc.GetVtxos(ctx, testVtxoKeys) + spent, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpent) + require.NoError(t, err) + require.Empty(t, spent) + + vtxos, err := storeSvc.GetVtxosByOutpoint(ctx, testVtxoKeys) require.NoError(t, err) requireVtxosListEqual(t, testVtxos, vtxos) - vtxos, err = storeSvc.GetVtxos(ctx, []clientTypes.Outpoint{ + vtxos, err = storeSvc.GetVtxosByOutpoint(ctx, []clientTypes.Outpoint{ {Txid: "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", VOut: 0}, }) require.NoError(t, err) @@ -629,17 +632,20 @@ func testVtxoStore(t *testing.T, storeSvc types.VtxoStore, storeType string) { require.NoError(t, err) require.Zero(t, count) - spendable, spent, err := storeSvc.GetAllVtxos(ctx) + all, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterAll) + require.NoError(t, err) + require.Equal(t, 4, len(all)) + + spent, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpent) require.NoError(t, err) require.Equal(t, 1, len(spent)) - require.Equal(t, 3, len(spendable)) for _, v := range spent { require.True(t, v.Spent) require.Equal(t, testSpendVtxoKeys[v.Outpoint], v.SpentBy) require.Equal(t, arkTxid, v.ArkTxid) } - spendable, err = storeSvc.GetSpendableVtxos(ctx) + spendable, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) require.NoError(t, err) require.Len(t, spendable, 3) for _, v := range spendable { @@ -656,10 +662,9 @@ func testVtxoStore(t *testing.T, storeSvc types.VtxoStore, storeType string) { require.NoError(t, err) require.Zero(t, count) - spendable, spent, err := storeSvc.GetAllVtxos(ctx) + spent, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpent) require.NoError(t, err) require.Len(t, spent, 2) - require.Len(t, spendable, 2) for _, v := range spent { require.True(t, v.Spent) testSettleBy, ok := testSettleVtxoKeys[v.Outpoint] @@ -668,6 +673,10 @@ func testVtxoStore(t *testing.T, storeSvc types.VtxoStore, storeType string) { require.Equal(t, settledBy, v.SettledBy) } } + + spendable, err := storeSvc.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) + require.NoError(t, err) + require.Len(t, spendable, 2) }) } @@ -803,6 +812,363 @@ func testAssetStore(t *testing.T, storeSvc types.AssetStore) { require.Nil(t, asset) } +func TestVtxoPagination(t *testing.T) { + svc, err := store.NewStore(store.Config{ + StoreType: types.SQLStore, + Args: t.TempDir(), + }) + require.NoError(t, err) + defer svc.Close() + + ctx := t.Context() + vtxoStore := svc.VtxoStore() + + // Insert 22 spendable VTXOs with distinct created_at values. + // created_at goes from 1000 (oldest, index 0) to 22000 (newest, index 21). + // SQL orders by created_at DESC, so page 1 should contain the newest VTXOs. + const totalVtxos = 22 + paginationVtxos := make([]clientTypes.Vtxo, totalVtxos) + for i := range totalVtxos { + paginationVtxos[i] = clientTypes.Vtxo{ + Outpoint: clientTypes.Outpoint{ + Txid: fmt.Sprintf("%064x", i+1), + VOut: 0, + }, + Script: "aaaa", + Amount: uint64((i + 1) * 1000), + CommitmentTxids: []string{"commitmentaaa"}, + ExpiresAt: time.Unix(1800000000, 0), + CreatedAt: time.Unix(int64(1000*(i+1)), 0), + } + } + count, err := vtxoStore.AddVtxos(ctx, paginationVtxos) + require.NoError(t, err) + require.Equal(t, totalVtxos, count) + + // Helper: collect created_at unix timestamps from a VTXO slice. + createdAts := func(vtxos []clientTypes.Vtxo) []int64 { + out := make([]int64, len(vtxos)) + for i, v := range vtxos { + out[i] = v.CreatedAt.Unix() + } + return out + } + + // Helper: collect outpoint txids from a VTXO slice. + outpointTxids := func(vtxos []clientTypes.Vtxo) map[string]bool { + out := make(map[string]bool, len(vtxos)) + for _, v := range vtxos { + out[v.Txid] = true + } + return out + } + + t.Run("Page{} returns ALL vtxos", func(t *testing.T) { + all, err := vtxoStore.GetVtxos(ctx, types.Page{}, types.VtxoFilterAll) + require.NoError(t, err) + require.Len(t, all, totalVtxos) + + spendable, err := vtxoStore.GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) + require.NoError(t, err) + require.Len(t, spendable, totalVtxos) + }) + + t.Run("Page{1,5} returns the 5 newest VTXOs", func(t *testing.T) { + page1, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Len(t, page1, 5) + + // The 5 newest VTXOs have created_at = 22000, 21000, 20000, 19000, 18000. + for _, v := range page1 { + require.GreaterOrEqual(t, v.CreatedAt.Unix(), int64(18000), + "page 1 VTXO created_at=%d should be >= 18000", v.CreatedAt.Unix()) + } + txids := outpointTxids(page1) + for i := 18; i <= 22; i++ { + txid := fmt.Sprintf("%064x", i) + require.True( + t, + txids[txid], + "page 1 should contain VTXO with index %d (created_at=%d)", + i, + i*1000, + ) + } + }) + + t.Run("Page{2,5} returns next 5, all older than page 1", func(t *testing.T) { + page1, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + + page2, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 2, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Len(t, page2, 5) + + // Every VTXO on page 2 must have created_at < every VTXO on page 1. + page1Timestamps := createdAts(page1) + page2Timestamps := createdAts(page2) + var minPage1 = page1Timestamps[0] + for _, ts := range page1Timestamps { + if ts < minPage1 { + minPage1 = ts + } + } + for _, ts := range page2Timestamps { + require.Less(t, ts, minPage1, + "page 2 VTXO created_at=%d must be < min page 1 created_at=%d", ts, minPage1) + } + + // Page 2 should contain VTXOs with created_at = 17000..13000. + txids := outpointTxids(page2) + for i := 13; i <= 17; i++ { + txid := fmt.Sprintf("%064x", i) + require.True(t, txids[txid], "page 2 should contain VTXO with index %d", i) + } + + // No overlap between page 1 and page 2. + page1Txids := outpointTxids(page1) + for txid := range txids { + require.False(t, page1Txids[txid], "page 1 and page 2 must not overlap (txid=%s)", txid) + } + }) + + t.Run("Page{4,5} returns 5 VTXOs from the 4th page", func(t *testing.T) { + // 22 VTXOs, page size 5, ordered by created_at DESC: + // Page 1 (offset 0): 22000, 21000, 20000, 19000, 18000 + // Page 2 (offset 5): 17000, 16000, 15000, 14000, 13000 + // Page 3 (offset 10): 12000, 11000, 10000, 9000, 8000 + // Page 4 (offset 15): 7000, 6000, 5000, 4000, 3000 + // Page 5 (offset 20): 2000, 1000 <-- partial page + page4, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 4, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Len(t, page4, 5) + + txids := outpointTxids(page4) + for i := 3; i <= 7; i++ { + txid := fmt.Sprintf("%064x", i) + require.True( + t, + txids[txid], + "page 4 should contain VTXO with index %d (created_at=%d)", + i, + i*1000, + ) + } + }) + + t.Run("Page{5,5} returns last partial page with 2 VTXOs", func(t *testing.T) { + page5, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 5, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Len(t, page5, 2) + + // These should be the 2 oldest VTXOs: created_at 2000, 1000. + txids := outpointTxids(page5) + for i := 1; i <= 2; i++ { + txid := fmt.Sprintf("%064x", i) + require.True( + t, + txids[txid], + "last page should contain VTXO with index %d (created_at=%d)", + i, + i*1000, + ) + } + }) + + t.Run("Page{6,5} beyond last page returns empty", func(t *testing.T) { + beyond, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 6, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Empty(t, beyond) + }) + + t.Run("spendable filter pagination with ordering", func(t *testing.T) { + page1, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 5}, + types.VtxoFilterSpendable, + ) + require.NoError(t, err) + require.Len(t, page1, 5) + + page2, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 2, PageSize: 5}, + types.VtxoFilterSpendable, + ) + require.NoError(t, err) + require.Len(t, page2, 5) + + // Every VTXO on page 2 must be older than every VTXO on page 1. + page1Timestamps := createdAts(page1) + page2Timestamps := createdAts(page2) + var minPage1 = page1Timestamps[0] + for _, ts := range page1Timestamps { + if ts < minPage1 { + minPage1 = ts + } + } + for _, ts := range page2Timestamps { + require.Less( + t, + ts, + minPage1, + "spendable filter: page 2 created_at=%d must be < min page 1 created_at=%d", + ts, + minPage1, + ) + } + + // Beyond last page returns empty. + beyondPage, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 999, PageSize: 5}, + types.VtxoFilterSpendable, + ) + require.NoError(t, err) + require.Empty(t, beyondPage) + }) + + t.Run("MaxPageSize clamping", func(t *testing.T) { + all, err := vtxoStore.GetVtxos(ctx, types.Page{ + PageNum: 1, PageSize: types.MaxPageSize + 100, + }, types.VtxoFilterAll) + require.NoError(t, err) + // Clamped to MaxPageSize=200, but only 22 exist. + require.Len(t, all, totalVtxos) + }) + + t.Run("PageNum 0 treated as page 1", func(t *testing.T) { + page0, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 0, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + page1, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + + require.Len(t, page0, 5) + require.Len(t, page1, 5) + // Both should contain the exact same set of outpoints. + page0Txids := outpointTxids(page0) + page1Txids := outpointTxids(page1) + require.Equal(t, page0Txids, page1Txids) + }) + + t.Run("multi-asset VTXO counts as 1", func(t *testing.T) { + multiAssetVtxo := clientTypes.Vtxo{ + Outpoint: clientTypes.Outpoint{ + Txid: fmt.Sprintf("%064x", 100), + VOut: 0, + }, + Script: "bbbb", + Amount: 9000, + CommitmentTxids: []string{"commitmentbbb"}, + ExpiresAt: time.Unix(1800000000, 0), + CreatedAt: time.Unix(100000, 0), + Assets: []clientTypes.Asset{testVtxoAsset1, testVtxoAsset2}, + } + n, err := vtxoStore.AddVtxos(ctx, []clientTypes.Vtxo{multiAssetVtxo}) + require.NoError(t, err) + require.Equal(t, 1, n) + + // Total should be 23 VTXOs now (22 + 1 multi-asset). + all, err := vtxoStore.GetVtxos(ctx, types.Page{}, types.VtxoFilterAll) + require.NoError(t, err) + require.Len(t, all, totalVtxos+1) + + // Multi-asset VTXO with created_at=100000 is the newest. + page1, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 5}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Len(t, page1, 5) + + // The multi-asset VTXO should be on page 1 (it has the highest created_at). + var foundMultiAsset bool + for _, v := range page1 { + if v.Txid == multiAssetVtxo.Txid { + require.Len(t, v.Assets, 2) + foundMultiAsset = true + } + } + require.True(t, foundMultiAsset, "multi-asset VTXO should appear on page 1") + }) + + t.Run("spent filter pagination", func(t *testing.T) { + // Spend the first vtxo (index 0, created_at=1000 — the oldest). + spendMap := map[clientTypes.Outpoint]string{ + paginationVtxos[0].Outpoint: "spender_tx", + } + n, err := vtxoStore.SpendVtxos(ctx, spendMap, "arktx1") + require.NoError(t, err) + require.Equal(t, 1, n) + + // VtxoFilterAll should return everything (23 total). + all, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 30}, + types.VtxoFilterAll, + ) + require.NoError(t, err) + require.Len(t, all, totalVtxos+1) // 22 original + 1 multi-asset + + // VtxoFilterSpent should return only the spent one. + spent, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 30}, + types.VtxoFilterSpent, + ) + require.NoError(t, err) + require.Len(t, spent, 1) + + // VtxoFilterSpendable should return the unspent ones. + spendable, err := vtxoStore.GetVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 30}, + types.VtxoFilterSpendable, + ) + require.NoError(t, err) + require.Len(t, spendable, totalVtxos) // 21 original unspent + 1 multi-asset + + // Verify spent VTXO does NOT appear in spendable filter. + for _, v := range spendable { + require.NotEqual(t, paginationVtxos[0].Txid, v.Txid, + "spent VTXO should not appear in spendable filter") + } + }) +} + func requireVtxosListEqual(t *testing.T, expected, actual []clientTypes.Vtxo) { require.Len(t, expected, len(actual)) diff --git a/store/sql/sqlc/queries/query_paginated.go b/store/sql/sqlc/queries/query_paginated.go new file mode 100644 index 00000000..b548ab3a --- /dev/null +++ b/store/sql/sqlc/queries/query_paginated.go @@ -0,0 +1,241 @@ +// Package queries — paginated query implementations. +// +// These queries are hand-written because sqlc cannot generate queries containing +// IN (subquery) patterns. The equivalent SQL is documented inline below. +// +// WHY a subquery instead of LIMIT on the view directly: +// +// asset_vtxo_vw is a LEFT JOIN of vtxo and asset_vtxo. A multi-asset VTXO +// produces N rows in the view (one per asset). If we applied LIMIT directly to +// the view, a 2-asset VTXO would consume 2 slots of the page budget, returning +// fewer logical VTXOs than the caller requested. Instead, the inner subquery +// paginates at the VTXO level (on the vtxo table), and the outer query fetches +// all view rows for the selected VTXOs. The Go layer then groups view rows +// back into domain VTXOs via the byOutpoint map (see assetVtxoVwRowsToVtxos). +package queries + +import "context" + +const selectAllVtxosPaginated = `SELECT txid, vout, script, amount, commitment_txids, spent_by, spent, expires_at, + created_at, preconfirmed, swept, settled_by, unrolled, ark_txid, asset_id, asset_amount +FROM asset_vtxo_vw +WHERE (txid, vout) IN ( + SELECT txid, vout FROM vtxo + ORDER BY created_at DESC, txid ASC, vout ASC + LIMIT ? OFFSET ? +) +ORDER BY created_at DESC, txid ASC, vout ASC` + +type SelectAllVtxosPaginatedParams struct { + Limit int64 + Offset int64 +} + +func (q *Queries) SelectAllVtxosPaginated( + ctx context.Context, + arg SelectAllVtxosPaginatedParams, +) ([]AssetVtxoVw, error) { + rows, err := q.db.QueryContext(ctx, selectAllVtxosPaginated, arg.Limit, arg.Offset) + if err != nil { + return nil, err + } + defer rows.Close() //nolint:errcheck + var items []AssetVtxoVw + for rows.Next() { + var i AssetVtxoVw + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Script, + &i.Amount, + &i.CommitmentTxids, + &i.SpentBy, + &i.Spent, + &i.ExpiresAt, + &i.CreatedAt, + &i.Preconfirmed, + &i.Swept, + &i.SettledBy, + &i.Unrolled, + &i.ArkTxid, + &i.AssetID, + &i.AssetAmount, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectSpendableVtxosPaginated = `SELECT txid, vout, script, amount, commitment_txids, spent_by, spent, + expires_at, created_at, preconfirmed, swept, settled_by, unrolled, ark_txid, asset_id, asset_amount +FROM asset_vtxo_vw +WHERE (txid, vout) IN ( + SELECT txid, vout FROM vtxo + WHERE spent = false AND unrolled = false + ORDER BY created_at DESC, txid ASC, vout ASC + LIMIT ? OFFSET ? +) +ORDER BY created_at DESC, txid ASC, vout ASC` + +type SelectSpendableVtxosPaginatedParams struct { + Limit int64 + Offset int64 +} + +func (q *Queries) SelectSpendableVtxosPaginated( + ctx context.Context, + arg SelectSpendableVtxosPaginatedParams, +) ([]AssetVtxoVw, error) { + rows, err := q.db.QueryContext(ctx, selectSpendableVtxosPaginated, arg.Limit, arg.Offset) + if err != nil { + return nil, err + } + defer rows.Close() //nolint:errcheck + var items []AssetVtxoVw + for rows.Next() { + var i AssetVtxoVw + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Script, + &i.Amount, + &i.CommitmentTxids, + &i.SpentBy, + &i.Spent, + &i.ExpiresAt, + &i.CreatedAt, + &i.Preconfirmed, + &i.Swept, + &i.SettledBy, + &i.Unrolled, + &i.ArkTxid, + &i.AssetID, + &i.AssetAmount, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectSpentVtxos = `SELECT txid, vout, script, amount, commitment_txids, spent_by, spent, + expires_at, created_at, preconfirmed, swept, settled_by, unrolled, ark_txid, asset_id, asset_amount +FROM asset_vtxo_vw +WHERE spent = true OR unrolled = true +ORDER BY created_at DESC, txid ASC, vout ASC` + +func (q *Queries) SelectSpentVtxos( + ctx context.Context, +) ([]AssetVtxoVw, error) { + rows, err := q.db.QueryContext(ctx, selectSpentVtxos) + if err != nil { + return nil, err + } + defer rows.Close() //nolint:errcheck + var items []AssetVtxoVw + for rows.Next() { + var i AssetVtxoVw + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Script, + &i.Amount, + &i.CommitmentTxids, + &i.SpentBy, + &i.Spent, + &i.ExpiresAt, + &i.CreatedAt, + &i.Preconfirmed, + &i.Swept, + &i.SettledBy, + &i.Unrolled, + &i.ArkTxid, + &i.AssetID, + &i.AssetAmount, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const selectSpentVtxosPaginated = `SELECT txid, vout, script, amount, commitment_txids, spent_by, spent, + expires_at, created_at, preconfirmed, swept, settled_by, unrolled, ark_txid, asset_id, asset_amount +FROM asset_vtxo_vw +WHERE (txid, vout) IN ( + SELECT txid, vout FROM vtxo + WHERE spent = true OR unrolled = true + ORDER BY created_at DESC, txid ASC, vout ASC + LIMIT ? OFFSET ? +) +ORDER BY created_at DESC, txid ASC, vout ASC` + +type SelectSpentVtxosPaginatedParams struct { + Limit int64 + Offset int64 +} + +func (q *Queries) SelectSpentVtxosPaginated( + ctx context.Context, + arg SelectSpentVtxosPaginatedParams, +) ([]AssetVtxoVw, error) { + rows, err := q.db.QueryContext(ctx, selectSpentVtxosPaginated, arg.Limit, arg.Offset) + if err != nil { + return nil, err + } + defer rows.Close() //nolint:errcheck + var items []AssetVtxoVw + for rows.Next() { + var i AssetVtxoVw + if err := rows.Scan( + &i.Txid, + &i.Vout, + &i.Script, + &i.Amount, + &i.CommitmentTxids, + &i.SpentBy, + &i.Spent, + &i.ExpiresAt, + &i.CreatedAt, + &i.Preconfirmed, + &i.Swept, + &i.SettledBy, + &i.Unrolled, + &i.ArkTxid, + &i.AssetID, + &i.AssetAmount, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/store/sql/vtxo_store.go b/store/sql/vtxo_store.go index 36957c4f..63f61d59 100644 --- a/store/sql/vtxo_store.go +++ b/store/sql/vtxo_store.go @@ -112,7 +112,7 @@ func (v *vtxoRepository) SpendVtxos( for outpoint := range spentVtxosMap { outpoints = append(outpoints, outpoint) } - vtxos, err := v.GetVtxos(ctx, outpoints) + vtxos, err := v.GetVtxosByOutpoint(ctx, outpoints) if err != nil { return -1, err } @@ -162,7 +162,7 @@ func (v *vtxoRepository) SweepVtxos( for _, vtxo := range vtxosToSweep { outpoints = append(outpoints, vtxo.Outpoint) } - vtxos, err := v.GetVtxos(ctx, outpoints) + vtxos, err := v.GetVtxosByOutpoint(ctx, outpoints) if err != nil { return -1, err } @@ -209,7 +209,7 @@ func (v *vtxoRepository) UnrollVtxos( for _, vtxo := range vtxosToUnroll { outpoints = append(outpoints, vtxo.Outpoint) } - vtxos, err := v.GetVtxos(ctx, outpoints) + vtxos, err := v.GetVtxosByOutpoint(ctx, outpoints) if err != nil { return -1, err } @@ -255,7 +255,7 @@ func (v *vtxoRepository) SettleVtxos( for outpoint := range spentVtxosMap { outpoints = append(outpoints, outpoint) } - vtxos, err := v.GetVtxos(ctx, outpoints) + vtxos, err := v.GetVtxosByOutpoint(ctx, outpoints) if err != nil { return -1, err } @@ -297,30 +297,98 @@ func (v *vtxoRepository) SettleVtxos( return len(settledVtxos), nil } -func (v *vtxoRepository) GetAllVtxos( - ctx context.Context, -) (spendable, spent []clientTypes.Vtxo, err error) { - rows, err := v.querier.SelectAllVtxos(ctx) - if err != nil { - return - } - byOutpoint := make(map[string][]queries.AssetVtxoVw) - for _, row := range rows { - key := fmt.Sprintf("%s:%d", row.Txid, row.Vout) - byOutpoint[key] = append(byOutpoint[key], row) - } - for _, group := range byOutpoint { - vtxo := assetVtxoVwGroupToVtxo(group) - if vtxo.Spent || vtxo.Unrolled { - spent = append(spent, vtxo) - } else { - spendable = append(spendable, vtxo) +// GetVtxos returns VTXOs filtered by the given VtxoFilter. +// +// When page.PageSize == 0, all matching VTXOs are returned (no LIMIT). +// Otherwise the paginated SQL path is used, which dispatches based on filter. +// +// The view rows are grouped by outpoint because asset_vtxo_vw is a LEFT JOIN +// of vtxo and asset_vtxo: a multi-asset VTXO produces N view rows (one per +// asset). The grouping step collapses them back into a single Vtxo with an +// Assets slice. +func (v *vtxoRepository) GetVtxos( + ctx context.Context, page types.Page, filter types.VtxoFilter, +) ([]clientTypes.Vtxo, error) { + limit, offset := pageToLimitOffset(page) + + // Recoverable filtering cannot be done in SQL (it depends on wall-clock + // time via ExpiresAt), so we always fetch ALL VTXOs, filter in Go, and + // then apply pagination to the filtered result. This is acceptable because + // recoverable VTXOs are rare. + if filter == types.VtxoFilterRecoverable { + rows, err := v.querier.SelectAllVtxos(ctx) + if err != nil { + return nil, err + } + allVtxos := assetVtxoVwRowsToVtxos(rows) + filtered := make([]clientTypes.Vtxo, 0, len(allVtxos)) + for _, vtxo := range allVtxos { + if vtxo.IsRecoverable() { + filtered = append(filtered, vtxo) + } + } + // Apply Go-side pagination to the filtered slice. + if limit > 0 { + if offset >= int64(len(filtered)) { + return []clientTypes.Vtxo{}, nil + } + end := offset + limit + if end > int64(len(filtered)) { + end = int64(len(filtered)) + } + filtered = filtered[offset:end] } + return filtered, nil + } + + var ( + rows []queries.AssetVtxoVw + err error + + allSpendable = limit == 0 && filter == types.VtxoFilterSpendable + allSpent = limit == 0 && filter == types.VtxoFilterSpent + all = limit == 0 + ) + + switch { + //no pagination + case allSpendable: + rows, err = v.querier.SelectSpendableVtxos(ctx) + case allSpent: + rows, err = v.querier.SelectSpentVtxos(ctx) + case all: + rows, err = v.querier.SelectAllVtxos(ctx) + + // with pagination + case filter == types.VtxoFilterSpendable: + rows, err = v.querier.SelectSpendableVtxosPaginated( + ctx, + queries.SelectSpendableVtxosPaginatedParams{ + Limit: limit, Offset: offset, + }, + ) + case filter == types.VtxoFilterSpent: + rows, err = v.querier.SelectSpentVtxosPaginated( + ctx, + queries.SelectSpentVtxosPaginatedParams{ + Limit: limit, Offset: offset, + }, + ) + default: + // VtxoFilterAll uses the "all" paginated query. + rows, err = v.querier.SelectAllVtxosPaginated(ctx, queries.SelectAllVtxosPaginatedParams{ + Limit: limit, Offset: offset, + }) + } + if err != nil { + return nil, err } - return + + return assetVtxoVwRowsToVtxos(rows), nil } -func (v *vtxoRepository) GetVtxos( +// GetVtxosByOutpoint fetches specific VTXOs by their outpoint keys. +func (v *vtxoRepository) GetVtxosByOutpoint( ctx context.Context, keys []clientTypes.Outpoint, ) ([]clientTypes.Vtxo, error) { vtxos := make([]clientTypes.Vtxo, 0, len(keys)) @@ -343,17 +411,6 @@ func (v *vtxoRepository) GetVtxos( return vtxos, nil } -func (v *vtxoRepository) GetSpendableVtxos( - ctx context.Context, -) (spendable []clientTypes.Vtxo, err error) { - rows, err := v.querier.SelectSpendableVtxos(ctx) - if err != nil { - return nil, err - } - - return assetVtxoVwRowsToVtxos(rows), nil -} - func (v *vtxoRepository) GetEventChannel() <-chan types.VtxoEvent { return v.eventCh } @@ -388,8 +445,34 @@ func (v *vtxoRepository) sendEvent(event types.VtxoEvent) { log.Warn("failed to send vtxo event") } +// pageToLimitOffset converts a Page to SQL LIMIT/OFFSET values. +// - PageSize == 0 is a sentinel meaning "return all rows" (limit 0 tells the +// caller to skip the paginated query and use the unpaginated variant). +// - PageSize values above MaxPageSize are clamped to prevent unbounded result sets. +// - PageNum is 1-based for callers; 0 is silently treated as 1 so that +// Page{PageSize: 10} returns the first page rather than an empty result. +func pageToLimitOffset(p types.Page) (limit, offset int64) { + if p.PageSize == 0 { + return 0, 0 + } + size := p.PageSize + if size > types.MaxPageSize { + size = types.MaxPageSize + } + num := p.PageNum + if num == 0 { + num = 1 + } + return int64(size), int64(num-1) * int64(size) +} + +// assetVtxoVwRowsToVtxos groups flat view rows back into domain VTXOs. +// +// The asset_vtxo_vw view LEFT JOINs vtxo with asset_vtxo, so a single VTXO +// carrying N assets produces N rows in the result set. This function groups +// those rows by outpoint and merges each group into one Vtxo with a populated +// Assets slice. func assetVtxoVwRowsToVtxos(rows []queries.AssetVtxoVw) []clientTypes.Vtxo { - // group rows by (txid, vout) byOutpoint := make(map[string][]queries.AssetVtxoVw) for _, row := range rows { key := fmt.Sprintf("%s:%d", row.Txid, row.Vout) diff --git a/test/e2e/asset_test.go b/test/e2e/asset_test.go index 79fb3124..8cafaa98 100644 --- a/test/e2e/asset_test.go +++ b/test/e2e/asset_test.go @@ -398,7 +398,11 @@ func TestAssetBurn(t *testing.T) { } func listVtxosWithAsset(t *testing.T, client sdk.Wallet, assetID string) []clientTypes.Vtxo { - vtxos, _, err := client.ListVtxos(t.Context()) + vtxos, err := client.ListVtxos( + t.Context(), + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) assetVtxos := make([]clientTypes.Vtxo, 0, len(vtxos)) diff --git a/test/e2e/exit_test.go b/test/e2e/exit_test.go index 8249e736..02798c5b 100644 --- a/test/e2e/exit_test.go +++ b/test/e2e/exit_test.go @@ -192,7 +192,11 @@ func TestUnilateralExit(t *testing.T) { break } - _, spent, err := alice.ListVtxos(ctx) + spent, err := alice.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpent, + ) require.NoError(t, err) require.NotEmpty(t, spent) require.Len(t, spent, 1) @@ -282,7 +286,11 @@ func TestUnilateralExit(t *testing.T) { break } - _, spent, err := bob.ListVtxos(ctx) + spent, err := bob.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpent, + ) require.NoError(t, err) require.NotEmpty(t, spent) require.Len(t, spent, 1) diff --git a/test/e2e/hd_wallet_test.go b/test/e2e/hd_wallet_test.go index cd6f8974..7fbae652 100644 --- a/test/e2e/hd_wallet_test.go +++ b/test/e2e/hd_wallet_test.go @@ -100,12 +100,23 @@ func TestHDWalletRecoversFundsAtRestore(t *testing.T) { // Scenario 3: Alice restores from seed and discovers all used keys on startup. aliceClientHD = setupClient(t, seed, sdk.WithGapLimit(50)) - restoredSpendable, restoredSpent, err := aliceClientHD.ListVtxos(ctx) + restoredSpendable, err := aliceClientHD.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) - require.Len(t, restoredSpent, 0) require.Len(t, restoredSpendable, 2) require.ElementsMatch(t, []uint64{15_000, 16_000}, vtxoAmounts(restoredSpendable)) + restoredSpent, err := aliceClientHD.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpent, + ) + require.NoError(t, err) + require.Len(t, restoredSpent, 0) + restoredBalance, err := aliceClientHD.Balance(ctx) require.NoError(t, err) require.EqualValues(t, 31_000, restoredBalance.OffchainBalance.Total) @@ -177,12 +188,23 @@ func TestHDWalletDoesNotRecoverVtxoBeyondConfiguredGapLimit(t *testing.T) { aliceClientHD = setupClient(t, seed, sdk.WithGapLimit(gapLimit)) - restoredSpendable, restoredSpent, err := aliceClientHD.ListVtxos(ctx) + restoredSpendable, err := aliceClientHD.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) - require.Len(t, restoredSpent, 0) require.Len(t, restoredSpendable, 1) require.ElementsMatch(t, []uint64{15_000}, vtxoAmounts(restoredSpendable)) + restoredSpent, err := aliceClientHD.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpent, + ) + require.NoError(t, err) + require.Len(t, restoredSpent, 0) + restoredBalance, err := aliceClientHD.Balance(ctx) require.NoError(t, err) require.EqualValues(t, 15_000, restoredBalance.OffchainBalance.Total) @@ -240,12 +262,16 @@ func TestHDWalletRestoresMixedOnchainAndOffchainState(t *testing.T) { const wantOffchainTotal = uint64(50_000) require.Eventually(t, func() bool { - spendable, spent, err := aliceClientHD.ListVtxos(ctx) + spendable, err := aliceClientHD.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) if err != nil { return false } - return len(spent) == 0 && len(spendable) == 4 && + return len(spendable) == 4 && sumVtxoAmounts(spendable) == wantOffchainTotal }, 10*time.Second, 200*time.Millisecond) @@ -465,7 +491,11 @@ func waitForSpendableVtxos( var spendable []clientTypes.Vtxo require.Eventually(t, func() bool { var err error - spendable, _, err = client.ListVtxos(t.Context()) + spendable, err = client.ListVtxos( + t.Context(), + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) if err != nil { return false } diff --git a/test/e2e/restore_smoke_test.go b/test/e2e/restore_smoke_test.go index 0d957fa7..f4244adc 100644 --- a/test/e2e/restore_smoke_test.go +++ b/test/e2e/restore_smoke_test.go @@ -6,6 +6,7 @@ import ( "testing" arksdk "github.com/arkade-os/go-sdk" + "github.com/arkade-os/go-sdk/types" "github.com/stretchr/testify/require" ) @@ -47,7 +48,7 @@ func TestSmokeWalletRestore(t *testing.T) { len(offchainAddresses), totalAddresses, ) - vtxos, _, err := alice.ListVtxos(ctx) + vtxos, err := alice.ListVtxos(ctx, types.Page{}, types.VtxoFilterAll) require.NoError(t, err, "❌ funding failed: expected no error on getting vtxos, got %w", err) require.Len( t, vtxos, expectedVtxos, @@ -91,7 +92,7 @@ func TestSmokeWalletRestore(t *testing.T) { len(offchainAddressesRestored), totalAddresses, ) - vtxosRestored, _, err := aliceRestored.ListVtxos(ctx) + vtxosRestored, err := aliceRestored.ListVtxos(ctx, types.Page{}, types.VtxoFilterAll) require.NoError(t, err, "❌ restore failed: expected no error, got %w", err) require.Len( t, vtxosRestored, len(vtxos), diff --git a/test/e2e/transaction_test.go b/test/e2e/transaction_test.go index b04bfde8..330db7e5 100644 --- a/test/e2e/transaction_test.go +++ b/test/e2e/transaction_test.go @@ -50,7 +50,11 @@ func TestOffchainTx(t *testing.T) { require.Equal(t, 1000, int(bobVtxo1.Amount)) require.Equal(t, txid, bobVtxo1.Txid) - bobVtxos, _, err := bob.ListVtxos(ctx) + bobVtxos, err := bob.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) require.Len(t, bobVtxos, 1) @@ -69,7 +73,11 @@ func TestOffchainTx(t *testing.T) { require.Equal(t, 10000, int(bobVtxo2.Amount)) require.Equal(t, txid, bobVtxo2.Txid) - bobVtxos, _, err = bob.ListVtxos(ctx) + bobVtxos, err = bob.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) require.Len(t, bobVtxos, 2) @@ -88,7 +96,11 @@ func TestOffchainTx(t *testing.T) { require.Equal(t, 10000, int(bobVtxo3.Amount)) require.Equal(t, txid, bobVtxo3.Txid) - bobVtxos, _, err = bob.ListVtxos(ctx) + bobVtxos, err = bob.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) require.Len(t, bobVtxos, 3) @@ -107,7 +119,11 @@ func TestOffchainTx(t *testing.T) { require.Equal(t, 10000, int(bobVtxo4.Amount)) require.Equal(t, txid, bobVtxo4.Txid) - bobVtxos, _, err = bob.ListVtxos(ctx) + bobVtxos, err = bob.ListVtxos( + ctx, + types.Page{PageNum: 1, PageSize: 50}, + types.VtxoFilterSpendable, + ) require.NoError(t, err) require.Len(t, bobVtxos, 4) diff --git a/types/interfaces.go b/types/interfaces.go index 013e2623..cb5cea47 100644 --- a/types/interfaces.go +++ b/types/interfaces.go @@ -52,9 +52,8 @@ type VtxoStore interface { ) (int, error) SweepVtxos(ctx context.Context, vtxosToSweep []types.Vtxo) (int, error) UnrollVtxos(ctx context.Context, vtxosToUnroll []types.Vtxo) (int, error) - GetAllVtxos(ctx context.Context) (spendable, spent []types.Vtxo, err error) - GetSpendableVtxos(ctx context.Context) ([]types.Vtxo, error) - GetVtxos(ctx context.Context, keys []types.Outpoint) ([]types.Vtxo, error) + GetVtxos(ctx context.Context, page Page, filter VtxoFilter) ([]types.Vtxo, error) + GetVtxosByOutpoint(ctx context.Context, keys []types.Outpoint) ([]types.Vtxo, error) Clean(ctx context.Context) error GetEventChannel() <-chan VtxoEvent } diff --git a/types/types.go b/types/types.go index f7079381..9a4156b4 100644 --- a/types/types.go +++ b/types/types.go @@ -106,6 +106,34 @@ const ( ContractStateInactive ContractState = "inactive" ) +// MaxPageSize is the maximum number of VTXOs returned per page. +const MaxPageSize uint32 = 200 + +// Page specifies offset-based pagination for VTXO listing operations. +// PageNum is 1-based; 0 is treated as 1. +// PageSize 0 means "return all rows" (no LIMIT applied). +// PageSize values above MaxPageSize are clamped to MaxPageSize. +type Page struct { + PageNum uint32 + PageSize uint32 +} + +// VtxoFilter controls which VTXOs are returned by listing operations. +type VtxoFilter int + +const ( + // VtxoFilterAll returns every VTXO regardless of state. + VtxoFilterAll VtxoFilter = iota + // VtxoFilterSpendable returns VTXOs where spent=false AND unrolled=false. + // This includes recoverable VTXOs (swept or expired) + VtxoFilterSpendable + // VtxoFilterSpent returns VTXOs that have been spent or unrolled. + VtxoFilterSpent + // VtxoFilterRecoverable returns VTXOs that are swept or expired but not + // yet spent, meaning the owner can still recover them on-chain. + VtxoFilterRecoverable +) + type ContractType string const ( diff --git a/wallet.go b/wallet.go index df572b99..69b0e5f0 100644 --- a/wallet.go +++ b/wallet.go @@ -807,7 +807,9 @@ func (w *wallet) refreshVtxoDb( ctx context.Context, spendableVtxos, spentVtxos []clienttypes.Vtxo, ) error { // Fetch old data. - oldSpendableVtxos, _, err := w.store.VtxoStore().GetAllVtxos(ctx) + oldSpendableVtxos, err := w.store.VtxoStore(). + GetVtxos(ctx, types.Page{}, types.VtxoFilterSpendable) + if err != nil { return err } @@ -1347,7 +1349,7 @@ func (w *wallet) handleCommitmentTx( indexedSpentVtxos[vtxo.Outpoint] = vtxo } } - myVtxos, err := w.store.VtxoStore().GetVtxos(ctx, spentVtxos) + myVtxos, err := w.store.VtxoStore().GetVtxosByOutpoint(ctx, spentVtxos) if err != nil { return err } @@ -1526,7 +1528,7 @@ func (w *wallet) handleArkTx( VOut: vtxo.VOut, }) } - myVtxos, err := w.store.VtxoStore().GetVtxos(ctx, spentVtxos) + myVtxos, err := w.store.VtxoStore().GetVtxosByOutpoint(ctx, spentVtxos) if err != nil { return err } @@ -1625,7 +1627,7 @@ func (w *wallet) handleSweepTx(ctx context.Context, sweepTx *client.TxNotificati return nil } - myVtxos, err := w.store.VtxoStore().GetVtxos(ctx, sweepTx.SweptVtxos) + myVtxos, err := w.store.VtxoStore().GetVtxosByOutpoint(ctx, sweepTx.SweptVtxos) if err != nil { return err }