Skip to content

Commit 5c5c33b

Browse files
Added WithWorkTextMapPropagator option (#28)
1. Added `WithWorkTextMapPropagator` 2. Fixed linter in pipeline 3. Addressed linter errors --------- Co-authored-by: stewartboyd119 <[email protected]>
1 parent e3c06b1 commit 5c5c33b

13 files changed

+48
-31
lines changed

.golangci.yml

+2-15
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,19 @@ linters-settings:
1414
sections:
1515
- standard
1616
- default
17-
depguard:
18-
rules:
19-
Main:
20-
files:
21-
- $all
22-
- "!$test"
23-
deny:
24-
- github.com/satori/go.uuid: Prefer "github.com/google/uuid"
17+
linters:
2518
disable-all: true
2619
enable:
2720
- asciicheck
2821
- bidichk
2922
- bodyclose
30-
- cyclop
3123
- decorder
32-
- depguard
33-
- deadcode
3424
- dupl
3525
- errcheck
3626
- errchkjson
3727
- errname
3828
- errorlint
39-
- exportloopref
4029
- gci
41-
- gocognit
4230
- goconst
4331
- gocritic
4432
- gocyclo
@@ -49,10 +37,9 @@ linters-settings:
4937
- nolintlint
5038
- prealloc
5139
- staticcheck
52-
- structcheck
5340
- typecheck
5441
- unconvert
5542
- unparam
5643
- unused
57-
- varcheck
44+
- copyloopvar
5845
- whitespace

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Directories containing independent Go modules.
22
MODULE_DIRS = .
3-
GOLANGCI_VERSION=1.64.5
3+
GOLANGCI_VERSION=1.64.6
44
AVRO_CMD_PATH=github.com/hamba/avro/v2/cmd/[email protected]
55
SCHEMA_REGISTRY_DOMAIN=schema-registry.shared.zg-int.net:443
66

changelog.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ All notable changes to this project will be documented in this file.
44

55
This project adheres to Semantic Versioning.
66

7-
## 2.1.2 (March 7, 2025)
7+
## 2.2.0 (March 9, 2025)
88

9-
1. Updated module to go1.24
10-
2. Updated dependencies
9+
1. Added `WithWorkTextMapPropagator`
10+
2. Updated module to go1.24
11+
3. Updated dependencies
12+
4. Fixed linter
1113

1214
## 2.1.1 (Feb 13, 2025)
1315

client.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,7 @@ func (c *Client) getFormatter(args formatterArgs) (kFormatter, error) {
186186
if err != nil {
187187
return nil, err
188188
}
189-
cf, err := newAvroSchemaRegistryFormatter(scl)
190-
return cf, err
189+
return newAvroSchemaRegistryFormatter(scl), nil
191190
case ProtoSchemaRegistry:
192191
scl, err := c.srf.createProto(args.srCfg)
193192
if err != nil {

example/worker-delay/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func main() {
1818
BootstrapServers: []string{"localhost:29092"},
1919
},
2020
// optionally add a logger, which implements zkafka.Logger, to see detailed information about message processsing
21-
//zkafka.LoggerOption(),
21+
// zkafka.LoggerOption(),
2222
)
2323
// It's important to close the client after consumption to gracefully leave the consumer group
2424
// (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance)

example/worker/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func main() {
1919
BootstrapServers: []string{"localhost:29092"},
2020
},
2121
// optionally add a logger, which implements zkafka.Logger, to see detailed information about message processsing
22-
//zkafka.LoggerOption(),
22+
// zkafka.LoggerOption(),
2323
)
2424
// It's important to close the client after consumption to gracefully leave the consumer group
2525
// (this commits completed work, and informs the broker that this consumer is leaving the group which yields a faster rebalance)
@@ -79,8 +79,8 @@ func (p Processor) Process(_ context.Context, msg *zkafka.Message) error {
7979
// optionally, if you don't want to use the configured formatter at all, access the kafka message payload bytes directly.
8080
// The commented out block shows accessing the byte array. In this case we're stringifying the bytes, but this could be json unmarshalled,
8181
// proto unmarshalled etc., depending on the expected payload
82-
//data := msg.Value()
83-
//str := string(data)
82+
// data := msg.Value()
83+
// str := string(data)
8484

8585
log.Printf(" offset: %d, partition: %d. event.Name: %s, event.Age %d\n", msg.Offset, msg.Partition, event.Name, event.Age)
8686
return nil

formatter.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,10 @@ type avroSchemaRegistryFormatter struct {
101101
afmt avroFmt
102102
}
103103

104-
func newAvroSchemaRegistryFormatter(afmt avroFmt) (avroSchemaRegistryFormatter, error) {
104+
func newAvroSchemaRegistryFormatter(afmt avroFmt) avroSchemaRegistryFormatter {
105105
return avroSchemaRegistryFormatter{
106106
afmt: afmt,
107-
}, nil
107+
}
108108
}
109109

110110
// marshall looks a subject's schema (id) so that it can prefix the eventual message payload.
@@ -170,7 +170,6 @@ func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error {
170170
}
171171
err = avro.Unmarshal(resolvedSchema, req.data[5:], req.target)
172172
if err != nil {
173-
174173
return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err)
175174
}
176175
return nil

lifecycle.go

-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ func ChainLifecycleHooks(hooks ...LifecycleHooks) LifecycleHooks {
106106
}
107107

108108
return hookCtx, allErrs
109-
110109
},
111110
PostReadImmediate: func(ctx context.Context, meta LifecyclePostReadImmediateMeta) {
112111
for _, h := range hooks {

reader.go

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func (r *KReader) Read(ctx context.Context) (*Message, error) {
115115
kmsg, err := r.consumer.ReadMessage(time.Duration(*r.topicConfig.ReadTimeoutMillis) * time.Millisecond)
116116
if err != nil {
117117
var v kafka.Error
118+
//nolint:gocritic //single switch is okay
118119
switch {
119120
case errors.As(err, &v):
120121
// timeouts occur (because the assigned partitions aren't being written to, lack of activity, etc.). We'll

schemareg.go

-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ func (c *schemaRegistryFactory) createProto(srConfig SchemaRegistryConfig) (prot
7979
ser: ser,
8080
deser: deser,
8181
}, nil
82-
8382
}
8483

8584
func (c *schemaRegistryFactory) createJson(srConfig SchemaRegistryConfig) (jsonFmt, error) {

work.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ func (w *Work) execProcessors(ctx context.Context, shutdown <-chan struct{}) {
161161
wg := sync.WaitGroup{}
162162
wg.Add(len(w.virtualPartitions))
163163
for i := range w.virtualPartitions {
164-
i := i
165164
go func() {
166165
w.processVirtualPartition(ctx, i, shutdown)
167166
wg.Done()
@@ -452,6 +451,7 @@ func (w *Work) processSingle(ctx context.Context, msg *Message, partitionIndex i
452451
span.RecordError(err)
453452
span.SetStatus(codes.Error, err.Error())
454453

454+
//nolint:errorlint // guaranteed to not be wrapped
455455
if pError, ok := err.(processorError); ok {
456456
// Because we assume workers will log their own internal errors once
457457
// already, we try to ignore logging them twice by also logging them

work_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,22 @@ func TestWork_WithOptions(t *testing.T) {
7878
require.Equal(t, tp.Tracer(""), work.tracer)
7979
}
8080

81+
func TestWork_WithTextMapPropagator_CanOverrideFactoryPropagator(t *testing.T) {
82+
defer recoverThenFail(t)
83+
ctrl := gomock.NewController(t)
84+
defer ctrl.Finish()
85+
86+
propagator := propagation.TraceContext{}
87+
88+
wf := NewWorkFactory(FakeClient{}, WithTextMapPropagator(propagator))
89+
90+
w1 := wf.Create(ConsumerTopicConfig{}, &timeDelayProcessor{})
91+
require.NotNil(t, w1.p)
92+
93+
w2 := wf.Create(ConsumerTopicConfig{}, &timeDelayProcessor{}, WithWorkTextMapPropagator(nil))
94+
require.Nil(t, w2.p)
95+
}
96+
8197
// TestWork_ShouldCommitMessagesProperly asserts the behavior of committing kafka messages.
8298
// Messages should be committed as they complete as long as there aren't lower offset messages still in progress.
8399
// This tests specifies processing delay times such that low offsets finish after high offsets and asserts that the storeOffsets method

workoption.go

+15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66
"time"
7+
8+
"go.opentelemetry.io/otel/propagation"
79
)
810

911
// WorkOption interface to identify functional options
@@ -62,6 +64,11 @@ func WithDeadLetterTopic(deadLetterTopicConfig ProducerTopicConfig) WorkOption {
6264
return dltOption{dltConfig: deadLetterTopicConfig}
6365
}
6466

67+
// WithWorkTextMapPropagator enables the specification of a propagator at the moment a work instance is instantiated
68+
func WithWorkTextMapPropagator(p propagation.TextMapPropagator) WorkOption {
69+
return textMapPropagator{p: p}
70+
}
71+
6572
type speedupOption struct{ times uint16 }
6673

6774
func (s speedupOption) apply(w *Work) {
@@ -120,6 +127,14 @@ func (d disableBlbOption) apply(w *Work) {
120127
w.blb.disabled = d.disabled
121128
}
122129

130+
type textMapPropagator struct {
131+
p propagation.TextMapPropagator
132+
}
133+
134+
func (d textMapPropagator) apply(w *Work) {
135+
w.p = d.p
136+
}
137+
123138
type dltOption struct {
124139
dltConfig ProducerTopicConfig
125140
}

0 commit comments

Comments
 (0)