Skip to content

Commit 493a30d

Browse files
Merge pull request #158 from jakeng-grabtaxi/drop-update-limits
Update limits when listener is dropped
2 parents 41fcd5c + 0b87224 commit 493a30d

File tree

1 file changed

+24
-19
lines changed

1 file changed

+24
-19
lines changed

limiter/default.go

+24-19
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,30 @@ func (l *DefaultListener) OnSuccess() {
4848
},
4949
)
5050

51+
l.updateLimit(endTime, current)
52+
}
53+
54+
// OnIgnore is called to indicate the operation failed before any meaningful RTT measurement could be made and
55+
// should be ignored to not introduce an artificially low RTT.
56+
func (l *DefaultListener) OnIgnore() {
57+
atomic.AddInt64(l.inFlight, -1)
58+
l.token.Release()
59+
}
60+
61+
// OnDropped is called to indicate the request failed and was dropped due to being rejected by an external limit or
62+
// hitting a timeout. Loss based Limit implementations will likely do an aggressive reducing in limit when this
63+
// happens.
64+
func (l *DefaultListener) OnDropped() {
65+
atomic.AddInt64(l.inFlight, -1)
66+
l.token.Release()
67+
_, current := l.limiter.updateAndGetSample(func(window measurements.ImmutableSampleWindow) measurements.ImmutableSampleWindow {
68+
return *(window.AddDroppedSample(-1, int(l.currentMaxInFlight)))
69+
})
70+
71+
l.updateLimit(time.Now().UnixNano(), current)
72+
}
73+
74+
func (l *DefaultListener) updateLimit(endTime int64, current measurements.ImmutableSampleWindow) {
5175
if endTime > l.nextUpdateTime {
5276
// double check just to be sure
5377
l.limiter.mu.Lock()
@@ -81,25 +105,6 @@ func (l *DefaultListener) OnSuccess() {
81105
}
82106
}
83107
}
84-
85-
}
86-
87-
// OnIgnore is called to indicate the operation failed before any meaningful RTT measurement could be made and
88-
// should be ignored to not introduce an artificially low RTT.
89-
func (l *DefaultListener) OnIgnore() {
90-
atomic.AddInt64(l.inFlight, -1)
91-
l.token.Release()
92-
}
93-
94-
// OnDropped is called to indicate the request failed and was dropped due to being rejected by an external limit or
95-
// hitting a timeout. Loss based Limit implementations will likely do an aggressive reducing in limit when this
96-
// happens.
97-
func (l *DefaultListener) OnDropped() {
98-
atomic.AddInt64(l.inFlight, -1)
99-
l.token.Release()
100-
l.limiter.updateAndGetSample(func(window measurements.ImmutableSampleWindow) measurements.ImmutableSampleWindow {
101-
return *(window.AddDroppedSample(-1, int(l.currentMaxInFlight)))
102-
})
103108
}
104109

105110
// DefaultLimiter is a Limiter that combines a plugable limit algorithm and enforcement strategy to enforce concurrency

0 commit comments

Comments
 (0)