Skip to content

Commit 0debdab

Browse files
feat(contrib/sarama): option to add custom consumer/producer span options
1 parent 45aa5ed commit 0debdab

File tree

5 files changed

+169
-7
lines changed

5 files changed

+169
-7
lines changed

contrib/IBM/sarama/consumer_test.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,80 @@ func TestWrapConsumer(t *testing.T) {
106106
assertDSMConsumerPathway(t, topic, "", msg2, false)
107107
}
108108
}
109+
110+
func TestWrapConsumerWithCustomConsumerSpanOptions(t *testing.T) {
111+
cfg := newIntegrationTestConfig(t)
112+
cfg.Version = sarama.MinVersion
113+
topic := topicName(t)
114+
115+
mt := mocktracer.Start()
116+
defer mt.Stop()
117+
118+
client, err := sarama.NewClient(kafkaBrokers, cfg)
119+
require.NoError(t, err)
120+
defer client.Close()
121+
122+
consumer, err := sarama.NewConsumerFromClient(client)
123+
require.NoError(t, err)
124+
consumer = WrapConsumer(
125+
consumer,
126+
WithDataStreams(),
127+
WithCustomConsumerSpanOptions(
128+
func(msg *sarama.ConsumerMessage) []tracer.StartSpanOption {
129+
return []tracer.StartSpanOption{
130+
tracer.Tag("messaging.kafka.key", string(msg.Key)),
131+
}
132+
},
133+
),
134+
)
135+
defer consumer.Close()
136+
137+
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
138+
require.NoError(t, err)
139+
defer partitionConsumer.Close()
140+
141+
p, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
142+
require.NoError(t, err)
143+
defer func() {
144+
assert.NoError(t, p.Close())
145+
}()
146+
147+
produceMsg := &sarama.ProducerMessage{
148+
Topic: topic,
149+
Key: sarama.StringEncoder("test key"),
150+
Value: sarama.StringEncoder("test 1"),
151+
Metadata: "test 1",
152+
}
153+
_, _, err = p.SendMessage(produceMsg)
154+
require.NoError(t, err)
155+
156+
msg1 := <-partitionConsumer.Messages()
157+
err = partitionConsumer.Close()
158+
require.NoError(t, err)
159+
// wait for the channel to be closed
160+
<-partitionConsumer.Messages()
161+
waitForSpans(mt, 1)
162+
163+
spans := mt.FinishedSpans()
164+
require.Len(t, spans, 1)
165+
166+
s := spans[0]
167+
spanctx, err := tracer.Extract(NewConsumerMessageCarrier(msg1))
168+
assert.NoError(t, err)
169+
assert.Equal(t, spanctx.TraceIDLower(), s.TraceID(),
170+
"span context should be injected into the consumer message headers")
171+
172+
assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition))
173+
assert.NotNil(t, s.Tag("offset"))
174+
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
175+
assert.Equal(t, "Consume Topic "+topic, s.Tag(ext.ResourceName))
176+
assert.Equal(t, "queue", s.Tag(ext.SpanType))
177+
assert.Equal(t, "kafka.consume", s.OperationName())
178+
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
179+
assert.Equal(t, ext.SpanKindConsumer, s.Tag(ext.SpanKind))
180+
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
181+
assert.Equal(t, topic, s.Tag("messaging.destination.name"))
182+
assert.Equal(t, "test key", s.Tag("messaging.kafka.key"))
183+
184+
assertDSMConsumerPathway(t, topic, "", msg1, false)
185+
}

contrib/IBM/sarama/dispatcher.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func (w *wrappedDispatcher) Run() {
5959
if !math.IsNaN(w.cfg.analyticsRate) {
6060
opts = append(opts, tracer.Tag(ext.EventSampleRate, w.cfg.analyticsRate))
6161
}
62+
if w.cfg.customConsumerSpanOptionsFunc != nil {
63+
opts = append(opts, w.cfg.customConsumerSpanOptionsFunc(msg)...)
64+
}
6265
// kafka supports headers, so try to extract a span context
6366
carrier := NewConsumerMessageCarrier(msg)
6467
if spanctx, err := tracer.Extract(carrier); err == nil {

contrib/IBM/sarama/option.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,24 @@ package sarama
88
import (
99
"math"
1010

11+
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
1112
"github.com/DataDog/dd-trace-go/v2/instrumentation"
13+
"github.com/IBM/sarama"
1214
)
1315

16+
type CustomConsumerSpanOptsFunc func(msg *sarama.ConsumerMessage) []tracer.StartSpanOption
17+
type CustomProducerSpanOptsFunc func(msg *sarama.ProducerMessage) []tracer.StartSpanOption
18+
1419
type config struct {
15-
consumerServiceName string
16-
producerServiceName string
17-
consumerSpanName string
18-
producerSpanName string
19-
analyticsRate float64
20-
dataStreamsEnabled bool
21-
groupID string
20+
consumerServiceName string
21+
producerServiceName string
22+
consumerSpanName string
23+
producerSpanName string
24+
analyticsRate float64
25+
dataStreamsEnabled bool
26+
groupID string
27+
customConsumerSpanOptionsFunc CustomConsumerSpanOptsFunc
28+
customProducerSpanOptionsFunc CustomProducerSpanOptsFunc
2229
}
2330

2431
func defaults(cfg *config) {
@@ -89,3 +96,17 @@ func WithAnalyticsRate(rate float64) OptionFn {
8996
}
9097
}
9198
}
99+
100+
// WithCustomConsumerSpanOptions enables calling a callback func to add custom span options on wrapped consumers.
101+
func WithCustomConsumerSpanOptions(f CustomConsumerSpanOptsFunc) OptionFn {
102+
return func(cfg *config) {
103+
cfg.customConsumerSpanOptionsFunc = f
104+
}
105+
}
106+
107+
// WithCustomProducerSpanOptions enables calling a callback func to add custom span options on wrapped producers.
108+
func WithCustomProducerSpanOptions(f CustomProducerSpanOptsFunc) OptionFn {
109+
return func(cfg *config) {
110+
cfg.customProducerSpanOptionsFunc = f
111+
}
112+
}

contrib/IBM/sarama/producer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ func startProducerSpan(cfg *config, version sarama.KafkaVersion, msg *sarama.Pro
194194
if !math.IsNaN(cfg.analyticsRate) {
195195
opts = append(opts, tracer.Tag(ext.EventSampleRate, cfg.analyticsRate))
196196
}
197+
if cfg.customProducerSpanOptionsFunc != nil {
198+
opts = append(opts, cfg.customProducerSpanOptionsFunc(msg)...)
199+
}
197200
// if there's a span context in the headers, use that as the parent
198201
if spanctx, err := tracer.Extract(carrier); err == nil {
199202
// If there are span links as a result of context extraction, add them as a StartSpanOption

contrib/IBM/sarama/producer_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
1616
"github.com/DataDog/dd-trace-go/v2/ddtrace/mocktracer"
17+
"github.com/DataDog/dd-trace-go/v2/ddtrace/tracer"
1718
)
1819

1920
func TestSyncProducer(t *testing.T) {
@@ -103,6 +104,63 @@ func TestSyncProducerSendMessages(t *testing.T) {
103104
}
104105
}
105106

107+
func TestSyncProducerWithCustomSpanOptions(t *testing.T) {
108+
cfg := newIntegrationTestConfig(t)
109+
topic := topicName(t)
110+
111+
mt := mocktracer.Start()
112+
defer mt.Stop()
113+
114+
producer, err := sarama.NewSyncProducer(kafkaBrokers, cfg)
115+
require.NoError(t, err)
116+
producer = WrapSyncProducer(
117+
cfg,
118+
producer,
119+
WithDataStreams(),
120+
WithCustomProducerSpanOptions(
121+
func(msg *sarama.ProducerMessage) []tracer.StartSpanOption {
122+
key, err := msg.Key.Encode()
123+
assert.NoError(t, err)
124+
125+
return []tracer.StartSpanOption{
126+
tracer.Tag("kafka.messaging.key", key),
127+
}
128+
},
129+
),
130+
)
131+
defer func() {
132+
assert.NoError(t, producer.Close())
133+
}()
134+
135+
msg1 := &sarama.ProducerMessage{
136+
Topic: topic,
137+
Key: sarama.StringEncoder("test key"),
138+
Value: sarama.StringEncoder("test 1"),
139+
Metadata: "test",
140+
}
141+
_, _, err = producer.SendMessage(msg1)
142+
require.NoError(t, err)
143+
144+
spans := mt.FinishedSpans()
145+
require.Len(t, spans, 1)
146+
{
147+
s := spans[0]
148+
assert.Equal(t, "kafka", s.Tag(ext.ServiceName))
149+
assert.Equal(t, "queue", s.Tag(ext.SpanType))
150+
assert.Equal(t, "Produce Topic "+topic, s.Tag(ext.ResourceName))
151+
assert.Equal(t, "kafka.produce", s.OperationName())
152+
assert.Equal(t, float64(0), s.Tag(ext.MessagingKafkaPartition))
153+
assert.NotNil(t, s.Tag("offset"))
154+
assert.Equal(t, "IBM/sarama", s.Tag(ext.Component))
155+
assert.Equal(t, ext.SpanKindProducer, s.Tag(ext.SpanKind))
156+
assert.Equal(t, "kafka", s.Tag(ext.MessagingSystem))
157+
assert.Equal(t, topic, s.Tag("messaging.destination.name"))
158+
assert.Equal(t, "test key", s.Tag("messaging.kafka.key"))
159+
160+
assertDSMProducerPathway(t, topic, msg1)
161+
}
162+
}
163+
106164
func TestWrapAsyncProducer(t *testing.T) {
107165
// the default for producers is a fire-and-forget model that doesn't return
108166
// successes

0 commit comments

Comments
 (0)