Skip to content
10 changes: 9 additions & 1 deletion daemon/test_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func (td *TestDaemon) VerifyOnLongestChainInUtxoStore(t *testing.T, tx *bt.Tx) {
func (td *TestDaemon) VerifyNotOnLongestChainInUtxoStore(t *testing.T, tx *bt.Tx) {
readTx, err := td.UtxoStore.Get(td.Ctx, tx.TxIDChainHash(), fields.UnminedSince)
require.NoError(t, err, "Failed to get transaction %s", tx.String())
assert.Greater(t, readTx.UnminedSince, uint32(0), "Expected transaction %s to be on the longest chain", tx.TxIDChainHash().String())
assert.Greater(t, readTx.UnminedSince, uint32(0), "Expected transaction %s to be not on the longest chain", tx.TxIDChainHash().String())
}

// VerifyNotInUtxoStore verifies that the transaction does not exist in the UTXO store.
Expand Down Expand Up @@ -1395,6 +1395,13 @@ finished:
}

func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model.Block, timeout time.Duration) {
// First check if the expected block is already the current best block
state, err := td.BlockAssemblyClient.GetBlockAssemblyState(td.Ctx)
if err == nil && state.CurrentHash == expectedBlock.Header.Hash().String() {
t.Logf("Block %s (height %d) is already the current best block", expectedBlock.Header.Hash().String(), expectedBlock.Height)
return
}

stateChangeCh := make(chan blockassembly.BestBlockInfo)
td.BlockAssembler.SetStateChangeCh(stateChangeCh)

Expand All @@ -1412,6 +1419,7 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model
t.Fatalf("Timeout waiting for block assembly to reach block %s", expectedBlock.Header.Hash().String())
case bestBlockInfo := <-stateChangeCh:
t.Logf("Received BestBlockInfo: Height=%d, Hash=%s", bestBlockInfo.Height, bestBlockInfo.Header.Hash().String())
t.Logf("Expected block: Height=%d, Hash=%s", expectedBlock.Height, expectedBlock.Header.Hash().String())
if bestBlockInfo.Header.Hash().IsEqual(expectedBlock.Header.Hash()) {
return
}
Expand Down
111 changes: 88 additions & 23 deletions services/blockassembly/subtreeprocessor/SubtreeProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,12 @@
// create empty map for processed conflicting hashes
processedConflictingHashesMap := make(map[chainhash.Hash]bool)

// store current state before attempting to move forward the block
originalChainedSubtrees := stp.chainedSubtrees
originalCurrentSubtree := stp.currentSubtree.Load()
originalCurrentTxMap := stp.currentTxMap
currentBlockHeader := stp.currentBlockHeader
if _, _, err = stp.moveForwardBlock(ctx, moveForwardReq.block, false, processedConflictingHashesMap, false, true); err != nil {
// rollback to previous state
stp.chainedSubtrees = originalChainedSubtrees
stp.currentSubtree = originalCurrentSubtree
stp.currentTxMap = originalCurrentTxMap
stp.currentBlockHeader = currentBlockHeader

if _, err = stp.moveForwardBlock(processorCtx, moveForwardReq.block, false, processedConflictingHashesMap, false, true); err != nil {
// rollback to previous state
Expand All @@ -585,7 +586,7 @@
logger.Infof("[SubtreeProcessor][%s] moveForwardBlock subtree processor DONE", moveForwardReq.block.String())
stp.setCurrentRunningState(StateRunning)

case resetBlocksMsg := <-stp.resetCh:

Check failure on line 589 in services/blockassembly/subtreeprocessor/SubtreeProcessor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

expected '}', found 'case' (typecheck)

Check failure on line 589 in services/blockassembly/subtreeprocessor/SubtreeProcessor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

syntax error: unexpected keyword case, expected } (typecheck)
stp.setCurrentRunningState(StateResetBlocks)

err = stp.reset(resetBlocksMsg.blockHeader, resetBlocksMsg.moveBackBlocks, resetBlocksMsg.moveForwardBlocks,
Expand Down Expand Up @@ -1956,42 +1957,106 @@

var (
transactionMap txmap.TxMap
losingTxHashesMap txmap.TxMap
markOnLongestChain = make([]chainhash.Hash, 0, 1024)
allLosingTxHashes = make([]chainhash.Hash, 0, 1024)
)

// Build a set of all transactions that will be mined in moveForward blocks
// We need this set BEFORE we start processing to correctly filter losing transactions
winningTxSet := make(map[chainhash.Hash]bool)

// Temporary storage for losing transactions per block
type blockLosingTxs struct {
blockHash *chainhash.Hash
losingTxs []chainhash.Hash
}
blockLosingTxsList := make([]blockLosingTxs, 0, len(moveForwardBlocks))

for blockIdx, block := range moveForwardBlocks {
lastMoveForwardBlock := blockIdx == len(moveForwardBlocks)-1
// we skip the notifications for now and do them all at the end
// transactionMap is returned so we can check which transactions need to be marked as on the longest chain
if transactionMap, err = stp.moveForwardBlock(ctx, block, true, processedConflictingHashesMap, true, lastMoveForwardBlock); err != nil {
// losingTxHashesMap contains transactions that lost conflicts and need to be marked as NOT on longest chain
if transactionMap, losingTxHashesMap, err = stp.moveForwardBlock(ctx, block, true, processedConflictingHashesMap, true, lastMoveForwardBlock); err != nil {
return err
}

// Process transactionMap: build winningTxSet and markOnLongestChain
if transactionMap != nil {
transactionMap.Iter(func(hash chainhash.Hash, n uint64) bool {
// if the transaction is not in the movedBackBlockTxMap, it means it was not part of the blocks we moved back
// and therefore needs to be marked in the utxo store as on the longest chain now
// since it was on the block moving forward
if !hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) && !movedBackBlockTxMap[hash] {
markOnLongestChain = append(markOnLongestChain, hash)
if !hash.Equal(subtreepkg.CoinbasePlaceholderHashValue) {
// Track as winning transaction
winningTxSet[hash] = true

// Add to markOnLongestChain if not in movedBack blocks
if !movedBackBlockTxMap[hash] {
markOnLongestChain = append(markOnLongestChain, hash)
}
}
return true
})
}

// Store losing transactions for later processing
// We can't process them yet because we need the complete winningTxSet from all blocks
if losingTxHashesMap != nil && losingTxHashesMap.Length() > 0 {
losingTxs := make([]chainhash.Hash, 0, losingTxHashesMap.Length())
losingTxHashesMap.Iter(func(hash chainhash.Hash, _ uint64) bool {
losingTxs = append(losingTxs, hash)
return true
})
blockLosingTxsList = append(blockLosingTxsList, blockLosingTxs{
blockHash: block.Hash(),
losingTxs: losingTxs,
})
}

stp.currentBlockHeader = block.Header
}

// Second pass: filter losing transactions using the complete winningTxSet
for _, blockLosing := range blockLosingTxsList {
for _, hash := range blockLosing.losingTxs {
// Only add to losing list if it's not a winning transaction in ANY moveForward block
if !winningTxSet[hash] {
allLosingTxHashes = append(allLosingTxHashes, hash)
}
}
}

movedBackBlockTxMap = nil // free up memory

// Build a set of all losing transactions for fast lookup
losingTxSet := make(map[chainhash.Hash]bool, len(allLosingTxHashes))
for _, hash := range allLosingTxHashes {
losingTxSet[hash] = true
}

// Remove losing transactions from markOnLongestChain
// A transaction that loses a conflict in a later block should NOT be marked as on longest chain
// even if it appeared in an earlier block
filteredMarkOnLongestChain := make([]chainhash.Hash, 0, len(markOnLongestChain))
for _, hash := range markOnLongestChain {
if !losingTxSet[hash] {
filteredMarkOnLongestChain = append(filteredMarkOnLongestChain, hash)
}
}

// all the transactions in markOnLongestChain need to be marked as on the longest chain in the utxo store
if len(markOnLongestChain) > 0 {
if err = stp.utxoStore.MarkTransactionsOnLongestChain(ctx, markOnLongestChain, true); err != nil {
if len(filteredMarkOnLongestChain) > 0 {
if err = stp.utxoStore.MarkTransactionsOnLongestChain(ctx, filteredMarkOnLongestChain, true); err != nil {
return errors.NewProcessingError("[reorgBlocks] error marking transactions as on longest chain in utxo store", err)
}
}

// Mark all losing conflicting transactions as NOT on longest chain
if len(allLosingTxHashes) > 0 {
if err = stp.utxoStore.MarkTransactionsOnLongestChain(ctx, allLosingTxHashes, false); err != nil {
return errors.NewProcessingError("[reorgBlocks] error marking losing conflicting transactions as not on longest chain in utxo store", err)
}
}

// everything now in block assembly is not mined on the longest chain
// so we need to set the unminedSince for all transactions in block assembly
for _, subtree := range stp.chainedSubtrees {
Expand Down Expand Up @@ -2733,9 +2798,9 @@
// moveForwardBlock cleans out all transactions that are in the current subtrees and also in the block
// given. It is akin to moving up the blockchain to the next block.
func (stp *SubtreeProcessor) moveForwardBlock(ctx context.Context, block *model.Block, skipNotification bool,
processedConflictingHashesMap map[chainhash.Hash]bool, skipDequeue bool, createProperlySizedSubtrees bool) (transactionMap txmap.TxMap, err error) {
processedConflictingHashesMap map[chainhash.Hash]bool, skipDequeue bool, createProperlySizedSubtrees bool) (transactionMap txmap.TxMap, losingTxHashesMap txmap.TxMap, err error) {
if block == nil {
return nil, errors.NewProcessingError("[moveForwardBlock] you must pass in a block to moveForwardBlock")
return nil, nil, errors.NewProcessingError("[moveForwardBlock] you must pass in a block to moveForwardBlock")
}

_, _, deferFn := tracing.Tracer("subtreeprocessor").Start(ctx, "moveForwardBlock",
Expand All @@ -2750,7 +2815,7 @@
}()

if !block.Header.HashPrevBlock.IsEqual(stp.currentBlockHeader.Hash()) {
return nil, errors.NewProcessingError("the block passed in does not match the current block header: [%s] - [%s]", block.Header.StringDump(), stp.currentBlockHeader.StringDump())
return nil, nil, errors.NewProcessingError("the block passed in does not match the current block header: [%s] - [%s]", block.Header.StringDump(), stp.currentBlockHeader.StringDump())
}

stp.logger.Debugf("[moveForwardBlock][%s] resetting subtrees: %v", block.String(), block.Subtrees)
Expand All @@ -2763,21 +2828,21 @@
// Create transaction map from remaining block subtrees
transactionMap, conflictingNodes, err = stp.createTransactionMapIfNeeded(ctx, block, blockSubtreesMap)
if err != nil {
return nil, err
return nil, nil, err
}

// Process conflicting transactions
losingTxHashesMap, err := stp.processConflictingTransactions(ctx, block, conflictingNodes, processedConflictingHashesMap)
losingTxHashesMap, err = stp.processConflictingTransactions(ctx, block, conflictingNodes, processedConflictingHashesMap)
if err != nil {
return nil, err
return nil, nil, err
}

originalCurrentSubtree := stp.currentSubtree.Load()
originalCurrentTxMap := stp.currentTxMap

// Reset subtree state
if err = stp.resetSubtreeState(createProperlySizedSubtrees); err != nil {
return nil, errors.NewProcessingError("[moveForwardBlock][%s] error resetting subtree state", block.String(), err)
return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error resetting subtree state", block.String(), err)
}

// Process remainder transactions and dequeueDuringBlockMovement
Expand All @@ -2792,12 +2857,12 @@
SkipNotification: skipNotification,
})
if err != nil {
return nil, err
return nil, nil, err
}

// create the coinbase after processing all other transaction operations
if err = stp.processCoinbaseUtxos(ctx, block); err != nil {
return nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing coinbase utxos", block.String(), err)
return nil, nil, errors.NewProcessingError("[moveForwardBlock][%s] error processing coinbase utxos", block.String(), err)
}

// Log memory stats after block processing if debug logging is enabled
Expand All @@ -2818,7 +2883,7 @@
}
}

return transactionMap, nil
return transactionMap, losingTxHashesMap, nil
}

func (stp *SubtreeProcessor) waitForBlockBeingMined(ctx context.Context, blockHash *chainhash.Hash) (bool, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3345,7 +3345,7 @@ func TestMoveForwardBlock_BlockHeaderValidation(t *testing.T) {
}

// moveForwardBlock should fail with parent mismatch
_, err := stp.moveForwardBlock(context.Background(), invalidBlock, false, map[chainhash.Hash]bool{}, false, true)
_, _, err := stp.moveForwardBlock(context.Background(), invalidBlock, false, map[chainhash.Hash]bool{}, false, true)
require.Error(t, err)
assert.Contains(t, err.Error(), "does not match the current block header")
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package longest_chain

import (
"testing"

"github.com/bsv-blockchain/teranode/test/utils/aerospike"
"github.com/bsv-blockchain/teranode/test/utils/transactions"
"github.com/stretchr/testify/require"
)

func TestLongestChainAerospikeInvalidateFork(t *testing.T) {
// start an aerospike container
utxoStore, teardown, err := aerospike.InitAerospikeContainer()
require.NoError(t, err)

t.Cleanup(func() {
_ = teardown()
})

t.Run("invalid block with old tx", func(t *testing.T) {
testLongestChainInvalidateFork(t, utxoStore)
})
}

func testLongestChainInvalidateFork(t *testing.T, utxoStore string) {
// Setup test environment
td, block3 := setupLongestChainTest(t, utxoStore)
defer func() {
td.Stop(t)
}()

td.Settings.BlockValidation.OptimisticMining = true

block1, err := td.BlockchainClient.GetBlockByHeight(td.Ctx, 1)
require.NoError(t, err)

parentTxWith3Outputs := td.CreateTransactionWithOptions(t, transactions.WithInput(block1.CoinbaseTx, 0), transactions.WithP2PKHOutputs(3, 100000))
require.NoError(t, td.PropagationClient.ProcessTransaction(td.Ctx, parentTxWith3Outputs))

childTx1 := td.CreateTransactionWithOptions(t, transactions.WithInput(parentTxWith3Outputs, 0), transactions.WithP2PKHOutputs(1, 100000))
childTx2 := td.CreateTransactionWithOptions(t, transactions.WithInput(parentTxWith3Outputs, 1), transactions.WithP2PKHOutputs(1, 100000))
childTx3 := td.CreateTransactionWithOptions(t, transactions.WithInput(parentTxWith3Outputs, 2), transactions.WithP2PKHOutputs(1, 100000))
// create a double spend of tx3
childTx3DS := td.CreateTransactionWithOptions(t, transactions.WithInput(parentTxWith3Outputs, 2), transactions.WithP2PKHOutputs(2, 50000))

require.NoError(t, td.PropagationClient.ProcessTransaction(td.Ctx, childTx1))
require.NoError(t, td.PropagationClient.ProcessTransaction(td.Ctx, childTx2))
require.NoError(t, td.PropagationClient.ProcessTransaction(td.Ctx, childTx3))

_, block4a := td.CreateTestBlock(t, block3, 4001, parentTxWith3Outputs, childTx1, childTx2)
require.NoError(t, td.BlockValidation.ValidateBlock(td.Ctx, block4a, "legacy", nil, false, true), "Failed to process block")
td.WaitForBlockBeingMined(t, block4a)
t.Logf("WaitForBlock(t, block4a, blockWait): %s", block4a.Hash().String())
td.WaitForBlock(t, block4a, blockWait)

// 0 -> 1 ... 2 -> 3 -> 4a (*)

td.VerifyNotInBlockAssembly(t, parentTxWith3Outputs)
td.VerifyNotInBlockAssembly(t, childTx1)
td.VerifyNotInBlockAssembly(t, childTx2)
td.VerifyInBlockAssembly(t, childTx3)
td.VerifyOnLongestChainInUtxoStore(t, parentTxWith3Outputs)
td.VerifyOnLongestChainInUtxoStore(t, childTx1)
td.VerifyOnLongestChainInUtxoStore(t, childTx2)
td.VerifyNotOnLongestChainInUtxoStore(t, childTx3)

// create a block with tx1 and tx2 that will be invalid as tx2 is already on block4a
_, block4b := td.CreateTestBlock(t, block3, 4002, parentTxWith3Outputs, childTx2, childTx3)
require.NoError(t, td.BlockValidation.ValidateBlock(td.Ctx, block4b, "legacy", nil, true), "Failed to process block")
t.Logf("WaitForBlockBeingMined(t, block4b): %s", block4b.Hash().String())
td.WaitForBlockBeingMined(t, block4b)

_, block5b := td.CreateTestBlock(t, block4b, 5001)
require.NoError(t, td.BlockValidation.ValidateBlock(td.Ctx, block5b, "legacy", nil, true), "Failed to process block")
t.Logf("WaitForBlockBeingMined(t, block5b): %s", block5b.Hash().String())
td.WaitForBlockBeingMined(t, block5b)
t.Logf("WaitForBlock(t, block5b, blockWait): %s", block5b.Hash().String())
td.WaitForBlock(t, block5b, blockWait)

// 0 -> 1 ... 2 -> 3 -> 4b -> 5b (*)
td.VerifyInBlockAssembly(t, childTx1)
td.VerifyNotInBlockAssembly(t, childTx2)
td.VerifyNotInBlockAssembly(t, childTx3)
td.VerifyNotOnLongestChainInUtxoStore(t, childTx1)
td.VerifyOnLongestChainInUtxoStore(t, childTx2)
td.VerifyOnLongestChainInUtxoStore(t, childTx3)

_, err = td.BlockchainClient.InvalidateBlock(t.Context(), block5b.Hash())
require.NoError(t, err)

td.WaitForBlock(t, block4a, blockWait)

// 0 -> 1 ... 2 -> 3 -> 4a

td.VerifyNotInBlockAssembly(t, childTx1)
td.VerifyNotInBlockAssembly(t, childTx2)
td.VerifyInBlockAssembly(t, childTx3)
td.VerifyOnLongestChainInUtxoStore(t, childTx1)
td.VerifyOnLongestChainInUtxoStore(t, childTx2)
td.VerifyNotOnLongestChainInUtxoStore(t, childTx3)

// create a new block on 4a with tx3 in it
_, block5a := td.CreateTestBlock(t, block4a, 6001, childTx3DS)
require.NoError(t, td.BlockValidation.ValidateBlock(td.Ctx, block5a, "legacy", nil, false, true), "Failed to process block")
t.Logf("WaitForBlockBeingMined(t, block5a): %s", block5a.Hash().String())
td.WaitForBlockBeingMined(t, block5a)
t.Logf("WaitForBlock(t, block5a, blockWait): %s", block5a.Hash().String())
td.WaitForBlock(t, block5a, blockWait)

td.VerifyNotInBlockAssembly(t, childTx1)
td.VerifyNotInBlockAssembly(t, childTx2)
td.VerifyInBlockAssembly(t, childTx3)
td.VerifyNotInBlockAssembly(t, childTx3DS)
td.VerifyOnLongestChainInUtxoStore(t, childTx1)
td.VerifyOnLongestChainInUtxoStore(t, childTx2)
td.VerifyNotOnLongestChainInUtxoStore(t, childTx3)
td.VerifyOnLongestChainInUtxoStore(t, childTx3DS) // 0 -> 1 ... 2 -> 3 -> 4a -> 6a (*)

_, err = td.BlockchainClient.InvalidateBlock(t.Context(), block4b.Hash())
require.NoError(t, err)

// create a new block on 5a with tx3 in it
_, block6a := td.CreateTestBlock(t, block5a, 7001, childTx3)
require.NoError(t, td.BlockValidation.ValidateBlock(td.Ctx, block6a, "legacy", nil, false, true), "Failed to process block")
t.Logf("WaitForBlockBeingMined(t, block6a): %s", block6a.Hash().String())
td.WaitForBlockBeingMined(t, block6a)
t.Logf("WaitForBlock(t, block6a, blockWait): %s", block6a.Hash().String())
td.WaitForBlock(t, block6a, blockWait)

t.Logf("FINAL VERIFICATIONS:")
td.VerifyNotInBlockAssembly(t, childTx1)
td.VerifyNotInBlockAssembly(t, childTx2)
td.VerifyNotInBlockAssembly(t, childTx3)
td.VerifyNotInBlockAssembly(t, childTx3DS)
td.VerifyOnLongestChainInUtxoStore(t, childTx1)
td.VerifyOnLongestChainInUtxoStore(t, childTx2)
td.VerifyOnLongestChainInUtxoStore(t, childTx3)
td.VerifyOnLongestChainInUtxoStore(t, childTx3DS)
}
2 changes: 1 addition & 1 deletion test/sequentialtest/longest_chain/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

var (
blockWait = 5 * time.Second
blockWait = 30 * time.Second
)

func setupLongestChainTest(t *testing.T, utxoStoreType string) (td *daemon.TestDaemon, block3 *model.Block) {
Expand Down
Loading
Loading