Skip to content

Commit a8b6ab0

Browse files
Merge pull request #10 from platinummonkey/blocking_limiter_with_timeout
Add timeout to BlockingLimiter
2 parents 6edf818 + 770efb9 commit a8b6ab0

File tree

3 files changed

+176
-18
lines changed

3 files changed

+176
-18
lines changed

examples/example_blocking_limit/main.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import (
1717
"github.com/platinummonkey/go-concurrency-limits/strategy"
1818
)
1919

20-
type contextKey uint8
20+
type contextKey string
2121

22-
const testContextKey = contextKey(1)
22+
const testContextKey contextKey = "jobID"
2323

2424
type resource struct {
2525
limiter *rate.Limiter
@@ -70,13 +70,14 @@ func (r *protectedResource) poll(ctx context.Context) (bool, error) {
7070

7171
func main() {
7272
limitStrategy := strategy.NewSimpleStrategy(10)
73+
logger := limit.BuiltinLimitLogger{}
7374
defaultLimiter, err := limiter.NewDefaultLimiterWithDefaults(
7475
"example_blocking_limit",
7576
limitStrategy,
76-
limit.BuiltinLimitLogger{},
77+
logger,
7778
core.EmptyMetricRegistryInstance,
7879
)
79-
externalResourceLimiter := limiter.NewBlockingLimiter(defaultLimiter)
80+
externalResourceLimiter := limiter.NewBlockingLimiter(defaultLimiter, 0, logger)
8081

8182
if err != nil {
8283
log.Fatalf("Error creating limiter err=%v\n", err)

limiter/blocking.go

+110-5
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,29 @@ package limiter
33
import (
44
"context"
55
"fmt"
6+
"github.com/platinummonkey/go-concurrency-limits/limit"
67
"sync"
8+
"time"
79

810
"github.com/platinummonkey/go-concurrency-limits/core"
911
)
1012

13+
const longBlockingTimeout = time.Hour * 24 * 30 * 12 * 100 // 100 years
14+
1115
// BlockingListener wraps the wrapped Limiter's Listener to correctly handle releasing blocked connections
1216
type BlockingListener struct {
1317
delegateListener core.Listener
1418
c *sync.Cond
1519
}
1620

21+
func NewBlockingListener(delegateListener core.Listener) *BlockingListener {
22+
mu := sync.Mutex{}
23+
return &BlockingListener{
24+
delegateListener: delegateListener,
25+
c: sync.NewCond(&mu),
26+
}
27+
}
28+
1729
func (l *BlockingListener) unblock() {
1830
l.c.Broadcast()
1931
}
@@ -40,38 +52,129 @@ func (l *BlockingListener) OnSuccess() {
4052
l.unblock()
4153
}
4254

55+
// timeoutWaiter will wait for a timeout or unblock signal
56+
type timeoutWaiter struct {
57+
timeoutSig chan struct{}
58+
closerSig chan struct{}
59+
c *sync.Cond
60+
once sync.Once
61+
timeout time.Duration
62+
}
63+
64+
func newTimeoutWaiter(c *sync.Cond, timeout time.Duration) *timeoutWaiter {
65+
return &timeoutWaiter{
66+
timeoutSig: make(chan struct{}),
67+
closerSig: make(chan struct{}),
68+
c: c,
69+
timeout: timeout,
70+
}
71+
}
72+
73+
func (w *timeoutWaiter) start() {
74+
// start two routines, one runner to signal, another blocking to wait and call unblock
75+
go func() {
76+
w.run()
77+
}()
78+
go func() {
79+
w.c.L.Lock()
80+
defer w.c.L.Unlock()
81+
w.c.Wait()
82+
w.unblock()
83+
}()
84+
}
85+
86+
func (w *timeoutWaiter) run() {
87+
select {
88+
case <-w.closerSig:
89+
close(w.timeoutSig)
90+
return
91+
case <-time.After(w.timeout):
92+
// call unblock
93+
close(w.timeoutSig)
94+
return
95+
}
96+
}
97+
98+
func (w *timeoutWaiter) unblock() {
99+
w.once.Do(func() {
100+
close(w.closerSig)
101+
})
102+
}
103+
104+
// wait blocks until we've timed out
105+
func (w *timeoutWaiter) wait() <-chan struct{} {
106+
return w.timeoutSig
107+
}
108+
43109
// BlockingLimiter implements a Limiter that blocks the caller when the limit has been reached. The caller is
44110
// blocked until the limiter has been released. This limiter is commonly used in batch clients that use the limiter
45111
// as a back-pressure mechanism.
46112
type BlockingLimiter struct {
113+
logger limit.Logger
47114
delegate core.Limiter
48115
c *sync.Cond
116+
timeout time.Duration
49117
}
50118

51119
// NewBlockingLimiter will create a new blocking limiter
52120
func NewBlockingLimiter(
53121
delegate core.Limiter,
122+
timeout time.Duration,
123+
logger limit.Logger,
54124
) *BlockingLimiter {
55125
mu := sync.Mutex{}
126+
if timeout <= 0 {
127+
timeout = longBlockingTimeout
128+
}
129+
if logger == nil {
130+
logger = limit.NoopLimitLogger{}
131+
}
56132
return &BlockingLimiter{
133+
logger: logger,
57134
delegate: delegate,
58135
c: sync.NewCond(&mu),
136+
timeout: timeout,
59137
}
60138
}
61139

62140
// tryAcquire will block when attempting to acquire a token
63-
func (l *BlockingLimiter) tryAcquire(ctx context.Context) core.Listener {
141+
func (l *BlockingLimiter) tryAcquire(ctx context.Context) (core.Listener, bool) {
64142
l.c.L.Lock()
65143
defer l.c.L.Unlock()
66144
for {
145+
// if the deadline has passed, fail quickly
146+
deadline, deadlineSet := ctx.Deadline()
147+
if deadlineSet && time.Now().UTC().After(deadline) {
148+
l.logger.Debugf("deadline passed ctx=%v", time.Now().UTC().After(deadline), ctx)
149+
return nil, false
150+
}
151+
67152
// try to acquire a new token and return immediately if successful
68153
listener, ok := l.delegate.Acquire(ctx)
69154
if ok && listener != nil {
70-
return listener
155+
l.logger.Debugf("delegate returned a listener ctx=%v", ctx)
156+
return listener, true
71157
}
72158

73159
// We have reached the limit so block until a token is released
74-
l.c.Wait()
160+
timeout := l.timeout // the default if not set
161+
162+
// infer timeout from deadline if set.
163+
if deadlineSet {
164+
timeout := deadline.Sub(time.Now().UTC())
165+
// if the deadline has passed, return acquire failure
166+
if timeout <= 0 {
167+
l.logger.Debugf("deadline passed ctx=%v", ctx)
168+
return nil, false
169+
}
170+
}
171+
172+
// block until we timeout
173+
timeoutWaiter := newTimeoutWaiter(l.c, timeout)
174+
timeoutWaiter.start()
175+
l.logger.Debugf("Blocking waiting for release or timeout ctx=%v", ctx)
176+
<-timeoutWaiter.wait()
177+
l.logger.Debugf("blocking released, trying again to acquire ctx=%v", ctx)
75178
}
76179
}
77180

@@ -81,10 +184,12 @@ func (l *BlockingLimiter) tryAcquire(ctx context.Context) core.Listener {
81184
//
82185
// context Context for the request. The context is used by advanced strategies such as LookupPartitionStrategy.
83186
func (l *BlockingLimiter) Acquire(ctx context.Context) (core.Listener, bool) {
84-
delegateListener := l.tryAcquire(ctx)
85-
if delegateListener == nil {
187+
delegateListener, ok := l.tryAcquire(ctx)
188+
if !ok && delegateListener == nil {
189+
l.logger.Debugf("did not acquire ctx=%v", ctx)
86190
return nil, false
87191
}
192+
l.logger.Debugf("acquired, returning listener ctx=%v", ctx)
88193
return &BlockingListener{
89194
delegateListener: delegateListener,
90195
c: l.c,

limiter/blocking_test.go

+61-9
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"strings"
66
"sync"
77
"testing"
8+
"time"
89

910
"github.com/stretchr/testify/assert"
1011

@@ -31,6 +32,10 @@ func (l *testListener) OnDropped() {
3132
l.dropCount++
3233
}
3334

35+
type contextKey string
36+
37+
var testContextKey contextKey = "jobID"
38+
3439
func TestBlockingLimiter(t *testing.T) {
3540
t.Run("Unblocked", func(t2 *testing.T) {
3641
asrt := assert.New(t2)
@@ -50,7 +55,7 @@ func TestBlockingLimiter(t *testing.T) {
5055
asrt.FailNow("")
5156
}
5257
asrt.NotNil(defaultLimiter)
53-
blockingLimiter := NewBlockingLimiter(defaultLimiter)
58+
blockingLimiter := NewBlockingLimiter(defaultLimiter, 0, noopLogger)
5459
// stringer
5560
asrt.True(strings.Contains(blockingLimiter.String(), "BlockingLimiter{delegate=DefaultLimiter{"))
5661

@@ -68,7 +73,7 @@ func TestBlockingLimiter(t *testing.T) {
6873
listener.OnSuccess()
6974
}
7075

71-
blockingLimiter.Acquire(nil)
76+
blockingLimiter.Acquire(context.Background())
7277
})
7378

7479
t.Run("MultipleBlocked", func(t2 *testing.T) {
@@ -89,7 +94,7 @@ func TestBlockingLimiter(t *testing.T) {
8994
asrt.FailNow("")
9095
}
9196
asrt.NotNil(defaultLimiter)
92-
blockingLimiter := NewBlockingLimiter(defaultLimiter)
97+
blockingLimiter := NewBlockingLimiter(defaultLimiter, 0, noopLogger)
9398

9499
wg := sync.WaitGroup{}
95100
wg.Add(8)
@@ -120,13 +125,8 @@ func TestBlockingLimiter(t *testing.T) {
120125

121126
t.Run("BlockingListener", func(t2 *testing.T) {
122127
asrt := assert.New(t2)
123-
mu := sync.Mutex{}
124-
c := sync.NewCond(&mu)
125128
delegateListener := testListener{}
126-
listener := BlockingListener{
127-
delegateListener: &delegateListener,
128-
c: c,
129-
}
129+
listener := NewBlockingListener(&delegateListener)
130130
listener.OnSuccess()
131131
asrt.Equal(1, delegateListener.successCount)
132132
listener.OnIgnore()
@@ -135,4 +135,56 @@ func TestBlockingLimiter(t *testing.T) {
135135
asrt.Equal(1, delegateListener.dropCount)
136136

137137
})
138+
139+
t.Run("BlockingLimiterTimeout", func(t2 *testing.T) {
140+
asrt := assert.New(t2)
141+
l := limit.NewSettableLimit("test", 1, nil)
142+
noopLogger := limit.NoopLimitLogger{}
143+
defaultLimiter, err := NewDefaultLimiter(
144+
l,
145+
defaultMinWindowTime,
146+
defaultMaxWindowTime,
147+
defaultMinRTTThreshold,
148+
defaultWindowSize,
149+
strategy.NewSimpleStrategy(1),
150+
noopLogger,
151+
core.EmptyMetricRegistryInstance,
152+
)
153+
if !asrt.NoError(err) {
154+
asrt.FailNow("")
155+
}
156+
asrt.NotNil(defaultLimiter)
157+
blockingLimiter := NewBlockingLimiter(defaultLimiter, time.Millisecond*25, noopLogger)
158+
159+
wg := sync.WaitGroup{}
160+
wg.Add(8)
161+
162+
released := make(chan int, 8)
163+
164+
for i := 0; i < 8; i++ {
165+
go func(j int) {
166+
defer wg.Done()
167+
ctx, cancel := context.WithTimeout(context.WithValue(context.Background(), testContextKey, j), time.Millisecond*400)
168+
defer cancel()
169+
listener, ok := blockingLimiter.Acquire(ctx)
170+
if ok && listener != nil {
171+
time.Sleep(time.Millisecond * 100)
172+
listener.OnSuccess()
173+
released <- 1
174+
return
175+
}
176+
released <- 0
177+
}(i)
178+
time.Sleep(time.Nanosecond * 50)
179+
}
180+
181+
wg.Wait()
182+
183+
sumReleased := 0
184+
for i := 0; i < 8; i++ {
185+
sumReleased += <-released
186+
}
187+
// we only expect half of them to complete before their deadlines
188+
asrt.Equal(4, sumReleased)
189+
})
138190
}

0 commit comments

Comments
 (0)