Skip to content

Commit e651817

Browse files
Merge pull request #25 from platinummonkey/fixes-2
Fixes bug for blocking limiter with timeout = 0, adds testing for pools
2 parents 0a43a47 + 3b24c6c commit e651817

18 files changed

+430
-43
lines changed

.gitmodules

-3
This file was deleted.

doc.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
// Package go_concurrency_limits provides primitives for concurrency control in complex systems.
2-
package go_concurrency_limits
1+
// Package provides primitives for concurrency control in complex systems.
2+
package main

examples/example_concurrent_loading/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type contextKey string
2222
const testContextKey contextKey = "jobID"
2323

2424
type resource struct {
25-
counter *int64
25+
counter *int64
2626
}
2727

2828
func (r *resource) poll(ctx context.Context) (bool, error) {
@@ -112,7 +112,7 @@ func main() {
112112
wg := sync.WaitGroup{}
113113

114114
// spin up 10*l consumers
115-
wg.Add(10*l)
115+
wg.Add(10 * l)
116116
for i := 0; i < 10*l; i++ {
117117
go func(c int) {
118118
for i := 0; i < 5; i++ {

grpc/option_streaming.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type StreamServerResponseClassifier func(
2121
ctx context.Context, req interface{}, info *golangGrpc.StreamServerInfo, err error,
2222
) ResponseType
2323

24-
2524
func defaultStreamClientResponseClassifier(
2625
ctx context.Context,
2726
req interface{},
@@ -47,15 +46,15 @@ func defaultStreamServerResponseClassifier(
4746
}
4847

4948
type streamInterceptorConfig struct {
50-
recvName string
51-
sendName string
52-
tags []string
53-
recvLimiter core.Limiter
54-
sendLimiter core.Limiter
49+
recvName string
50+
sendName string
51+
tags []string
52+
recvLimiter core.Limiter
53+
sendLimiter core.Limiter
5554
recvLimitExceededResponseClassifier LimitExceededResponseClassifier
5655
sendLimitExceededResponseClassifier LimitExceededResponseClassifier
57-
serverResponseClassifer StreamServerResponseClassifier
58-
clientResponseClassifer StreamClientResponseClassifier
56+
serverResponseClassifer StreamServerResponseClassifier
57+
clientResponseClassifer StreamClientResponseClassifier
5958
}
6059

6160
// StreamInterceptorOption represents an option that can be passed to the stream
@@ -157,4 +156,3 @@ func WithStreamServerResponseTypeClassifier(classifier StreamServerResponseClass
157156
cfg.serverResponseClassifer = classifier
158157
}
159158
}
160-

limiter/blocking.go

+22-8
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package limiter
33
import (
44
"context"
55
"fmt"
6-
"github.com/platinummonkey/go-concurrency-limits/limit"
76
"sync"
87
"time"
98

109
"github.com/platinummonkey/go-concurrency-limits/core"
10+
"github.com/platinummonkey/go-concurrency-limits/limit"
1111
)
1212

1313
const longBlockingTimeout = time.Hour * 24 * 30 * 12 * 100 // 100 years
@@ -32,26 +32,38 @@ func newTimeoutWaiter(c *sync.Cond, timeout time.Duration) *timeoutWaiter {
3232

3333
func (w *timeoutWaiter) start() {
3434
// start two routines, one runner to signal, another blocking to wait and call unblock
35+
var wg sync.WaitGroup
36+
wg.Add(2)
3537
go func() {
38+
wg.Done()
3639
w.run()
3740
}()
3841
go func() {
42+
wg.Done()
3943
w.c.L.Lock()
40-
defer w.c.L.Unlock()
4144
w.c.Wait()
45+
w.c.L.Unlock()
4246
w.unblock()
4347
}()
48+
wg.Wait()
4449
}
4550

4651
func (w *timeoutWaiter) run() {
52+
if w.timeout > 0 {
53+
select {
54+
case <-w.closerSig:
55+
close(w.timeoutSig)
56+
return
57+
case <-time.After(w.timeout):
58+
// call unblock
59+
close(w.timeoutSig)
60+
return
61+
}
62+
}
4763
select {
4864
case <-w.closerSig:
4965
close(w.timeoutSig)
5066
return
51-
case <-time.After(w.timeout):
52-
// call unblock
53-
close(w.timeoutSig)
54-
return
5567
}
5668
}
5769

@@ -99,20 +111,21 @@ func NewBlockingLimiter(
99111

100112
// tryAcquire will block when attempting to acquire a token
101113
func (l *BlockingLimiter) tryAcquire(ctx context.Context) (core.Listener, bool) {
102-
l.c.L.Lock()
103-
defer l.c.L.Unlock()
104114
for {
115+
l.c.L.Lock()
105116
// if the deadline has passed, fail quickly
106117
deadline, deadlineSet := ctx.Deadline()
107118
if deadlineSet && time.Now().UTC().After(deadline) {
108119
l.logger.Debugf("deadline passed ctx=%v", time.Now().UTC().After(deadline), ctx)
120+
l.c.L.Unlock()
109121
return nil, false
110122
}
111123

112124
// try to acquire a new token and return immediately if successful
113125
listener, ok := l.delegate.Acquire(ctx)
114126
if ok && listener != nil {
115127
l.logger.Debugf("delegate returned a listener ctx=%v", ctx)
128+
l.c.L.Unlock()
116129
return listener, true
117130
}
118131

@@ -130,6 +143,7 @@ func (l *BlockingLimiter) tryAcquire(ctx context.Context) (core.Listener, bool)
130143
}
131144

132145
// block until we timeout
146+
l.c.L.Unlock()
133147
timeoutWaiter := newTimeoutWaiter(l.c, timeout)
134148
timeoutWaiter.start()
135149
l.logger.Debugf("Blocking waiting for release or timeout ctx=%v", ctx)

limiter/deadline.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewDeadlineLimiter(
3434
logger: logger,
3535
delegate: delegate,
3636
c: sync.NewCond(&mu),
37-
deadline: deadline,
37+
deadline: deadline,
3838
}
3939
}
4040

@@ -67,7 +67,6 @@ func (l *DeadlineLimiter) tryAcquire(ctx context.Context) (listener core.Listene
6767
}
6868
}
6969

70-
7170
// Acquire a token from the limiter. Returns `nil, false` if the limit has been exceeded.
7271
// If acquired the caller must call one of the Listener methods when the operation has been completed to release
7372
// the count.
@@ -90,5 +89,3 @@ func (l *DeadlineLimiter) Acquire(ctx context.Context) (listener core.Listener,
9089
func (l DeadlineLimiter) String() string {
9190
return fmt.Sprintf("DeadlineLimiter{delegate=%v}", l.delegate)
9291
}
93-
94-

limiter/deadline_test.go

+50-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package limiter
33
import (
44
"context"
55
"strings"
6+
"sync"
7+
"sync/atomic"
68
"testing"
79
"time"
810

@@ -16,23 +18,22 @@ func TestDeadlineLimiter(t *testing.T) {
1618
t.Run("Unblocked", func(t2 *testing.T) {
1719
asrt := assert.New(t2)
1820
l := limit.NewSettableLimit("test", 10, nil)
19-
noopLogger := limit.NoopLimitLogger{}
2021
defaultLimiter, err := NewDefaultLimiter(
2122
l,
2223
defaultMinWindowTime,
2324
defaultMaxWindowTime,
2425
defaultMinRTTThreshold,
2526
defaultWindowSize,
2627
strategy.NewSimpleStrategy(10),
27-
noopLogger,
28+
nil,
2829
core.EmptyMetricRegistryInstance,
2930
)
3031
if !asrt.NoError(err) {
3132
asrt.FailNow("")
3233
}
3334
asrt.NotNil(defaultLimiter)
34-
deadline := time.Now().Add(time.Second*15)
35-
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, noopLogger)
35+
deadline := time.Now().Add(time.Second * 15)
36+
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, nil)
3637
// stringer
3738
asrt.True(strings.Contains(deadlineLimiter.String(), "DeadlineLimiter{delegate=DefaultLimiter{"))
3839

@@ -71,7 +72,7 @@ func TestDeadlineLimiter(t *testing.T) {
7172
asrt.FailNow("")
7273
}
7374
asrt.NotNil(defaultLimiter)
74-
deadline := time.Now().Add(time.Second*1)
75+
deadline := time.Now().Add(time.Second * 1)
7576
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, noopLogger)
7677

7778
i := 1
@@ -94,5 +95,48 @@ func TestDeadlineLimiter(t *testing.T) {
9495
asrt.True(deadlineFound, "expected deadline to be reached but not after %d attempts", i)
9596
asrt.Equal(2, i, "expected deadline to be exceeded on second attempt")
9697
})
97-
}
9898

99+
t.Run("limit reached", func(t2 *testing.T) {
100+
asrt := assert.New(t2)
101+
l := limit.NewFixedLimit("test", 1, nil)
102+
noopLogger := limit.BuiltinLimitLogger{}
103+
defaultLimiter, err := NewDefaultLimiter(
104+
l,
105+
defaultMinWindowTime,
106+
defaultMaxWindowTime,
107+
defaultMinRTTThreshold,
108+
defaultWindowSize,
109+
strategy.NewSimpleStrategy(1),
110+
noopLogger,
111+
core.EmptyMetricRegistryInstance,
112+
)
113+
if !asrt.NoError(err) {
114+
asrt.FailNow("")
115+
}
116+
asrt.NotNil(defaultLimiter)
117+
deadline := time.Now().Add(time.Second * 1)
118+
deadlineLimiter := NewDeadlineLimiter(defaultLimiter, deadline, noopLogger)
119+
120+
var deadlineFound atomic.Value
121+
deadlineFound.Store(false)
122+
var wg sync.WaitGroup
123+
wg.Add(3)
124+
125+
for i := 0; i < 3; i++ {
126+
go func(c int) {
127+
defer wg.Done()
128+
listener, ok := deadlineLimiter.Acquire(context.Background())
129+
if ok && listener != nil {
130+
time.Sleep(time.Second)
131+
listener.OnSuccess()
132+
} else {
133+
deadlineFound.Store(true)
134+
}
135+
}(i)
136+
time.Sleep(time.Millisecond * 5)
137+
}
138+
wg.Wait()
139+
140+
asrt.True(deadlineFound.Load().(bool), "expected deadline limit to be reached but not after 2 attempts")
141+
})
142+
}

metric_registry/doc.go

-2
This file was deleted.

patterns/pool/example_fixed_pool_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import (
1010
)
1111

1212
func ExampleFixedPool() {
13-
var JobKey = "job_id"
13+
type JobKey string
14+
var JobKeyID = JobKey("job_id")
1415

1516
l := 1000 // limit to 1000 concurrent requests.
1617
// create a new pool
@@ -35,7 +36,7 @@ func ExampleFixedPool() {
3536
for i := 0; i <= l*3; i++ {
3637
go func(c int) {
3738
defer wg.Done()
38-
ctx := context.WithValue(context.Background(), JobKey, c)
39+
ctx := context.WithValue(context.Background(), JobKeyID, c)
3940
// this will block until timeout or token was acquired.
4041
listener, ok := pool.Acquire(ctx)
4142
if !ok {

patterns/pool/example_generic_pool_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
)
1313

1414
func ExamplePool() {
15-
var JobKey = "job_id"
15+
type JobKey string
16+
var JobKeyID = JobKey("job_id")
1617

1718
l := 1000 // limit to adjustable 1000 concurrent requests.
1819
delegateLimit := limit.NewDefaultAIMLimit(
@@ -54,7 +55,7 @@ func ExamplePool() {
5455
for i := 0; i <= l*3; i++ {
5556
go func(c int) {
5657
defer wg.Done()
57-
ctx := context.WithValue(context.Background(), JobKey, c)
58+
ctx := context.WithValue(context.Background(), JobKeyID, c)
5859
// this will block until timeout or token was acquired.
5960
listener, ok := pool.Acquire(ctx)
6061
if !ok {

patterns/pool/example_lifo_fixed_pool_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import (
1010
)
1111

1212
func ExampleLIFOFixedPool() {
13-
var JobKey = "job_id"
13+
type JobKey string
14+
var JobKeyID = JobKey("job_id")
1415

1516
l := 1000 // limit to 1000 concurrent requests.
1617
// create a new pool
@@ -36,7 +37,7 @@ func ExampleLIFOFixedPool() {
3637
for i := 0; i <= l*3; i++ {
3738
go func(c int) {
3839
defer wg.Done()
39-
ctx := context.WithValue(context.Background(), JobKey, c)
40+
ctx := context.WithValue(context.Background(), JobKeyID, c)
4041
// this will block until timeout or token was acquired.
4142
listener, ok := pool.Acquire(ctx)
4243
if !ok {

patterns/pool/fixed_pool.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func NewFixedPool(
5353
metricRegistry = core.EmptyMetricRegistryInstance
5454
}
5555

56-
limitStrategy := strategy.NewSimpleStrategy(fixedLimit)
56+
limitStrategy := strategy.NewPreciseStrategy(fixedLimit)
5757
defaultLimiter, err := limiter.NewDefaultLimiter(
5858
limit.NewFixedLimit(
5959
name,

patterns/pool/fixed_pool_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package pool
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestFixedPool(t *testing.T) {
15+
asrt := assert.New(t)
16+
p, err := NewFixedPool(
17+
"test-fixed-pool",
18+
10,
19+
-1,
20+
-1,
21+
-1,
22+
-1,
23+
0,
24+
nil,
25+
nil,
26+
)
27+
asrt.NoError(err)
28+
29+
asrt.Equal(10, p.Limit())
30+
31+
var wg sync.WaitGroup
32+
for i := 0; i < 20; i++ {
33+
wg.Add(1)
34+
go func(c int) {
35+
defer wg.Done()
36+
l, _ := p.Acquire(context.WithValue(context.Background(), testKeyID, fmt.Sprint(c)))
37+
log.Printf("acquired now, sleeping - %d\n", c)
38+
time.Sleep(time.Millisecond * 100)
39+
l.OnSuccess()
40+
log.Printf("no longer acquired, released - %d\n", c)
41+
}(i)
42+
time.Sleep(time.Millisecond * 5)
43+
}
44+
wg.Wait()
45+
}

0 commit comments

Comments
 (0)