Skip to content

Commit 3830d82

Browse files
feat: add support for producing kakfa messages asynchronously
1 parent 7912e94 commit 3830d82

File tree

1 file changed

+30
-3
lines changed

1 file changed

+30
-3
lines changed

exporter/kafkaexporter/exporter.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,28 @@ type MessageSender interface {
1919
SendMessages(msgs []*sarama.ProducerMessage) error
2020
}
2121

22+
type AsyncMessageSender struct {
23+
sarama.AsyncProducer
24+
logger *fflog.FFLogger
25+
}
26+
27+
func (a *AsyncMessageSender) SendMessages(msgs []*sarama.ProducerMessage) error {
28+
for len(msgs) > 0 {
29+
select {
30+
case err := <-a.AsyncProducer.Errors():
31+
a.logger.Warn("Failed to produce message: %w", err)
32+
case a.AsyncProducer.Input() <- msgs[0]:
33+
msgs = msgs[1:]
34+
}
35+
}
36+
return nil
37+
}
38+
2239
// Settings contains Kafka-specific configurations needed for message creation
2340
type Settings struct {
2441
Topic string `json:"topic"`
2542
Addresses []string `json:"addresses"`
43+
Async bool `json:"async"`
2644
*sarama.Config
2745
}
2846

@@ -41,13 +59,15 @@ type Exporter struct {
4159
// dialer will create the producer. This field is added for dependency injection during testing as sarama
4260
// has the annoying tendency to dial as soon as a producer is created.
4361
dialer func(addrs []string, config *sarama.Config) (MessageSender, error)
62+
63+
logger *fflog.FFLogger
4464
}
4565

4666
// Export will produce a message to the Kafka topic. The message's value will contain the event encoded in the
4767
// selected format. Messages are published synchronously and will error immediately on failure.
48-
func (e *Exporter) Export(_ context.Context, _ *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
68+
func (e *Exporter) Export(_ context.Context, logger *fflog.FFLogger, featureEvents []exporter.FeatureEvent) error {
4969
if e.sender == nil {
50-
err := e.initializeProducer()
70+
err := e.initializeProducer(logger)
5171
if err != nil {
5272
return fmt.Errorf("writer: %w", err)
5373
}
@@ -81,7 +101,7 @@ func (e *Exporter) IsBulk() bool {
81101

82102
// initializeProducer runs only once and creates a new producer from the dialer. If the config is not populated a new
83103
// one will be created with sensible defaults.
84-
func (e *Exporter) initializeProducer() error {
104+
func (e *Exporter) initializeProducer(logger *fflog.FFLogger) error {
85105
if e.Settings.Config == nil {
86106
e.Settings.Config = sarama.NewConfig()
87107
e.Settings.Config.Producer.Return.Successes = true // Needs to be true for sync producers
@@ -90,6 +110,13 @@ func (e *Exporter) initializeProducer() error {
90110
if e.dialer == nil {
91111
e.dialer = func(addrs []string, config *sarama.Config) (MessageSender, error) {
92112
// Adapter for the function to comply with the MessageSender interface return
113+
if e.Settings.Async {
114+
asyncProducer, err := sarama.NewAsyncProducer(addrs, config)
115+
if err != nil {
116+
return nil, err
117+
}
118+
return &AsyncMessageSender{AsyncProducer: asyncProducer, logger: logger}, nil //TODO Close should be called on shutdown
119+
}
93120
return sarama.NewSyncProducer(addrs, config)
94121
}
95122
}

0 commit comments

Comments
 (0)