Skip to content

Commit 92df8ad

Browse files
Merge pull request #79 from marcoferrer/queue-impl-parity
Queue impl parity
2 parents dde2b6c + 7d5f55c commit 92df8ad

10 files changed

+872
-759
lines changed

core/dep.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ const (
2121
MetricWindowMinRTT = "window.min_rtt"
2222
// MetricWindowQueueSize represents the name of the metric for the Window's Queue Size
2323
MetricWindowQueueSize = "window.queue_size"
24-
// MetricLifoQueueSize represents the name of the metric for the size of a lifo queue
25-
MetricLifoQueueSize = "lifo.queue_size"
26-
// MetricLifoQueueLimit represents the name of the metric for the max size of a lifo queue
27-
MetricLifoQueueLimit = "lifo.queue_limit"
24+
// MetricQueueSize represents the name of the metric for the size of a lifo queue
25+
MetricQueueSize = "queue_size"
26+
// MetricQueueLimit represents the name of the metric for the max size of a lifo queue
27+
MetricQueueLimit = "queue_limit"
2828
)
2929

3030
// PrefixMetricWithName will prefix a given name with the metric name in the form "<name>.<metric>"

limit/aimd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (l *AIMDLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop b
8888
l.commonSampler.Sample(rtt, inFlight, didDrop)
8989

9090
if didDrop {
91-
l.limit = int(math.Max(1, math.Min(float64(l.limit-1), float64(int(float64(l.limit)*l.backOffRatio)))))
91+
l.limit = int(math.Max(1, math.Min(float64(l.limit-1), float64(l.limit)*l.backOffRatio)))
9292
l.notifyListeners(l.limit)
9393
} else if inFlight >= l.limit {
9494
l.limit += l.increaseBy

limiter/fifo_blocking.go

+10-185
Original file line numberDiff line numberDiff line change
@@ -1,218 +1,43 @@
11
package limiter
22

33
import (
4-
"container/list"
5-
"context"
6-
"fmt"
7-
"sync"
84
"time"
95

106
"github.com/platinummonkey/go-concurrency-limits/core"
117
)
128

13-
type fifoElement struct {
14-
ctx context.Context
15-
releaseChan chan core.Listener
16-
}
17-
18-
func (e *fifoElement) setListener(listener core.Listener) {
19-
select {
20-
case e.releaseChan <- listener:
21-
// noop
22-
default:
23-
// timeout has expired
24-
}
25-
}
26-
27-
type fifoQueue struct {
28-
q *list.List
29-
mu sync.RWMutex
30-
}
31-
32-
func (q *fifoQueue) len() int {
33-
q.mu.RLock()
34-
defer q.mu.RUnlock()
35-
return q.q.Len()
36-
}
37-
38-
func (q *fifoQueue) push(ctx context.Context) (*list.Element, chan core.Listener) {
39-
q.mu.Lock()
40-
defer q.mu.Unlock()
41-
releaseChan := make(chan core.Listener, 1)
42-
fe := fifoElement{
43-
ctx: ctx,
44-
releaseChan: releaseChan,
45-
}
46-
e := q.q.PushBack(fe)
47-
return e, releaseChan
48-
}
49-
50-
func (q *fifoQueue) pop() *fifoElement {
51-
q.mu.Lock()
52-
defer q.mu.Unlock()
53-
e := q.q.Front()
54-
if e != nil {
55-
fe := e.Value.(fifoElement)
56-
q.q.Remove(e)
57-
return &fe
58-
}
59-
return nil
60-
}
61-
62-
func (q *fifoQueue) peek() *fifoElement {
63-
q.mu.RLock()
64-
defer q.mu.RUnlock()
65-
e := q.q.Front()
66-
if e != nil {
67-
fe := q.q.Front().Value.(fifoElement)
68-
return &fe
69-
}
70-
return nil
71-
}
72-
73-
func (q *fifoQueue) remove(event *list.Element) {
74-
if event == nil {
75-
return
76-
}
77-
q.mu.Lock()
78-
defer q.mu.Unlock()
79-
q.q.Remove(event)
80-
}
81-
82-
// FifoBlockingListener implements a blocking listener for the FifoBlockingListener
83-
type FifoBlockingListener struct {
84-
delegateListener core.Listener
85-
limiter *FifoBlockingLimiter
86-
}
87-
88-
func (l *FifoBlockingListener) unblock() {
89-
l.limiter.mu.Lock()
90-
defer l.limiter.mu.Unlock()
91-
if l.limiter.backlog.len() > 0 {
92-
event := l.limiter.backlog.peek()
93-
if event != nil {
94-
listener, ok := l.limiter.delegate.Acquire(event.ctx)
95-
if ok && listener != nil {
96-
nextEvent := l.limiter.backlog.pop()
97-
nextEvent.setListener(listener)
98-
}
99-
}
100-
// otherwise: still can't acquire the limit. unblock will be called again next time the limit is released.
101-
}
102-
}
103-
104-
// OnDropped is called to indicate the request failed and was dropped due to being rejected by an external limit or
105-
// hitting a timeout. Loss based Limit implementations will likely do an aggressive reducing in limit when this
106-
// happens.
107-
func (l *FifoBlockingListener) OnDropped() {
108-
l.delegateListener.OnDropped()
109-
l.unblock()
110-
}
111-
112-
// OnIgnore is called to indicate the operation failed before any meaningful RTT measurement could be made and
113-
// should be ignored to not introduce an artificially low RTT.
114-
func (l *FifoBlockingListener) OnIgnore() {
115-
l.delegateListener.OnIgnore()
116-
l.unblock()
117-
}
118-
119-
// OnSuccess is called as a notification that the operation succeeded and internally measured latency should be
120-
// used as an RTT sample.
121-
func (l *FifoBlockingListener) OnSuccess() {
122-
l.delegateListener.OnSuccess()
123-
l.unblock()
124-
}
125-
1269
// FifoBlockingLimiter implements a Limiter that blocks the caller when the limit has been reached. This strategy
12710
// ensures the resource is properly protected but favors availability over latency by not fast failing requests when
12811
// the limit has been reached. To help keep success latencies low and minimize timeouts any blocked requests are
12912
// processed in last in/first out order.
13013
//
13114
// Use this limiter only when the concurrency model allows the limiter to be blocked.
15+
// Deprecated in favor of QueueBlockingLimiter
13216
type FifoBlockingLimiter struct {
133-
delegate core.Limiter
134-
maxBacklogSize int
135-
maxBacklogTimeout time.Duration
136-
137-
backlog fifoQueue
138-
c *sync.Cond
139-
mu sync.RWMutex
17+
*QueueBlockingLimiter
14018
}
14119

14220
// NewFifoBlockingLimiter will create a new FifoBlockingLimiter
21+
// Deprecated, use NewQueueBlockingLimiterFromConfig instead
14322
func NewFifoBlockingLimiter(
14423
delegate core.Limiter,
14524
maxBacklogSize int,
14625
maxBacklogTimeout time.Duration,
14726
) *FifoBlockingLimiter {
148-
if maxBacklogSize <= 0 {
149-
maxBacklogSize = 100
150-
}
151-
if maxBacklogTimeout == 0 {
152-
maxBacklogTimeout = time.Millisecond * 1000
153-
}
154-
mu := sync.Mutex{}
27+
15528
return &FifoBlockingLimiter{
156-
delegate: delegate,
157-
maxBacklogSize: maxBacklogSize,
158-
maxBacklogTimeout: maxBacklogTimeout,
159-
backlog: fifoQueue{
160-
q: list.New(),
161-
},
162-
c: sync.NewCond(&mu),
29+
NewQueueBlockingLimiterFromConfig(delegate, QueueLimiterConfig{
30+
Ordering: OrderingFIFO,
31+
MaxBacklogSize: maxBacklogSize,
32+
MaxBacklogTimeout: maxBacklogTimeout,
33+
}),
16334
}
16435
}
16536

16637
// NewFifoBlockingLimiterWithDefaults will create a new FifoBlockingLimiter with default values.
38+
// Deprecated, use NewQueueBlockingLimiterWithDefaults instead
16739
func NewFifoBlockingLimiterWithDefaults(
16840
delegate core.Limiter,
16941
) *FifoBlockingLimiter {
17042
return NewFifoBlockingLimiter(delegate, 100, time.Millisecond*1000)
17143
}
172-
173-
func (l *FifoBlockingLimiter) tryAcquire(ctx context.Context) core.Listener {
174-
// Try to acquire a token and return immediately if successful
175-
listener, ok := l.delegate.Acquire(ctx)
176-
if ok && listener != nil {
177-
return listener
178-
}
179-
180-
// Restrict backlog size so the queue doesn't grow unbounded during an outage
181-
if l.backlog.len() >= l.maxBacklogSize {
182-
return nil
183-
}
184-
185-
// Create a holder for a listener and block until a listener is released by another
186-
// operation. Holders will be unblocked in FIFO order
187-
event, eventReleaseChan := l.backlog.push(ctx)
188-
select {
189-
case listener = <-eventReleaseChan:
190-
return listener
191-
case <-time.After(l.maxBacklogTimeout):
192-
// Remove the holder from the backlog. This item is likely to be at the end of the
193-
// list so do a remove to minimize the number of items to traverse
194-
l.backlog.remove(event)
195-
return nil
196-
}
197-
}
198-
199-
// Acquire a token from the limiter. Returns an Optional.empty() if the limit has been exceeded.
200-
// If acquired the caller must call one of the Listener methods when the operation has been completed to release
201-
// the count.
202-
//
203-
// context Context for the request. The context is used by advanced strategies such as LookupPartitionStrategy.
204-
func (l *FifoBlockingLimiter) Acquire(ctx context.Context) (core.Listener, bool) {
205-
delegateListener := l.tryAcquire(ctx)
206-
if delegateListener == nil {
207-
return nil, false
208-
}
209-
return &FifoBlockingListener{
210-
delegateListener: delegateListener,
211-
limiter: l,
212-
}, true
213-
}
214-
215-
func (l *FifoBlockingLimiter) String() string {
216-
return fmt.Sprintf("FifoBlockingLimiter{delegate=%v, maxBacklogSize=%d, maxBacklogTimeout=%v}",
217-
l.delegate, l.maxBacklogSize, l.maxBacklogTimeout)
218-
}

limiter/fifo_blocking_test.go

+2-85
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package limiter
22

33
import (
4-
"container/list"
54
"context"
65
"strings"
76
"sync"
@@ -14,88 +13,6 @@ import (
1413
"github.com/platinummonkey/go-concurrency-limits/strategy"
1514
)
1615

17-
type testFifoQueueContextKey int
18-
19-
func TestFifoQueue(t *testing.T) {
20-
t.Parallel()
21-
asrt := assert.New(t)
22-
q := fifoQueue{
23-
q: list.New(),
24-
}
25-
26-
asrt.Equal(0, q.len())
27-
el := q.peek()
28-
asrt.Nil(el)
29-
asrt.Nil(q.pop())
30-
31-
ctx1 := context.WithValue(context.Background(), testFifoQueueContextKey(1), 1)
32-
q.push(ctx1)
33-
34-
el = q.peek()
35-
asrt.Equal(1, q.len())
36-
asrt.NotNil(el)
37-
asrt.Equal(ctx1, el.ctx)
38-
39-
// add a 2nd
40-
ctx2 := context.WithValue(context.Background(), testFifoQueueContextKey(2), 2)
41-
q.push(ctx2)
42-
43-
// make sure it's still FIFO
44-
el = q.peek()
45-
asrt.Equal(2, q.len())
46-
asrt.NotNil(el)
47-
asrt.Equal(ctx1, el.ctx)
48-
49-
// pop off
50-
el = q.pop()
51-
asrt.NotNil(el)
52-
asrt.Equal(ctx1, el.ctx)
53-
54-
// check that we only have one again
55-
el = q.peek()
56-
asrt.Equal(1, q.len())
57-
asrt.NotNil(el)
58-
asrt.Equal(ctx2, el.ctx)
59-
60-
// add a 2nd & 3rd
61-
ctx3 := context.WithValue(context.Background(), testFifoQueueContextKey(3), 3)
62-
el3, _ := q.push(ctx3)
63-
ctx4 := context.WithValue(context.Background(), testFifoQueueContextKey(4), 4)
64-
q.push(ctx4)
65-
66-
// remove the middle
67-
q.remove(el3)
68-
el = q.peek()
69-
asrt.Equal(2, q.len())
70-
asrt.NotNil(el)
71-
asrt.Equal(ctx2, el.ctx)
72-
asrt.Equal(ctx4, q.q.Back().Value.(fifoElement).ctx)
73-
}
74-
75-
func TestFifoBlockingListener(t *testing.T) {
76-
t.Parallel()
77-
delegateLimiter, _ := NewDefaultLimiterWithDefaults(
78-
"",
79-
strategy.NewSimpleStrategy(20),
80-
limit.NoopLimitLogger{},
81-
core.EmptyMetricRegistryInstance,
82-
)
83-
limiter := NewFifoBlockingLimiterWithDefaults(delegateLimiter)
84-
delegateListener, _ := delegateLimiter.Acquire(context.Background())
85-
listener := FifoBlockingListener{
86-
delegateListener: delegateListener,
87-
limiter: limiter,
88-
}
89-
listener.OnSuccess()
90-
listener.OnIgnore()
91-
listener.OnDropped()
92-
}
93-
94-
type acquiredListenerFifo struct {
95-
id int
96-
listener core.Listener
97-
}
98-
9916
func TestFifoBlockingLimiter(t *testing.T) {
10017
t.Parallel()
10118

@@ -110,7 +27,7 @@ func TestFifoBlockingLimiter(t *testing.T) {
11027
)
11128
limiter := NewFifoBlockingLimiterWithDefaults(delegateLimiter)
11229
asrt.NotNil(limiter)
113-
asrt.True(strings.Contains(limiter.String(), "FifoBlockingLimiter{delegate=DefaultLimiter{"))
30+
asrt.True(strings.Contains(limiter.String(), "QueueBlockingLimiter{delegate=DefaultLimiter{"))
11431
})
11532

11633
t.Run("NewFifoBlockingLimiter", func(t2 *testing.T) {
@@ -124,7 +41,7 @@ func TestFifoBlockingLimiter(t *testing.T) {
12441
)
12542
limiter := NewFifoBlockingLimiter(delegateLimiter, -1, 0)
12643
asrt.NotNil(limiter)
127-
asrt.True(strings.Contains(limiter.String(), "FifoBlockingLimiter{delegate=DefaultLimiter{"))
44+
asrt.True(strings.Contains(limiter.String(), "QueueBlockingLimiter{delegate=DefaultLimiter{"))
12845
})
12946

13047
t.Run("Acquire", func(t2 *testing.T) {

0 commit comments

Comments
 (0)