Skip to content

Commit e98f2a3

Browse files
committed
Merge pull request #111 from brianshannan-wf/batcher_ordering
Make batcher ordered
2 parents 3b07818 + 37048aa commit e98f2a3

File tree

2 files changed

+139
-83
lines changed

2 files changed

+139
-83
lines changed

batcher/batcher.go

Lines changed: 137 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,38 @@ package batcher
1818

1919
import (
2020
"errors"
21-
"sync"
22-
"sync/atomic"
2321
"time"
2422
)
2523

26-
const (
27-
batcherActive = uint32(0)
28-
batcherDisposed = uint32(1)
29-
)
24+
// I honestly can't believe I'm doing this, but go's sync package doesn't
25+
// have a TryLock function.
26+
// Could probably do this with atomics
27+
type mutex struct {
28+
// This is really more of a semaphore design, but eh
29+
// Full -> locked, empty -> unlocked
30+
lock chan struct{}
31+
}
32+
33+
func newMutex() *mutex {
34+
return &mutex{lock: make(chan struct{}, 1)}
35+
}
36+
37+
func (m *mutex) Lock() {
38+
m.lock <- struct{}{}
39+
}
40+
41+
func (m *mutex) Unlock() {
42+
<-m.lock
43+
}
44+
45+
func (m *mutex) TryLock() bool {
46+
select {
47+
case m.lock <- struct{}{}:
48+
return true
49+
default:
50+
return false
51+
}
52+
}
3053

3154
// Batcher provides an API for accumulating items into a batch for processing.
3255
type Batcher interface {
@@ -60,13 +83,11 @@ type basicBatcher struct {
6083
maxItems uint
6184
maxBytes uint
6285
calculateBytes CalculateBytes
63-
disposed uint32
86+
disposed bool
6487
items []interface{}
65-
lock sync.RWMutex
6688
batchChan chan []interface{}
67-
disposeChan chan struct{}
6889
availableBytes uint
69-
waiting int32
90+
lock *mutex
7091
}
7192

7293
// New creates a new Batcher using the provided arguments.
@@ -76,6 +97,10 @@ type basicBatcher struct {
7697
// - Maximum amount of time waiting for a batch
7798
// Values of zero for one of these fields indicate they should not be
7899
// taken into account when evaluating the readiness of a batch.
100+
// This provides an ordering guarantee for any given thread such that if a
101+
// thread places two items in the batcher, Get will guarantee the first
102+
// item is returned before the second, whether before the second in the same
103+
// batch, or in an earlier batch.
79104
func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate CalculateBytes) (Batcher, error) {
80105
if maxBytes > 0 && calculate == nil {
81106
return nil, errors.New("batcher: must provide CalculateBytes function")
@@ -88,24 +113,27 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
88113
calculateBytes: calculate,
89114
items: make([]interface{}, 0, maxItems),
90115
batchChan: make(chan []interface{}, queueLen),
91-
disposeChan: make(chan struct{}),
116+
lock: newMutex(),
92117
}, nil
93118
}
94119

95-
// Put adds items to the batcher. If Put is continually called without calls to
96-
// Get, an unbounded number of go-routines will be generated.
97-
// Note: there is no order guarantee for items entering/leaving the batcher.
120+
// Put adds items to the batcher.
98121
func (b *basicBatcher) Put(item interface{}) error {
99-
// Check to see if disposed before putting
100-
if b.IsDisposed() {
122+
b.lock.Lock()
123+
if b.disposed {
124+
b.lock.Unlock()
101125
return ErrDisposed
102126
}
103-
b.lock.Lock()
127+
104128
b.items = append(b.items, item)
105129
if b.calculateBytes != nil {
106130
b.availableBytes += b.calculateBytes(item)
107131
}
108132
if b.ready() {
133+
// To guarantee ordering this MUST be in the lock, otherwise multiple
134+
// flush calls could be blocked at the same time, in which case
135+
// there's no guarantee each batch is placed into the channel in
136+
// the proper order
109137
b.flush()
110138
}
111139

@@ -114,10 +142,7 @@ func (b *basicBatcher) Put(item interface{}) error {
114142
}
115143

116144
// Get retrieves a batch from the batcher. This call will block until
117-
// one of the conditions for a "complete" batch is reached. If Put is
118-
// continually called without calls to Get, an unbounded number of
119-
// go-routines will be generated.
120-
// Note: there is no order guarantee for items entering/leaving the batcher.
145+
// one of the conditions for a "complete" batch is reached.
121146
func (b *basicBatcher) Get() ([]interface{}, error) {
122147
// Don't check disposed yet so any items remaining in the queue
123148
// will be returned properly.
@@ -127,86 +152,119 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
127152
timeout = time.After(b.maxTime)
128153
}
129154

130-
// Check to see if disposed before blocking
131-
if b.IsDisposed() {
132-
return nil, ErrDisposed
133-
}
134-
135155
select {
136-
case items := <-b.batchChan:
137-
return items, nil
138-
case _, ok := <-b.disposeChan:
156+
case items, ok := <-b.batchChan:
157+
// If there's something on the batch channel, we definitely want that.
139158
if !ok {
140159
return nil, ErrDisposed
141160
}
142-
return nil, nil
161+
return items, nil
143162
case <-timeout:
144-
// Check to see if disposed before getting lock
145-
if b.IsDisposed() {
146-
return nil, ErrDisposed
163+
// It's possible something was added to the channel after something
164+
// was received on the timeout channel, in which case that must
165+
// be returned first to satisfy our ordering guarantees.
166+
// We can't just grab the lock here in case the batch channel is full,
167+
// in which case a Put or Flush will be blocked and holding
168+
// onto the lock. In that case, there should be something on the
169+
// batch channel
170+
for {
171+
if b.lock.TryLock() {
172+
// We have a lock, try to read from channel first in case
173+
// something snuck in
174+
select {
175+
case items, ok := <-b.batchChan:
176+
b.lock.Unlock()
177+
if !ok {
178+
return nil, ErrDisposed
179+
}
180+
return items, nil
181+
default:
182+
}
183+
184+
// If that is unsuccessful, nothing was added to the channel,
185+
// and the temp buffer can't have changed because of the lock,
186+
// so grab that
187+
items := b.items
188+
b.items = make([]interface{}, 0, b.maxItems)
189+
b.availableBytes = 0
190+
b.lock.Unlock()
191+
return items, nil
192+
} else {
193+
// If we didn't get a lock, there are two cases:
194+
// 1) The batch chan is full.
195+
// 2) A Put or Flush temporarily has the lock.
196+
// In either case, trying to read something off the batch chan,
197+
// and going back to trying to get a lock if unsuccessful
198+
// works.
199+
select {
200+
case items, ok := <-b.batchChan:
201+
if !ok {
202+
return nil, ErrDisposed
203+
}
204+
return items, nil
205+
default:
206+
}
207+
}
147208
}
148-
b.lock.Lock()
149-
items := b.items
150-
b.items = make([]interface{}, 0, b.maxItems)
151-
b.availableBytes = 0
152-
b.lock.Unlock()
153-
return items, nil
154209
}
155210
}
156211

157212
// Flush forcibly completes the batch currently being built
158213
func (b *basicBatcher) Flush() error {
159-
if b.IsDisposed() {
214+
// This is the same pattern as a Put
215+
b.lock.Lock()
216+
if b.disposed {
217+
b.lock.Unlock()
160218
return ErrDisposed
161219
}
162-
b.lock.Lock()
163220
b.flush()
164221
b.lock.Unlock()
165222
return nil
166223
}
167224

168225
// Dispose will dispose of the batcher. Any calls to Put or Flush
169226
// will return ErrDisposed, calls to Get will return an error iff
170-
// there are no more ready batches.
227+
// there are no more ready batches. Any items not flushed and retrieved
228+
// by a Get may or may not be retrievable after calling this.
171229
func (b *basicBatcher) Dispose() {
172-
// Check to see if disposed before attempting to dispose
173-
if atomic.CompareAndSwapUint32(&b.disposed, batcherActive, batcherDisposed) {
174-
return
175-
}
176-
b.lock.Lock()
177-
b.flush()
178-
b.items = nil
179-
close(b.disposeChan)
230+
for {
231+
if b.lock.TryLock() {
232+
// We've got a lock
233+
if b.disposed {
234+
b.lock.Unlock()
235+
return
236+
}
237+
238+
b.disposed = true
239+
b.items = nil
240+
b.drainBatchChan()
241+
close(b.batchChan)
242+
b.lock.Unlock()
243+
} else {
244+
// Two cases here:
245+
// 1) Something is blocked and holding onto the lock
246+
// 2) Something temporarily has a lock
247+
// For case 1, we have to clear at least some space so the blocked
248+
// Put/Flush can release the lock. For case 2, nothing bad
249+
// will happen here
250+
b.drainBatchChan()
251+
}
180252

181-
// Drain the batch channel and all routines waiting to put on the channel
182-
for len(b.batchChan) > 0 || atomic.LoadInt32(&b.waiting) > 0 {
183-
<-b.batchChan
184253
}
185-
close(b.batchChan)
186-
b.lock.Unlock()
187254
}
188255

189256
// IsDisposed will determine if the batcher is disposed
190257
func (b *basicBatcher) IsDisposed() bool {
191-
return atomic.LoadUint32(&b.disposed) == batcherDisposed
258+
b.lock.Lock()
259+
disposed := b.disposed
260+
b.lock.Unlock()
261+
return disposed
192262
}
193263

194264
// flush adds the batch currently being built to the queue of completed batches.
195265
// flush is not threadsafe, so should be synchronized externally.
196266
func (b *basicBatcher) flush() {
197-
// Note: This needs to be in a go-routine to avoid locking out gets when
198-
// the batch channel is full.
199-
cpItems := make([]interface{}, len(b.items))
200-
for i, val := range b.items {
201-
cpItems[i] = val
202-
}
203-
// Signal one more waiter for the batch channel
204-
atomic.AddInt32(&b.waiting, 1)
205-
// Don't block on the channel put
206-
go func() {
207-
b.batchChan <- cpItems
208-
atomic.AddInt32(&b.waiting, -1)
209-
}()
267+
b.batchChan <- b.items
210268
b.items = make([]interface{}, 0, b.maxItems)
211269
b.availableBytes = 0
212270
}
@@ -220,3 +278,13 @@ func (b *basicBatcher) ready() bool {
220278
}
221279
return false
222280
}
281+
282+
func (b *basicBatcher) drainBatchChan() {
283+
for {
284+
select {
285+
case <-b.batchChan:
286+
default:
287+
return
288+
}
289+
}
290+
}

batcher/batcher_test.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -145,29 +145,17 @@ func TestDispose(t *testing.T) {
145145
b.Put("b")
146146
b.Put("c")
147147

148-
possibleBatches := [][]interface{}{
149-
[]interface{}{"a", "b"},
150-
[]interface{}{"c"},
151-
}
152-
153-
// Wait for items to get to the channel
154-
for len(b.(*basicBatcher).batchChan) == 0 {
155-
time.Sleep(1 * time.Millisecond)
156-
}
157148
batch1, err := b.Get()
158-
assert.Contains(possibleBatches, batch1)
149+
assert.Equal([]interface{}{"a", "b"}, batch1)
159150
assert.Nil(err)
160151

161152
batch2, err := b.Get()
162-
assert.Contains(possibleBatches, batch2)
153+
assert.Equal([]interface{}{"c"}, batch2)
163154
assert.Nil(err)
164155

165156
b.Put("d")
166157
b.Put("e")
167158
b.Put("f")
168-
b.Put("g")
169-
b.Put("h")
170-
b.Put("i")
171159

172160
b.Dispose()
173161

0 commit comments

Comments
 (0)