From 078f15cc69b108fe4dbb01c7c469034886e8e9f3 Mon Sep 17 00:00:00 2001 From: Eray Date: Fri, 24 Mar 2023 19:43:06 +0300 Subject: [PATCH] feat: mapper should return more than one message #29 --- connector.go | 16 +++++++++------- example/main.go | 6 +++--- mapper.go | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/connector.go b/connector.go index 741b647..73bec6e 100644 --- a/connector.go +++ b/connector.go @@ -54,14 +54,16 @@ func (c *connector) listener(ctx *models.ListenerContext) { return } - if kafkaMessage := c.mapper(e); kafkaMessage != nil { - defer message.MessagePool.Put(kafkaMessage) - topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName] - if topic == "" { - c.errorLogger.Printf("unexpected collection | %s", e.CollectionName) - return + for _, kafkaMessage := range c.mapper(e) { + if kafkaMessage != nil { + topic := c.config.Kafka.CollectionTopicMapping[e.CollectionName] + if topic == "" { + c.errorLogger.Printf("unexpected collection | %s", e.CollectionName) + return + } + c.producer.Produce(ctx, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic) + message.MessagePool.Put(kafkaMessage) } - c.producer.Produce(ctx, kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic) } } diff --git a/example/main.go b/example/main.go index 2493d29..d6a71e6 100644 --- a/example/main.go +++ b/example/main.go @@ -6,9 +6,9 @@ import ( "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" ) -func mapper(event couchbase.Event) *message.KafkaMessage { - // return nil if you wish filter the event - return message.GetKafkaMessage(event.Key, event.Value, nil) +func mapper(event couchbase.Event) []*message.KafkaMessage { + // return empty if you wish filter the event + return []*message.KafkaMessage{message.GetKafkaMessage(event.Key, event.Value, nil)} } func main() { diff --git a/mapper.go b/mapper.go index b5b7ba9..db076a0 100644 --- a/mapper.go +++ b/mapper.go @@ -5,4 +5,4 @@ import ( "github.com/Trendyol/go-kafka-connect-couchbase/kafka/message" ) -type Mapper func(event couchbase.Event) *message.KafkaMessage +type Mapper func(event couchbase.Event) []*message.KafkaMessage