Skip to content

Commit

Permalink
fix: custom logger not passed to dcp client #34
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 24, 2023
1 parent 078f15c commit 4455b04
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 64 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ This repository contains the Go implementation of the Couchbase Kafka Connector.
* [Examples](#examples)

### What is Couchbase Kafka Connector?

Official Couchbase documentation defines the Couchbase Kafka Connector as follows:

_The Couchbase Kafka connector is a plug-in for the Kafka Connect framework. It provides source and sink components._

The **source connector** streams documents from Couchbase Database Change Protocol (DCP) and publishes each document to a Kafka topic in near real-time.
The **source connector** streams documents from Couchbase Database Change Protocol (DCP) and publishes each document to
a Kafka topic in near real-time.

The **sink connector** subscribes to Kafka topics and writes the messages to Couchbase.

Expand All @@ -35,7 +37,8 @@ The **sink connector** subscribes to Kafka topics and writes the messages to Cou
---

### Example
```

```go
package main

func mapper(event couchbase.Event) *message.KafkaMessage {
Expand All @@ -54,7 +57,8 @@ func main() {
```

Custom log structures can be used with the connector
```

```go
package main

type ConnectorLogger struct{}
Expand All @@ -80,6 +84,7 @@ func main() {

- [X] Batch Producer
- [X] Secure Kafka
- [X] Kafka Metadata

---

Expand All @@ -100,7 +105,7 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura

| Variable | Type | Is Required |
|-------------------------------------|----------------------|-------------|
| `kafka.collectionTopicMapping` | map[string][string] | yes |
| `kafka.collectionTopicMapping` | map[string]string | yes |
| `kafka.brokers` | array | yes |
| `kafka.readTimeout` | integer | no |
| `kafka.compression` | integer | no |
Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package config
import (
"time"

"github.com/Trendyol/go-kafka-connect-couchbase/logger"
"github.com/Trendyol/go-dcp-client/logger"

"github.com/gookit/config/v2"
"github.com/gookit/config/v2/yamlv3"
Expand Down Expand Up @@ -47,14 +47,14 @@ func NewConfig(name string, filePath string, errorLogger logger.Logger) *Config

err := conf.LoadFiles(filePath)
if err != nil {
errorLogger.Printf("Error while reading config %v", err)
errorLogger.Printf("error while reading config %v", err)
}

_config := &Config{}
err = conf.Decode(_config)

if err != nil {
errorLogger.Printf("Error while reading config %v", err)
errorLogger.Printf("error while reading config %v", err)
}

return _config
Expand Down
10 changes: 5 additions & 5 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/producer"

godcpclient "github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-kafka-connect-couchbase/config"
"github.com/Trendyol/go-kafka-connect-couchbase/couchbase"
"github.com/Trendyol/go-kafka-connect-couchbase/kafka"
"github.com/Trendyol/go-kafka-connect-couchbase/logger"
)

var MetadataTypeKafka = "kafka"
Expand Down Expand Up @@ -68,7 +68,7 @@ func (c *connector) listener(ctx *models.ListenerContext) {
}

func NewConnector(configPath string, mapper Mapper) (Connector, error) {
return newConnector(configPath, mapper, &logger.Log, &logger.Log)
return newConnector(configPath, mapper, logger.Log, logger.Log)
}

func NewConnectorWithLoggers(configPath string, mapper Mapper, logger logger.Logger, errorLogger logger.Logger) (Connector, error) {
Expand All @@ -87,9 +87,9 @@ func newConnector(configPath string, mapper Mapper, logger logger.Logger, errorL

kafkaClient := kafka.NewClient(c.Kafka, connector.logger, connector.errorLogger)

dcp, err := godcpclient.NewDcp(configPath, connector.listener)
dcp, err := godcpclient.NewDcpWithLoggers(configPath, connector.listener, logger, errorLogger)
if err != nil {
connector.errorLogger.Printf("Dcp error: %v", err)
connector.errorLogger.Printf("dcp error: %v", err)
return nil, err
}

Expand All @@ -104,7 +104,7 @@ func newConnector(configPath string, mapper Mapper, logger logger.Logger, errorL

connector.producer, err = producer.NewProducer(kafkaClient, c.Kafka, connector.logger, connector.errorLogger, dcp.Commit)
if err != nil {
connector.errorLogger.Printf("Kafka error: %v", err)
connector.errorLogger.Printf("kafka error: %v", err)
return nil, err
}
return connector, nil
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/go-kafka-connect-couchbase
go 1.19

require (
github.com/Trendyol/go-dcp-client v0.0.28
github.com/Trendyol/go-dcp-client v0.0.29
github.com/gookit/config/v2 v2.2.1
github.com/json-iterator/go v1.1.12
github.com/segmentio/kafka-go v0.4.39
Expand Down Expand Up @@ -55,7 +55,6 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rs/zerolog v1.29.0 // indirect
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/tinylib/msgp v1.1.8 // indirect
Expand Down
13 changes: 2 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Trendyol/go-dcp-client v0.0.28 h1:1RHDAJWolSe6bMOP6nrOegYQMBpx6cA581vTQQDvJ7Y=
github.com/Trendyol/go-dcp-client v0.0.28/go.mod h1:f7AK3AIT0MyaOJrSKPVFk0RQPNmqH3WH46Xia42d2eo=
github.com/Trendyol/go-dcp-client v0.0.29 h1:1qrpBShtfMLyfOE4pVbqsjmoG4PvlZPL5kCRzTmmb4I=
github.com/Trendyol/go-dcp-client v0.0.29/go.mod h1:iXYRSyAS6xtOqlVM8rSKb++Z1zakEbOLJ3bATEM/wZk=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -75,7 +75,6 @@ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnht
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/couchbase/gocbcore/v10 v10.2.1 h1:C5zXZUWdKrE2hncDBdDLRoKXNaVc7BYrn6JOgNZAODA=
github.com/couchbase/gocbcore/v10 v10.2.1/go.mod h1:Y3S2HKwqDDcsOXdsB5YRtd2wn5ADnOQwT7G00vVylh4=
github.com/couchbaselabs/gocaves/client v0.0.0-20221010100422-25779db8de05 h1:A3GRyDjp5MhqHxrGB0Hl95cCuA70Ovquy26kAJAAeZ0=
Expand Down Expand Up @@ -133,7 +132,6 @@ github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-yaml v1.10.0 h1:rBi+5HGuznOxx0JZ+60LDY85gc0dyIJCIMvsMJTKSKQ=
github.com/goccy/go-yaml v1.10.0/go.mod h1:h/18Lr6oSQ3mvmqFoWmQ47KChOgpfHpTyIHl3yVmpiY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/adaptor/v2 v2.1.31/go.mod h1:vdSG9JhOhOLYjE4j14fx6sJvLJNFVf9o6rSyB5GkU4s=
github.com/gofiber/adaptor/v2 v2.1.32 h1:94cL79U4ekq78TmqfXPrulMWkpfPxqzHimUc/B+jmkY=
github.com/gofiber/adaptor/v2 v2.1.32/go.mod h1:aX4qfSo+1AJYIWnLL1Mx3EQ6znC6WW46MqFQruUQE6c=
Expand Down Expand Up @@ -255,11 +253,9 @@ github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamh
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down Expand Up @@ -335,9 +331,6 @@ github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.29.0 h1:Zes4hju04hjbvkVkOhdl2HpZa+0PmVwigmo8XoORE5w=
github.com/rs/zerolog v1.29.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0=
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 h1:rmMl4fXJhKMNWl+K+r/fq4FbbKI+Ia2m9hYBLm2h4G4=
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94/go.mod h1:90zrgN3D/WJsDd1iXHT96alCoN2KJo6/4x1DZC3wZs8=
github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4=
Expand Down Expand Up @@ -547,8 +540,6 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
31 changes: 16 additions & 15 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"os"
"time"

"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-kafka-connect-couchbase/config"
"github.com/Trendyol/go-kafka-connect-couchbase/logger"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
)
Expand Down Expand Up @@ -47,13 +47,13 @@ func createSecureKafkaTransport(

caCert, err := os.ReadFile(os.ExpandEnv(rootCAPath))
if err != nil {
errorLogger.Printf("An error occurred while reading ca.pem file! Error: %s", err.Error())
errorLogger.Printf("an error occurred while reading ca.pem file! Error: %s", err.Error())
return nil, err
}

intCert, err := os.ReadFile(os.ExpandEnv(interCAPath))
if err != nil {
errorLogger.Printf("An error occurred while reading int.pem file! Error: %s", err.Error())
errorLogger.Printf("an error occurred while reading int.pem file! Error: %s", err.Error())
return nil, err
}

Expand Down Expand Up @@ -151,18 +151,19 @@ func (c *client) GetPartitions(topic string) ([]int, error) {

func (c *client) Producer() *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(c.config.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: c.config.ProducerBatchSize,
BatchBytes: math.MaxInt,
BatchTimeout: 500 * time.Microsecond,
MaxAttempts: math.MaxInt,
ReadTimeout: c.config.ReadTimeout,
WriteTimeout: c.config.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(c.config.RequiredAcks),
Logger: c.logger,
ErrorLogger: c.errorLogger,
Compression: kafka.Compression(c.config.GetCompression()),
Addr: kafka.TCP(c.config.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: c.config.ProducerBatchSize,
BatchBytes: math.MaxInt,
BatchTimeout: 500 * time.Microsecond,
MaxAttempts: math.MaxInt,
ReadTimeout: c.config.ReadTimeout,
WriteTimeout: c.config.WriteTimeout,
RequiredAcks: kafka.RequiredAcks(c.config.RequiredAcks),
Logger: c.logger,
ErrorLogger: c.errorLogger,
Compression: kafka.Compression(c.config.GetCompression()),
AllowAutoTopicCreation: true,
}
}

Expand Down
2 changes: 1 addition & 1 deletion kafka/metadata/kafka_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"sync"

godcpclient "github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client/logger"
gKafka "github.com/Trendyol/go-kafka-connect-couchbase/kafka"
"github.com/Trendyol/go-kafka-connect-couchbase/logger"
jsoniter "github.com/json-iterator/go"
"github.com/segmentio/kafka-go"
)
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package producer
import (
"github.com/Trendyol/go-dcp-client/models"

"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-kafka-connect-couchbase/config"
gKafka "github.com/Trendyol/go-kafka-connect-couchbase/kafka"
"github.com/Trendyol/go-kafka-connect-couchbase/logger"

"github.com/segmentio/kafka-go"
)
Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/Trendyol/go-dcp-client/models"

"github.com/Trendyol/go-kafka-connect-couchbase/logger"
"github.com/Trendyol/go-dcp-client/logger"

"github.com/segmentio/kafka-go"
)
Expand Down Expand Up @@ -79,7 +79,7 @@ func (b *producerBatch) FlushMessages() {
}
err := b.Writer.WriteMessages(context.Background(), b.messages...)
if err != nil {
b.errorLogger.Printf("Batch producer flush error %v", err)
b.errorLogger.Printf("batch producer flush error %v", err)
return
}
b.dcpCheckpointCommit()
Expand Down
20 changes: 0 additions & 20 deletions logger/logger.go

This file was deleted.

0 comments on commit 4455b04

Please sign in to comment.