Skip to content

Commit 79288c8

Browse files
add testing to pools
some additional formatting fix bug for timeout=0 on blocking limiter
1 parent 0a43a47 commit 79288c8

File tree

13 files changed

+415
-33
lines changed

13 files changed

+415
-33
lines changed

.gitmodules

Lines changed: 0 additions & 3 deletions
This file was deleted.

examples/example_concurrent_loading/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type contextKey string
2222
const testContextKey contextKey = "jobID"
2323

2424
type resource struct {
25-
counter *int64
25+
counter *int64
2626
}
2727

2828
func (r *resource) poll(ctx context.Context) (bool, error) {
@@ -112,7 +112,7 @@ func main() {
112112
wg := sync.WaitGroup{}
113113

114114
// spin up 10*l consumers
115-
wg.Add(10*l)
115+
wg.Add(10 * l)
116116
for i := 0; i < 10*l; i++ {
117117
go func(c int) {
118118
for i := 0; i < 5; i++ {

grpc/option_streaming.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type StreamServerResponseClassifier func(
2121
ctx context.Context, req interface{}, info *golangGrpc.StreamServerInfo, err error,
2222
) ResponseType
2323

24-
2524
func defaultStreamClientResponseClassifier(
2625
ctx context.Context,
2726
req interface{},
@@ -47,15 +46,15 @@ func defaultStreamServerResponseClassifier(
4746
}
4847

4948
type streamInterceptorConfig struct {
50-
recvName string
51-
sendName string
52-
tags []string
53-
recvLimiter core.Limiter
54-
sendLimiter core.Limiter
49+
recvName string
50+
sendName string
51+
tags []string
52+
recvLimiter core.Limiter
53+
sendLimiter core.Limiter
5554
recvLimitExceededResponseClassifier LimitExceededResponseClassifier
5655
sendLimitExceededResponseClassifier LimitExceededResponseClassifier
57-
serverResponseClassifer StreamServerResponseClassifier
58-
clientResponseClassifer StreamClientResponseClassifier
56+
serverResponseClassifer StreamServerResponseClassifier
57+
clientResponseClassifer StreamClientResponseClassifier
5958
}
6059

6160
// StreamInterceptorOption represents an option that can be passed to the stream
@@ -157,4 +156,3 @@ func WithStreamServerResponseTypeClassifier(classifier StreamServerResponseClass
157156
cfg.serverResponseClassifer = classifier
158157
}
159158
}
160-

limiter/blocking.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package limiter
33
import (
44
"context"
55
"fmt"
6-
"github.com/platinummonkey/go-concurrency-limits/limit"
76
"sync"
87
"time"
98

109
"github.com/platinummonkey/go-concurrency-limits/core"
10+
"github.com/platinummonkey/go-concurrency-limits/limit"
1111
)
1212

1313
const longBlockingTimeout = time.Hour * 24 * 30 * 12 * 100 // 100 years
@@ -32,26 +32,38 @@ func newTimeoutWaiter(c *sync.Cond, timeout time.Duration) *timeoutWaiter {
3232

3333
func (w *timeoutWaiter) start() {
3434
// start two routines, one runner to signal, another blocking to wait and call unblock
35+
var wg sync.WaitGroup
36+
wg.Add(2)
3537
go func() {
38+
wg.Done()
3639
w.run()
3740
}()
3841
go func() {
42+
wg.Done()
3943
w.c.L.Lock()
40-
defer w.c.L.Unlock()
4144
w.c.Wait()
45+
w.c.L.Unlock()
4246
w.unblock()
4347
}()
48+
wg.Wait()
4449
}
4550

4651
func (w *timeoutWaiter) run() {
52+
if w.timeout > 0 {
53+
select {
54+
case <-w.closerSig:
55+
close(w.timeoutSig)
56+
return
57+
case <-time.After(w.timeout):
58+
// call unblock
59+
close(w.timeoutSig)
60+
return
61+
}
62+
}
4763
select {
4864
case <-w.closerSig:
4965
close(w.timeoutSig)
5066
return
51-
case <-time.After(w.timeout):
52-
// call unblock
53-
close(w.timeoutSig)
54-
return
5567
}
5668
}
5769

@@ -99,20 +111,21 @@ func NewBlockingLimiter(
99111

100112
// tryAcquire will block when attempting to acquire a token
101113
func (l *BlockingLimiter) tryAcquire(ctx context.Context) (core.Listener, bool) {
102-
l.c.L.Lock()
103-
defer l.c.L.Unlock()
104114
for {
115+
l.c.L.Lock()
105116
// if the deadline has passed, fail quickly
106117
deadline, deadlineSet := ctx.Deadline()
107118
if deadlineSet && time.Now().UTC().After(deadline) {
108119
l.logger.Debugf("deadline passed ctx=%v", time.Now().UTC().After(deadline), ctx)
120+
l.c.L.Unlock()
109121
return nil, false
110122
}
111123

112124
// try to acquire a new token and return immediately if successful
113125
listener, ok := l.delegate.Acquire(ctx)
114126
if ok && listener != nil {
115127
l.logger.Debugf("delegate returned a listener ctx=%v", ctx)
128+
l.c.L.Unlock()
116129
return listener, true
117130
}
118131

@@ -130,6 +143,7 @@ func (l *BlockingLimiter) tryAcquire(ctx context.Context) (core.Listener, bool)
130143
}
131144

132145
// block until we timeout
146+
l.c.L.Unlock()
133147
timeoutWaiter := newTimeoutWaiter(l.c, timeout)
134148
timeoutWaiter.start()
135149
l.logger.Debugf("Blocking waiting for release or timeout ctx=%v", ctx)

limiter/deadline.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewDeadlineLimiter(
3434
logger: logger,
3535
delegate: delegate,
3636
c: sync.NewCond(&mu),
37-
deadline: deadline,
37+
deadline: deadline,
3838
}
3939
}
4040

@@ -67,7 +67,6 @@ func (l *DeadlineLimiter) tryAcquire(ctx context.Context) (listener core.Listene
6767
}
6868
}
6969

70-
7170
// Acquire a token from the limiter. Returns `nil, false` if the limit has been exceeded.
7271
// If acquired the caller must call one of the Listener methods when the operation has been completed to release
7372
// the count.
@@ -90,5 +89,3 @@ func (l *DeadlineLimiter) Acquire(ctx context.Context) (listener core.Listener,
9089
func (l DeadlineLimiter) String() string {
9190
return fmt.Sprintf("DeadlineLimiter{delegate=%v}", l.delegate)
9291
}
93-
94-

limiter/deadline_test.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package limiter
33
import (
44
"context"
55
"strings"
6+
"sync"
7+
"sync/atomic"
68
"testing"
79
"time"
810

@@ -16,23 +18,22 @@ func TestDeadlineLimiter(t *testing.T) {
1618
t.Run("Unblocked", func(t2 *testing.T) {
1719
asrt := assert.New(t2)
1820
l := limit.NewSettableLimit("test", 10, nil)
19-
noopLogger := limit.NoopLimitLogger{}
2021
defaultLimiter, err := NewDefaultLimiter(
2122
l,
2223
defaultMinWindowTime,
2324
defaultMaxWindowTime,
2425
defaultMinRTTThreshold,
2526
defaultWindowSize,
2627
strategy.NewSimpleStrategy(10),
27-
noopLogger,
28+
nil,
2829
core.EmptyMetricRegistryInstance,
2930
)
3031
if !asrt.NoError(err) {
3132
asrt.FailNow("")
3233
}
3334
asrt.NotNil(defaultLimiter)
34-
deadline := time.Now().Add(time.Second*15)
35-
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, noopLogger)
35+
deadline := time.Now().Add(time.Second * 15)
36+
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, nil)
3637
// stringer
3738
asrt.True(strings.Contains(deadlineLimiter.String(), "DeadlineLimiter{delegate=DefaultLimiter{"))
3839

@@ -71,7 +72,7 @@ func TestDeadlineLimiter(t *testing.T) {
7172
asrt.FailNow("")
7273
}
7374
asrt.NotNil(defaultLimiter)
74-
deadline := time.Now().Add(time.Second*1)
75+
deadline := time.Now().Add(time.Second * 1)
7576
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, noopLogger)
7677

7778
i := 1
@@ -94,5 +95,48 @@ func TestDeadlineLimiter(t *testing.T) {
9495
asrt.True(deadlineFound, "expected deadline to be reached but not after %d attempts", i)
9596
asrt.Equal(2, i, "expected deadline to be exceeded on second attempt")
9697
})
97-
}
9898

99+
t.Run("limit reached", func(t2 *testing.T) {
100+
asrt := assert.New(t2)
101+
l := limit.NewFixedLimit("test", 1, nil)
102+
noopLogger := limit.BuiltinLimitLogger{}
103+
defaultLimiter, err := NewDefaultLimiter(
104+
l,
105+
defaultMinWindowTime,
106+
defaultMaxWindowTime,
107+
defaultMinRTTThreshold,
108+
defaultWindowSize,
109+
strategy.NewSimpleStrategy(1),
110+
noopLogger,
111+
core.EmptyMetricRegistryInstance,
112+
)
113+
if !asrt.NoError(err) {
114+
asrt.FailNow("")
115+
}
116+
asrt.NotNil(defaultLimiter)
117+
deadline := time.Now().Add(time.Second * 1)
118+
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, noopLogger)
119+
120+
var deadlineFound atomic.Value
121+
deadlineFound.Store(false)
122+
var wg sync.WaitGroup
123+
wg.Add(3)
124+
125+
for i := 0; i < 3; i++ {
126+
go func(c int) {
127+
defer wg.Done()
128+
listener, ok := deadlineLimiter.Acquire(context.Background())
129+
if ok && listener != nil {
130+
time.Sleep(time.Second)
131+
listener.OnSuccess()
132+
} else {
133+
deadlineFound.Store(true)
134+
}
135+
}(i)
136+
time.Sleep(time.Millisecond * 5)
137+
}
138+
wg.Wait()
139+
140+
asrt.True(deadlineFound.Load().(bool), "expected deadline limit to be reached but not after 2 attempts")
141+
})
142+
}

patterns/pool/fixed_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func NewFixedPool(
5353
metricRegistry = core.EmptyMetricRegistryInstance
5454
}
5555

56-
limitStrategy := strategy.NewSimpleStrategy(fixedLimit)
56+
limitStrategy := strategy.NewPreciseStrategy(fixedLimit)
5757
defaultLimiter, err := limiter.NewDefaultLimiter(
5858
limit.NewFixedLimit(
5959
name,

patterns/pool/fixed_pool_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package pool
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestFixedPool(t *testing.T) {
15+
asrt := assert.New(t)
16+
p, err := NewFixedPool(
17+
"test-fixed-pool",
18+
10,
19+
-1,
20+
-1,
21+
-1,
22+
-1,
23+
0,
24+
nil,
25+
nil,
26+
)
27+
asrt.NoError(err)
28+
29+
asrt.Equal(10, p.Limit())
30+
31+
var wg sync.WaitGroup
32+
for i := 0; i < 20; i++ {
33+
wg.Add(1)
34+
go func(c int) {
35+
defer wg.Done()
36+
l, _ := p.Acquire(context.WithValue(context.Background(), "id", fmt.Sprint(c)))
37+
log.Printf("acquired now, sleeping - %d\n", c)
38+
time.Sleep(time.Millisecond * 100)
39+
l.OnSuccess()
40+
log.Printf("no longer acquired, released - %d\n", c)
41+
}(i)
42+
time.Sleep(time.Millisecond * 5)
43+
}
44+
wg.Wait()
45+
}

patterns/pool/generic_pool.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pool
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
"github.com/platinummonkey/go-concurrency-limits/core"
@@ -25,6 +26,10 @@ func NewPool(
2526
logger limit.Logger,
2627
metricRegistry core.MetricRegistry,
2728
) (*Pool, error) {
29+
if delegateLimiter == nil {
30+
return nil, fmt.Errorf("must specify a delegateLimiter")
31+
}
32+
2833
if timeout < 0 {
2934
timeout = 0
3035
}

patterns/pool/generic_pool_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package pool
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/platinummonkey/go-concurrency-limits/limit"
12+
"github.com/platinummonkey/go-concurrency-limits/limiter"
13+
"github.com/platinummonkey/go-concurrency-limits/strategy"
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestGenericPool(t *testing.T) {
18+
asrt := assert.New(t)
19+
delegateLimiter, err := limiter.NewDefaultLimiter(
20+
limit.NewFixedLimit("test-generic-pool", 10, nil),
21+
(time.Millisecond * 250).Nanoseconds(),
22+
(time.Millisecond * 500).Nanoseconds(),
23+
(time.Millisecond * 10).Nanoseconds(),
24+
100,
25+
strategy.NewPreciseStrategy(10),
26+
nil,
27+
nil,
28+
)
29+
asrt.NoError(err)
30+
31+
p, err := NewPool(
32+
delegateLimiter,
33+
false,
34+
10,
35+
-1,
36+
nil,
37+
nil,
38+
)
39+
asrt.NoError(err)
40+
41+
var wg sync.WaitGroup
42+
for i := 0; i < 20; i++ {
43+
wg.Add(1)
44+
go func(c int) {
45+
defer wg.Done()
46+
l, _ := p.Acquire(context.WithValue(context.Background(), "id", fmt.Sprint(c)))
47+
log.Printf("acquired now, sleeping - %d\n", c)
48+
time.Sleep(time.Millisecond * 100)
49+
l.OnSuccess()
50+
log.Printf("no longer acquired, released - %d\n", c)
51+
}(i)
52+
time.Sleep(time.Millisecond * 5)
53+
}
54+
wg.Wait()
55+
}

0 commit comments

Comments
 (0)