-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathshard.go
223 lines (194 loc) · 6.18 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package sturdyc
import (
"math/rand/v2"
"sync"
"time"
)
// entry represents a single cache entry.
type entry[T any] struct {
key string
value T
expiresAt time.Time
backgroundRefreshAt time.Time
synchronousRefreshAt time.Time
numOfRefreshRetries int
isMissingRecord bool
}
// shard is a thread-safe data structure that holds a subset of the cache entries.
type shard[T any] struct {
sync.RWMutex
*Config
capacity int
ttl time.Duration
entries map[string]*entry[T]
evictionPercentage int
}
// newShard creates a new shard and returns a pointer to it.
func newShard[T any](capacity int, ttl time.Duration, evictionPercentage int, cfg *Config) *shard[T] {
return &shard[T]{
Config: cfg,
capacity: capacity,
ttl: ttl,
entries: make(map[string]*entry[T]),
evictionPercentage: evictionPercentage,
}
}
// size returns the number of entries in the shard.
func (s *shard[T]) size() int {
s.RLock()
defer s.RUnlock()
return len(s.entries)
}
// evictExpired evicts all the expired entries in the shard.
func (s *shard[T]) evictExpired() {
s.Lock()
defer s.Unlock()
var entriesEvicted int
for _, e := range s.entries {
if s.clock.Now().After(e.expiresAt) {
delete(s.entries, e.key)
entriesEvicted++
}
}
s.reportEntriesEvicted(entriesEvicted)
}
// forceEvict evicts a certain percentage of the entries in the shard
// based on the expiration time. Should be called with a lock.
func (s *shard[T]) forceEvict() {
s.reportForcedEviction()
// Check if we should evict all entries.
if s.evictionPercentage == 100 {
evictedCount := len(s.entries)
s.entries = make(map[string]*entry[T])
s.reportEntriesEvicted(evictedCount)
return
}
expirationTimes := make([]time.Time, 0, len(s.entries))
for _, e := range s.entries {
expirationTimes = append(expirationTimes, e.expiresAt)
}
// We could have a lumpy distribution of expiration times. As an example, we
// might have 100 entries in the cache but only 2 unique expiration times. In
// order to not over-evict when trying to remove 10%, we'll have to keep
// track of the number of entries that we've evicted.
percentage := float64(s.evictionPercentage) / 100
cutoff := FindCutoff(expirationTimes, percentage)
entriesToEvict := int(float64(len(expirationTimes)) * percentage)
entriesEvicted := 0
for key, e := range s.entries {
// Here we're essentially saying: if e.expiresAt <= cutoff.
if !e.expiresAt.After(cutoff) {
delete(s.entries, key)
entriesEvicted++
if entriesEvicted == entriesToEvict {
break
}
}
}
s.reportEntriesEvicted(entriesEvicted)
}
// get attempts to retrieve a value from the shard.
//
// Parameters:
//
// key: The key for which the value is to be retrieved.
//
// Returns:
//
// val: The value associated with the key, if it exists.
// exists: A boolean indicating if the value exists in the shard.
// markedAsMissing: A boolean indicating if the key has been marked as a missing record.
// refresh: A boolean indicating if the value should be refreshed in the background.
func (s *shard[T]) get(key string) (val T, exists, markedAsMissing, backgroundRefresh, synchronousRefresh bool) {
s.RLock()
item, ok := s.entries[key]
if !ok {
s.RUnlock()
return val, false, false, false, false
}
if s.clock.Now().After(item.expiresAt) {
s.RUnlock()
return val, false, false, false, false
}
// Check if the record should be synchronously refreshed.
if s.earlyRefreshes && s.clock.Now().After(item.synchronousRefreshAt) {
s.RUnlock()
return item.value, true, item.isMissingRecord, false, true
}
shouldRefresh := s.earlyRefreshes && s.clock.Now().After(item.backgroundRefreshAt)
if shouldRefresh {
// Release the read lock, and switch to a write lock.
s.RUnlock()
s.Lock()
// However, during the time it takes to switch locks, another goroutine
// might have acquired it and moved the refreshAt. Therefore, we'll have to
// check if this operation should still be performed.
if !s.clock.Now().After(item.backgroundRefreshAt) {
s.Unlock()
return item.value, true, item.isMissingRecord, false, false
}
// Update the "refreshAt" so no other goroutines attempts to refresh the same entry.
nextRefresh := s.retryBaseDelay * (1 << item.numOfRefreshRetries)
item.backgroundRefreshAt = s.clock.Now().Add(nextRefresh)
item.numOfRefreshRetries++
s.Unlock()
return item.value, true, item.isMissingRecord, shouldRefresh, false
}
s.RUnlock()
return item.value, true, item.isMissingRecord, false, false
}
// set writes a key-value pair to the shard and returns a
// boolean indicating whether an eviction was performed.
func (s *shard[T]) set(key string, value T, isMissingRecord bool) bool {
s.Lock()
defer s.Unlock()
// Check we need to perform an eviction first.
evict := len(s.entries) >= s.capacity
// If the cache is configured to not evict any entries,
// and we're att full capacity, we'll return early.
if s.evictionPercentage < 1 && evict {
return false
}
if evict {
s.forceEvict()
}
now := s.clock.Now()
newEntry := &entry[T]{
key: key,
value: value,
expiresAt: now.Add(s.ttl),
isMissingRecord: isMissingRecord,
}
if s.earlyRefreshes {
// If there is a difference between the min- and maxRefreshTime we'll use that to
// set a random padding so that the refreshes get spread out evenly over time.
var padding time.Duration
if s.minAsyncRefreshTime != s.maxAsyncRefreshTime {
padding = time.Duration(rand.Int64N(int64(s.maxAsyncRefreshTime - s.minAsyncRefreshTime)))
}
newEntry.backgroundRefreshAt = now.Add(s.minAsyncRefreshTime + padding)
newEntry.synchronousRefreshAt = now.Add(s.syncRefreshTime)
newEntry.numOfRefreshRetries = 0
}
s.entries[key] = newEntry
return evict
}
// delete removes a key from the shard.
func (s *shard[T]) delete(key string) {
s.Lock()
defer s.Unlock()
delete(s.entries, key)
}
// keys returns all non-expired keys in the shard.
func (s *shard[T]) keys() []string {
s.RLock()
defer s.RUnlock()
keys := make([]string, 0, len(s.entries))
for k, v := range s.entries {
if s.clock.Now().After(v.expiresAt) {
continue
}
keys = append(keys, k)
}
return keys
}