Skip to content

Commit 7dcf80f

Browse files
authored
Merge pull request #192 from RedisLabs/handle-rate-limits
Handle API rate limits gracefully
2 parents 5191907 + 2b07007 commit 7dcf80f

File tree

7 files changed

+303
-16
lines changed

7 files changed

+303
-16
lines changed

client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func NewClient(configs ...Option) (*Client, error) {
7373
Transport: config.roundTripper(),
7474
}
7575

76-
client, err := internal.NewHttpClient(httpClient, config.baseUrl)
76+
client, err := internal.NewHttpClient(httpClient, config.baseUrl, config.logger)
7777
if err != nil {
7878
return nil, err
7979
}

internal/http_client.go

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,50 +4,108 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"errors"
78
"fmt"
89
"io"
910
"net/http"
1011
"net/url"
12+
"strconv"
13+
"time"
14+
15+
"github.com/avast/retry-go/v4"
16+
)
17+
18+
const (
19+
defaultWindowLimit = 400
20+
defaultWindowDuration = 1 * time.Minute
21+
22+
headerRateLimitRemaining = "X-Rate-Limit-Remaining"
1123
)
1224

1325
type HttpClient struct {
14-
client *http.Client
15-
baseUrl *url.URL
26+
client *http.Client
27+
baseUrl *url.URL
28+
rateLimiter RateLimiter
29+
retryEnabled bool
30+
retryMaxDelay time.Duration
31+
retryDelay time.Duration
32+
retryMaxAttempts uint
33+
logger Log
1634
}
1735

18-
func NewHttpClient(client *http.Client, baseUrl string) (*HttpClient, error) {
36+
func NewHttpClient(client *http.Client, baseUrl string, logger Log) (*HttpClient, error) {
1937
parsed, err := url.Parse(baseUrl)
2038
if err != nil {
2139
return nil, err
2240
}
23-
return &HttpClient{client: client, baseUrl: parsed}, nil
41+
42+
return &HttpClient{
43+
client: client,
44+
baseUrl: parsed,
45+
rateLimiter: newFixedWindowCountRateLimiter(defaultWindowLimit, defaultWindowDuration),
46+
retryEnabled: true,
47+
retryMaxAttempts: 10,
48+
retryDelay: 1 * time.Second,
49+
retryMaxDelay: defaultWindowDuration,
50+
logger: logger,
51+
}, nil
2452
}
2553

2654
func (c *HttpClient) Get(ctx context.Context, name, path string, responseBody interface{}) error {
27-
return c.connection(ctx, http.MethodGet, name, path, nil, nil, responseBody)
55+
return c.connectionWithRetries(ctx, http.MethodGet, name, path, nil, nil, responseBody)
2856
}
2957

3058
func (c *HttpClient) GetWithQuery(ctx context.Context, name, path string, query url.Values, responseBody interface{}) error {
31-
return c.connection(ctx, http.MethodGet, name, path, query, nil, responseBody)
59+
return c.connectionWithRetries(ctx, http.MethodGet, name, path, query, nil, responseBody)
3260
}
3361

3462
func (c *HttpClient) Put(ctx context.Context, name, path string, requestBody interface{}, responseBody interface{}) error {
35-
return c.connection(ctx, http.MethodPut, name, path, nil, requestBody, responseBody)
63+
return c.connectionWithRetries(ctx, http.MethodPut, name, path, nil, requestBody, responseBody)
3664
}
3765

3866
func (c *HttpClient) Post(ctx context.Context, name, path string, requestBody interface{}, responseBody interface{}) error {
39-
return c.connection(ctx, http.MethodPost, name, path, nil, requestBody, responseBody)
67+
return c.connectionWithRetries(ctx, http.MethodPost, name, path, nil, requestBody, responseBody)
4068
}
4169

4270
func (c *HttpClient) Delete(ctx context.Context, name, path string, responseBody interface{}) error {
43-
return c.connection(ctx, http.MethodDelete, name, path, nil, nil, responseBody)
71+
return c.connectionWithRetries(ctx, http.MethodDelete, name, path, nil, nil, responseBody)
4472
}
4573

4674
func (c *HttpClient) DeleteWithQuery(ctx context.Context, name, path string, requestBody interface{}, responseBody interface{}) error {
47-
return c.connection(ctx, http.MethodDelete, name, path, nil, requestBody, responseBody)
75+
return c.connectionWithRetries(ctx, http.MethodDelete, name, path, nil, requestBody, responseBody)
76+
}
77+
78+
func (c *HttpClient) connectionWithRetries(ctx context.Context, method, name, path string, query url.Values, requestBody interface{}, responseBody interface{}) error {
79+
return retry.Do(func() error {
80+
return c.connection(ctx, method, name, path, query, requestBody, responseBody)
81+
},
82+
retry.Attempts(c.retryMaxAttempts),
83+
retry.Delay(c.retryDelay),
84+
retry.MaxDelay(c.retryMaxDelay),
85+
retry.RetryIf(func(err error) bool {
86+
if !c.retryEnabled {
87+
return false
88+
}
89+
var target *HTTPError
90+
if errors.As(err, &target) && target.StatusCode == http.StatusTooManyRequests {
91+
c.logger.Println(fmt.Sprintf("status code 429 received, request will be retried"))
92+
return true
93+
}
94+
return false
95+
}),
96+
retry.LastErrorOnly(true),
97+
retry.Context(ctx),
98+
)
4899
}
49100

50101
func (c *HttpClient) connection(ctx context.Context, method, name, path string, query url.Values, requestBody interface{}, responseBody interface{}) error {
102+
if c.rateLimiter != nil {
103+
err := c.rateLimiter.Wait(ctx)
104+
if err != nil {
105+
return err
106+
}
107+
}
108+
51109
parsed := new(url.URL)
52110
*parsed = *c.baseUrl
53111

@@ -81,6 +139,16 @@ func (c *HttpClient) connection(ctx context.Context, method, name, path string,
81139
return fmt.Errorf("failed to %s: %w", name, err)
82140
}
83141

142+
remainingLimit := response.Header.Get(headerRateLimitRemaining)
143+
if remainingLimit != "" {
144+
if limit, err := strconv.Atoi(remainingLimit); err == nil {
145+
err = c.rateLimiter.Update(limit)
146+
if err != nil {
147+
return err
148+
}
149+
}
150+
}
151+
84152
defer response.Body.Close()
85153

86154
if response.StatusCode > 299 {

internal/http_client_test.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/http/httptest"
77
"testing"
88

9+
"github.com/stretchr/testify/assert"
910
"github.com/stretchr/testify/require"
1011
)
1112

@@ -14,9 +15,81 @@ func TestHttpClient_Get_failsFor4xx(t *testing.T) {
1415
w.WriteHeader(418)
1516
}))
1617

17-
subject, err := NewHttpClient(s.Client(), s.URL)
18+
subject, err := NewHttpClient(s.Client(), s.URL, &testLogger{t: t})
1819
require.NoError(t, err)
1920

2021
err = subject.Get(context.TODO(), "testing", "/", nil)
2122
require.Error(t, err)
2223
}
24+
25+
func TestHttpClient_Retry(t *testing.T) {
26+
testCase := []struct {
27+
description string
28+
retryEnabled bool
29+
statusCode int
30+
expectedCount int
31+
expectedError string
32+
}{
33+
{
34+
description: "should retry 429 requests when retry is enabled",
35+
retryEnabled: true,
36+
statusCode: 429,
37+
expectedCount: 3,
38+
},
39+
{
40+
description: "should not retry other status code when retry is enabled",
41+
retryEnabled: true,
42+
statusCode: 404,
43+
expectedCount: 1,
44+
expectedError: "failed to test get request: 404 - ",
45+
},
46+
{
47+
description: "should not retry 429 requests when retry is disabled",
48+
retryEnabled: false,
49+
statusCode: 429,
50+
expectedCount: 1,
51+
expectedError: "failed to test get request: 429 - ",
52+
},
53+
}
54+
55+
for _, test := range testCase {
56+
t.Run(test.description, func(t *testing.T) {
57+
58+
count := 0
59+
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
60+
count++
61+
if count < 3 {
62+
w.WriteHeader(test.statusCode)
63+
return
64+
}
65+
w.WriteHeader(200)
66+
_, err := w.Write([]byte("{}"))
67+
require.NoError(t, err)
68+
}))
69+
70+
subject, err := NewHttpClient(s.Client(), s.URL, &testLogger{t: t})
71+
require.NoError(t, err)
72+
subject.retryEnabled = test.retryEnabled
73+
74+
ctx := context.Background()
75+
err = subject.Get(ctx, "test get request", "/", nil)
76+
if test.expectedError != "" {
77+
assert.EqualError(t, err, test.expectedError)
78+
} else {
79+
assert.NoError(t, err)
80+
}
81+
assert.Equal(t, test.expectedCount, count)
82+
})
83+
}
84+
85+
}
86+
87+
type testLogger struct {
88+
t *testing.T
89+
}
90+
91+
func (l *testLogger) Println(v ...interface{}) {
92+
l.t.Log(v...)
93+
}
94+
95+
var _ Log = &testLogger{}

internal/log.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package internal
2+
3+
type Log interface {
4+
Println(v ...interface{})
5+
}

internal/rate_limit.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
)
9+
10+
type RateLimiter interface {
11+
// Wait will verify one request can be sent or wait if it can't.
12+
Wait(ctx context.Context) error
13+
// Update the rate limiter when the server returns more information about the current limits.
14+
Update(remaining int) error
15+
}
16+
17+
// A fixedWindowCountRateLimiter is a rate limiter that will count the number of requests within a period (or window)
18+
// and block the caller for the expected remaining period in the window.
19+
//
20+
// The window will start again after the last one closes and the count will be reset.
21+
// Since other requests can happen outside the SDK, callers can calls the Update() function to update the remaining
22+
// event in the window.
23+
//
24+
// This rate limiter tries to model the server-side behaviour as best it can, however, it doesn't know exactly when
25+
// the server-side window starts or ends, so it can be misaligned. Therefore, the callers still need to retry requests
26+
// if a status code 429 (Too Many Requests) is received.
27+
type fixedWindowCountRateLimiter struct {
28+
limit int
29+
period time.Duration
30+
windowStart *time.Time
31+
count int
32+
mu *sync.Mutex
33+
}
34+
35+
func newFixedWindowCountRateLimiter(limit int, period time.Duration) *fixedWindowCountRateLimiter {
36+
return &fixedWindowCountRateLimiter{
37+
limit: limit,
38+
period: period,
39+
mu: &sync.Mutex{},
40+
}
41+
}
42+
43+
// Wait will block the caller when the number of requests has exceeded the limit in the current window.
44+
// This function allows bursting so it will only block when the limit is reached.
45+
func (rl *fixedWindowCountRateLimiter) Wait(ctx context.Context) error {
46+
rl.mu.Lock()
47+
defer rl.mu.Unlock()
48+
49+
// Start window on first requests
50+
if rl.windowStart == nil {
51+
now := time.Now()
52+
rl.windowStart = &now
53+
}
54+
55+
windowEnd := rl.windowStart.Add(rl.period)
56+
if time.Now().After(windowEnd) {
57+
rl.count = 0
58+
rl.windowStart = &windowEnd
59+
windowEnd = rl.windowStart.Add(rl.period)
60+
}
61+
62+
if rl.count == rl.limit {
63+
delay := windowEnd.Sub(time.Now())
64+
err := sleepWithContext(ctx, delay)
65+
if err != nil {
66+
return err
67+
}
68+
}
69+
rl.count++
70+
return nil
71+
}
72+
73+
func (rl *fixedWindowCountRateLimiter) Update(remaining int) error {
74+
rl.mu.Lock()
75+
defer rl.mu.Unlock()
76+
rl.count = rl.limit - remaining
77+
return nil
78+
}
79+
80+
func sleepWithContext(ctx context.Context, d time.Duration) error {
81+
timer := time.NewTimer(d)
82+
select {
83+
case <-ctx.Done():
84+
if !timer.Stop() {
85+
return fmt.Errorf("context expired before timer stopped")
86+
}
87+
case <-timer.C:
88+
}
89+
return nil
90+
}
91+
92+
var _ RateLimiter = &fixedWindowCountRateLimiter{}

internal/rate_limit_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestFixedWindowCountRateLimiter_Wait(t *testing.T) {
13+
windowSize := 2 * time.Second
14+
windowLimit := 10
15+
16+
limiter := newFixedWindowCountRateLimiter(windowLimit, windowSize)
17+
18+
ctx := context.Background()
19+
start := time.Now()
20+
runs := 3
21+
count := 0
22+
for range windowLimit * runs {
23+
err := limiter.Wait(ctx)
24+
require.NoError(t, err)
25+
count++
26+
}
27+
end := time.Now()
28+
assert.Equal(t, runs*windowLimit, count)
29+
assert.Greater(t, end.Sub(start), windowSize.Nanoseconds()*int64(runs-1))
30+
}
31+
32+
func TestFixedWindowCountRateLimiter_Update(t *testing.T) {
33+
windowSize := 2 * time.Second
34+
windowLimit := 10
35+
36+
limiter := newFixedWindowCountRateLimiter(windowLimit, windowSize)
37+
38+
ctx := context.Background()
39+
start := time.Now()
40+
runs := 2
41+
count := 0
42+
assert.NoError(t, limiter.Update(0))
43+
for range windowLimit * runs {
44+
t.Logf("%s\n", time.Now().String())
45+
err := limiter.Wait(ctx)
46+
require.NoError(t, err)
47+
count++
48+
}
49+
end := time.Now()
50+
assert.Equal(t, runs*windowLimit, count)
51+
assert.Greater(t, end.Sub(start), windowSize.Nanoseconds()*int64(runs))
52+
53+
}

internal/service.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import (
1212
"github.com/avast/retry-go/v4"
1313
)
1414

15-
type Log interface {
16-
Println(v ...interface{})
17-
}
18-
1915
type Api interface {
2016
// WaitForResourceId will poll the Task, waiting for the Task to finish processing, where it will then return.
2117
// An error will be returned if the Task couldn't be retrieved or the Task was not processed successfully.

0 commit comments

Comments
 (0)