Skip to content

Commit

Permalink
feat: mapper should return more than one message #29
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 24, 2023
1 parent bdaee76 commit 078f15c
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
16 changes: 9 additions & 7 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ func (c *connector) listener(ctx *models.ListenerContext) {
return
}

if kafkaMessage := c.mapper(e); kafkaMessage != nil {
defer message.MessagePool.Put(kafkaMessage)
topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName]
if topic == "" {
c.errorLogger.Printf("unexpected collection | %s", e.CollectionName)
return
for _, kafkaMessage := range c.mapper(e) {
if kafkaMessage != nil {
topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName]
if topic == "" {
c.errorLogger.Printf("unexpected collection | %s", e.CollectionName)
return
}
c.producer.Produce(ctx, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic)
message.MessagePool.Put(kafkaMessage)
}
c.producer.Produce(ctx, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic)
}
}

Expand Down
6 changes: 3 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
)

func mapper(event couchbase.Event) *message.KafkaMessage {
// return nil if you wish filter the event
return message.GetKafkaMessage(event.Key, event.Value, nil)
func mapper(event couchbase.Event) []*message.KafkaMessage {
// return empty if you wish filter the event
return []*message.KafkaMessage{message.GetKafkaMessage(event.Key, event.Value, nil)}
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import (
"github.com/Trendyol/go-kafka-connect-couchbase/kafka/message"
)

type Mapper func(event couchbase.Event) *message.KafkaMessage
type Mapper func(event couchbase.Event) []*message.KafkaMessage

0 comments on commit 078f15c

Please sign in to comment.