Skip to content

Commit

Permalink
fix: improve tls connection
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 27, 2023
1 parent dfa03d9 commit 5237e2c
Showing 1 changed file with 54 additions and 19 deletions.
73 changes: 54 additions & 19 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"os"
"time"

"github.com/segmentio/kafka-go/sasl"

"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-kafka-connect-couchbase/config"
"github.com/segmentio/kafka-go"
Expand All @@ -33,15 +35,22 @@ type client struct {
config *config.Kafka
logger logger.Logger
errorLogger logger.Logger
transport *kafka.Transport
dialer *kafka.Dialer
}

type tlsContent struct {
config *tls.Config
sasl sasl.Mechanism
}

func createSecureKafkaTransport(
func newTLSContent(
scramUsername,
scramPassword,
rootCAPath,
interCAPath string,
errorLogger logger.Logger,
) (*kafka.Transport, error) {
) (*tlsContent, error) {
mechanism, err := scram.Mechanism(scram.SHA512, scramUsername, scramPassword)
if err != nil {
return nil, err
Expand All @@ -63,12 +72,12 @@ func createSecureKafkaTransport(
caCertPool.AppendCertsFromPEM(caCert)
caCertPool.AppendCertsFromPEM(intCert)

return &kafka.Transport{
TLS: &tls.Config{
return &tlsContent{
config: &tls.Config{
RootCAs: caCertPool,
MinVersion: tls.VersionTLS12,
},
SASL: mechanism,
sasl: mechanism,
}, nil
}

Expand Down Expand Up @@ -170,7 +179,7 @@ func (c *client) CheckTopics(topics []string) error {
}

func (c *client) Producer() *kafka.Writer {
return &kafka.Writer{
writer := &kafka.Writer{
Addr: kafka.TCP(c.config.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: c.config.ProducerBatchSize,
Expand All @@ -185,15 +194,27 @@ func (c *client) Producer() *kafka.Writer {
Compression: kafka.Compression(c.config.GetCompression()),
Transport: c.kafkaClient.Transport,
}

if c.transport != nil {
writer.Transport = c.transport
}

return writer
}

func (c *client) Consumer(topic string, partition int, startOffset int64) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
readerConfig := kafka.ReaderConfig{
Brokers: c.config.Brokers,
Topic: topic,
Partition: partition,
StartOffset: startOffset,
})
}

if c.dialer != nil {
readerConfig.Dialer = c.dialer
}

return kafka.NewReader(readerConfig)
}

func (c *client) CreateCompactedTopic(topic string, partition int, replicationFactor int) error {
Expand Down Expand Up @@ -238,12 +259,18 @@ func (c *client) CreateCompactedTopic(topic string, partition int, replicationFa
func NewClient(config *config.Kafka, logger logger.Logger, errorLogger logger.Logger) Client {
addr := kafka.TCP(config.Brokers...)

kafkaClient := &kafka.Client{
Addr: addr,
newClient := &client{
addr: addr,
kafkaClient: &kafka.Client{
Addr: addr,
},
config: config,
logger: logger,
errorLogger: errorLogger,
}

if config.SecureConnection {
transport, err := createSecureKafkaTransport(
tlsContent, err := newTLSContent(
config.ScramUsername,
config.ScramPassword,
config.RootCAPath,
Expand All @@ -254,14 +281,22 @@ func NewClient(config *config.Kafka, logger logger.Logger, errorLogger logger.Lo
panic(err)
}

kafkaClient.Transport = transport
}
transport := &kafka.Transport{
TLS: tlsContent.config,
SASL: tlsContent.sasl,
}

return &client{
addr: addr,
kafkaClient: kafkaClient,
config: config,
logger: logger,
errorLogger: errorLogger,
newClient.transport = transport

newClient.dialer = &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsContent.config,
SASLMechanism: tlsContent.sasl,
}

newClient.kafkaClient.Transport = transport
}

return newClient
}

0 comments on commit 5237e2c

Please sign in to comment.