Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7116

## 1.20.0 2025-11-10

Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error {
select {
case <-ctx.Done():
return
case job := <-c.streamPushChan:
case job, ok := <-c.streamPushChan:
if !ok {
return
}
err = stream.Send(job.req)
if err == io.EOF {
job.resp = &cortexpb.WriteResponse{}
Expand Down
44 changes: 44 additions & 0 deletions pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

Expand Down Expand Up @@ -115,12 +116,18 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ

type mockIngester struct {
IngesterClient
mock.Mock
}

func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}

func (m *mockIngester) PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) {
args := m.Called(ctx, opts)
return args.Get(0).(Ingester_PushStreamClient), nil
}

type mockClientConn struct {
ClosableClientConn
}
Expand Down Expand Up @@ -227,3 +234,40 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) {
assert.True(t, job1Cancelled, "job1 should have been cancelled")
assert.True(t, job2Cancelled, "job2 should have been cancelled")
}

type mockClientStream struct {
mock.Mock
grpc.ClientStream
}

func (m *mockClientStream) Send(msg *cortexpb.StreamWriteRequest) error {
args := m.Called(msg)
return args.Error(0)
}

func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) {
return &cortexpb.WriteResponse{}, nil
}

func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
streamChan := make(chan *streamWriteJob)

mockIngester := &mockIngester{}
mockStream := &mockClientStream{}
mockIngester.On("PushStream", mock.Anything, mock.Anything).Return(mockStream, nil).Once()

client := &closableHealthAndIngesterClient{
IngesterClient: mockIngester,
conn: &mockClientConn{},
addr: "test-addr",
inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}),
streamCtx: ctx,
streamCancel: cancel,
streamPushChan: streamChan,
}
require.NoError(t, client.worker(context.Background()))
require.NoError(t, client.Close())

time.Sleep(100 * time.Millisecond)
}
Loading