Skip to content

Commit

Permalink
feat: bump go-dcp-client v0.0.36
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 26, 2023
1 parent 4e9cef5 commit 5996790
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion connector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package gokafkaconnectcouchbase

import (
godcpclient "github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client/logger"
"github.com/Trendyol/go-dcp-client/models"
"github.com/Trendyol/go-kafka-connect-couchbase/config"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/Trendyol/go-kafka-connect-couchbase
go 1.19

require (
github.com/Trendyol/go-dcp-client v0.0.35
github.com/Trendyol/go-dcp-client v0.0.36
github.com/gookit/config/v2 v2.2.1
github.com/json-iterator/go v1.1.12
github.com/segmentio/kafka-go v0.4.39
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Trendyol/go-dcp-client v0.0.35 h1:M/SK7/aKFfgxFqBjxVJAar5okPtlQ9xucnYiP7wrbi8=
github.com/Trendyol/go-dcp-client v0.0.35/go.mod h1:IqhHa4RLdr3VWUWNl3pbBCpKw9MLsqUvFi8UnqfjkGk=
github.com/Trendyol/go-dcp-client v0.0.36 h1:L6G6Tv8GZ40fz1tqJgpSwFYrGsK3J+HmjCS4csj36QY=
github.com/Trendyol/go-dcp-client v0.0.36/go.mod h1:IqhHa4RLdr3VWUWNl3pbBCpKw9MLsqUvFi8UnqfjkGk=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
20 changes: 11 additions & 9 deletions kafka/metadata/kafka_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"strconv"
"sync"

godcpclient "github.com/Trendyol/go-dcp-client"
"github.com/Trendyol/go-dcp-client/metadata"
"github.com/Trendyol/go-dcp-client/models"

"github.com/Trendyol/go-dcp-client/logger"
gKafka "github.com/Trendyol/go-kafka-connect-couchbase/kafka"
jsoniter "github.com/json-iterator/go"
"github.com/json-iterator/go"
"github.com/segmentio/kafka-go"
)

Expand All @@ -21,7 +23,7 @@ type kafkaMetadata struct {
errorLogger logger.Logger
}

func (s *kafkaMetadata) Save(state map[uint16]*godcpclient.CheckpointDocument, dirtyOffsets map[uint16]bool, _ string) error {
func (s *kafkaMetadata) Save(state map[uint16]*models.CheckpointDocument, dirtyOffsets map[uint16]bool, _ string) error {
var messages []kafka.Message

for vbID, document := range state {
Expand All @@ -47,7 +49,7 @@ func (s *kafkaMetadata) Save(state map[uint16]*godcpclient.CheckpointDocument, d
func (s *kafkaMetadata) Load( //nolint:funlen
vbIDs []uint16,
bucketUUID string,
) (map[uint16]*godcpclient.CheckpointDocument, bool, error) {
) (map[uint16]*models.CheckpointDocument, bool, error) {
partitions, err := s.kafkaClient.GetPartitions(s.topic)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -91,17 +93,17 @@ func (s *kafkaMetadata) Load( //nolint:funlen
}(consumer, endOffset.LastOffset)
}

state := map[uint16]*godcpclient.CheckpointDocument{}
state := map[uint16]*models.CheckpointDocument{}
stateLock := &sync.Mutex{}
exist := false

go func() {
for m := range ch {
var doc *godcpclient.CheckpointDocument
var doc *models.CheckpointDocument

err = jsoniter.Unmarshal(m.Value, &doc)
if err != nil {
doc = godcpclient.NewEmptyCheckpointDocument(bucketUUID)
doc = models.NewEmptyCheckpointDocument(bucketUUID)
} else {
exist = true
}
Expand All @@ -122,7 +124,7 @@ func (s *kafkaMetadata) Load( //nolint:funlen

for _, vbID := range vbIDs {
if _, ok := state[vbID]; !ok {
state[vbID] = godcpclient.NewEmptyCheckpointDocument(bucketUUID)
state[vbID] = models.NewEmptyCheckpointDocument(bucketUUID)
}
}

Expand All @@ -138,7 +140,7 @@ func NewKafkaMetadata(
kafkaMetadataConfig map[string]string,
logger logger.Logger,
errorLogger logger.Logger,
) godcpclient.Metadata {
) metadata.Metadata {
var topic string
var partition int
var replicationFactor int
Expand Down

0 comments on commit 5996790

Please sign in to comment.