diff --git a/services/blockassembly/BlockAssembler.go b/services/blockassembly/BlockAssembler.go index e4358516b..c85e91c2f 100644 --- a/services/blockassembly/BlockAssembler.go +++ b/services/blockassembly/BlockAssembler.go @@ -762,6 +762,10 @@ func (b *BlockAssembler) Start(ctx context.Context) (err error) { return errors.NewProcessingError("[BlockAssembler] failed to initialize state: %v", err) } + if err = b.initializeCapacityLimit(); err != nil { + return errors.NewProcessingError("[BlockAssembler] failed to initialize capacity limit: %v", err) + } + // Wait for any pending blocks to be processed before loading unmined transactions if !b.skipWaitForPendingBlocks { if err = b.subtreeProcessor.WaitForPendingBlocks(ctx); err != nil { @@ -867,6 +871,22 @@ func (b *BlockAssembler) initState(ctx context.Context) error { return nil } +// initializeCapacityLimit sets the maximum unmined transaction limit. +// If MaxUnminedTransactions is 0, no limit is enforced (unlimited). +func (b *BlockAssembler) initializeCapacityLimit() error { + maxTx := b.settings.BlockAssembly.MaxUnminedTransactions + + if maxTx > 0 { + b.logger.Infof("[BlockAssembler] Setting max unmined transactions limit to %d", maxTx) + } else { + b.logger.Infof("[BlockAssembler] No limit on unmined transactions (MaxUnminedTransactions=0)") + } + + b.subtreeProcessor.SetMaxUnminedTransactions(maxTx) + + return nil +} + // GetState retrieves the current state of the block assembler from the blockchain. // // Parameters: diff --git a/services/blockassembly/Client.go b/services/blockassembly/Client.go index d518b6fcb..3f02c8c70 100644 --- a/services/blockassembly/Client.go +++ b/services/blockassembly/Client.go @@ -267,6 +267,35 @@ func (s *Client) RemoveTx(ctx context.Context, hash *chainhash.Hash) error { return unwrappedErr } +// CanAcceptTransaction checks if block assembly can accept more transactions. +// Returns capacity information to allow validator to fail fast before +// spending UTXOs if capacity limit has been reached. +// +// Parameters: +// - ctx: Context for cancellation +// - count: Number of transactions to check (default: 1) +// +// Returns: +// - canAccept: true if block assembly can accept the transactions +// - currentCount: current number of unmined transactions +// - maxLimit: maximum limit (0 = unlimited) +// - remainingCapacity: how many more transactions can be accepted +// - error: Any error encountered +func (s *Client) CanAcceptTransaction(ctx context.Context, count uint32) (canAccept bool, currentCount, maxLimit, remainingCapacity int64, err error) { + if count == 0 { + count = 1 + } + + resp, err := s.client.CanAcceptTransaction(ctx, &blockassembly_api.CanAcceptTransactionRequest{ + Count: count, + }) + if err != nil { + return false, 0, 0, 0, errors.UnwrapGRPC(err) + } + + return resp.CanAccept, resp.CurrentCount, resp.MaxLimit, resp.RemainingCapacity, nil +} + // GetMiningCandidate retrieves a candidate block for mining. // // Parameters: diff --git a/services/blockassembly/Interface.go b/services/blockassembly/Interface.go index a08ab7c3c..0b1f94d7a 100644 --- a/services/blockassembly/Interface.go +++ b/services/blockassembly/Interface.go @@ -59,6 +59,22 @@ type ClientI interface { // - error: Any error encountered during removal RemoveTx(ctx context.Context, hash *chainhash.Hash) error + // CanAcceptTransaction checks if block assembly can accept more transactions. + // Returns capacity information to allow validator to fail fast before + // spending UTXOs if capacity limit has been reached. + // + // Parameters: + // - ctx: Context for cancellation + // - count: Number of transactions to check (default: 1) + // + // Returns: + // - canAccept: true if block assembly can accept the transactions + // - currentCount: current number of unmined transactions + // - maxLimit: maximum limit (0 = unlimited) + // - remainingCapacity: how many more transactions can be accepted + // - error: Any error encountered + CanAcceptTransaction(ctx context.Context, count uint32) (canAccept bool, currentCount, maxLimit, remainingCapacity int64, err error) + // GetMiningCandidate retrieves a candidate block for mining. // // Parameters: @@ -177,4 +193,20 @@ type Store interface { // Returns: // - error: Any error encountered during removal RemoveTx(ctx context.Context, hash *chainhash.Hash) error + + // CanAcceptTransaction checks if block assembly can accept more transactions. + // Returns capacity information to allow validator to fail fast before + // spending UTXOs if capacity limit has been reached. + // + // Parameters: + // - ctx: Context for cancellation + // - count: Number of transactions to check (default: 1) + // + // Returns: + // - canAccept: true if block assembly can accept the transactions + // - currentCount: current number of unmined transactions + // - maxLimit: maximum limit (0 = unlimited) + // - remainingCapacity: how many more transactions can be accepted + // - error: Any error encountered + CanAcceptTransaction(ctx context.Context, count uint32) (canAccept bool, currentCount, maxLimit, remainingCapacity int64, err error) } diff --git a/services/blockassembly/Server.go b/services/blockassembly/Server.go index dbd556fd0..ae664a7db 100644 --- a/services/blockassembly/Server.go +++ b/services/blockassembly/Server.go @@ -46,6 +46,8 @@ import ( "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -890,6 +892,13 @@ func (ba *BlockAssembly) AddTx(ctx context.Context, req *blockassembly_api.AddTx } if !ba.settings.BlockAssembly.Disabled { + if !ba.blockAssembler.subtreeProcessor.CanAcceptTransactions(1) { + return nil, status.Errorf(codes.ResourceExhausted, + "capacity limit reached: current=%d, max=%d", + ba.blockAssembler.subtreeProcessor.CurrentTransactionCount(), + ba.blockAssembler.subtreeProcessor.GetMaxUnminedTransactions()) + } + ba.blockAssembler.AddTxBatch( []subtreepkg.Node{{Hash: chainhash.Hash(req.Txid), Fee: req.Fee, SizeInBytes: req.Size}}, []*subtreepkg.TxInpoints{&txInpoints}, @@ -1000,6 +1009,13 @@ func (ba *BlockAssembly) AddTxBatch(ctx context.Context, batch *blockassembly_ap // Add entire batch in one call if !ba.settings.BlockAssembly.Disabled { + if !ba.blockAssembler.subtreeProcessor.CanAcceptTransactions(len(nodes)) { + return nil, status.Errorf(codes.ResourceExhausted, + "capacity limit reached: current=%d, max=%d", + ba.blockAssembler.subtreeProcessor.CurrentTransactionCount(), + ba.blockAssembler.subtreeProcessor.GetMaxUnminedTransactions()) + } + ba.blockAssembler.AddTxBatch(nodes, txInpointsList) } @@ -1133,6 +1149,13 @@ func (ba *BlockAssembly) AddTxBatchColumnar(ctx context.Context, req *blockassem // Add entire batch in one call if !ba.settings.BlockAssembly.Disabled { + if !ba.blockAssembler.subtreeProcessor.CanAcceptTransactions(len(nodes)) { + return nil, status.Errorf(codes.ResourceExhausted, + "capacity limit reached: current=%d, max=%d", + ba.blockAssembler.subtreeProcessor.CurrentTransactionCount(), + ba.blockAssembler.subtreeProcessor.GetMaxUnminedTransactions()) + } + ba.blockAssembler.AddTxBatch(nodes, txInpointsList) } @@ -1993,3 +2016,33 @@ func (ba *BlockAssembly) SetSkipWaitForPendingBlocks(skip bool) { ba.blockAssembler.SetSkipWaitForPendingBlocks(skip) } } + +// CanAcceptTransaction checks if block assembly can accept more transactions. +// This method is used by the validator to fail fast before spending UTXOs +// if the capacity limit has been reached. +// +// Parameters: +// - ctx: Context for the operation +// - req: Request containing the number of transactions to check +// +// Returns: +// - Response with capacity information +// - error: Any error encountered +func (ba *BlockAssembly) CanAcceptTransaction(ctx context.Context, req *blockassembly_api.CanAcceptTransactionRequest) (*blockassembly_api.CanAcceptTransactionResponse, error) { + count := req.Count + if count == 0 { + count = 1 + } + + canAccept := ba.blockAssembler.subtreeProcessor.CanAcceptTransactions(int(count)) + currentCount := ba.blockAssembler.subtreeProcessor.CurrentTransactionCount() + maxLimit := ba.blockAssembler.subtreeProcessor.GetMaxUnminedTransactions() + remainingCapacity := ba.blockAssembler.subtreeProcessor.RemainingCapacity() + + return &blockassembly_api.CanAcceptTransactionResponse{ + CanAccept: canAccept, + CurrentCount: currentCount, + MaxLimit: maxLimit, + RemainingCapacity: remainingCapacity, + }, nil +} diff --git a/services/blockassembly/blockassembly_api/blockassembly_api.pb.go b/services/blockassembly/blockassembly_api/blockassembly_api.pb.go index f0c463b7a..b7aa0464f 100644 --- a/services/blockassembly/blockassembly_api/blockassembly_api.pb.go +++ b/services/blockassembly/blockassembly_api/blockassembly_api.pb.go @@ -1056,6 +1056,120 @@ func (x *GetBlockAssemblyTxsResponse) GetTxs() []string { return nil } +// Request for checking if block assembly can accept transactions. +type CanAcceptTransactionRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Count uint32 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` // the number of transactions to check (default: 1) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CanAcceptTransactionRequest) Reset() { + *x = CanAcceptTransactionRequest{} + mi := &file_services_blockassembly_blockassembly_api_blockassembly_api_proto_msgTypes[17] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CanAcceptTransactionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CanAcceptTransactionRequest) ProtoMessage() {} + +func (x *CanAcceptTransactionRequest) ProtoReflect() protoreflect.Message { + mi := &file_services_blockassembly_blockassembly_api_blockassembly_api_proto_msgTypes[17] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CanAcceptTransactionRequest.ProtoReflect.Descriptor instead. +func (*CanAcceptTransactionRequest) Descriptor() ([]byte, []int) { + return file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDescGZIP(), []int{17} +} + +func (x *CanAcceptTransactionRequest) GetCount() uint32 { + if x != nil { + return x.Count + } + return 0 +} + +// Response indicating whether block assembly can accept transactions. +type CanAcceptTransactionResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + CanAccept bool `protobuf:"varint,1,opt,name=can_accept,json=canAccept,proto3" json:"can_accept,omitempty"` // true if block assembly can accept the requested number of transactions + CurrentCount int64 `protobuf:"varint,2,opt,name=current_count,json=currentCount,proto3" json:"current_count,omitempty"` // current number of unmined transactions + MaxLimit int64 `protobuf:"varint,3,opt,name=max_limit,json=maxLimit,proto3" json:"max_limit,omitempty"` // maximum limit (0 = unlimited) + RemainingCapacity int64 `protobuf:"varint,4,opt,name=remaining_capacity,json=remainingCapacity,proto3" json:"remaining_capacity,omitempty"` // how many more transactions can be accepted + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CanAcceptTransactionResponse) Reset() { + *x = CanAcceptTransactionResponse{} + mi := &file_services_blockassembly_blockassembly_api_blockassembly_api_proto_msgTypes[18] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CanAcceptTransactionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CanAcceptTransactionResponse) ProtoMessage() {} + +func (x *CanAcceptTransactionResponse) ProtoReflect() protoreflect.Message { + mi := &file_services_blockassembly_blockassembly_api_blockassembly_api_proto_msgTypes[18] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CanAcceptTransactionResponse.ProtoReflect.Descriptor instead. +func (*CanAcceptTransactionResponse) Descriptor() ([]byte, []int) { + return file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDescGZIP(), []int{18} +} + +func (x *CanAcceptTransactionResponse) GetCanAccept() bool { + if x != nil { + return x.CanAccept + } + return false +} + +func (x *CanAcceptTransactionResponse) GetCurrentCount() int64 { + if x != nil { + return x.CurrentCount + } + return 0 +} + +func (x *CanAcceptTransactionResponse) GetMaxLimit() int64 { + if x != nil { + return x.MaxLimit + } + return 0 +} + +func (x *CanAcceptTransactionResponse) GetRemainingCapacity() int64 { + if x != nil { + return x.RemainingCapacity + } + return 0 +} + var File_services_blockassembly_blockassembly_api_blockassembly_api_proto protoreflect.FileDescriptor const file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDesc = "" + @@ -1140,7 +1254,15 @@ const file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawD "\x05block\x18\x01 \x01(\fR\x05block\"I\n" + "\x1bGetBlockAssemblyTxsResponse\x12\x18\n" + "\atxCount\x18\x01 \x01(\x04R\atxCount\x12\x10\n" + - "\x03txs\x18\x02 \x03(\tR\x03txs2\xbb\v\n" + + "\x03txs\x18\x02 \x03(\tR\x03txs\"3\n" + + "\x1bCanAcceptTransactionRequest\x12\x14\n" + + "\x05count\x18\x01 \x01(\rR\x05count\"\xae\x01\n" + + "\x1cCanAcceptTransactionResponse\x12\x1d\n" + + "\n" + + "can_accept\x18\x01 \x01(\bR\tcanAccept\x12#\n" + + "\rcurrent_count\x18\x02 \x01(\x03R\fcurrentCount\x12\x1b\n" + + "\tmax_limit\x18\x03 \x01(\x03R\bmaxLimit\x12-\n" + + "\x12remaining_capacity\x18\x04 \x01(\x03R\x11remainingCapacity2\xb6\f\n" + "\x10BlockAssemblyAPI\x12R\n" + "\n" + "HealthGRPC\x12\x1f.blockassembly_api.EmptyMessage\x1a!.blockassembly_api.HealthResponse\"\x00\x12L\n" + @@ -1158,7 +1280,8 @@ const file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawD "\x0eGenerateBlocks\x12(.blockassembly_api.GenerateBlocksRequest\x1a\x1f.blockassembly_api.EmptyMessage\"\x00\x12V\n" + "\x12CheckBlockAssembly\x12\x1f.blockassembly_api.EmptyMessage\x1a\x1d.blockassembly_api.OKResponse\"\x00\x12~\n" + "\x1eGetBlockAssemblyBlockCandidate\x12\x1f.blockassembly_api.EmptyMessage\x1a9.blockassembly_api.GetBlockAssemblyBlockCandidateResponse\"\x00\x12h\n" + - "\x13GetBlockAssemblyTxs\x12\x1f.blockassembly_api.EmptyMessage\x1a..blockassembly_api.GetBlockAssemblyTxsResponse\"\x00B\x16Z\x14./;blockassembly_apib\x06proto3" + "\x13GetBlockAssemblyTxs\x12\x1f.blockassembly_api.EmptyMessage\x1a..blockassembly_api.GetBlockAssemblyTxsResponse\"\x00\x12y\n" + + "\x14CanAcceptTransaction\x12..blockassembly_api.CanAcceptTransactionRequest\x1a/.blockassembly_api.CanAcceptTransactionResponse\"\x00B\x16Z\x14./;blockassembly_apib\x06proto3" var ( file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDescOnce sync.Once @@ -1172,7 +1295,7 @@ func file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDe return file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDescData } -var file_services_blockassembly_blockassembly_api_blockassembly_api_proto_msgTypes = make([]protoimpl.MessageInfo, 17) +var file_services_blockassembly_blockassembly_api_blockassembly_api_proto_msgTypes = make([]protoimpl.MessageInfo, 19) var file_services_blockassembly_blockassembly_api_blockassembly_api_proto_goTypes = []any{ (*EmptyMessage)(nil), // 0: blockassembly_api.EmptyMessage (*HealthResponse)(nil), // 1: blockassembly_api.HealthResponse @@ -1191,11 +1314,13 @@ var file_services_blockassembly_blockassembly_api_blockassembly_api_proto_goType (*GenerateBlocksRequest)(nil), // 14: blockassembly_api.GenerateBlocksRequest (*GetBlockAssemblyBlockCandidateResponse)(nil), // 15: blockassembly_api.GetBlockAssemblyBlockCandidateResponse (*GetBlockAssemblyTxsResponse)(nil), // 16: blockassembly_api.GetBlockAssemblyTxsResponse - (*timestamppb.Timestamp)(nil), // 17: google.protobuf.Timestamp - (*model.MiningCandidate)(nil), // 18: model.MiningCandidate + (*CanAcceptTransactionRequest)(nil), // 17: blockassembly_api.CanAcceptTransactionRequest + (*CanAcceptTransactionResponse)(nil), // 18: blockassembly_api.CanAcceptTransactionResponse + (*timestamppb.Timestamp)(nil), // 19: google.protobuf.Timestamp + (*model.MiningCandidate)(nil), // 20: model.MiningCandidate } var file_services_blockassembly_blockassembly_api_blockassembly_api_proto_depIdxs = []int32{ - 17, // 0: blockassembly_api.HealthResponse.timestamp:type_name -> google.protobuf.Timestamp + 19, // 0: blockassembly_api.HealthResponse.timestamp:type_name -> google.protobuf.Timestamp 3, // 1: blockassembly_api.AddTxBatchRequest.txRequests:type_name -> blockassembly_api.AddTxRequest 0, // 2: blockassembly_api.BlockAssemblyAPI.HealthGRPC:input_type -> blockassembly_api.EmptyMessage 3, // 3: blockassembly_api.BlockAssemblyAPI.AddTx:input_type -> blockassembly_api.AddTxRequest @@ -1212,23 +1337,25 @@ var file_services_blockassembly_blockassembly_api_blockassembly_api_proto_depIdx 0, // 14: blockassembly_api.BlockAssemblyAPI.CheckBlockAssembly:input_type -> blockassembly_api.EmptyMessage 0, // 15: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyBlockCandidate:input_type -> blockassembly_api.EmptyMessage 0, // 16: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyTxs:input_type -> blockassembly_api.EmptyMessage - 1, // 17: blockassembly_api.BlockAssemblyAPI.HealthGRPC:output_type -> blockassembly_api.HealthResponse - 8, // 18: blockassembly_api.BlockAssemblyAPI.AddTx:output_type -> blockassembly_api.AddTxResponse - 0, // 19: blockassembly_api.BlockAssemblyAPI.RemoveTx:output_type -> blockassembly_api.EmptyMessage - 9, // 20: blockassembly_api.BlockAssemblyAPI.AddTxBatch:output_type -> blockassembly_api.AddTxBatchResponse - 9, // 21: blockassembly_api.BlockAssemblyAPI.AddTxBatchColumnar:output_type -> blockassembly_api.AddTxBatchResponse - 18, // 22: blockassembly_api.BlockAssemblyAPI.GetMiningCandidate:output_type -> model.MiningCandidate - 13, // 23: blockassembly_api.BlockAssemblyAPI.GetCurrentDifficulty:output_type -> blockassembly_api.GetCurrentDifficultyResponse - 11, // 24: blockassembly_api.BlockAssemblyAPI.SubmitMiningSolution:output_type -> blockassembly_api.OKResponse - 0, // 25: blockassembly_api.BlockAssemblyAPI.ResetBlockAssembly:output_type -> blockassembly_api.EmptyMessage - 0, // 26: blockassembly_api.BlockAssemblyAPI.ResetBlockAssemblyFully:output_type -> blockassembly_api.EmptyMessage - 12, // 27: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyState:output_type -> blockassembly_api.StateMessage - 0, // 28: blockassembly_api.BlockAssemblyAPI.GenerateBlocks:output_type -> blockassembly_api.EmptyMessage - 11, // 29: blockassembly_api.BlockAssemblyAPI.CheckBlockAssembly:output_type -> blockassembly_api.OKResponse - 15, // 30: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyBlockCandidate:output_type -> blockassembly_api.GetBlockAssemblyBlockCandidateResponse - 16, // 31: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyTxs:output_type -> blockassembly_api.GetBlockAssemblyTxsResponse - 17, // [17:32] is the sub-list for method output_type - 2, // [2:17] is the sub-list for method input_type + 17, // 17: blockassembly_api.BlockAssemblyAPI.CanAcceptTransaction:input_type -> blockassembly_api.CanAcceptTransactionRequest + 1, // 18: blockassembly_api.BlockAssemblyAPI.HealthGRPC:output_type -> blockassembly_api.HealthResponse + 8, // 19: blockassembly_api.BlockAssemblyAPI.AddTx:output_type -> blockassembly_api.AddTxResponse + 0, // 20: blockassembly_api.BlockAssemblyAPI.RemoveTx:output_type -> blockassembly_api.EmptyMessage + 9, // 21: blockassembly_api.BlockAssemblyAPI.AddTxBatch:output_type -> blockassembly_api.AddTxBatchResponse + 9, // 22: blockassembly_api.BlockAssemblyAPI.AddTxBatchColumnar:output_type -> blockassembly_api.AddTxBatchResponse + 20, // 23: blockassembly_api.BlockAssemblyAPI.GetMiningCandidate:output_type -> model.MiningCandidate + 13, // 24: blockassembly_api.BlockAssemblyAPI.GetCurrentDifficulty:output_type -> blockassembly_api.GetCurrentDifficultyResponse + 11, // 25: blockassembly_api.BlockAssemblyAPI.SubmitMiningSolution:output_type -> blockassembly_api.OKResponse + 0, // 26: blockassembly_api.BlockAssemblyAPI.ResetBlockAssembly:output_type -> blockassembly_api.EmptyMessage + 0, // 27: blockassembly_api.BlockAssemblyAPI.ResetBlockAssemblyFully:output_type -> blockassembly_api.EmptyMessage + 12, // 28: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyState:output_type -> blockassembly_api.StateMessage + 0, // 29: blockassembly_api.BlockAssemblyAPI.GenerateBlocks:output_type -> blockassembly_api.EmptyMessage + 11, // 30: blockassembly_api.BlockAssemblyAPI.CheckBlockAssembly:output_type -> blockassembly_api.OKResponse + 15, // 31: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyBlockCandidate:output_type -> blockassembly_api.GetBlockAssemblyBlockCandidateResponse + 16, // 32: blockassembly_api.BlockAssemblyAPI.GetBlockAssemblyTxs:output_type -> blockassembly_api.GetBlockAssemblyTxsResponse + 18, // 33: blockassembly_api.BlockAssemblyAPI.CanAcceptTransaction:output_type -> blockassembly_api.CanAcceptTransactionResponse + 18, // [18:34] is the sub-list for method output_type + 2, // [2:18] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name 2, // [2:2] is the sub-list for extension extendee 0, // [0:2] is the sub-list for field type_name @@ -1247,7 +1374,7 @@ func file_services_blockassembly_blockassembly_api_blockassembly_api_proto_init( GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDesc), len(file_services_blockassembly_blockassembly_api_blockassembly_api_proto_rawDesc)), NumEnums: 0, - NumMessages: 17, + NumMessages: 19, NumExtensions: 0, NumServices: 1, }, diff --git a/services/blockassembly/blockassembly_api/blockassembly_api.proto b/services/blockassembly/blockassembly_api/blockassembly_api.proto index f59ed7275..8cd5b4efa 100644 --- a/services/blockassembly/blockassembly_api/blockassembly_api.proto +++ b/services/blockassembly/blockassembly_api/blockassembly_api.proto @@ -76,6 +76,11 @@ service BlockAssemblyAPI { // This provides visibility into the transactions that are candidates for inclusion in the next block. // NOTE: this method is primarily for debugging purposes and may not be suitable for production use. rpc GetBlockAssemblyTxs (EmptyMessage) returns (GetBlockAssemblyTxsResponse) {} + + // CanAcceptTransaction checks if block assembly can accept more transactions. + // Returns information about current capacity and whether new transactions can be accepted. + // Used by validator to fail fast before spending UTXOs if capacity is reached. + rpc CanAcceptTransaction (CanAcceptTransactionRequest) returns (CanAcceptTransactionResponse) {} } // An empty message used as a placeholder or a request with no data. @@ -232,3 +237,16 @@ message GetBlockAssemblyTxsResponse { uint64 txCount = 1; // the number of transactions in the block assembly repeated string txs = 2; // the transactions currently being assembled in the block assembly } + +// Request for checking if block assembly can accept transactions. +message CanAcceptTransactionRequest { + uint32 count = 1; // the number of transactions to check (default: 1) +} + +// Response indicating whether block assembly can accept transactions. +message CanAcceptTransactionResponse { + bool can_accept = 1; // true if block assembly can accept the requested number of transactions + int64 current_count = 2; // current number of unmined transactions + int64 max_limit = 3; // maximum limit (0 = unlimited) + int64 remaining_capacity = 4; // how many more transactions can be accepted +} diff --git a/services/blockassembly/blockassembly_api/blockassembly_api_grpc.pb.go b/services/blockassembly/blockassembly_api/blockassembly_api_grpc.pb.go index 5c36e329a..6cd79ee73 100644 --- a/services/blockassembly/blockassembly_api/blockassembly_api_grpc.pb.go +++ b/services/blockassembly/blockassembly_api/blockassembly_api_grpc.pb.go @@ -35,6 +35,7 @@ const ( BlockAssemblyAPI_CheckBlockAssembly_FullMethodName = "/blockassembly_api.BlockAssemblyAPI/CheckBlockAssembly" BlockAssemblyAPI_GetBlockAssemblyBlockCandidate_FullMethodName = "/blockassembly_api.BlockAssemblyAPI/GetBlockAssemblyBlockCandidate" BlockAssemblyAPI_GetBlockAssemblyTxs_FullMethodName = "/blockassembly_api.BlockAssemblyAPI/GetBlockAssemblyTxs" + BlockAssemblyAPI_CanAcceptTransaction_FullMethodName = "/blockassembly_api.BlockAssemblyAPI/CanAcceptTransaction" ) // BlockAssemblyAPIClient is the client API for BlockAssemblyAPI service. @@ -94,6 +95,10 @@ type BlockAssemblyAPIClient interface { // This provides visibility into the transactions that are candidates for inclusion in the next block. // NOTE: this method is primarily for debugging purposes and may not be suitable for production use. GetBlockAssemblyTxs(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (*GetBlockAssemblyTxsResponse, error) + // CanAcceptTransaction checks if block assembly can accept more transactions. + // Returns information about current capacity and whether new transactions can be accepted. + // Used by validator to fail fast before spending UTXOs if capacity is reached. + CanAcceptTransaction(ctx context.Context, in *CanAcceptTransactionRequest, opts ...grpc.CallOption) (*CanAcceptTransactionResponse, error) } type blockAssemblyAPIClient struct { @@ -254,6 +259,16 @@ func (c *blockAssemblyAPIClient) GetBlockAssemblyTxs(ctx context.Context, in *Em return out, nil } +func (c *blockAssemblyAPIClient) CanAcceptTransaction(ctx context.Context, in *CanAcceptTransactionRequest, opts ...grpc.CallOption) (*CanAcceptTransactionResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(CanAcceptTransactionResponse) + err := c.cc.Invoke(ctx, BlockAssemblyAPI_CanAcceptTransaction_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // BlockAssemblyAPIServer is the server API for BlockAssemblyAPI service. // All implementations must embed UnimplementedBlockAssemblyAPIServer // for forward compatibility. @@ -311,6 +326,10 @@ type BlockAssemblyAPIServer interface { // This provides visibility into the transactions that are candidates for inclusion in the next block. // NOTE: this method is primarily for debugging purposes and may not be suitable for production use. GetBlockAssemblyTxs(context.Context, *EmptyMessage) (*GetBlockAssemblyTxsResponse, error) + // CanAcceptTransaction checks if block assembly can accept more transactions. + // Returns information about current capacity and whether new transactions can be accepted. + // Used by validator to fail fast before spending UTXOs if capacity is reached. + CanAcceptTransaction(context.Context, *CanAcceptTransactionRequest) (*CanAcceptTransactionResponse, error) mustEmbedUnimplementedBlockAssemblyAPIServer() } @@ -366,6 +385,9 @@ func (UnimplementedBlockAssemblyAPIServer) GetBlockAssemblyBlockCandidate(contex func (UnimplementedBlockAssemblyAPIServer) GetBlockAssemblyTxs(context.Context, *EmptyMessage) (*GetBlockAssemblyTxsResponse, error) { return nil, status.Error(codes.Unimplemented, "method GetBlockAssemblyTxs not implemented") } +func (UnimplementedBlockAssemblyAPIServer) CanAcceptTransaction(context.Context, *CanAcceptTransactionRequest) (*CanAcceptTransactionResponse, error) { + return nil, status.Error(codes.Unimplemented, "method CanAcceptTransaction not implemented") +} func (UnimplementedBlockAssemblyAPIServer) mustEmbedUnimplementedBlockAssemblyAPIServer() {} func (UnimplementedBlockAssemblyAPIServer) testEmbeddedByValue() {} @@ -657,6 +679,24 @@ func _BlockAssemblyAPI_GetBlockAssemblyTxs_Handler(srv interface{}, ctx context. return interceptor(ctx, in, info, handler) } +func _BlockAssemblyAPI_CanAcceptTransaction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CanAcceptTransactionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BlockAssemblyAPIServer).CanAcceptTransaction(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: BlockAssemblyAPI_CanAcceptTransaction_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BlockAssemblyAPIServer).CanAcceptTransaction(ctx, req.(*CanAcceptTransactionRequest)) + } + return interceptor(ctx, in, info, handler) +} + // BlockAssemblyAPI_ServiceDesc is the grpc.ServiceDesc for BlockAssemblyAPI service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -724,6 +764,10 @@ var BlockAssemblyAPI_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetBlockAssemblyTxs", Handler: _BlockAssemblyAPI_GetBlockAssemblyTxs_Handler, }, + { + MethodName: "CanAcceptTransaction", + Handler: _BlockAssemblyAPI_CanAcceptTransaction_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "services/blockassembly/blockassembly_api/blockassembly_api.proto", diff --git a/services/blockassembly/mock.go b/services/blockassembly/mock.go index cbd3ea7be..d8ca737f0 100644 --- a/services/blockassembly/mock.go +++ b/services/blockassembly/mock.go @@ -72,6 +72,16 @@ func (m *Mock) RemoveTx(ctx context.Context, hash *chainhash.Hash) error { return nil } +func (m *Mock) CanAcceptTransaction(ctx context.Context, count uint32) (canAccept bool, currentCount, maxLimit, remainingCapacity int64, err error) { + args := m.Called(ctx, count) + + if args.Error(4) != nil { + return false, 0, 0, 0, args.Error(4) + } + + return args.Bool(0), args.Get(1).(int64), args.Get(2).(int64), args.Get(3).(int64), nil +} + func (m *Mock) GetMiningCandidate(ctx context.Context, includeSubtreeHashes ...bool) (*model.MiningCandidate, error) { args := m.Called(ctx, includeSubtreeHashes) @@ -195,6 +205,14 @@ func (m *mockBlockAssemblyAPIClient) RemoveTx(ctx context.Context, in *blockasse return args.Get(0).(*blockassembly_api.EmptyMessage), args.Error(1) } +func (m *mockBlockAssemblyAPIClient) CanAcceptTransaction(ctx context.Context, in *blockassembly_api.CanAcceptTransactionRequest, opts ...grpc.CallOption) (*blockassembly_api.CanAcceptTransactionResponse, error) { + args := m.Called(ctx, in, opts) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*blockassembly_api.CanAcceptTransactionResponse), args.Error(1) +} + func (m *mockBlockAssemblyAPIClient) AddTxBatch(ctx context.Context, in *blockassembly_api.AddTxBatchRequest, opts ...grpc.CallOption) (*blockassembly_api.AddTxBatchResponse, error) { args := m.Called(ctx, in, opts) if args.Get(0) == nil { diff --git a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go index e3f2ec716..814c078cd 100644 --- a/services/blockassembly/subtreeprocessor/SubtreeProcessor.go +++ b/services/blockassembly/subtreeprocessor/SubtreeProcessor.go @@ -254,6 +254,14 @@ type SubtreeProcessor struct { // startOnce ensures the processing goroutine is only started once startOnce sync.Once + + // maxUnminedTransactions is the configured or calculated limit for unmined transactions + // A value of 0 means unlimited (no capacity limit enforced) + maxUnminedTransactions atomic.Int64 + + // capacityLimitReached tracks if the capacity limit has ever been reached during this session + // This is used for metrics and alerting purposes + capacityLimitReached atomic.Bool } type State uint32 @@ -1266,6 +1274,77 @@ func (stp *SubtreeProcessor) SubtreeCount() int { return int(stp.chainedSubtreeCount.Load()) + 01 } +// CurrentTransactionCount returns the total number of transactions currently in RAM. +// This includes transactions in the currentTxMap and the processing queue. +// +// Returns: +// - int64: Total transaction count in RAM +func (stp *SubtreeProcessor) CurrentTransactionCount() int64 { + return int64(stp.currentTxMap.Length()) + stp.queue.length() +} + +// CanAcceptTransactions checks if the specified number of transactions can be accepted +// without exceeding the capacity limit. +// +// Parameters: +// - count: Number of transactions to check +// +// Returns: +// - bool: true if transactions can be accepted, false if capacity would be exceeded +func (stp *SubtreeProcessor) CanAcceptTransactions(count int) bool { + maxLimit := stp.maxUnminedTransactions.Load() + if maxLimit <= 0 { + return true + } + + current := stp.CurrentTransactionCount() + + return current+int64(count) <= maxLimit +} + +// RemainingCapacity returns how many more transactions can be accepted before +// reaching the capacity limit. +// +// Returns: +// - int64: Remaining capacity (0 or negative if at/over limit, MaxInt64 if unlimited) +func (stp *SubtreeProcessor) RemainingCapacity() int64 { + maxLimit := stp.maxUnminedTransactions.Load() + if maxLimit <= 0 { + return math.MaxInt64 + } + + current := stp.CurrentTransactionCount() + + return maxLimit - current +} + +// IsCapacityLimitReached returns true if the capacity limit has been reached at any point +// during this session. This is useful for metrics and alerting. +// +// Returns: +// - bool: true if capacity limit has been reached +func (stp *SubtreeProcessor) IsCapacityLimitReached() bool { + return stp.capacityLimitReached.Load() +} + +// SetMaxUnminedTransactions sets the maximum unmined transaction limit. +// A value of 0 means unlimited (no capacity limit enforced). +// +// Parameters: +// - max: Maximum number of unmined transactions allowed +func (stp *SubtreeProcessor) SetMaxUnminedTransactions(max int64) { + stp.maxUnminedTransactions.Store(max) + prometheusMaxUnminedTransactions.Set(float64(max)) +} + +// GetMaxUnminedTransactions returns the configured maximum unmined transaction limit. +// +// Returns: +// - int64: Maximum limit (0 means unlimited) +func (stp *SubtreeProcessor) GetMaxUnminedTransactions() int64 { + return stp.maxUnminedTransactions.Load() +} + // adjustSubtreeSize calculates and sets a new subtree size based on recent block statistics // to maintain approximately one subtree per second. The size will always be a power of 2 // and not smaller than 1024. @@ -1619,11 +1698,21 @@ func (stp *SubtreeProcessor) processCompleteSubtree(skipNotification bool) (err } // AddBatch adds a batch of transaction nodes to the processor queue. +// If the capacity limit is reached, the batch is rejected and logged. // // Parameters: // - nodes: Transaction nodes to add // - txInpoints: Parent transaction references for each node func (stp *SubtreeProcessor) AddBatch(nodes []subtreepkg.Node, txInpoints []*subtreepkg.TxInpoints) { + if !stp.CanAcceptTransactions(len(nodes)) { + stp.capacityLimitReached.Store(true) + prometheusCapacityLimitReached.Set(1) + prometheusCapacityLimitRejected.Add(float64(len(nodes))) + stp.logger.Warnf("[AddBatch] Capacity limit reached, rejecting %d transactions (current: %d, max: %d)", len(nodes), stp.CurrentTransactionCount(), stp.maxUnminedTransactions.Load()) + + return + } + stp.queue.enqueueBatch(nodes, txInpoints) } @@ -1651,6 +1740,7 @@ func (stp *SubtreeProcessor) AddDirectly(node *subtreepkg.Node, txInpoints *subt // AddNodesDirectly adds a batch of unmined transactions directly to the processor without going through the queue. // It performs parallel filtering/insertion into currentTxMap and sequential insertion into subtrees. // This bypasses the queue and is useful for bulk loading transactions at startup. +// If the capacity limit would be exceeded, the transaction list is truncated to fit. // // Parameters: // - txs: Unmined transactions to add @@ -1663,6 +1753,27 @@ func (stp *SubtreeProcessor) AddNodesDirectly(txs []*utxostore.UnminedTransactio return nil } + maxLimit := stp.maxUnminedTransactions.Load() + if maxLimit > 0 { + remaining := stp.RemainingCapacity() + if int64(len(txs)) > remaining { + if remaining <= 0 { + stp.capacityLimitReached.Store(true) + prometheusCapacityLimitReached.Set(1) + prometheusCapacityLimitRejected.Add(float64(len(txs))) + stp.logger.Warnf("[AddNodesDirectly] Capacity limit reached, cannot load any transactions (current: %d, max: %d)", stp.CurrentTransactionCount(), maxLimit) + + return nil + } + + stp.capacityLimitReached.Store(true) + prometheusCapacityLimitReached.Set(1) + prometheusCapacityLimitRejected.Add(float64(int64(len(txs)) - remaining)) + stp.logger.Warnf("[AddNodesDirectly] Truncating unmined transactions from %d to %d due to capacity limit (max: %d)", len(txs), remaining, maxLimit) + txs = txs[:remaining] + } + } + // Phase 1: Parallel insertion into currentTxMap using 1024 batches const numWorkers = 1024 currentTxMap := stp.currentTxMap diff --git a/services/blockassembly/subtreeprocessor/capacity_limit_test.go b/services/blockassembly/subtreeprocessor/capacity_limit_test.go new file mode 100644 index 000000000..b278d8fdb --- /dev/null +++ b/services/blockassembly/subtreeprocessor/capacity_limit_test.go @@ -0,0 +1,54 @@ +package subtreeprocessor + +import ( + "math" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCapacityLimit_SetAndGet(t *testing.T) { + stp := setupTestSubtreeProcessor(t) + + require.Equal(t, int64(0), stp.GetMaxUnminedTransactions(), "default should be 0 (unlimited)") + require.True(t, stp.CanAcceptTransactions(1000000), "should accept when unlimited") + require.Equal(t, int64(math.MaxInt64), stp.RemainingCapacity(), "remaining should be MaxInt64 when unlimited") + + stp.SetMaxUnminedTransactions(1000) + require.Equal(t, int64(1000), stp.GetMaxUnminedTransactions()) +} + +func TestCapacityLimit_CanAcceptTransactions(t *testing.T) { + stp := setupTestSubtreeProcessor(t) + stp.SetMaxUnminedTransactions(100) + + require.True(t, stp.CanAcceptTransactions(50), "should accept when below limit") + require.True(t, stp.CanAcceptTransactions(100), "should accept when exactly at limit") + require.False(t, stp.CanAcceptTransactions(101), "should reject when exceeding limit") +} + +func TestCapacityLimit_RemainingCapacity(t *testing.T) { + stp := setupTestSubtreeProcessor(t) + stp.SetMaxUnminedTransactions(100) + + remaining := stp.RemainingCapacity() + require.True(t, remaining > 0, "remaining should be positive when under limit") +} + +func TestCapacityLimit_IsCapacityLimitReached(t *testing.T) { + stp := setupTestSubtreeProcessor(t) + stp.SetMaxUnminedTransactions(10) + + require.False(t, stp.IsCapacityLimitReached(), "should not be reached initially") + + stp.capacityLimitReached.Store(true) + require.True(t, stp.IsCapacityLimitReached(), "should be reached after being set") +} + +func TestCapacityLimit_Unlimited(t *testing.T) { + stp := setupTestSubtreeProcessor(t) + stp.SetMaxUnminedTransactions(0) + + require.True(t, stp.CanAcceptTransactions(1000000000), "should accept any amount when unlimited") + require.Equal(t, int64(math.MaxInt64), stp.RemainingCapacity(), "remaining should be MaxInt64 when unlimited") +} diff --git a/services/blockassembly/subtreeprocessor/interface.go b/services/blockassembly/subtreeprocessor/interface.go index 59d0a3e35..3fa351051 100644 --- a/services/blockassembly/subtreeprocessor/interface.go +++ b/services/blockassembly/subtreeprocessor/interface.go @@ -286,6 +286,50 @@ type Interface interface { // Parameters: // - ctx: Context for the stop operation Stop(ctx context.Context) + + // CurrentTransactionCount returns the total number of transactions currently in RAM. + // This includes transactions in the currentTxMap and the processing queue. + // + // Returns: + // - int64: Total transaction count in RAM + CurrentTransactionCount() int64 + + // CanAcceptTransactions checks if the specified number of transactions can be accepted + // without exceeding the capacity limit. + // + // Parameters: + // - count: Number of transactions to check + // + // Returns: + // - bool: true if transactions can be accepted, false if capacity would be exceeded + CanAcceptTransactions(count int) bool + + // RemainingCapacity returns how many more transactions can be accepted before + // reaching the capacity limit. + // + // Returns: + // - int64: Remaining capacity (0 or negative if at/over limit, MaxInt64 if unlimited) + RemainingCapacity() int64 + + // IsCapacityLimitReached returns true if the capacity limit has been reached at any point + // during this session. + // + // Returns: + // - bool: true if capacity limit has been reached + IsCapacityLimitReached() bool + + // SetMaxUnminedTransactions sets the maximum unmined transaction limit. + // A value of 0 means unlimited. + // + // Parameters: + // - max: Maximum number of unmined transactions allowed + SetMaxUnminedTransactions(max int64) + + // GetMaxUnminedTransactions returns the configured maximum unmined transaction limit. + // + // Returns: + // - int64: Maximum limit (0 means unlimited) + GetMaxUnminedTransactions() int64 } // TxInpointsMap defines the interface for transaction inpoints storage with hash keys. diff --git a/services/blockassembly/subtreeprocessor/metrics.go b/services/blockassembly/subtreeprocessor/metrics.go index d0da4964c..015456f6c 100644 --- a/services/blockassembly/subtreeprocessor/metrics.go +++ b/services/blockassembly/subtreeprocessor/metrics.go @@ -32,6 +32,10 @@ var ( prometheusSubtreeProcessorCurrentState prometheus.Gauge prometheusBlockAssemblySubtreeCompleteHist prometheus.Histogram prometheusSubtreeProcessorDequeuedTxs prometheus.Counter + + prometheusCapacityLimitRejected prometheus.Counter + prometheusCapacityLimitReached prometheus.Gauge + prometheusMaxUnminedTransactions prometheus.Gauge ) var ( @@ -211,4 +215,31 @@ func _initPrometheusMetrics() { Help: "Number of transactions dequeued from subtree processor", }, ) + + prometheusCapacityLimitRejected = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: "teranode", + Subsystem: "subtreeprocessor", + Name: "capacity_limit_rejected_total", + Help: "Total transactions rejected due to capacity limit", + }, + ) + + prometheusCapacityLimitReached = promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: "teranode", + Subsystem: "subtreeprocessor", + Name: "capacity_limit_reached", + Help: "1 if capacity limit has been reached, 0 otherwise", + }, + ) + + prometheusMaxUnminedTransactions = promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: "teranode", + Subsystem: "subtreeprocessor", + Name: "max_unmined_transactions", + Help: "Configured maximum unmined transactions (0 = unlimited)", + }, + ) } diff --git a/services/blockassembly/subtreeprocessor/mock.go b/services/blockassembly/subtreeprocessor/mock.go index 0fa6e404a..cfda2995f 100644 --- a/services/blockassembly/subtreeprocessor/mock.go +++ b/services/blockassembly/subtreeprocessor/mock.go @@ -211,3 +211,43 @@ func (m *MockSubtreeProcessor) WaitForPendingBlocks(ctx context.Context) error { func (m *MockSubtreeProcessor) Stop(ctx context.Context) { m.Called(ctx) } + +// CurrentTransactionCount implements Interface.CurrentTransactionCount +func (m *MockSubtreeProcessor) CurrentTransactionCount() int64 { + args := m.Called() + + return args.Get(0).(int64) +} + +// CanAcceptTransactions implements Interface.CanAcceptTransactions +func (m *MockSubtreeProcessor) CanAcceptTransactions(count int) bool { + args := m.Called(count) + + return args.Bool(0) +} + +// RemainingCapacity implements Interface.RemainingCapacity +func (m *MockSubtreeProcessor) RemainingCapacity() int64 { + args := m.Called() + + return args.Get(0).(int64) +} + +// IsCapacityLimitReached implements Interface.IsCapacityLimitReached +func (m *MockSubtreeProcessor) IsCapacityLimitReached() bool { + args := m.Called() + + return args.Bool(0) +} + +// SetMaxUnminedTransactions implements Interface.SetMaxUnminedTransactions +func (m *MockSubtreeProcessor) SetMaxUnminedTransactions(max int64) { + m.Called(max) +} + +// GetMaxUnminedTransactions implements Interface.GetMaxUnminedTransactions +func (m *MockSubtreeProcessor) GetMaxUnminedTransactions() int64 { + args := m.Called() + + return args.Get(0).(int64) +} diff --git a/services/blockchain/blockchain_api/blockchain_api.pb.go b/services/blockchain/blockchain_api/blockchain_api.pb.go index 483b9fee8..c822cde85 100644 --- a/services/blockchain/blockchain_api/blockchain_api.pb.go +++ b/services/blockchain/blockchain_api/blockchain_api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.11 -// protoc v6.33.4 +// protoc v6.33.2 // source: services/blockchain/blockchain_api/blockchain_api.proto // Package blockchain_api defines the gRPC service interface for blockchain operations. diff --git a/services/blockchain/blockchain_api/blockchain_api_grpc.pb.go b/services/blockchain/blockchain_api/blockchain_api_grpc.pb.go index bf7d00eca..b87cf9d76 100644 --- a/services/blockchain/blockchain_api/blockchain_api_grpc.pb.go +++ b/services/blockchain/blockchain_api/blockchain_api_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.6.0 -// - protoc v6.33.4 +// - protoc v6.33.2 // source: services/blockchain/blockchain_api/blockchain_api.proto // Package blockchain_api defines the gRPC service interface for blockchain operations. diff --git a/services/rpc/handlers_additional_test.go b/services/rpc/handlers_additional_test.go index b1cd17bcf..fba046146 100644 --- a/services/rpc/handlers_additional_test.go +++ b/services/rpc/handlers_additional_test.go @@ -5585,6 +5585,9 @@ func (m *mockBlockAssemblyClient) Store(ctx context.Context, hash *chainhash.Has func (m *mockBlockAssemblyClient) RemoveTx(ctx context.Context, hash *chainhash.Hash) error { return nil } +func (m *mockBlockAssemblyClient) CanAcceptTransaction(ctx context.Context, count uint32) (canAccept bool, currentCount, maxLimit, remainingCapacity int64, err error) { + return true, 0, 0, 1000000, nil +} func (m *mockBlockAssemblyClient) GetMiningCandidate(ctx context.Context, includeSubtreeHashes ...bool) (*model.MiningCandidate, error) { if m.getMiningCandidateFunc != nil { return m.getMiningCandidateFunc(ctx, includeSubtreeHashes...) diff --git a/services/validator/Validator.go b/services/validator/Validator.go index d65b54afe..332c084e3 100644 --- a/services/validator/Validator.go +++ b/services/validator/Validator.go @@ -549,6 +549,26 @@ func (v *Validator) validateInternal(ctx context.Context, tx *bt.Tx, blockHeight utxoMapErr error ) + // Check block assembly capacity BEFORE doing expensive UTXO operations + // Only check if block assembly is enabled and we plan to add the transaction + blockAssemblyEnabled := !v.settings.BlockAssembly.Disabled + willAddToBlockAssembly := blockAssemblyEnabled && validationOptions.AddTXToBlockAssembly + + if willAddToBlockAssembly { + // Only make gRPC call if a limit is configured (optimization) + if v.settings.BlockAssembly.MaxUnminedTransactions > 0 { + canAccept, currentCount, maxLimit, _, checkErr := v.blockAssembler.CanAcceptTransaction(decoupledCtx, 1) + if checkErr != nil { + v.logger.Warnf("[Validate][%s] failed to check block assembly capacity: %v", txID, checkErr) + } else if !canAccept { + err = errors.New(errors.ERR_THRESHOLD_EXCEEDED, "[Validate][%s] block assembly capacity limit reached: current=%d, max=%d", txID, currentCount, maxLimit) + span.RecordError(err) + + return nil, err + } + } + } + // this will reverse the spends if there is an error if spentUtxos, err = v.spendUtxos(decoupledCtx, tx, blockHeight, validationOptions.IgnoreLocked); err != nil { if errors.Is(err, errors.ErrUtxoError) { @@ -622,9 +642,8 @@ func (v *Validator) validateInternal(ctx context.Context, tx *bt.Tx, blockHeight return nil, err } - // the option blockAssemblyDisabled is false by default - blockAssemblyEnabled := !v.settings.BlockAssembly.Disabled - addToBlockAssembly := blockAssemblyEnabled && validationOptions.AddTXToBlockAssembly + // Use the already-calculated values from the early capacity check + addToBlockAssembly := willAddToBlockAssembly if !validationOptions.SkipUtxoCreation { // store the transaction in the UTXO store, marking it as locked if we are going to add it to the block assembly diff --git a/services/validator/Validator_test.go b/services/validator/Validator_test.go index 24ffed56a..d2bde0a50 100644 --- a/services/validator/Validator_test.go +++ b/services/validator/Validator_test.go @@ -713,6 +713,14 @@ func (s *MockBlockAssemblyStore) RemoveTx(_ context.Context, hash *chainhash.Has return nil } +func (s *MockBlockAssemblyStore) CanAcceptTransaction(_ context.Context, count uint32) (canAccept bool, currentCount, maxLimit, remainingCapacity int64, err error) { + if s.returnError != nil { + return false, 0, 0, 0, s.returnError + } + + return true, int64(len(s.storedTxs)), 0, 1000000, nil +} + func Benchmark_validateInternal(b *testing.B) { txF65eHex, err := os.ReadFile("./testdata/f65ec8dcc934c8118f3c65f86083c2b7c28dad0579becd0cfe87243e576d9ae9.bin") require.NoError(b, err) diff --git a/settings/blockassembly_settings.go b/settings/blockassembly_settings.go index d681088bb..7d0a118a0 100644 --- a/settings/blockassembly_settings.go +++ b/settings/blockassembly_settings.go @@ -45,4 +45,5 @@ type BlockAssemblySettings struct { SubtreeAnnouncementInterval time.Duration `key:"blockassembly_subtreeAnnouncementInterval" desc:"Interval for subtree announcements" default:"10s" category:"BlockAssembly" usage:"How often to announce subtrees" type:"duration" longdesc:"### Purpose\nSets the interval between announcements of newly created subtrees to the P2P network.\n\n### How It Works\nAnnouncements help other nodes discover and fetch subtree data.\n\n### Trade-offs\n| Setting | Benefit | Drawback |\n|---------|---------|----------|\n| Shorter | Faster propagation | More network traffic |\n| Longer | Reduced overhead | Slower propagation |\n\n### Recommendations\n- **10s** (default) - Balances network traffic with timely propagation"` ParallelSetIfNotExistsThreshold int `key:"blockassembly_parallelSetIfNotExistsThreshold" desc:"Threshold for parallel set-if-not-exists operations" default:"10000" category:"BlockAssembly" usage:"When to parallelize conditional writes" type:"int" longdesc:"### Purpose\nSets the record count threshold above which set-if-not-exists operations are parallelized.\n\n### How It Works\n- **Below threshold** - Operations run sequentially\n- **Above threshold** - Operations run in parallel for improved throughput\n\n### Recommendations\n- **10000** (default) - Triggers parallel execution for large batches"` StoreTxInpointsForSubtreeMeta bool `key:"blockassembly_storeTxInpointsForSubtreeMeta" desc:"Store transaction input points in subtree metadata" default:"false" category:"BlockAssembly" usage:"Memory vs lookup tradeoff" type:"bool" longdesc:"### Purpose\nEnables storing transaction input points in subtree metadata.\n\n### Trade-offs\n| Setting | Benefit | Drawback |\n|---------|---------|----------|\n| Enabled | Speeds up certain lookups | Increases memory usage |\n| Disabled | Optimizes for memory | Requires lookups for input point data |\n\n### Recommendations\n- **false** (default) - Optimizes for memory\n- Enable if subtree operations frequently need input point data and sufficient memory is available"` + MaxUnminedTransactions int64 `key:"blockassembly_maxUnminedTransactions" desc:"Maximum unmined transactions allowed (0=unlimited)" default:"0" category:"BlockAssembly" usage:"Limit unmined transactions in RAM" type:"int64" longdesc:"### Purpose\nLimits the maximum number of unmined transactions that can be held in BlockAssembly RAM.\n\n### How It Works\n- **0** (default) - Unlimited transactions\n- **Positive value** - Hard limit on unmined transaction count\n\n### Why This Matters\nPrevents OOM crashes on BlockAssembly restart by limiting how many transactions can be loaded from UTXOStore. When the node reaches capacity, new transactions are rejected.\n\n### Memory Guidelines\nEach transaction uses approximately 272 bytes in BlockAssembly RAM:\n- TxInpoints (parent dependencies): ~176 bytes\n- Subtree Node (hash + fee + size): ~48 bytes \n- Map index entry: ~40 bytes\n\nExample configurations:\n- 256GB RAM: Set to ~683 million\n- 512GB RAM: Set to ~1.4 billion\n- 1TB RAM: Set to ~2.7 billion\n\n### Recommendations\n- Leave at **0** (unlimited) for small mempools or high-memory systems\n- Set explicit limit based on available RAM for production nodes handling large mempools"` } diff --git a/test/e2e/daemon/ready/capacity_limit_test.go b/test/e2e/daemon/ready/capacity_limit_test.go new file mode 100644 index 000000000..6136cc281 --- /dev/null +++ b/test/e2e/daemon/ready/capacity_limit_test.go @@ -0,0 +1,188 @@ +package smoke + +import ( + "encoding/hex" + "testing" + "time" + + "github.com/bsv-blockchain/teranode/daemon" + "github.com/bsv-blockchain/teranode/services/blockchain" + "github.com/bsv-blockchain/teranode/settings" + "github.com/bsv-blockchain/teranode/test" + "github.com/bsv-blockchain/teranode/test/utils/transactions" + "github.com/stretchr/testify/require" +) + +// TestCapacityLimit tests that the MaxUnminedTransactions limit is enforced correctly. +// This test creates two nodes: +// - Node 1: Limited to 2 unmined transactions +// - Node 2: Unlimited unmined transactions +// +// The test verifies: +// 1. Node 1 accepts the first 2 transactions but rejects the 3rd due to capacity limit +// 2. Node 2 accepts all 3 transactions without limit +// 3. When Node 2 mines a block containing all 3 transactions, Node 1 can process the mined block +func TestCapacityLimit(t *testing.T) { + // Phase 1: Start Node 1 with limited capacity (2 unmined transactions) + t.Log("Phase 1: Starting Node 1 with MaxUnminedTransactions=2...") + node1 := daemon.NewTestDaemon(t, daemon.TestOptions{ + EnableRPC: true, + EnableP2P: true, + EnableValidator: true, + UTXOStoreType: "aerospike", + SettingsOverrideFunc: func(s *settings.Settings) { + test.MultiNodeSettings(1)(s) + s.P2P.PeerCacheDir = t.TempDir() + s.ChainCfgParams.CoinbaseMaturity = 2 + s.P2P.SyncCoordinatorPeriodicEvaluationInterval = 1 * time.Second + s.BlockAssembly.MaxUnminedTransactions = 2 + }, + FSMState: blockchain.FSMStateRUNNING, + }) + defer node1.Stop(t) + + t.Logf("Node 1 created with MaxUnminedTransactions=2") + + // Phase 3: Mine to maturity on Node 1 + t.Log("Phase 3: Mining to maturity on Node 1...") + coinbaseTx := node1.MineToMaturityAndGetSpendableCoinbaseTx(t, node1.Ctx) + t.Logf("Node 1 mined to maturity, coinbase tx: %s", coinbaseTx.TxIDChainHash().String()) + + // Wait for Node 2 to sync + node1BestHeader, _, err := node1.BlockchainClient.GetBestBlockHeader(node1.Ctx) + require.NoError(t, err) + + // Phase 2: Start Node 2 with unlimited capacity + t.Log("Phase 2: Starting Node 2 with unlimited capacity...") + node2 := daemon.NewTestDaemon(t, daemon.TestOptions{ + EnableRPC: true, + EnableP2P: true, + EnableValidator: true, + UTXOStoreType: "aerospike", + SkipRemoveDataDir: true, + SettingsOverrideFunc: func(s *settings.Settings) { + test.MultiNodeSettings(2)(s) + s.P2P.PeerCacheDir = t.TempDir() + s.ChainCfgParams.CoinbaseMaturity = 2 + s.P2P.SyncCoordinatorPeriodicEvaluationInterval = 1 * time.Second + s.BlockAssembly.MaxUnminedTransactions = 0 // 0 = unlimited + }, + FSMState: blockchain.FSMStateRUNNING, + }) + defer node2.Stop(t) + + t.Logf("Node 2 created with MaxUnminedTransactions=0 (unlimited)") + + // Connect the nodes + node2.InjectPeer(t, node1) + node1.InjectPeer(t, node2) + t.Log("Nodes connected") + + node2.WaitForBlockhash(t, node1BestHeader.Hash(), 30*time.Second) + t.Log("Node 2 synced to Node 1") + + // Phase 4: Create 3 transactions + t.Log("Phase 4: Creating 3 transactions...") + tx1 := node1.CreateTransactionWithOptions(t, + transactions.WithInput(coinbaseTx, 0), + transactions.WithP2PKHOutputs(1, 30000), + ) + tx1ID := tx1.TxID() + t.Logf("Created tx1: %s", tx1ID) + + tx2 := node1.CreateTransactionWithOptions(t, + transactions.WithInput(tx1, 0), + transactions.WithP2PKHOutputs(1, 20000), + ) + tx2ID := tx2.TxID() + t.Logf("Created tx2: %s", tx2ID) + + tx3 := node1.CreateTransactionWithOptions(t, + transactions.WithInput(tx2, 0), + transactions.WithP2PKHOutputs(1, 10000), + ) + tx3ID := tx3.TxID() + t.Logf("Created tx3: %s", tx3ID) + + // Phase 5: Send transactions to both nodes via RPC + t.Log("Phase 5: Sending transactions to both nodes...") + + // Send tx1 to both nodes + tx1Hex := hex.EncodeToString(tx1.ExtendedBytes()) + _, err = node1.CallRPC(node1.Ctx, "sendrawtransaction", []any{tx1Hex}) + require.NoError(t, err) + _, err = node2.CallRPC(node2.Ctx, "sendrawtransaction", []any{tx1Hex}) + require.NoError(t, err) + t.Logf("Sent tx1 to both nodes") + + // Send tx2 to both nodes + tx2Hex := hex.EncodeToString(tx2.ExtendedBytes()) + _, err = node1.CallRPC(node1.Ctx, "sendrawtransaction", []any{tx2Hex}) + require.NoError(t, err) + _, err = node2.CallRPC(node2.Ctx, "sendrawtransaction", []any{tx2Hex}) + require.NoError(t, err) + t.Logf("Sent tx2 to both nodes") + + // Verify Node 1 accepted tx1 and tx2 + node1.WaitForBlockAssemblyToProcessTx(t, tx1ID) + node1.WaitForBlockAssemblyToProcessTx(t, tx2ID) + t.Log("Node 1 accepted tx1 and tx2") + + // Verify Node 2 accepted tx1 and tx2 + node2.WaitForBlockAssemblyToProcessTx(t, tx1ID) + node2.WaitForBlockAssemblyToProcessTx(t, tx2ID) + t.Log("Node 2 accepted tx1 and tx2") + + // Phase 6: Send tx3 to both nodes - Node 1 should reject due to capacity limit + t.Log("Phase 6: Sending tx3 to both nodes (Node 1 should reject)...") + tx3Hex := hex.EncodeToString(tx3.ExtendedBytes()) + _, errNode1 := node1.CallRPC(node1.Ctx, "sendrawtransaction", []any{tx3Hex}) + require.Error(t, errNode1, "Node 1 should reject tx3 due to capacity limit") + t.Logf("Node 1 rejected tx3 as expected: %v", errNode1) + + _, err = node2.CallRPC(node2.Ctx, "sendrawtransaction", []any{tx3Hex}) + require.NoError(t, err) + t.Logf("Node 2 accepted tx3") + + // Verify Node 2 accepted tx3 + node2.WaitForBlockAssemblyToProcessTx(t, tx3ID) + t.Logf("Node 2 has all 3 transactions in block assembly") + + // Phase 7: Node 2 mines a block with all 3 transactions + t.Log("Phase 7: Node 2 mining a block with all 3 transactions...") + minedBlock := node2.MineAndWait(t, 1) + t.Logf("Node 2 mined block: %s (height %d, %d txs)", minedBlock.Hash().String(), minedBlock.Height, minedBlock.TransactionCount) + + // Verify the block contains all 3 transactions plus coinbase (4 total) + require.Equal(t, uint64(4), minedBlock.TransactionCount, "Block should contain 4 transactions (coinbase + 3 txs)") + + node1.InjectPeer(t, node2) + + // Phase 8: Wait for Node 1 to process the mined block + t.Log("Phase 8: Waiting for Node 1 to process the mined block...") + node1.WaitForBlockhash(t, minedBlock.Hash(), 30*time.Second) + t.Log("Node 1 successfully processed the block containing tx3") + + // Verify Node 1 can retrieve the block + node1Block, err := node1.BlockchainClient.GetBlock(node1.Ctx, minedBlock.Hash()) + require.NoError(t, err) + require.Equal(t, uint64(4), node1Block.TransactionCount, "Node 1's block should contain 4 transactions (coinbase + 3 txs)") + t.Logf("✓ Node 1 successfully processed block with %d transactions", node1Block.TransactionCount) + + // Phase 9: Verify Node 1 can now accept new transactions (capacity available) + t.Log("Phase 9: Verifying Node 1's capacity is available after mining...") + + // Create a new transaction from the mined block's coinbase + tx4 := node2.CreateTransactionWithOptions(t, + transactions.WithInput(tx3, 0), + transactions.WithP2PKHOutputs(1, 5000), + ) + tx4Hex := hex.EncodeToString(tx4.ExtendedBytes()) + + // Node 1 should now be able to accept tx4 since previous transactions are mined + _, err = node1.CallRPC(node1.Ctx, "sendrawtransaction", []any{tx4Hex}) + require.NoError(t, err, "Node 1 should accept tx4 after previous transactions were mined") + t.Logf("✓ Node 1 accepted tx4 (capacity now available)") + + t.Log("✓ Test completed successfully") +}