Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 40 additions & 49 deletions daemon/daemon_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/bsv-blockchain/teranode/stores/utxo/aerospike"
utxofactory "github.com/bsv-blockchain/teranode/stores/utxo/factory"
"github.com/bsv-blockchain/teranode/ulogger"
"github.com/bsv-blockchain/teranode/util/kafka"
)

type Stores struct {
Expand All @@ -44,13 +43,12 @@ func (d *Stores) GetUtxoStore(ctx context.Context, logger ulogger.Logger,
return d.mainUtxoStore, nil
}

var err error

d.mainUtxoStore, err = utxofactory.NewStore(ctx, logger, appSettings, "main")
store, err := utxofactory.NewStore(ctx, logger, appSettings, "main")
if err != nil {
return nil, err
}

d.mainUtxoStore = store
return d.mainUtxoStore, nil
}

Expand All @@ -63,11 +61,13 @@ func (d *Stores) GetSubtreeValidationClient(ctx context.Context, logger ulogger.
return d.mainSubtreeValidationClient, nil
}

var err error

d.mainSubtreeValidationClient, err = subtreevalidation.NewClient(ctx, logger, appSettings, "main_stores")
client, err := subtreevalidation.NewClient(ctx, logger, appSettings, "main_stores")
if err != nil {
return nil, err
}

return d.mainSubtreeValidationClient, err
d.mainSubtreeValidationClient = client
return d.mainSubtreeValidationClient, nil
}

// GetBlockValidationClient returns the main block validation client instance. If the client
Expand All @@ -79,11 +79,13 @@ func (d *Stores) GetBlockValidationClient(ctx context.Context, logger ulogger.Lo
return d.mainBlockValidationClient, nil
}

var err error

d.mainBlockValidationClient, err = blockvalidation.NewClient(ctx, logger, appSettings, "main_stores")
client, err := blockvalidation.NewClient(ctx, logger, appSettings, "main_stores")
if err != nil {
return nil, err
}

return d.mainBlockValidationClient, err
d.mainBlockValidationClient = client
return d.mainBlockValidationClient, nil
}

// GetP2PClient creates and returns a new P2P client instance. Unlike other store getters, this function
Expand All @@ -103,14 +105,13 @@ func (d *Stores) GetP2PClient(ctx context.Context, logger ulogger.Logger, appSet
return d.mainP2PClient, nil
}

p2pClient, err := p2p.NewClient(ctx, logger, appSettings)
client, err := p2p.NewClient(ctx, logger, appSettings)
if err != nil {
return nil, err
}

d.mainP2PClient = p2pClient

return p2pClient, nil
d.mainP2PClient = client
return d.mainP2PClient, nil
}

// GetBlockchainClient creates and returns a new blockchain client instance. Unlike other store
Expand All @@ -129,16 +130,13 @@ func (d *Stores) GetBlockAssemblyClient(ctx context.Context, logger ulogger.Logg
return d.mainBlockAssemblyClient, nil
}

var err error

client, err := blockassembly.NewClient(ctx, logger, appSettings)
if err != nil {
return nil, err
}

d.mainBlockAssemblyClient = client

return client, nil
return d.mainBlockAssemblyClient, nil
}

// GetValidatorClient returns the main validator client instance. If the client hasn't been
Expand All @@ -150,51 +148,37 @@ func (d *Stores) GetValidatorClient(ctx context.Context, logger ulogger.Logger,
return d.mainValidatorClient, nil
}

var err error

localValidator := appSettings.Validator.UseLocalValidator

if localValidator {
logger.Infof("[Validator] Using local validator")

var utxoStore utxostore.Store

utxoStore, err = d.GetUtxoStore(ctx, logger, appSettings)
utxoStore, err := d.GetUtxoStore(ctx, logger, appSettings)
if err != nil {
return nil, errors.NewServiceError("could not create local validator client", err)
}

var txMetaKafkaProducerClient *kafka.KafkaAsyncProducer

txMetaKafkaProducerClient, err = getKafkaTxmetaAsyncProducer(ctx, logger, appSettings)
txMetaKafkaProducerClient, err := getKafkaTxmetaAsyncProducer(ctx, logger, appSettings)
if err != nil {
return nil, errors.NewServiceError("could not create txmeta kafka producer for local validator", err)
}

var rejectedTxKafkaProducerClient *kafka.KafkaAsyncProducer

rejectedTxKafkaProducerClient, err = getKafkaRejectedTxAsyncProducer(ctx, logger, appSettings)
rejectedTxKafkaProducerClient, err := getKafkaRejectedTxAsyncProducer(ctx, logger, appSettings)
if err != nil {
return nil, errors.NewServiceError("could not create rejectedTx kafka producer for local validator", err)
}

var blockAssemblyClient blockassembly.ClientI

blockAssemblyClient, err = d.GetBlockAssemblyClient(ctx, logger, appSettings)
blockAssemblyClient, err := d.GetBlockAssemblyClient(ctx, logger, appSettings)
if err != nil {
return nil, errors.NewServiceError("could not create block assembly client for local validator", err)
}

var validatorClient validator.Interface

var blockchainClient blockchain.ClientI

blockchainClient, err = d.GetBlockchainClient(ctx, logger, appSettings, "validator")
blockchainClient, err := d.GetBlockchainClient(ctx, logger, appSettings, "validator")
if err != nil {
return nil, errors.NewServiceError("could not create block validation client for local validator", err)
}

validatorClient, err = validator.New(ctx,
validatorClient, err := validator.New(ctx,
logger,
appSettings,
utxoStore,
Expand All @@ -207,15 +191,17 @@ func (d *Stores) GetValidatorClient(ctx context.Context, logger ulogger.Logger,
return nil, errors.NewServiceError("could not create local validator", err)
}

return validatorClient, nil
d.mainValidatorClient = validatorClient
return d.mainValidatorClient, nil
} else {
d.mainValidatorClient, err = validator.NewClient(ctx, logger, appSettings)
client, err := validator.NewClient(ctx, logger, appSettings)
if err != nil {
return nil, errors.NewServiceError("could not create validator client", err)
}
}

return d.mainValidatorClient, nil
d.mainValidatorClient = client
return d.mainValidatorClient, nil
}
}

// GetTxStore returns the main transaction store instance. If the store hasn't been initialized yet,
Expand All @@ -241,11 +227,12 @@ func (d *Stores) GetTxStore(logger ulogger.Logger, appSettings *settings.Setting
}
}

d.mainTxStore, err = blob.NewStore(logger, txStoreURL, options.WithHashPrefix(hashPrefix))
store, err := blob.NewStore(logger, txStoreURL, options.WithHashPrefix(hashPrefix))
if err != nil {
return nil, errors.NewServiceError("could not create tx store", err)
}

d.mainTxStore = store
return d.mainTxStore, nil
}

Expand Down Expand Up @@ -283,11 +270,12 @@ func (d *Stores) GetSubtreeStore(ctx context.Context, logger ulogger.Logger, app
return nil, errors.NewServiceError("could not create block height tracker channel", err)
}

d.mainSubtreeStore, err = blob.NewStore(logger, subtreeStoreURL, options.WithHashPrefix(hashPrefix), options.WithBlockHeightCh(ch))
store, err := blob.NewStore(logger, subtreeStoreURL, options.WithHashPrefix(hashPrefix), options.WithBlockHeightCh(ch))
if err != nil {
return nil, errors.NewServiceError("could not create subtree store", err)
}

d.mainSubtreeStore = store
return d.mainSubtreeStore, nil
}

Expand All @@ -314,11 +302,12 @@ func (d *Stores) GetTempStore(ctx context.Context, logger ulogger.Logger, appSet
return nil, errors.NewServiceError("could not create block height tracker channel", err)
}

d.mainTempStore, err = blob.NewStore(logger, tempStoreURL, options.WithBlockHeightCh(ch))
store, err := blob.NewStore(logger, tempStoreURL, options.WithBlockHeightCh(ch))
if err != nil {
return nil, errors.NewServiceError("could not create temp_store", err)
}

d.mainTempStore = store
return d.mainTempStore, nil
}

Expand Down Expand Up @@ -357,11 +346,12 @@ func (d *Stores) GetBlockStore(ctx context.Context, logger ulogger.Logger, appSe
return nil, errors.NewServiceError("could not create block height tracker channel", err)
}

d.mainBlockStore, err = blob.NewStore(logger, blockStoreURL, options.WithHashPrefix(hashPrefix), options.WithBlockHeightCh(ch))
store, err := blob.NewStore(logger, blockStoreURL, options.WithHashPrefix(hashPrefix), options.WithBlockHeightCh(ch))
if err != nil {
return nil, errors.NewServiceError("could not create block store", err)
}

d.mainBlockStore = store
return d.mainBlockStore, nil
}

Expand Down Expand Up @@ -399,11 +389,12 @@ func (d *Stores) GetBlockPersisterStore(ctx context.Context, logger ulogger.Logg
return nil, errors.NewServiceError("could not create block height tracker channel", err)
}

d.mainBlockPersisterStore, err = blob.NewStore(logger, blockStoreURL, options.WithHashPrefix(hashPrefix), options.WithBlockHeightCh(ch))
store, err := blob.NewStore(logger, blockStoreURL, options.WithHashPrefix(hashPrefix), options.WithBlockHeightCh(ch))
if err != nil {
return nil, errors.NewServiceError("could not create block persister store", err)
}

d.mainBlockPersisterStore = store
return d.mainBlockPersisterStore, nil
}

Expand Down
8 changes: 4 additions & 4 deletions deploy/docker/base/aerospike.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,20 @@ namespace utxo-store {

flush-size 128K
# Post-write cache to reduce I/O pressure (renamed from post-write-queue in v7.1)
post-write-cache 256
post-write-cache 1024
# Defrag settings - less aggressive to reduce write amplification
defrag-lwm-pct 50
defrag-sleep 2000
defrag-sleep 10000
# Eviction threshold
evict-used-pct 70
# Cache settings
read-page-cache true
# Maximum flush delay in milliseconds
flush-max-ms 1000
flush-max-ms 5000

# high number to allow slow storage to keep up in case of traffic peaks
# can be dangerous if the instance crashes or the storage can't keep up at all
# monitor the queue with `asadm -e "show statistics like write_q"`
max-write-cache 1024M
max-write-cache 4096M
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Significant Aerospike tuning changes without justification

This PR changes several critical Aerospike performance parameters:

  • post-write-cache: 256 -> 1024 (4x increase)
  • defrag-sleep: 2000 -> 10000 (5x slower defragmentation)
  • flush-max-ms: 1000 -> 5000 (5x longer flush delay)
  • max-write-cache: 1024M -> 4096M (4x larger write buffer)

These changes significantly affect UTXO store write performance and memory usage. While the PR title mentions ValidateMulti(), the Aerospike tuning seems unrelated to that feature.

Questions:

  1. Are these changes necessary for the worker pool/concurrent catchup features?
  2. Were these settings benchmarked with production workloads?
  3. Should these be in a separate PR focused on storage optimization?
  4. What happens on systems with less available RAM when max-write-cache is 4GB?

}
}
32 changes: 22 additions & 10 deletions model/Block.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,21 +681,33 @@ func (b *Block) validOrderAndBlessed(ctx context.Context, logger ulogger.Logger,
parentSpendsMap: NewSplitSyncedParentMap(4096),
}

concurrency := b.getValidationConcurrency(validOrderAndBlessedConcurrency)
g, gCtx := errgroup.WithContext(ctx)
util.SafeSetLimit(g, concurrency)
// Calculate optimal worker count for I/O-bound subtree validation
numWorkers := getOptimalSubtreeWorkerCount(len(b.SubtreeSlices), validOrderAndBlessedConcurrency)

for sIdx := 0; sIdx < len(b.SubtreeSlices); sIdx++ {
subtree := b.SubtreeSlices[sIdx]
sIdx := sIdx
// Create worker pool with parent context for proper cancellation/tracing
pool := newSubtreeWorkerPool(ctx, b, numWorkers, len(b.SubtreeSlices), logger, deps, validationCtx)
pool.Start()

g.Go(func() error {
return b.validateSubtree(gCtx, logger, deps, validationCtx, subtree, sIdx)
// Submit all subtrees as jobs to the worker pool
for sIdx := 0; sIdx < len(b.SubtreeSlices); sIdx++ {
pool.Submit(subtreeValidationJob{
subtreeIndex: sIdx,
subtree: b.SubtreeSlices[sIdx],
})
}

// do not wrap the error again, the error is already wrapped
return g.Wait()
// Wait for all validations to complete
pool.Close()

// Check for validation errors
for _, result := range pool.results {
if result.err != nil {
// Do not wrap the error again, the error is already wrapped
return result.err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: Early return on first error loses other validation errors

The current implementation returns immediately on the first subtree validation error found. This means if multiple subtrees have validation errors, only the first one discovered (which could be any due to concurrent execution) will be reported.

This makes debugging harder because developers won't see all the validation failures at once. Consider either:

  1. Collecting all errors and returning a combined error
  2. Documenting this "fail-fast" behavior explicitly in comments

The previous errgroup implementation had the same behavior, but it's worth considering if this is the desired outcome for block validation.

}
}

return nil
}

func (b *Block) validateSubtree(ctx context.Context, logger ulogger.Logger, deps *validationDependencies,
Expand Down
Loading
Loading