@@ -23,6 +23,11 @@ import (
2323 "time"
2424)
2525
26+ const (
27+ batcherActive = uint32 (0 )
28+ batcherDisposed = uint32 (1 )
29+ )
30+
2631// Batcher provides an API for accumulating items into a batch for processing.
2732type Batcher interface {
2833 // Put adds items to the batcher.
@@ -55,10 +60,11 @@ type basicBatcher struct {
5560 maxItems uint
5661 maxBytes uint
5762 calculateBytes CalculateBytes
58- disposed bool
63+ disposed uint32
5964 items []interface {}
6065 lock sync.RWMutex
6166 batchChan chan []interface {}
67+ disposeChan chan struct {}
6268 availableBytes uint
6369 waiting int32
6470}
@@ -82,19 +88,19 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
8288 calculateBytes : calculate ,
8389 items : make ([]interface {}, 0 , maxItems ),
8490 batchChan : make (chan []interface {}, queueLen ),
91+ disposeChan : make (chan struct {}),
8592 }, nil
8693}
8794
8895// Put adds items to the batcher. If Put is continually called without calls to
8996// Get, an unbounded number of go-routines will be generated.
9097// Note: there is no order guarantee for items entering/leaving the batcher.
9198func (b * basicBatcher ) Put (item interface {}) error {
92- b .lock .Lock ()
93- if b .disposed {
94- b .lock .Unlock ()
99+ // Check to see if disposed before putting
100+ if b .IsDisposed () {
95101 return ErrDisposed
96102 }
97-
103+ b . lock . Lock ()
98104 b .items = append (b .items , item )
99105 if b .calculateBytes != nil {
100106 b .availableBytes += b .calculateBytes (item )
@@ -121,18 +127,25 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
121127 timeout = time .After (b .maxTime )
122128 }
123129
130+ // Check to see if disposed before blocking
131+ if b .IsDisposed () {
132+ return nil , ErrDisposed
133+ }
134+
124135 select {
125- case items , ok := <- b .batchChan :
136+ case items := <- b .batchChan :
137+ return items , nil
138+ case _ , ok := <- b .disposeChan :
126139 if ! ok {
127140 return nil , ErrDisposed
128141 }
129- return items , nil
142+ return nil , nil
130143 case <- timeout :
131- b .lock .Lock ()
132- if b .disposed {
133- b .lock .Unlock ()
144+ // Check to see if disposed before getting lock
145+ if b .IsDisposed () {
134146 return nil , ErrDisposed
135147 }
148+ b .lock .Lock ()
136149 items := b .items
137150 b .items = make ([]interface {}, 0 , b .maxItems )
138151 b .availableBytes = 0
@@ -143,11 +156,10 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
143156
144157// Flush forcibly completes the batch currently being built
145158func (b * basicBatcher ) Flush () error {
146- b .lock .Lock ()
147- if b .disposed {
148- b .lock .Unlock ()
159+ if b .IsDisposed () {
149160 return ErrDisposed
150161 }
162+ b .lock .Lock ()
151163 b .flush ()
152164 b .lock .Unlock ()
153165 return nil
@@ -157,14 +169,14 @@ func (b *basicBatcher) Flush() error {
157169// will return ErrDisposed, calls to Get will return an error iff
158170// there are no more ready batches.
159171func (b * basicBatcher ) Dispose () {
160- b .lock .Lock ()
161- if b .disposed {
162- b .lock .Unlock ()
172+ // Check to see if disposed before attempting to dispose
173+ if atomic .CompareAndSwapUint32 (& b .disposed , batcherActive , batcherDisposed ) {
163174 return
164175 }
176+ b .lock .Lock ()
165177 b .flush ()
166- b .disposed = true
167178 b .items = nil
179+ close (b .disposeChan )
168180
169181 // Drain the batch channel and all routines waiting to put on the channel
170182 for len (b .batchChan ) > 0 || atomic .LoadInt32 (& b .waiting ) > 0 {
@@ -176,10 +188,7 @@ func (b *basicBatcher) Dispose() {
176188
177189// IsDisposed will determine if the batcher is disposed
178190func (b * basicBatcher ) IsDisposed () bool {
179- b .lock .RLock ()
180- disposed := b .disposed
181- b .lock .RUnlock ()
182- return disposed
191+ return atomic .LoadUint32 (& b .disposed ) == batcherDisposed
183192}
184193
185194// flush adds the batch currently being built to the queue of completed batches.
0 commit comments