Skip to content

Commit 3ee5afe

Browse files
author
Cody Lee
committed
improve vegas limit from upstream suggestions
1 parent f8afff6 commit 3ee5afe

File tree

4 files changed

+46
-27
lines changed

4 files changed

+46
-27
lines changed

limit/gradient.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ type GradientLimit struct {
3939
}
4040

4141
func nextProbeCountdown(probeInterval int) int {
42-
if probeInterval == LimitProbeDisabled {
43-
return LimitProbeDisabled
42+
if probeInterval == ProbeDisabled {
43+
return ProbeDisabled
4444
}
4545
return probeInterval + rand.Int()
4646
}
@@ -152,7 +152,7 @@ func (l *GradientLimit) OnSample(startTime int64, rtt int64, inFlight int, didDr
152152
// Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit
153153
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
154154
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
155-
if l.probeInterval != LimitProbeDisabled {
155+
if l.probeInterval != ProbeDisabled {
156156
l.resetRTTCounter--
157157
if l.resetRTTCounter <= 0 {
158158
l.resetRTTCounter = nextProbeCountdown(l.probeInterval)

limit/gradient_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func TestGradientLimit(t *testing.T) {
1313
t.Run("nextProbeInterval", func(t2 *testing.T) {
1414
t2.Parallel()
1515
asrt := assert.New(t2)
16-
asrt.Equal(LimitProbeDisabled, nextProbeCountdown(LimitProbeDisabled))
16+
asrt.Equal(ProbeDisabled, nextProbeCountdown(ProbeDisabled))
1717
asrt.True(nextProbeCountdown(1) > 0)
1818
})
1919

limit/vegas.go

+39-23
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/platinummonkey/go-concurrency-limits/core"
1010
"github.com/platinummonkey/go-concurrency-limits/limit/functions"
11+
"github.com/platinummonkey/go-concurrency-limits/measurements"
1112
)
1213

1314
// VegasLimit implements a Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is
@@ -21,7 +22,7 @@ import (
2122
type VegasLimit struct {
2223
estimatedLimit float64
2324
maxLimit int
24-
rttNoLoad int64
25+
rttNoLoad core.MeasurementInterface
2526
smoothing float64
2627
alphaFunc func(estimatedLimit int) int
2728
betaFunc func(estimatedLimit int) int
@@ -31,7 +32,8 @@ type VegasLimit struct {
3132
rttSampleListener core.MetricSampleListener
3233
commonSampler *core.CommonMetricSampler
3334
probeMultipler int
34-
probeCountdown int
35+
probeJitter float64
36+
probeCount int64
3537

3638
listeners []core.LimitChangeListener
3739
registry core.MetricRegistry
@@ -49,6 +51,7 @@ func NewDefaultVegasLimit(
4951
return NewVegasLimitWithRegistry(
5052
name,
5153
-1,
54+
nil,
5255
-1,
5356
-1,
5457
nil,
@@ -74,6 +77,7 @@ func NewDefaultVegasLimitWithLimit(
7477
return NewVegasLimitWithRegistry(
7578
name,
7679
initialLimit,
80+
nil,
7781
-1,
7882
-1,
7983
nil,
@@ -92,6 +96,7 @@ func NewDefaultVegasLimitWithLimit(
9296
func NewVegasLimitWithRegistry(
9397
name string,
9498
initialLimit int,
99+
rttNoLoad core.MeasurementInterface,
95100
maxConcurrency int,
96101
smoothing float64,
97102
alphaFunc func(estimatedLimit int) int,
@@ -107,12 +112,19 @@ func NewVegasLimitWithRegistry(
107112
if initialLimit < 1 {
108113
initialLimit = 20
109114
}
115+
116+
if rttNoLoad == nil {
117+
rttNoLoad = &measurements.MinimumMeasurement{}
118+
}
119+
110120
if maxConcurrency < 0 {
111121
maxConcurrency = 1000
112122
}
123+
113124
if smoothing < 0 || smoothing > 1.0 {
114125
smoothing = 1.0
115126
}
127+
116128
if probeMultiplier <= 0 {
117129
probeMultiplier = 30
118130
}
@@ -163,7 +175,8 @@ func NewVegasLimitWithRegistry(
163175
decreaseFunc: decreaseFunc,
164176
smoothing: smoothing,
165177
probeMultipler: probeMultiplier,
166-
probeCountdown: nextVegasProbeCountdown(probeMultiplier, float64(initialLimit)),
178+
probeJitter: newProbeJitter(),
179+
probeCount: 0,
167180
rttSampleListener: registry.RegisterDistribution(core.PrefixMetricWithName(core.MetricMinRTT, name), tags...),
168181
listeners: make([]core.LimitChangeListener, 0),
169182
registry: registry,
@@ -174,12 +187,11 @@ func NewVegasLimitWithRegistry(
174187
return l
175188
}
176189

177-
// LimitProbeDisabled represents the disabled value for probing.
178-
const LimitProbeDisabled = -1
190+
// ProbeDisabled represents the disabled value for probing.
191+
const ProbeDisabled = -1
179192

180-
func nextVegasProbeCountdown(probeMultiplier int, estimatedLimit float64) int {
181-
maxRange := int(float64(probeMultiplier)*estimatedLimit) / 2
182-
return rand.Intn(maxRange) + maxRange // return roughly [maxVal / 2, maxVal]
193+
func newProbeJitter() float64 {
194+
return (rand.Float64() / 2.0) + 0.5
183195
}
184196

185197
// EstimatedLimit returns the current estimated limit.
@@ -209,28 +221,32 @@ func (l *VegasLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop
209221
defer l.mu.Unlock()
210222
l.commonSampler.Sample(rtt, inFlight, didDrop)
211223

212-
if l.probeCountdown != LimitProbeDisabled {
213-
l.probeCountdown--
214-
if l.probeCountdown <= 0 {
215-
l.logger.Debugf("probe MinRTT %d", rtt/1e6)
216-
l.probeCountdown = nextVegasProbeCountdown(l.probeMultipler, l.estimatedLimit)
217-
l.rttNoLoad = rtt
218-
return
219-
}
224+
l.probeCount++
225+
if l.shouldProbe() {
226+
l.logger.Debugf("Probe triggered update to RTT No Load %d ms from %d ms",
227+
rtt/1e6, int64(l.rttNoLoad.Get())/1e6)
228+
l.probeJitter = newProbeJitter()
229+
l.probeCount = 0
230+
l.rttNoLoad.Add(float64(rtt))
231+
return
220232
}
221233

222-
if l.rttNoLoad == 0 || rtt < l.rttNoLoad {
223-
l.logger.Debugf("New MinRTT %d", rtt/1e6)
224-
l.rttNoLoad = rtt
234+
if l.rttNoLoad.Get() == 0 || float64(rtt) < l.rttNoLoad.Get() {
235+
l.logger.Debugf("Update RTT No Load to %d ms from %d ms", rtt/1e6, int64(l.rttNoLoad.Get())/1e6)
236+
l.rttNoLoad.Add(float64(rtt))
225237
return
226238
}
227239

228-
l.rttSampleListener.AddSample(float64(l.rttNoLoad))
240+
l.rttSampleListener.AddSample(l.rttNoLoad.Get())
229241
l.updateEstimatedLimit(startTime, rtt, inFlight, didDrop)
230242
}
231243

244+
func (l *VegasLimit) shouldProbe() bool {
245+
return int64(l.probeJitter*float64(l.probeMultipler)*l.estimatedLimit) <= l.probeCount
246+
}
247+
232248
func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight int, didDrop bool) {
233-
queueSize := int(math.Ceil(l.estimatedLimit * (1 - float64(l.rttNoLoad)/float64(rtt))))
249+
queueSize := int(math.Ceil(l.estimatedLimit * (1 - l.rttNoLoad.Get()/float64(rtt))))
234250

235251
var newLimit float64
236252
// Treat any drop (i.e timeout) as needing to reduce the limit
@@ -264,7 +280,7 @@ func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight i
264280

265281
if int(newLimit) != int(l.estimatedLimit) && l.logger.IsDebugEnabled() {
266282
l.logger.Debugf("New limit=%d, minRTT=%d ms, winRTT=%d ms, queueSize=%d",
267-
int(newLimit), l.rttNoLoad/1e6, rtt/1e6, queueSize)
283+
int(newLimit), int64(l.rttNoLoad.Get())/1e6, rtt/1e6, queueSize)
268284
}
269285

270286
l.estimatedLimit = newLimit
@@ -275,7 +291,7 @@ func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight i
275291
func (l *VegasLimit) RTTNoLoad() int64 {
276292
l.mu.RLock()
277293
defer l.mu.RUnlock()
278-
return l.rttNoLoad
294+
return int64(l.rttNoLoad.Get())
279295
}
280296

281297
func (l *VegasLimit) String() string {

limit/vegas_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ func createVegasLimit() *VegasLimit {
1414
return NewVegasLimitWithRegistry(
1515
"test",
1616
10,
17+
nil,
1718
20,
1819
1.0,
1920
functions.FixedQueueSizeFunc(3),
@@ -88,6 +89,7 @@ func TestVegasLimit(t *testing.T) {
8889
l := NewVegasLimitWithRegistry(
8990
"test",
9091
100,
92+
nil,
9193
200,
9294
0.5,
9395
nil,
@@ -120,6 +122,7 @@ func TestVegasLimit(t *testing.T) {
120122
l := NewVegasLimitWithRegistry(
121123
"test",
122124
100,
125+
nil,
123126
200,
124127
-1,
125128
nil,

0 commit comments

Comments
 (0)