Skip to content

Commit 126926b

Browse files
authored
use exp/api/remote in PRW2 e2e test code (#7035)
Signed-off-by: SungJin1212 <[email protected]>
1 parent 0e9721f commit 126926b

File tree

2 files changed

+40
-58
lines changed

2 files changed

+40
-58
lines changed

integration/e2ecortex/client.go

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/prometheus/alertmanager/types"
2121
promapi "github.com/prometheus/client_golang/api"
2222
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
23+
remoteapi "github.com/prometheus/client_golang/exp/api/remote"
2324
"github.com/prometheus/common/model"
2425
"github.com/prometheus/prometheus/model/labels"
2526
"github.com/prometheus/prometheus/model/rulefmt"
@@ -51,6 +52,7 @@ type Client struct {
5152
distributorAddress string
5253
timeout time.Duration
5354
httpClient *http.Client
55+
remoteWriteAPI *remoteapi.API
5456
querierClient promv1.API
5557
orgID string
5658
}
@@ -72,6 +74,17 @@ func NewClient(
7274
return nil, err
7375
}
7476

77+
client := &http.Client{
78+
Transport: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
79+
}
80+
remoteWriteAPI, err := remoteapi.NewAPI(fmt.Sprintf("http://%s", distributorAddress),
81+
remoteapi.WithAPIHTTPClient(client),
82+
remoteapi.WithAPIPath("/api/prom/push"),
83+
)
84+
if err != nil {
85+
return nil, err
86+
}
87+
7588
c := &Client{
7689
distributorAddress: distributorAddress,
7790
querierAddress: querierAddress,
@@ -80,6 +93,7 @@ func NewClient(
8093
timeout: 30 * time.Second,
8194
httpClient: &http.Client{},
8295
querierClient: promv1.NewAPI(querierAPIClient),
96+
remoteWriteAPI: remoteWriteAPI,
8397
orgID: orgID,
8498
}
8599

@@ -184,36 +198,12 @@ func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricM
184198
}
185199

186200
// PushV2 the input timeseries to the remote endpoint
187-
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
201+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (remoteapi.WriteResponseStats, error) {
188202
// Create write request
189-
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
190-
if err != nil {
191-
return nil, err
192-
}
193-
194-
// Create HTTP request
195-
compressed := snappy.Encode(nil, data)
196-
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
197-
if err != nil {
198-
return nil, err
199-
}
200-
201-
req.Header.Add("Content-Encoding", "snappy")
202-
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
203-
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
204-
req.Header.Set("X-Scope-OrgID", c.orgID)
205-
206203
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
207204
defer cancel()
208205

209-
// Execute HTTP request
210-
res, err := c.httpClient.Do(req.WithContext(ctx))
211-
if err != nil {
212-
return nil, err
213-
}
214-
215-
defer res.Body.Close()
216-
return res, nil
206+
return c.remoteWriteAPI.Write(ctx, remoteapi.WriteV2MessageType, &writev2.Request{Symbols: symbols, Timeseries: timeseries})
217207
}
218208

219209
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {

integration/remote_write_v2_test.go

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ package integration
55

66
import (
77
"math/rand"
8-
"net/http"
98
"path"
109
"testing"
1110
"time"
1211

12+
remoteapi "github.com/prometheus/client_golang/exp/api/remote"
1313
"github.com/prometheus/common/model"
1414
"github.com/prometheus/prometheus/model/labels"
1515
"github.com/prometheus/prometheus/prompb"
@@ -95,10 +95,9 @@ func TestIngesterRollingUpdate(t *testing.T) {
9595

9696
// series push
9797
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
98-
res, err := c.PushV2(symbols1, series)
98+
stats, err := c.PushV2(symbols1, series)
9999
require.NoError(t, err)
100-
require.Equal(t, 200, res.StatusCode)
101-
testPushHeader(t, res.Header, "1", "0", "0")
100+
testPushHeader(t, stats, 1, 0, 0)
102101

103102
// sample
104103
result, err := c.Query("test_series", now)
@@ -113,16 +112,14 @@ func TestIngesterRollingUpdate(t *testing.T) {
113112
// histogram
114113
histogramIdx := rand.Uint32()
115114
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
116-
res, err = c.PushV2(symbols2, histogramSeries)
115+
writeStats, err := c.PushV2(symbols2, histogramSeries)
117116
require.NoError(t, err)
118-
require.Equal(t, 200, res.StatusCode)
119-
testPushHeader(t, res.Header, "0", "1", "0")
117+
testPushHeader(t, writeStats, 0, 1, 0)
120118

121119
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
122-
res, err = c.PushV2(symbols3, histogramFloatSeries)
120+
writeStats, err = c.PushV2(symbols3, histogramFloatSeries)
123121
require.NoError(t, err)
124-
require.Equal(t, 200, res.StatusCode)
125-
testPushHeader(t, res.Header, "0", "1", "0")
122+
testPushHeader(t, writeStats, 0, 1, 0)
126123

127124
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
128125
expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx))
@@ -198,9 +195,9 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
198195

199196
// series push
200197
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
201-
res, err := c.PushV2(symbols1, series)
202-
require.NoError(t, err)
203-
require.Equal(t, 200, res.StatusCode)
198+
_, err = c.PushV2(symbols1, series)
199+
require.Error(t, err)
200+
require.Contains(t, err.Error(), "sent v2 request; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted")
204201

205202
// sample
206203
result, err := c.Query("test_series", now)
@@ -266,10 +263,9 @@ func TestIngest(t *testing.T) {
266263

267264
// series push
268265
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
269-
res, err := c.PushV2(symbols1, series)
266+
writeStats, err := c.PushV2(symbols1, series)
270267
require.NoError(t, err)
271-
require.Equal(t, 200, res.StatusCode)
272-
testPushHeader(t, res.Header, "1", "0", "0")
268+
testPushHeader(t, writeStats, 1, 0, 0)
273269

274270
// sample
275271
result, err := c.Query("test_series", now)
@@ -284,17 +280,15 @@ func TestIngest(t *testing.T) {
284280
// histogram
285281
histogramIdx := rand.Uint32()
286282
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
287-
res, err = c.PushV2(symbols2, histogramSeries)
283+
writeStats, err = c.PushV2(symbols2, histogramSeries)
288284
require.NoError(t, err)
289-
require.Equal(t, 200, res.StatusCode)
290-
testPushHeader(t, res.Header, "0", "1", "0")
285+
testPushHeader(t, writeStats, 0, 1, 0)
291286

292287
// float histogram
293288
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
294-
res, err = c.PushV2(symbols3, histogramFloatSeries)
289+
writeStats, err = c.PushV2(symbols3, histogramFloatSeries)
295290
require.NoError(t, err)
296-
require.Equal(t, 200, res.StatusCode)
297-
testPushHeader(t, res.Header, "0", "1", "0")
291+
testPushHeader(t, writeStats, 0, 1, 0)
298292

299293
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
300294
expectedHistogram := tsdbutil.GenerateTestHistogram(int64(histogramIdx))
@@ -379,10 +373,9 @@ func TestExemplar(t *testing.T) {
379373
},
380374
}
381375

382-
res, err := c.PushV2(symbols, timeseries)
376+
writeStats, err := c.PushV2(symbols, timeseries)
383377
require.NoError(t, err)
384-
require.Equal(t, 200, res.StatusCode)
385-
testPushHeader(t, res.Header, "1", "0", "1")
378+
testPushHeader(t, writeStats, 1, 0, 1)
386379

387380
start := time.Now().Add(-time.Minute)
388381
end := now.Add(time.Minute)
@@ -451,14 +444,13 @@ func Test_WriteStatWithReplication(t *testing.T) {
451444
numSamples := 20
452445
scrapeInterval := 30 * time.Second
453446
symbols, series := e2e.GenerateV2SeriesWithSamples("test_series", start, scrapeInterval, 0, numSamples, prompb.Label{Name: "job", Value: "test"})
454-
res, err := c.PushV2(symbols, []writev2.TimeSeries{series})
447+
writeStats, err := c.PushV2(symbols, []writev2.TimeSeries{series})
455448
require.NoError(t, err)
456-
require.Equal(t, 200, res.StatusCode)
457-
testPushHeader(t, res.Header, "20", "0", "0")
449+
testPushHeader(t, writeStats, 20, 0, 0)
458450
}
459451

460-
func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
461-
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
462-
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
463-
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
452+
func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSamples, expectedHistogram, expectedExemplars int) {
453+
require.Equal(t, expectedSamples, stats.Samples)
454+
require.Equal(t, expectedHistogram, stats.Histograms)
455+
require.Equal(t, expectedExemplars, stats.Exemplars)
464456
}

0 commit comments

Comments
 (0)