Description
Describe the bug
While debugging a high CPU consumption in my project, I noticed that each time an event is sent to Kafka, the number of goroutines running in my project was increasing.
The code sending the event is:
func (k *Client) Push(ctx context.Context, topicName string, message *kafka.Message) error {
writer := &kafka.Writer{
Addr: kafka.TCP(k.brokers...),
Topic: topicName,
Balancer: &kafka.Murmur2Balancer{},
Transport: &kafka.Transport{
TLS: k.dialer.TLS,
},
}
defer func() {
err := writer.Close()
if err != nil {
logger.Warnf("Error closing writer for topic %s: %v", topicName, err)
}
}()
return writer.WriteMessages(ctx, *message)
}
After running that, I see in Intellij debug console that the number of goroutines increases:
The more events are sent, the more goroutines keep running.
Checking the code in the transport file I see that a pool of connections is initialized with a refc: 2
here. When calling to the ref()
function, it is increasing that value here. When calling the unref()
function, it decreases the value and if it's 0 it properly closes the context of that goroutine. The unref()
function is properly called when closing the transport connections.
Adding breakpoints I see that, when writing an event, a pool is created, the code calls to unref()
, ref()
, unref()
, ref()
, unref()
, leaving the value of refc as 1
but never properly canceling the context and leaving the discover goroutine running forever. On each write operation, there's a new pool created and a new discover goroutine left behind running.
I suspect that refc
attribute is being used for tracking if the pool is being used or not. I don't fully understand why the code is using a uintptr
for this instead of a int
or any other type nor why it is being initialized with 2
.
Kafka Version
- Kafka version: 3.7.0 (bitnami/kafka:3.7.0-debian-12-r7)
- kafka-go: v0.4.47