Skip to content

Commit

Permalink
Add maximum bulk size as byte to the config (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr authored Apr 5, 2023
1 parent 0fd5dae commit 8ca26ec
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 4 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func main() {
connector.Start()
}
```

---

### Features
Expand Down Expand Up @@ -127,6 +128,7 @@ Check out on [go-dcp-client](https://github.com/Trendyol/go-dcp-client#configura
| `kafka.collectionTopicMapping` | map[string]string | yes | |
| `kafka.brokers` | []string | yes | |
| `kafka.producerBatchSize` | integer | yes | |
| `kafka.producerBatchBytes` | integer | yes | |
| `kafka.producerBatchTickerDuration` | time.Duration | yes | |
| `kafka.readTimeout` | time.Duration | no | |
| `kafka.compression` | integer | no | |
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Kafka struct {
RootCAPath string `yaml:"rootCAPath"`
Brokers []string `yaml:"brokers"`
ProducerBatchSize int `yaml:"producerBatchSize"`
ProducerBatchBytes int `yaml:"producerBatchBytes"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
Expand Down
1 change: 1 addition & 0 deletions example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ kafka:
readTimeout: 30s
writeTimeout: 30s
producerBatchSize: 2000
producerBatchBytes: 10000
producerBatchTickerDuration: 10s
requiredAcks: 1
# 0 -> None
Expand Down
4 changes: 2 additions & 2 deletions kafka/metadata/kafka_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (

type kafkaMetadata struct {
kafkaClient gKafka.Client
writer *kafka.Writer
topic string
logger logger.Logger
errorLogger logger.Logger
writer *kafka.Writer
topic string
}

func (s *kafkaMetadata) Save(state map[uint16]*models.CheckpointDocument, dirtyOffsets map[uint16]bool, _ string) error {
Expand Down
4 changes: 3 additions & 1 deletion kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ func NewProducer(kafkaClient gKafka.Client,
config.Kafka.ProducerBatchTickerDuration,
writer,
config.Kafka.ProducerBatchSize,
config.Kafka.ProducerBatchBytes,
config.Metric.AverageWindowSec,
logger,
errorLogger,
dcpCheckpointCommit),
dcpCheckpointCommit,
),
}, nil
}

Expand Down
8 changes: 7 additions & 1 deletion kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ type producerBatch struct {
dcpCheckpointCommit func()
metric *Metric
messages []kafka.Message
currentMessageBytes int
batchTickerDuration time.Duration
batchLimit int
batchBytes int
flushLock sync.Mutex
}

func newProducerBatch(
batchTime time.Duration,
writer *kafka.Writer,
batchLimit int,
batchBytes int,
averageWindowSec float64,
logger logger.Logger,
errorLogger logger.Logger,
Expand All @@ -45,6 +48,7 @@ func newProducerBatch(
logger: logger,
errorLogger: errorLogger,
dcpCheckpointCommit: dcpCheckpointCommit,
batchBytes: batchBytes,
}
batch.StartBatchTicker()
return batch
Expand All @@ -67,12 +71,13 @@ func (b *producerBatch) Close() {
func (b *producerBatch) AddMessage(ctx *models.ListenerContext, message kafka.Message, eventTime time.Time) {
b.flushLock.Lock()
b.messages = append(b.messages, message)
b.currentMessageBytes += len(message.Value)
ctx.Ack()
b.flushLock.Unlock()

b.metric.KafkaConnectorLatency.Add(float64(time.Since(eventTime).Milliseconds()))

if len(b.messages) == b.batchLimit {
if len(b.messages) == b.batchLimit || b.currentMessageBytes >= b.batchBytes {
b.FlushMessages()
}
}
Expand All @@ -91,5 +96,6 @@ func (b *producerBatch) FlushMessages() {
b.dcpCheckpointCommit()

b.messages = b.messages[:0]
b.currentMessageBytes = 0
b.batchTicker.Reset(b.batchTickerDuration)
}

0 comments on commit 8ca26ec

Please sign in to comment.