Open
Description
Describe the bug
When I close the reader, it stuck at the batch.Close()
until it reach readBatchTimeout
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
...
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
batch.Close() // stuck here
break
}
A clear and concise description of what the bug is.
Wait too long when close a consumer group reader
Kafka Version
- What version(s) of Kafka are you testing against?
v3.5.1
- What version of kafka-go are you using?
v0.4.47
To Reproduce
Use the configuration to read messages continuously, then close the reader in another goroutine.
kafka.ReaderConfig{
Brokers: s.broker,
GroupID: s.groupID,
Topic: s.topic,
MaxWait: 3 * time.Second,
StartOffset: kafka.LastOffset,
QueueCapacity: 4096,
MaxBytes: 1024 * 1024 * 20,
ReadBatchTimeout: time.Second *10,
SessionTimeout: 20 * time.Second,
HeartbeatInterval: 6 * time.Second,
RebalanceTimeout: 10 * time.Second,
}
Resources to reproduce the behavior: