Skip to content

Commit 70dfa38

Browse files
Udhayarajanndyakov
andauthored
feat(otel): add trace filter for process pipeline and dial operation (#3550)
* feat(tracing): add process pipeline and dial filtering options for tracing * refactor(tracing): implement default command filter and process pipeline options * refactor(tracing): rename defaultCommandFilter to DefaultCommandFilter * refactor(tracing): add BasicCommandFilter as a deprecated alias for DefaultCommandFilter --------- Co-authored-by: Nedyalko Dyakov <[email protected]>
1 parent a15e763 commit 70dfa38

File tree

3 files changed

+167
-9
lines changed

3 files changed

+167
-9
lines changed

extra/redisotel/config.go

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ type config struct {
2222
tp trace.TracerProvider
2323
tracer trace.Tracer
2424

25-
dbStmtEnabled bool
26-
callerEnabled bool
27-
filter func(cmd redis.Cmder) bool
25+
dbStmtEnabled bool
26+
callerEnabled bool
27+
filterDial bool
28+
filterProcessPipeline func(cmds []redis.Cmder) bool
29+
filterProcess func(cmd redis.Cmder) bool
2830

2931
// Metrics options.
3032

@@ -65,6 +67,15 @@ func newConfig(opts ...baseOption) *config {
6567
mp: otel.GetMeterProvider(),
6668
dbStmtEnabled: true,
6769
callerEnabled: true,
70+
filterProcess: DefaultCommandFilter,
71+
filterProcessPipeline: func(cmds []redis.Cmder) bool {
72+
for _, cmd := range cmds {
73+
if DefaultCommandFilter(cmd) {
74+
return true
75+
}
76+
}
77+
return false
78+
},
6879
}
6980

7081
for _, opt := range opts {
@@ -132,11 +143,28 @@ func WithCallerEnabled(on bool) TracingOption {
132143
// passwords.
133144
func WithCommandFilter(filter func(cmd redis.Cmder) bool) TracingOption {
134145
return tracingOption(func(conf *config) {
135-
conf.filter = filter
146+
conf.filterProcess = filter
136147
})
137148
}
138149

139-
func BasicCommandFilter(cmd redis.Cmder) bool {
150+
// WithCommandsFilter allows filtering of pipeline commands
151+
// when tracing to omit commands that may have sensitive details like
152+
// passwords in a pipeline.
153+
func WithCommandsFilter(filter func(cmds []redis.Cmder) bool) TracingOption {
154+
return tracingOption(func(conf *config) {
155+
conf.filterProcessPipeline = filter
156+
})
157+
}
158+
159+
// WithDialFilter enables or disables filtering of dial commands.
160+
func WithDialFilter(on bool) TracingOption {
161+
return tracingOption(func(conf *config) {
162+
conf.filterDial = on
163+
})
164+
}
165+
166+
// DefaultCommandFilter filters out AUTH commands from tracing.
167+
func DefaultCommandFilter(cmd redis.Cmder) bool {
140168
if strings.ToLower(cmd.Name()) == "auth" {
141169
return true
142170
}
@@ -159,6 +187,12 @@ func BasicCommandFilter(cmd redis.Cmder) bool {
159187
return false
160188
}
161189

190+
// BasicCommandFilter filters out AUTH commands from tracing.
191+
// Deprecated: use DefaultCommandFilter instead.
192+
func BasicCommandFilter(cmd redis.Cmder) bool {
193+
return DefaultCommandFilter(cmd)
194+
}
195+
162196
//------------------------------------------------------------------------------
163197

164198
type MetricsOption interface {

extra/redisotel/tracing.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ func newTracingHook(connString string, opts ...TracingOption) *tracingHook {
8787

8888
func (th *tracingHook) DialHook(hook redis.DialHook) redis.DialHook {
8989
return func(ctx context.Context, network, addr string) (net.Conn, error) {
90+
91+
if th.conf.filterDial {
92+
return hook(ctx, network, addr)
93+
}
94+
9095
ctx, span := th.conf.tracer.Start(ctx, "redis.dial", th.spanOpts...)
9196
defer span.End()
9297

@@ -103,7 +108,7 @@ func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
103108
return func(ctx context.Context, cmd redis.Cmder) error {
104109

105110
// Check if the command should be filtered out
106-
if th.conf.filter != nil && th.conf.filter(cmd) {
111+
if th.conf.filterProcess != nil && th.conf.filterProcess(cmd) {
107112
// If so, just call the next hook
108113
return hook(ctx, cmd)
109114
}
@@ -141,6 +146,11 @@ func (th *tracingHook) ProcessPipelineHook(
141146
hook redis.ProcessPipelineHook,
142147
) redis.ProcessPipelineHook {
143148
return func(ctx context.Context, cmds []redis.Cmder) error {
149+
150+
if th.conf.filterProcessPipeline != nil && th.conf.filterProcessPipeline(cmds) {
151+
return hook(ctx, cmds)
152+
}
153+
144154
attrs := make([]attribute.KeyValue, 0, 8)
145155
attrs = append(attrs,
146156
attribute.Int("db.redis.num_cmd", len(cmds)),

extra/redisotel/tracing_test.go

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func TestWithCommandFilter(t *testing.T) {
156156
hook := newTracingHook(
157157
"",
158158
WithTracerProvider(provider),
159-
WithCommandFilter(BasicCommandFilter),
159+
WithCommandFilter(DefaultCommandFilter),
160160
)
161161
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
162162
cmd := redis.NewCmd(ctx, "auth", "test-password")
@@ -181,7 +181,7 @@ func TestWithCommandFilter(t *testing.T) {
181181
hook := newTracingHook(
182182
"",
183183
WithTracerProvider(provider),
184-
WithCommandFilter(BasicCommandFilter),
184+
WithCommandFilter(DefaultCommandFilter),
185185
)
186186
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
187187
cmd := redis.NewCmd(ctx, "hello", 3, "AUTH", "test-user", "test-password")
@@ -206,7 +206,7 @@ func TestWithCommandFilter(t *testing.T) {
206206
hook := newTracingHook(
207207
"",
208208
WithTracerProvider(provider),
209-
WithCommandFilter(BasicCommandFilter),
209+
WithCommandFilter(DefaultCommandFilter),
210210
)
211211
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
212212
cmd := redis.NewCmd(ctx, "hello", 3)
@@ -227,6 +227,120 @@ func TestWithCommandFilter(t *testing.T) {
227227
})
228228
}
229229

230+
func TestWithCommandsFilter(t *testing.T) {
231+
t.Run("filter out ping and info commands", func(t *testing.T) {
232+
provider := sdktrace.NewTracerProvider()
233+
hook := newTracingHook(
234+
"",
235+
WithTracerProvider(provider),
236+
WithCommandsFilter(func(cmds []redis.Cmder) bool {
237+
for _, cmd := range cmds {
238+
if cmd.Name() == "ping" || cmd.Name() == "info" {
239+
return true
240+
}
241+
}
242+
return false
243+
}),
244+
)
245+
246+
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
247+
cmds := []redis.Cmder{
248+
redis.NewCmd(ctx, "ping"),
249+
redis.NewCmd(ctx, "info"),
250+
}
251+
defer span.End()
252+
253+
processPipelineHook := hook.ProcessPipelineHook(func(ctx context.Context, cmds []redis.Cmder) error {
254+
innerSpan := trace.SpanFromContext(ctx).(sdktrace.ReadOnlySpan)
255+
if innerSpan.Name() != "redis-test" || innerSpan.Name() == "redis.pipeline ping\ninfo" {
256+
t.Fatalf("ping and info commands should not be traced")
257+
}
258+
return nil
259+
})
260+
err := processPipelineHook(ctx, cmds)
261+
if err != nil {
262+
t.Fatal(err)
263+
}
264+
})
265+
266+
t.Run("do not filter ping and info commands", func(t *testing.T) {
267+
provider := sdktrace.NewTracerProvider()
268+
hook := newTracingHook(
269+
"",
270+
WithTracerProvider(provider),
271+
WithCommandsFilter(func(cmds []redis.Cmder) bool {
272+
return false // never filter
273+
}),
274+
)
275+
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
276+
cmds := []redis.Cmder{
277+
redis.NewCmd(ctx, "ping"),
278+
redis.NewCmd(ctx, "info"),
279+
}
280+
defer span.End()
281+
processPipelineHook := hook.ProcessPipelineHook(func(ctx context.Context, cmds []redis.Cmder) error {
282+
innerSpan := trace.SpanFromContext(ctx).(sdktrace.ReadOnlySpan)
283+
if innerSpan.Name() != "redis.pipeline ping info" {
284+
t.Fatalf("ping and info commands should be traced")
285+
}
286+
287+
return nil
288+
})
289+
290+
err := processPipelineHook(ctx, cmds)
291+
if err != nil {
292+
t.Fatal(err)
293+
}
294+
})
295+
}
296+
297+
func TestWithDialFilter(t *testing.T) {
298+
t.Run("filter out dial", func(t *testing.T) {
299+
provider := sdktrace.NewTracerProvider()
300+
hook := newTracingHook(
301+
"",
302+
WithTracerProvider(provider),
303+
WithDialFilter(true),
304+
)
305+
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
306+
defer span.End()
307+
dialHook := hook.DialHook(func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
308+
innerSpan := trace.SpanFromContext(ctx).(sdktrace.ReadOnlySpan)
309+
if innerSpan.Name() == "redis.dial" {
310+
t.Fatalf("dial should not be traced")
311+
}
312+
return nil, nil
313+
})
314+
315+
_, err := dialHook(ctx, "tcp", "localhost:6379")
316+
if err != nil {
317+
t.Fatal(err)
318+
}
319+
})
320+
321+
t.Run("do not filter dial", func(t *testing.T) {
322+
provider := sdktrace.NewTracerProvider()
323+
hook := newTracingHook(
324+
"",
325+
WithTracerProvider(provider),
326+
WithDialFilter(false),
327+
)
328+
ctx, span := provider.Tracer("redis-test").Start(context.TODO(), "redis-test")
329+
defer span.End()
330+
dialHook := hook.DialHook(func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
331+
innerSpan := trace.SpanFromContext(ctx).(sdktrace.ReadOnlySpan)
332+
if innerSpan.Name() != "redis.dial" {
333+
t.Fatalf("dial should be traced")
334+
}
335+
return nil, nil
336+
})
337+
_, err := dialHook(ctx, "tcp", "localhost:6379")
338+
if err != nil {
339+
t.Fatal(err)
340+
}
341+
})
342+
}
343+
230344
func TestTracingHook_DialHook(t *testing.T) {
231345
imsb := tracetest.NewInMemoryExporter()
232346
provider := sdktrace.NewTracerProvider(sdktrace.WithSyncer(imsb))

0 commit comments

Comments
 (0)