Skip to content

Commit c712e58

Browse files
Merge pull request #17 from platinummonkey/moving_percentile
Adds Additional Measurement Types
2 parents 43e12f0 + 7170326 commit c712e58

9 files changed

+384
-0
lines changed
219 KB
Binary file not shown.

docs/content/_index.md

+3
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,6 @@ dialOption := grpc.WithUnaryInterceptor(
152152
),
153153
)
154154
```
155+
156+
---
157+
[References used can be found here](_references.md)

docs/content/_references.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# References Used
2+
1. Original Java implementation - Netflix - https://github.com/netflix/concurrency-limits/
3+
1. Windowless Moving Percentile - Martin Jambon - https://mjambon.com/2016-07-23-moving-percentile/

measurements/moving_average.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package measurements
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"sync"
7+
)
8+
9+
// SimpleExponentialMovingAverage implements a simple exponential moving average
10+
// this implementation only uses a single alpha value to determine warm-up time and provides a mean
11+
// approximation
12+
type SimpleExponentialMovingAverage struct {
13+
alpha float64
14+
initialAlpha float64
15+
minSamples int
16+
seenSamples int
17+
18+
value float64
19+
20+
mu sync.RWMutex
21+
}
22+
23+
// NewSimpleExponentialMovingAverage creates a new simple moving average
24+
func NewSimpleExponentialMovingAverage(
25+
alpha float64,
26+
) (*SimpleExponentialMovingAverage, error) {
27+
if alpha < 0 || alpha > 1 {
28+
return nil, fmt.Errorf("alpha must be [0, 1]")
29+
}
30+
minSamples := int(math.Trunc(math.Ceil(1 / alpha)))
31+
return &SimpleExponentialMovingAverage{
32+
alpha: alpha,
33+
initialAlpha: alpha,
34+
minSamples: minSamples,
35+
}, nil
36+
}
37+
38+
// Add a single sample and update the internal state.
39+
// returns true if the internal state was updated, also return the current value.
40+
func (m *SimpleExponentialMovingAverage) Add(value float64) (float64, bool) {
41+
m.mu.Lock()
42+
defer m.mu.Unlock()
43+
return m.add(value)
44+
}
45+
46+
func (m *SimpleExponentialMovingAverage) add(value float64) (float64, bool) {
47+
changed := false
48+
if m.seenSamples < m.minSamples {
49+
m.seenSamples++
50+
}
51+
var alpha float64
52+
if m.seenSamples >= m.minSamples {
53+
alpha = m.alpha
54+
} else {
55+
alpha = 1 / float64(m.seenSamples)
56+
}
57+
newValue := (1-alpha)*m.value + alpha*value
58+
if newValue != m.value {
59+
changed = true
60+
}
61+
m.value = newValue
62+
return m.value, changed
63+
}
64+
65+
// Get the current value.
66+
func (m *SimpleExponentialMovingAverage) Get() float64 {
67+
m.mu.RLock()
68+
defer m.mu.RUnlock()
69+
return m.value
70+
}
71+
72+
// Reset the internal state as if no samples were ever added.
73+
func (m *SimpleExponentialMovingAverage) Reset() {
74+
m.mu.Lock()
75+
m.seenSamples = 0
76+
m.value = 0
77+
m.alpha = m.initialAlpha
78+
m.mu.Unlock()
79+
}
80+
81+
// Update will update the value given an operation function
82+
func (m *SimpleExponentialMovingAverage) Update(operation func(value float64) float64) {
83+
m.mu.Lock()
84+
defer m.mu.Unlock()
85+
newValue, _ := m.add(m.value)
86+
m.value = operation(newValue)
87+
}

measurements/moving_average_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package measurements
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestSimpleExponentialMovingAverage(t *testing.T) {
10+
t.Parallel()
11+
asrt := assert.New(t)
12+
m, err := NewSimpleExponentialMovingAverage(0.05)
13+
asrt.NoError(err)
14+
asrt.NotNil(m)
15+
16+
asrt.Equal(float64(0), m.Get())
17+
m.Add(10)
18+
asrt.Equal(float64(10), m.Get())
19+
m.Add(11)
20+
asrt.Equal(float64(10.5), m.Get())
21+
m.Add(11)
22+
m.Add(11)
23+
asrt.Equal(float64(10.75), m.Get())
24+
}

measurements/moving_variance.go

+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package measurements
2+
3+
import (
4+
"math"
5+
"sync"
6+
)
7+
8+
// SimpleMovingVariance implements a simple moving variance calculation based on the simple moving average.
9+
type SimpleMovingVariance struct {
10+
average *SimpleExponentialMovingAverage
11+
variance *SimpleExponentialMovingAverage
12+
13+
stdev float64 // square root of the estimated variance
14+
normalized float64 // (input - mean) / stdev
15+
16+
mu sync.RWMutex
17+
}
18+
19+
// NewSimpleMovingVariance will create a new exponential moving variance approximation based on the SimpleMovingAverage
20+
func NewSimpleMovingVariance(
21+
alphaAverage float64,
22+
alphaVariance float64,
23+
) (*SimpleMovingVariance, error) {
24+
movingAverage, err := NewSimpleExponentialMovingAverage(alphaAverage)
25+
if err != nil {
26+
return nil, err
27+
}
28+
variance, err := NewSimpleExponentialMovingAverage(alphaVariance)
29+
if err != nil {
30+
return nil, err
31+
}
32+
return &SimpleMovingVariance{
33+
average: movingAverage,
34+
variance: variance,
35+
}, nil
36+
}
37+
38+
// Add a single sample and update the internal state.
39+
// returns true if the internal state was updated, also return the current value.
40+
func (m *SimpleMovingVariance) Add(value float64) (float64, bool) {
41+
m.mu.Lock()
42+
defer m.mu.Unlock()
43+
changed := false
44+
if m.average.seenSamples > 0 {
45+
m.variance.Add(math.Pow(value-m.average.Get(), 2))
46+
}
47+
m.average.Add(value)
48+
49+
mean := m.average.Get()
50+
variance := m.variance.Get()
51+
stdev := math.Sqrt(variance)
52+
normalized := m.normalized
53+
if stdev != 0 {
54+
// edge case
55+
normalized = (value - mean) / stdev
56+
}
57+
58+
if stdev != m.stdev || normalized != m.normalized {
59+
changed = true
60+
}
61+
m.stdev = stdev
62+
m.normalized = normalized
63+
return stdev, changed
64+
}
65+
66+
// Get the current value.
67+
func (m *SimpleMovingVariance) Get() float64 {
68+
m.mu.RLock()
69+
defer m.mu.RUnlock()
70+
return m.variance.Get()
71+
}
72+
73+
// Reset the internal state as if no samples were ever added.
74+
func (m *SimpleMovingVariance) Reset() {
75+
m.mu.Lock()
76+
m.average.Reset()
77+
m.variance.Reset()
78+
m.stdev = 0
79+
m.normalized = 0
80+
m.mu.Unlock()
81+
}
82+
83+
// Update will update the value given an operation function
84+
func (m *SimpleMovingVariance) Update(operation func(value float64) float64) {
85+
m.mu.Lock()
86+
defer m.mu.Unlock()
87+
m.stdev = operation(m.variance.Get())
88+
}

measurements/moving_variance_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package measurements
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestSimpleMovingVariance(t *testing.T) {
10+
t.Parallel()
11+
asrt := assert.New(t)
12+
m, err := NewSimpleMovingVariance(0.05, 0.05)
13+
asrt.NoError(err)
14+
asrt.NotNil(m)
15+
16+
// initial condition
17+
asrt.Equal(float64(0), m.Get())
18+
// warmup first sample
19+
m.Add(10)
20+
asrt.Equal(float64(0), m.Get())
21+
// first variance reading is expected to be 1 here
22+
m.Add(11)
23+
asrt.Equal(float64(1), m.Get())
24+
m.Add(10)
25+
m.Add(11)
26+
asrt.InDelta(float64(0.5648), m.Get(), 0.00005)
27+
m.Add(20)
28+
m.Add(100)
29+
m.Add(30)
30+
asrt.InDelta(float64(1295.7841), m.Get(), 0.00005)
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package measurements
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
)
7+
8+
// WindowlessMovingPercentile implements a moving percentile.
9+
// This implementation uses a windowless calculation that while not strictly always accurate,
10+
// provides a very close estimation in O(1) time and space.
11+
// Much credit goes to Martin Jambon here: https://mjambon.com/2016-07-23-moving-percentile/
12+
// a copy can be found in github.com/platinummonkey/go-concurrency-limits/docs/assets/moving_percentile_reference.pdf
13+
// and this is a port of the OCaml implementation provided in that reference.
14+
type WindowlessMovingPercentile struct {
15+
p float64
16+
deltaInitial float64
17+
q float64
18+
19+
value float64
20+
delta float64
21+
deltaState *SimpleMovingVariance
22+
23+
seenCount int
24+
25+
mu sync.RWMutex
26+
}
27+
28+
// NewWindowlessMovingPercentile creates a new Windowless Moving Percentile
29+
// p - percentile requested, accepts (0,1)
30+
// deltaInitial - the initial delta value, here 0 is acceptable if you expect it to be rather stable at start, otherwise
31+
// choose a larger value. This would be estimated: `delta := stdev * r` where `r` is a user chosen
32+
// constant. Good values are generally from 0.001 to 0.01
33+
// movingAvgAlphaAvg - this is the alpha value for the simple moving average. A good start is 0.05. Accepts [0,1]
34+
// movingVarianceAlphaAvg - this is the alpha value for the simple moving variance. A good start is 0.05. Accepts [0,1]
35+
func NewWindowlessMovingPercentile(
36+
p float64, // percentile requested
37+
deltaInitial float64,
38+
movingAvgAlphaAvg float64,
39+
movingVarianceAlphaVar float64,
40+
) (*WindowlessMovingPercentile, error) {
41+
if p <= 0 || p >= 1 {
42+
return nil, fmt.Errorf("p must be between (0,1)")
43+
}
44+
q := 1 - p
45+
if q <= 0 || q >= 1 {
46+
return nil, fmt.Errorf("calculated q must be between (0,1)")
47+
}
48+
variance, err := NewSimpleMovingVariance(movingAvgAlphaAvg, movingVarianceAlphaVar)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
return &WindowlessMovingPercentile{
54+
p: p,
55+
q: q,
56+
deltaInitial: deltaInitial,
57+
delta: deltaInitial,
58+
deltaState: variance,
59+
}, nil
60+
}
61+
62+
// Add a single sample and update the internal state.
63+
// returns true if the internal state was updated, also return the current value.
64+
func (m *WindowlessMovingPercentile) Add(value float64) (float64, bool) {
65+
m.mu.Lock()
66+
defer m.mu.Unlock()
67+
return m.add(value)
68+
}
69+
70+
func (m *WindowlessMovingPercentile) add(value float64) (float64, bool) {
71+
changed := false
72+
if m.seenCount < 2 {
73+
// we only need 2 samples to continue
74+
m.seenCount++
75+
}
76+
originalDelta := m.delta
77+
stdev, _ := m.deltaState.Add(value)
78+
if m.seenCount >= 2 {
79+
m.delta = m.deltaInitial * stdev
80+
if m.delta != originalDelta {
81+
changed = true
82+
}
83+
}
84+
newValue := float64(m.value)
85+
if m.seenCount == 1 {
86+
newValue = value
87+
changed = true
88+
} else if value < m.value {
89+
newValue = m.value - m.delta/m.p
90+
} else if value > m.value {
91+
newValue = m.value + m.delta/(1-m.p)
92+
}
93+
// else the same
94+
if newValue != m.value {
95+
changed = true
96+
}
97+
m.value = newValue
98+
return m.value, changed
99+
}
100+
101+
// Get the current value.
102+
func (m *WindowlessMovingPercentile) Get() float64 {
103+
m.mu.RLock()
104+
defer m.mu.RUnlock()
105+
return m.value
106+
}
107+
108+
// Reset the internal state as if no samples were ever added.
109+
func (m *WindowlessMovingPercentile) Reset() {
110+
m.mu.Lock()
111+
m.value = 0
112+
m.seenCount = 0
113+
m.mu.Unlock()
114+
}
115+
116+
// Update will update the value given an operation function
117+
func (m *WindowlessMovingPercentile) Update(operation func(value float64) float64) {
118+
m.mu.Lock()
119+
defer m.mu.Unlock()
120+
newValue, _ := m.add(m.value)
121+
m.value = operation(newValue)
122+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package measurements
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestWindowlessMovingPercentile(t *testing.T) {
10+
t.Parallel()
11+
asrt := assert.New(t)
12+
m, err := NewWindowlessMovingPercentile(0.9, 0.01, 0.05, 0.05)
13+
asrt.NoError(err)
14+
asrt.NotNil(m)
15+
asrt.Equal(float64(0.0), m.Get())
16+
for i := 0; i < 10; i++ {
17+
m.Add(100)
18+
}
19+
asrt.Equal(float64(100), m.Get())
20+
m.Add(99)
21+
for i := 0; i < 10; i++ {
22+
m.Add(1000)
23+
}
24+
m.Add(0.1)
25+
asrt.Equal(520, int(m.Get()))
26+
}

0 commit comments

Comments
 (0)