From 1dc1246d8cb871f89175fff9660f04cd1106ba86 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Sat, 7 Jan 2023 09:42:55 +0300 Subject: [PATCH] Support secure ssl kafka --- config/config.go | 7 +++++- example/config.yml | 21 ++++++++++------ go.mod | 3 +++ kafka/producer/producer.go | 50 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/config/config.go b/config/config.go index 888adb4..f303edf 100644 --- a/config/config.go +++ b/config/config.go @@ -10,13 +10,18 @@ import ( ) type Kafka struct { - Brokers []string `yaml:"brokers"` Topic string `yaml:"topic"` + InterCAPath string `yaml:"interCAPath"` + ScramUsername string `yaml:"scramUsername"` + ScramPassword string `yaml:"scramPassword"` + RootCAPath string `yaml:"rootCAPath"` + Brokers []string `yaml:"brokers"` ProducerBatchSize int `yaml:"producerBatchSize"` ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"` ReadTimeout time.Duration `yaml:"readTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"` RequiredAcks int `yaml:"requiredAcks"` + SecureConnection bool `yaml:"secureConnection"` } type Config struct { diff --git a/example/config.yml b/example/config.yml index ab8e804..c8e4f7b 100644 --- a/example/config.yml +++ b/example/config.yml @@ -12,19 +12,12 @@ metadataBucket: dcp-test-meta connectTimeout: 10s dcp: connectTimeout: 10s - flowControlBuffer: 16 - persistencePollingInterval: 100ms group: name: groupName membership: type: static memberNumber: 1 totalMembers: 1 -api: - port: 8080 -metric: - enabled: true - path: /metrics kafka: topic: "topicname" brokers: @@ -35,3 +28,17 @@ kafka: producerBatchSize: 50 producerBatchTickerDuration: 5s requiredAcks: 1 + + #SSL configurations + secureConnection: true + #Config support env variable "$HOME/example/..." + rootCAPath: "example/stretch-kafka/rootCA.pem" + interCAPath: "example/stretch-kafka/interCA.pem" + scramUsername: "username" + scramPassword: "password" + +checkpoint: + timeout: 100s + +logging: + level: debug diff --git a/go.mod b/go.mod index 1dab193..5f6d6b8 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,9 @@ require ( github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.40.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect + github.com/xdg/scram v1.0.5 // indirect + github.com/xdg/stringprep v1.0.3 // indirect + golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect golang.org/x/sys v0.0.0-20220829200755-d48e67d00261 // indirect diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index 1acfdce..73dfd70 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -1,10 +1,15 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "math" + "os" "sync" "time" + "github.com/segmentio/kafka-go/sasl/scram" + "github.com/Trendyol/go-kafka-connect-couchbase/config" "github.com/Trendyol/go-kafka-connect-couchbase/logger" @@ -34,11 +39,56 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger. Logger: logger, ErrorLogger: errorLogger, } + if config.SecureConnection { + transport, err := createSecureKafkaTransport(config.ScramUsername, config.ScramPassword, config.RootCAPath, + config.InterCAPath, errorLogger) + if err != nil { + panic("Secure kafka couldn't connect " + err.Error()) + } + writer.Transport = transport + } return &producer{ producerBatch: newProducerBatch(config.ProducerBatchTickerDuration, writer, config.ProducerBatchSize, logger, errorLogger), } } +func createSecureKafkaTransport( + scramUsername, + scramPassword, + rootCAPath, + interCAPath string, + errorLogger logger.Logger, +) (*kafka.Transport, error) { + mechanism, err := scram.Mechanism(scram.SHA512, scramUsername, scramPassword) + if err != nil { + return nil, err + } + + caCert, err := os.ReadFile(os.ExpandEnv(rootCAPath)) + if err != nil { + errorLogger.Printf("An error occurred while reading ca.pem file! Error: %s", err.Error()) + return nil, err + } + + intCert, err := os.ReadFile(os.ExpandEnv(interCAPath)) + if err != nil { + errorLogger.Printf("An error occurred while reading int.pem file! Error: %s", err.Error()) + return nil, err + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + caCertPool.AppendCertsFromPEM(intCert) + + return &kafka.Transport{ + TLS: &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + }, + SASL: mechanism, + }, nil +} + var KafkaMessagePool = sync.Pool{ New: func() any { return &kafka.Message{}