Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ require (
github.com/stretchr/objx v0.5.3 // indirect
github.com/tonistiigi/dchapes-mode v0.0.0-20250318174251-73d941a28323 // indirect
github.com/tonistiigi/go-csvvalue v0.0.0-20240814133006-030d3b2625d0 // indirect
github.com/twmb/franz-go v1.20.6 // indirect
github.com/twmb/franz-go/pkg/kadm v1.17.2 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.12.0 // indirect
github.com/wadey/gocovmerge v0.0.0-20160331181800-b5bfa59ec0ad // indirect
github.com/wlynxg/anet v0.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,12 @@ github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea h1:SXhTLE6pb6eld/
github.com/tonistiigi/units v0.0.0-20180711220420-6950e57a87ea/go.mod h1:WPnis/6cRcDZSUvVmezrxJPkiO87ThFYsoUiMwWNDJk=
github.com/tonistiigi/vt100 v0.0.0-20240514184818-90bafcd6abab h1:H6aJ0yKQ0gF49Qb2z5hI1UHxSQt4JMyxebFR15KnApw=
github.com/tonistiigi/vt100 v0.0.0-20240514184818-90bafcd6abab/go.mod h1:ulncasL3N9uLrVann0m+CDlJKWsIAP34MPcOJF6VRvc=
github.com/twmb/franz-go v1.20.6 h1:TpQTt4QcixJ1cHEmQGPOERvTzo99s8jAutmS7rbSD6w=
github.com/twmb/franz-go v1.20.6/go.mod h1:u+FzH2sInp7b9HNVv2cZN8AxdXy6y/AQ1Bkptu4c0FM=
github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8=
github.com/twmb/franz-go/pkg/kadm v1.17.2/go.mod h1:ST55zUB+sUS+0y+GcKY/Tf1XxgVilaFpB9I19UubLmU=
github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc=
github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY=
github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaOOb6ThwMmTEbhRwtKR97o=
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
Expand Down
13 changes: 3 additions & 10 deletions services/blockvalidation/Server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"testing"
"time"

"github.com/IBM/sarama"
"github.com/bsv-blockchain/go-bt/v2"
"github.com/bsv-blockchain/go-bt/v2/chainhash"
"github.com/bsv-blockchain/go-chaincfg"
Expand Down Expand Up @@ -1616,9 +1615,7 @@ func Test_consumerMessageHandler(t *testing.T) {
require.NoError(t, err)

msg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: msgBytes,
},
Value: msgBytes,
}

handler := server.consumerMessageHandler(ctx)
Expand Down Expand Up @@ -1660,9 +1657,7 @@ func Test_consumerMessageHandler(t *testing.T) {

// Invalid message that will cause a parsing error
msg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: []byte("invalid protobuf"),
},
Value: []byte("invalid protobuf"),
}

handler := server.consumerMessageHandler(ctx)
Expand Down Expand Up @@ -1707,9 +1702,7 @@ func Test_consumerMessageHandler(t *testing.T) {
require.NoError(t, err)

msg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: msgBytes,
},
Value: msgBytes,
}

handler := server.consumerMessageHandler(ctx)
Expand Down
5 changes: 1 addition & 4 deletions services/blockvalidation/integration_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"
"time"

"github.com/IBM/sarama"
"github.com/bsv-blockchain/go-bt/v2/chainhash"
"github.com/bsv-blockchain/teranode/errors"
"github.com/bsv-blockchain/teranode/services/blockchain"
Expand Down Expand Up @@ -151,9 +150,7 @@ func TestIntegrationRetryWithMultipleFailures(t *testing.T) {
require.NoError(t, err)

kafkaMessage := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: msgBytes,
},
Value: msgBytes,
}

handler := server.consumerMessageHandler(ctx)
Expand Down
13 changes: 3 additions & 10 deletions services/blockvalidation/malicious_peer_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/IBM/sarama"
"github.com/bsv-blockchain/go-bt/v2/chainhash"
"github.com/bsv-blockchain/teranode/errors"
"github.com/bsv-blockchain/teranode/model"
Expand Down Expand Up @@ -185,9 +184,7 @@ func TestKafkaConsumerMessageHandling(t *testing.T) {
require.NoError(t, err)

kafkaMessage := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: msgBytes,
},
Value: msgBytes,
}

// Get consumer handler
Expand Down Expand Up @@ -219,9 +216,7 @@ func TestKafkaConsumerMessageHandling(t *testing.T) {
require.NoError(t, err)

kafkaMessage := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: msgBytes,
},
Value: msgBytes,
}

handler := server.consumerMessageHandler(ctx)
Expand Down Expand Up @@ -259,9 +254,7 @@ func TestKafkaConsumerMessageHandling(t *testing.T) {
require.NoError(t, err)

kafkaMessage := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Value: msgBytes,
},
Value: msgBytes,
}

handler := server.consumerMessageHandler(ctx)
Expand Down
55 changes: 18 additions & 37 deletions services/p2p/Server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"testing"
"time"

"github.com/IBM/sarama"
"github.com/bsv-blockchain/go-bt/v2/chainhash"
"github.com/bsv-blockchain/go-chaincfg"
p2pMessageBus "github.com/bsv-blockchain/go-p2p-message-bus"
Expand Down Expand Up @@ -2194,10 +2193,8 @@ func TestInvalidSubtreeHandlerHappyPath(t *testing.T) {
require.NoError(t, err)

msg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-subtrees",
Value: payload,
},
Topic: "invalid-subtrees",
Value: payload,
}

h := s.invalidSubtreeHandler(context.Background())
Expand Down Expand Up @@ -2233,10 +2230,8 @@ func TestInvalidBlockHandler(t *testing.T) {

mkKafkaMsg := func(payload []byte) *kafka.KafkaMessage {
return &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-subtrees",
Value: payload,
},
Topic: "invalid-subtrees",
Value: payload,
}
}

Expand Down Expand Up @@ -2356,10 +2351,8 @@ func TestServerRejectedHandler(t *testing.T) {
// helper: incapsulate in kafka.KafkaMessage
mkKafkaMsg := func(b []byte) *kafka.KafkaMessage {
return &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "rejected-tx",
Value: b,
},
Topic: "rejected-tx",
Value: b,
}
}

Expand Down Expand Up @@ -2476,10 +2469,8 @@ func TestServerRejectedHandler(t *testing.T) {

mkKafkaMsg := func(b []byte) *kafka.KafkaMessage {
return &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "rejected-tx",
Value: b,
},
Topic: "rejected-tx",
Value: b,
}
}

Expand Down Expand Up @@ -2830,10 +2821,8 @@ func TestProcessInvalidBlockMessageSuccess(t *testing.T) {
require.NoError(t, err)

kafkaMsg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-blocks",
Value: msgBytes,
},
Topic: "invalid-blocks",
Value: msgBytes,
}

// Create a real ban manager for testing
Expand Down Expand Up @@ -2871,10 +2860,8 @@ func TestProcessInvalidBlockMessageUnmarshalError(t *testing.T) {
logger := ulogger.New("test")

kafkaMsg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-blocks",
Value: invalidBytes,
},
Topic: "invalid-blocks",
Value: invalidBytes,
}
server := &Server{
logger: logger,
Expand All @@ -2894,10 +2881,8 @@ func TestProcessInvalidBlockMessageNoPeerInMap(t *testing.T) {
logger := ulogger.New("test")

kafkaMsg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-blocks",
Value: msgBytes,
},
Topic: "invalid-blocks",
Value: msgBytes,
}

server := &Server{
Expand All @@ -2919,10 +2904,8 @@ func TestProcessInvalidBlockMessageWrongTypeInMap(t *testing.T) {
logger := ulogger.New("test")

kafkaMsg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-blocks",
Value: msgBytes,
},
Topic: "invalid-blocks",
Value: msgBytes,
}

server := &Server{
Expand All @@ -2947,10 +2930,8 @@ func TestProcessInvalidBlockMessageAddBanScoreFails(t *testing.T) {
mockPeerID := peer.ID("peer-fail")

kafkaMsg := &kafka.KafkaMessage{
ConsumerMessage: sarama.ConsumerMessage{
Topic: "invalid-blocks",
Value: msgBytes,
},
Topic: "invalid-blocks",
Value: msgBytes,
}

// Create a real ban manager for testing
Expand Down
4 changes: 2 additions & 2 deletions services/subtreevalidation/txmetaHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (u *Server) txmetaMessageHandler(ctx context.Context) func(msg *kafka.Kafka
// Processing errors are logged and the message is marked as completed
// to prevent infinite retry loops on malformed data.
func (u *Server) txmetaHandler(ctx context.Context, msg *kafka.KafkaMessage) error {
if msg == nil || len(msg.ConsumerMessage.Value) < 4 {
if msg == nil || len(msg.Value) < 4 {
return nil
}

Expand All @@ -50,7 +50,7 @@ func (u *Server) txmetaHandler(ctx context.Context, msg *kafka.KafkaMessage) err
go func() {
startTime := time.Now()

data := msg.ConsumerMessage.Value
data := msg.Value
offset := 0

// Read entry count
Expand Down
9 changes: 2 additions & 7 deletions services/subtreevalidation/txmetaHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/IBM/sarama"
"github.com/bsv-blockchain/go-bt/v2"
"github.com/bsv-blockchain/go-bt/v2/chainhash"
"github.com/bsv-blockchain/teranode/errors"
Expand Down Expand Up @@ -227,12 +226,8 @@ func createKafkaMessage(t *testing.T, delete bool, content []byte) *kafka.KafkaM
copy(data[offset:], content)
}

consumerMsg := sarama.ConsumerMessage{
Value: data,
}

return &kafka.KafkaMessage{
ConsumerMessage: consumerMsg,
Value: data,
}
}

Expand All @@ -252,7 +247,7 @@ func TestServer_txmetaHandler(t *testing.T) {
{
name: "message too short for entry count",
setupMocks: func(l *mockLogger, c *mockCache) {},
input: &kafka.KafkaMessage{ConsumerMessage: sarama.ConsumerMessage{Value: make([]byte, 3)}},
input: &kafka.KafkaMessage{Value: make([]byte, 3)},
},
{
name: "successful delete operation",
Expand Down
2 changes: 1 addition & 1 deletion settings/kafka_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ type KafkaSettings struct {
TLSKeyFile string `key:"KAFKA_TLS_KEY_FILE" desc:"Path to Kafka TLS client key file" default:"" category:"Kafka" usage:"For client certificate authentication" type:"string" longdesc:"### Purpose\nPath to client private key file for mutual TLS (mTLS) authentication.\n\n### Format\nPEM-encoded private key, RSA or ECDSA (e.g., /etc/kafka/ssl/client-key.pem)\n\n### How It Works\n- Loaded via tls.LoadX509KeyPair with TLSCertFile\n- Private key must correspond to public key in client certificate\n- Validated on startup to ensure key matches certificate\n\n### Recommendations\n- Required when broker requires client certificate authentication\n- **Must be provided together with TLSCertFile** - both or neither\n- File permissions should be restricted (0600) to protect private key"`
// Debug logging
EnableDebugLogging bool `key:"kafka_enable_debug_logging" desc:"Enable debug logging for Kafka client" default:"false" category:"Kafka" usage:"Useful for troubleshooting connection issues" type:"bool" longdesc:"### Purpose\nEnable verbose debug logging for Kafka client operations.\n\n### How It Works\nWhen enabled, logs detailed information about:\n- Connection attempts and broker communication\n- Metadata requests\n- Produce/consume operations\n- Partition assignments and rebalancing\n- Consumer group coordination\n- Errors\n\n### Trade-offs\n| Setting | Benefit | Drawback |\n|---------|---------|----------|\n| Enabled | Detailed troubleshooting info | High logging overhead, large log volume |\n| Disabled | Normal performance | Less visibility into client behavior |\n\n### Recommendations\n- Use for troubleshooting connection issues\n- Use for debugging message delivery problems\n- **Disable in production** due to performance impact"`
Scheme string `key:"KAFKA_SCHEMA" desc:"Kafka connection scheme" default:"http" category:"Kafka" usage:"Use 'kafka' for standard, 'memory' for testing" type:"string" longdesc:"### Purpose\nConnection scheme for Kafka client.\n\n### Values\n| Scheme | Description |\n|--------|-------------|\n| kafka | Standard Kafka broker connections |\n| memory | In-memory mock implementation (testing only) |\n\n### How It Works\n- **kafka** scheme connects to real brokers specified in KAFKA_HOSTS\n- **memory** scheme uses in-memory broker (imk.GetSharedBroker()) providing sarama.SyncProducer and sarama.Consumer interfaces without requiring a Kafka cluster\n\n### Recommendations\n- **Production**: Must use kafka\n- **CI/CD pipelines**: memory scheme useful for testing\n- **Local development**: memory scheme useful for unit tests"`
Scheme string `key:"KAFKA_SCHEMA" desc:"Kafka connection scheme" default:"http" category:"Kafka" usage:"Use 'kafka' for standard, 'memory' for testing" type:"string" longdesc:"### Purpose\nConnection scheme for Kafka client.\n\n### Values\n| Scheme | Description |\n|--------|-------------|\n| kafka | Standard Kafka broker connections |\n| memory | In-memory mock implementation (testing only) |\n\n### How It Works\n- **kafka** scheme connects to real brokers specified in KAFKA_HOSTS\n- **memory** scheme uses in-memory broker (imk.GetSharedBroker()) providing sync/async producer and consumer behavior without requiring a Kafka cluster\n\n### Recommendations\n- **Production**: Must use kafka\n- **CI/CD pipelines**: memory scheme useful for testing\n- **Local development**: memory scheme useful for unit tests"`
}
4 changes: 2 additions & 2 deletions test/longtest/util/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestKafkaProducerAndConsumer(t *testing.T) {
}
}

err = client.ConsumerGroup.Close()
err = client.Close()
require.NoError(t, err)

// In total we should have 4 messages (1 + 3 retries) for each of the 2 original messages
Expand Down Expand Up @@ -608,7 +608,7 @@ func consumeMessages(t *testing.T, ctx context.Context, logger ulogger.Logger, k
require.NoError(t, err)

defer func() {
_ = consumer.ConsumerGroup.Close()
_ = consumer.Close()
}()

consumer.Start(ctx, func(msg *ukafka.KafkaMessage) error {
Expand Down
Loading
Loading