Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/references/kafkaMessageFormat.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ message KafkaTxValidationTopicMessage {
}

message KafkaTxValidationOptions {
bool skipUtxoCreation = 1; // Skip UTXO creation if true
bool SkipUtxoCreate = 1; // Skip UTXO creation if true
bool addTXToBlockAssembly = 2; // Add transaction to block assembly if true
bool skipPolicyChecks = 3; // Skip policy checks if true
bool createConflicting = 4; // Allow conflicting transactions if true
Expand Down Expand Up @@ -622,7 +622,7 @@ message KafkaTxValidationOptions {

##### KafkaTxValidationOptions

###### skipUtxoCreation
###### SkipUtxoCreate

- Type: bool
- Description: When true, the validator will not create UTXO entries for this transaction
Expand Down Expand Up @@ -655,7 +655,7 @@ Here's a JSON representation of the message content (for illustration purposes o
"tx": "<binary data - variable length>",
"height": 12345,
"options": {
"skipUtxoCreation": false,
"SkipUtxoCreate": false,
"addTXToBlockAssembly": true,
"skipPolicyChecks": false,
"createConflicting": false
Expand All @@ -674,7 +674,7 @@ currentHeight := uint32(12345) // current blockchain height

// Create options (using defaults in this example)
options := &kafkamessage.KafkaTxValidationOptions{
SkipUtxoCreation: false,
SkipUtxoCreate: false,
AddTXToBlockAssembly: true,
SkipPolicyChecks: false,
CreateConflicting: false,
Expand Down Expand Up @@ -726,13 +726,13 @@ func handleTxValidationMessage(msg *kafka.Message) error {
}

// Process the transaction with the provided options...
skipUtxoCreation := options.SkipUtxoCreation
SkipUtxoCreate := options.SkipUtxoCreate
addToBlockAssembly := options.AddTXToBlockAssembly
skipPolicyChecks := options.SkipPolicyChecks
createConflicting := options.CreateConflicting

// Perform validation based on options...
return validateTransaction(tx, height, skipUtxoCreation, addToBlockAssembly, skipPolicyChecks, createConflicting)
return validateTransaction(tx, height, SkipUtxoCreate, addToBlockAssembly, skipPolicyChecks, createConflicting)
}
```

Expand Down
17 changes: 9 additions & 8 deletions services/blockvalidation/BlockValidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestBlockValidationValidateBlockSmall(t *testing.T) {
require.NoError(t, err)

coinbase.Outputs = nil
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000+300)
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000)

subtreeHashes := make([]*chainhash.Hash, 0)
subtreeHashes = append(subtreeHashes, subtree.RootHash())
Expand Down Expand Up @@ -376,7 +376,7 @@ func TestBlockValidationValidateBlockSmall(t *testing.T) {
subtreeHashes, // should be the subtree with placeholder
uint64(subtree.Length()), // nolint:gosec
123123,
0, 0)
1, 0)
require.NoError(t, err)

blockChainStore, err := blockchain_store.NewStore(ulogger.TestLogger{}, &url.URL{Scheme: "sqlitememory"}, tSettings)
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestBlockValidationShouldNotAllowDuplicateCoinbasePlaceholder(t *testing.T)
require.NoError(t, err)

coinbase.Outputs = nil
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000+300)
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000)

require.True(t, coinbase.IsCoinbase())

Expand Down Expand Up @@ -641,7 +641,7 @@ func TestBlockValidationShouldNotAllowDuplicateCoinbaseTx(t *testing.T) {
require.NoError(t, err)

coinbase.Outputs = nil
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000+300)
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000)

require.True(t, coinbase.IsCoinbase())

Expand Down Expand Up @@ -777,7 +777,7 @@ func TestInvalidBlockWithoutGenesisBlock(t *testing.T) {

coinbase.Outputs = nil

_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000+300)
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000)

subtreeBytes, err := subtree.Serialize()
require.NoError(t, err)
Expand Down Expand Up @@ -898,7 +898,7 @@ func TestInvalidChainWithoutGenesisBlock(t *testing.T) {

coinbase.Outputs = nil

_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000+300)
_ = coinbase.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000)

subtreeBytes, err := subtree.Serialize()
require.NoError(t, err)
Expand Down Expand Up @@ -1194,7 +1194,7 @@ func TestBlockValidationRequestMissingTransaction(t *testing.T) {
}

// Create block header
nBits, _ := model.NewNBitFromString("2000ffff")
nBits, _ := model.NewNBitFromString("207fffff")
hashPrevBlock := chaincfg.RegressionNetParams.GenesisBlock.BlockHash()

// Calculate merkle root using coinbase and subtree
Expand Down Expand Up @@ -1245,7 +1245,7 @@ func TestBlockValidationRequestMissingTransaction(t *testing.T) {

return uint64(totalSize)
}(),
0, 0)
1, 0)

require.NoError(t, err)

Expand Down Expand Up @@ -1831,6 +1831,7 @@ func TestBlockValidation_DoubleSpendInBlock(t *testing.T) {

err = blockValidation.ValidateBlock(context.Background(), block, "http://localhost")
require.Error(t, err)
// With validator-based storage, double spend is caught during validation phase
require.ErrorContains(t, err, "BLOCK_INVALID")
require.ErrorContains(t, err, "has duplicate inputs")
}
Expand Down
2 changes: 1 addition & 1 deletion services/legacy/netsync/handle_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (sm *SyncManager) PreValidateTransactions(ctx context.Context, txMap *txmap
_, err = sm.validationClient.Validate(gCtx,
txWrapper.Tx,
blockHeight,
validator.WithSkipUtxoCreation(true),
validator.WithSkipUtxoCreate(true),
validator.WithAddTXToBlockAssembly(false),
validator.WithSkipPolicyChecks(true),
)
Expand Down
2 changes: 1 addition & 1 deletion services/propagation/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ func (ps *PropagationServer) validateTransactionViaKafka(btTx *bt.Tx) error {
Tx: btTx.SerializeBytes(),
Height: 0,
Options: &kafkamessage.KafkaTxValidationOptions{
SkipUtxoCreation: validationOptions.SkipUtxoCreation,
SkipUtxoCreate: validationOptions.SkipUtxoCreate,
AddTXToBlockAssembly: validationOptions.AddTXToBlockAssembly,
SkipPolicyChecks: validationOptions.SkipPolicyChecks,
CreateConflicting: validationOptions.CreateConflicting,
Expand Down
139 changes: 123 additions & 16 deletions services/subtreevalidation/check_block_subtrees.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,9 @@ func (u *Server) processTransactionsInLevels(ctx context.Context, allTransaction
// Only transactions that successfully validate should be included in parent metadata
successfulTxsByLevel := make(map[uint32]map[chainhash.Hash]bool)

// Pipeline: Store level N-1 (background) while processing level N
var storageGroup *errgroup.Group

// Process each level in series, but all transactions within a level in parallel
for level := uint32(0); level <= maxLevel; level++ {
levelTxs := txsPerLevel[level]
Expand Down Expand Up @@ -737,9 +740,25 @@ func (u *Server) processTransactionsInLevels(ctx context.Context, allTransaction
}
}

// Process all transactions at this level in parallel
g, gCtx := errgroup.WithContext(ctx)
util.SafeSetLimit(g, u.settings.SubtreeValidation.SpendBatcherSize*2)
// Step 3: Validate concurrently (CPU-intensive, can overlap with previous level storage!)
// Use CPU-based worker limit for CPU-bound validation work
validationGroup, vCtx := errgroup.WithContext(ctx)
// CPU-bound: Use existing CheckBlockSubtreesConcurrency setting
// This controls CPU-intensive script execution and signature validation
validationWorkers := u.settings.SubtreeValidation.CheckBlockSubtreesConcurrency
util.SafeSetLimit(validationGroup, validationWorkers)

// Build validation-only options (skip UTXO spending for validation phase)
validationOnlyOptions := []validator.Option{
validator.WithSkipPolicyChecks(true),
validator.WithCreateConflicting(true),
validator.WithIgnoreLocked(true),
validator.WithSkipUtxoSpend(true), // Skip UTXO operations during validation phase
}
if len(processedValidatorOptions.ParentMetadata) > 0 {
validationOnlyOptions = append(validationOnlyOptions, validator.WithParentMetadata(processedValidatorOptions.ParentMetadata))
}
processedValidationOptions := validator.ProcessOptions(validationOnlyOptions...)

for _, mTx := range levelTxs {
tx := mTx.tx
Expand All @@ -750,12 +769,12 @@ func (u *Server) processTransactionsInLevels(ctx context.Context, allTransaction
// Skip transactions that were already validated (found in cache or UTXO store)
if txMetaSlice[mTx.idx].isSet {
u.logger.Debugf("[processTransactionsInLevels] Transaction %s already validated (pre-check), skipping", tx.TxIDChainHash().String())
return nil
continue
}

g.Go(func() error {
validationGroup.Go(func() error {
// Use existing blessMissingTransaction logic for validation
txMeta, err := u.blessMissingTransaction(gCtx, blockHash, subtreeHash, tx, blockHeight, blockIds, processedValidatorOptions)
_, err := u.blessMissingTransaction(vCtx, blockHash, subtreeHash, tx, blockHeight, blockIds, processedValidationOptions)
if err != nil {
u.logger.Debugf("[processTransactionsInLevels] Failed to validate transaction %s: %v", tx.TxIDChainHash().String(), err)

Expand All @@ -774,7 +793,7 @@ func (u *Server) processTransactionsInLevels(ctx context.Context, allTransaction

// Handle missing parent transactions by adding to orphanage
if errors.Is(err, errors.ErrTxMissingParent) {
isRunning, runningErr := u.blockchainClient.IsFSMCurrentState(gCtx, blockchain.FSMStateRUNNING)
isRunning, runningErr := u.blockchainClient.IsFSMCurrentState(vCtx, blockchain.FSMStateRUNNING)
if runningErr == nil && isRunning {
u.logger.Debugf("[processTransactionsInLevels] Transaction %s missing parent, adding to orphanage", tx.TxIDChainHash().String())
if u.orphanage.Set(*tx.TxIDChainHash(), tx) {
Expand Down Expand Up @@ -804,22 +823,101 @@ func (u *Server) processTransactionsInLevels(ctx context.Context, allTransaction
successfulTxsByLevel[level][*tx.TxIDChainHash()] = true
successfulTxsMutex.Unlock()

if txMeta == nil {
u.logger.Debugf("[processTransactionsInLevels] Transaction metadata is nil for %s", tx.TxIDChainHash().String())
} else {
u.logger.Debugf("[processTransactionsInLevels] Successfully validated transaction %s", tx.TxIDChainHash().String())
u.logger.Debugf("[processTransactionsInLevels] Successfully validated transaction %s", tx.TxIDChainHash().String())
return nil
})
}

// Wait for validation to complete - fail fast on any error
if err = validationGroup.Wait(); err != nil {
return errors.NewProcessingError("[processTransactionsInLevels] Validation failed at level %d", level+1, err)
}

u.logger.Debugf("[processTransactionsInLevels] Level %d validation complete (%d transactions)", level, len(levelTxs))

// Step 4: Wait for previous level storage (dependency constraint)
// Level N storage cannot start until Level N-1 storage completes (UTXO dependencies)
// However, Level N+1 can extend+validate while Level N is storing (thanks to parent metadata!)
if storageGroup != nil {
u.logger.Debugf("[processTransactionsInLevels] Waiting for level %d storage to complete before starting level %d storage", level-1, level)
if err = storageGroup.Wait(); err != nil {
return errors.NewProcessingError("[processTransactionsInLevels] Previous level storage failed", err)
}
u.logger.Debugf("[processTransactionsInLevels] Level %d storage complete, starting level %d storage", level-1, level)
}

// Step 5: Start storage for current level (runs in background)
// Use higher worker limit for I/O-bound storage work (Aerospike network calls)
newStorageGroup, sCtx := errgroup.WithContext(ctx)
// I/O-bound: Use higher multiplier for network latency tolerance
// SpendBatcherSize controls batch size; multiply by 6 for I/O concurrency
storageWorkers := u.settings.SubtreeValidation.SpendBatcherSize * 6
util.SafeSetLimit(newStorageGroup, storageWorkers)

currentState, err := u.blockchainClient.GetFSMCurrentState(ctx)
if err != nil {
return errors.NewProcessingError("[processTransactionsInLevels] Failed to get FSM current state", err)
}

skipBlockAssembly := *currentState == blockchain.FSMStateLEGACYSYNCING || *currentState == blockchain.FSMStateCATCHINGBLOCKS

// Build storage-phase validator options
// Skip validation (already done in validation phase), but perform UTXO operations
storageOptions := &validator.Options{
SkipValidation: true, // Skip validation work (already done)
SkipUtxoCreate: false, // DO create UTXOs
SkipUtxoSpend: false, // DO spend parent UTXOs
SkipPolicyChecks: true,
CreateConflicting: true,
IgnoreLocked: true,
AddTXToBlockAssembly: !skipBlockAssembly,
ParentMetadata: processedValidationOptions.ParentMetadata, // Reuse parent metadata from validation phase
}

// Only process successfully validated transactions in storage phase
for _, mTx := range levelTxs {
tx := mTx.tx
if tx == nil {
continue
}

// Skip transactions that failed validation
successfulTxsMutex.Lock()
wasSuccessful := successfulTxsByLevel[level][*tx.TxIDChainHash()]
successfulTxsMutex.Unlock()

if !wasSuccessful {
u.logger.Debugf("[processTransactionsInLevels] Skipping storage for transaction %s (validation failed)", tx.TxIDChainHash().String())
continue
}

newStorageGroup.Go(func() error {
// Store: Call validator with SkipValidation=true to perform UTXO operations only
_, storeErr := u.validatorClient.ValidateWithOptions(sCtx, tx, blockHeight, storageOptions)
if storeErr != nil {
// These are success cases - transaction was already stored
if errors.Is(storeErr, errors.ErrTxExists) {
u.logger.Debugf("[processTransactionsInLevels] Transaction %s already exists, skipping", tx.TxIDChainHash().String())
return nil
}

if errors.Is(storeErr, errors.ErrTxConflicting) {
u.logger.Debugf("[processTransactionsInLevels] Transaction %s is conflicting, skipping", tx.TxIDChainHash().String())
return nil
}

// FAIL FAST: Return storage error immediately
return errors.NewProcessingError("[processTransactionsInLevels] Storage failed for tx %s at level %d", tx.TxIDChainHash().String(), level+1, storeErr)
}

return nil
})
}

// Fail early if we get an actual tx error thrown
if err = g.Wait(); err != nil {
return errors.NewProcessingError("[processTransactionsInLevels] Failed to process level %d", level+1, err)
}
// Store for next iteration - next level will wait for this to complete
storageGroup = newStorageGroup

u.logger.Debugf("[processTransactionsInLevels] Processing level %d/%d with %d transactions DONE", level+1, maxLevel+1, len(levelTxs))
u.logger.Debugf("[processTransactionsInLevels] Level %d storage started (%d transactions)", level, len(levelTxs))

// PHASE 2 OPTIMIZATION: Release grandparent level (level-2) after current level succeeds
// Keep current level (being processed) and parent level (level-1) for safety
Expand All @@ -831,6 +929,15 @@ func (u *Server) processTransactionsInLevels(ctx context.Context, allTransaction
}
}

// Wait for final level's storage to complete
if storageGroup != nil {
u.logger.Debugf("[processTransactionsInLevels] Waiting for final level storage to complete")
if err = storageGroup.Wait(); err != nil {
return errors.NewProcessingError("[processTransactionsInLevels] Final level storage failed", err)
}
u.logger.Debugf("[processTransactionsInLevels] Final level storage complete")
}

if errorsFound.Load() > 0 {
return errors.NewProcessingError("[processTransactionsInLevels] Completed processing with %d errors, %d transactions added to orphanage", errorsFound.Load(), addedToOrphanage.Load())
}
Expand Down
13 changes: 13 additions & 0 deletions services/subtreevalidation/check_block_subtrees_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,19 @@ func setupTestServer(t *testing.T) (*Server, func()) {
mockUtxoStore.On("GetMeta",
mock.Anything, mock.Anything).
Return(&utxometa.Data{}, nil).Maybe()
// Set up default mock for Get method (used by mock validator to extend transactions)
// Return a transaction with multiple outputs to support any output index
parentTxForExtension := bt.NewTx()
for i := 0; i < 20; i++ { // Create 20 outputs to handle any test case
_ = parentTxForExtension.AddP2PKHOutputFromAddress("1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa", 5000000000)
}
mockUtxoStore.On("Get",
mock.Anything, mock.Anything, mock.Anything).
Return(&utxometa.Data{Tx: parentTxForExtension, BlockHeights: []uint32{100}}, nil).Maybe()
// Set up default mock for Spend method
mockUtxoStore.On("Spend",
mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]*utxo.Spend{}, nil).Maybe()

// Mock validator client
mockValidatorClient := &validator.MockValidatorClient{}
Expand Down
10 changes: 5 additions & 5 deletions services/validator/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (c *Client) ValidateWithOptions(ctx context.Context, tx *bt.Tx, blockHeight
response, err := c.client.ValidateTransaction(ctx, &validator_api.ValidateTransactionRequest{
TransactionData: tx.SerializeBytes(),
BlockHeight: blockHeight,
SkipUtxoCreation: &validationOptions.SkipUtxoCreation,
SkipUtxoCreate: &validationOptions.SkipUtxoCreate,
AddTxToBlockAssembly: &validationOptions.AddTXToBlockAssembly,
SkipPolicyChecks: &validationOptions.SkipPolicyChecks,
CreateConflicting: &validationOptions.CreateConflicting,
Expand All @@ -229,7 +229,7 @@ func (c *Client) ValidateWithOptions(ctx context.Context, tx *bt.Tx, blockHeight
req: &validator_api.ValidateTransactionRequest{
TransactionData: tx.SerializeBytes(),
BlockHeight: blockHeight,
SkipUtxoCreation: &validationOptions.SkipUtxoCreation,
SkipUtxoCreate: &validationOptions.SkipUtxoCreate,
AddTxToBlockAssembly: &validationOptions.AddTXToBlockAssembly,
SkipPolicyChecks: &validationOptions.SkipPolicyChecks,
CreateConflicting: &validationOptions.CreateConflicting,
Expand Down Expand Up @@ -336,7 +336,7 @@ func (c *Client) handleBatchHTTPFallback(ctx context.Context, batch []*batchItem

// Create options from the request
options := &Options{
SkipUtxoCreation: *txReq.SkipUtxoCreation,
SkipUtxoCreate: *txReq.SkipUtxoCreate,
AddTXToBlockAssembly: *txReq.AddTxToBlockAssembly,
SkipPolicyChecks: *txReq.SkipPolicyChecks,
CreateConflicting: *txReq.CreateConflicting,
Expand Down Expand Up @@ -393,8 +393,8 @@ func (c *Client) validateTransactionViaHTTP(ctx context.Context, tx *bt.Tx, bloc

// Add validation options as query parameters
queryParams := url.Values{}
if validationOptions.SkipUtxoCreation {
queryParams.Add("skipUtxoCreation", "true")
if validationOptions.SkipUtxoCreate {
queryParams.Add("SkipUtxoCreate", "true")
}

if validationOptions.AddTXToBlockAssembly {
Expand Down
Loading
Loading