Skip to content

Commit cdaf2aa

Browse files
feat: Add queue size multiplier config to BulkIndexer (#1029) (#1055)
* Add queue size multiplier config to BulkIndexer * test: add unit tests for QueueSizeMultiplier BulkIndexerConfig value --------- (cherry picked from commit c2f52b5) Co-authored-by: Stephanie Wei <[email protected]>
1 parent 5983693 commit cdaf2aa

File tree

2 files changed

+109
-7
lines changed

2 files changed

+109
-7
lines changed

esutil/bulk_indexer.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ type BulkIndexer interface {
5454

5555
// BulkIndexerConfig represents configuration of the indexer.
5656
type BulkIndexerConfig struct {
57-
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
58-
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
59-
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
57+
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
58+
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
59+
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
60+
QueueSizeMultiplier int // The multiplier on the size of the worker queue. Defaults to 1.
6061

6162
Client esapi.Transport // The Elasticsearch client.
6263
Decoder BulkResponseJSONDecoder // A custom JSON decoder.
@@ -289,18 +290,22 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
289290
cfg.Decoder = defaultJSONDecoder{}
290291
}
291292

292-
if cfg.NumWorkers == 0 {
293+
if cfg.NumWorkers <= 0 {
293294
cfg.NumWorkers = runtime.NumCPU()
294295
}
295296

296-
if cfg.FlushBytes == 0 {
297+
if cfg.FlushBytes <= 0 {
297298
cfg.FlushBytes = 5e+6
298299
}
299300

300-
if cfg.FlushInterval == 0 {
301+
if cfg.FlushInterval <= 0 {
301302
cfg.FlushInterval = 30 * time.Second
302303
}
303304

305+
if cfg.QueueSizeMultiplier <= 0 {
306+
cfg.QueueSizeMultiplier = 1
307+
}
308+
304309
bi := bulkIndexer{
305310
config: cfg,
306311
stats: &bulkIndexerStats{},
@@ -371,7 +376,7 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats {
371376

372377
// init initializes the bulk indexer.
373378
func (bi *bulkIndexer) init() {
374-
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)
379+
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers*bi.config.QueueSizeMultiplier)
375380

376381
for i := 1; i <= bi.config.NumWorkers; i++ {
377382
bi.wg.Add(1)

esutil/bulk_indexer_internal_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,103 @@ func TestBulkIndexer(t *testing.T) {
186186
}
187187
})
188188

189+
t.Run("BulkIndexerConfig.QueueSizeMultiplier", func(t *testing.T) {
190+
tests := []struct {
191+
name string
192+
numWorkers int
193+
queueSizeMultiplier int
194+
expectedQueueCap int
195+
}{
196+
{
197+
name: "Default QueueSizeMultiplier with 1 worker",
198+
numWorkers: 1,
199+
queueSizeMultiplier: 0,
200+
expectedQueueCap: 1,
201+
},
202+
{
203+
name: "Default QueueSizeMultiplier with 2 workers",
204+
numWorkers: 2,
205+
queueSizeMultiplier: 0,
206+
expectedQueueCap: 2,
207+
},
208+
{
209+
name: "QueueSizeMultiplier=2 with 1 worker",
210+
numWorkers: 1,
211+
queueSizeMultiplier: 2,
212+
expectedQueueCap: 2,
213+
},
214+
{
215+
name: "QueueSizeMultiplier=5 with 1 worker",
216+
numWorkers: 1,
217+
queueSizeMultiplier: 5,
218+
expectedQueueCap: 5,
219+
},
220+
{
221+
name: "QueueSizeMultiplier=10 with 1 worker",
222+
numWorkers: 1,
223+
queueSizeMultiplier: 10,
224+
expectedQueueCap: 10,
225+
},
226+
{
227+
name: "QueueSizeMultiplier=1 with 4 workers",
228+
numWorkers: 4,
229+
queueSizeMultiplier: 1,
230+
expectedQueueCap: 4,
231+
},
232+
{
233+
name: "QueueSizeMultiplier=2 with 4 workers",
234+
numWorkers: 4,
235+
queueSizeMultiplier: 2,
236+
expectedQueueCap: 8,
237+
},
238+
{
239+
name: "QueueSizeMultiplier=5 with 3 workers",
240+
numWorkers: 3,
241+
queueSizeMultiplier: 5,
242+
expectedQueueCap: 15,
243+
},
244+
{
245+
name: "QueueSizeMultiplier=-1 with 1 worker",
246+
numWorkers: 1,
247+
queueSizeMultiplier: -1,
248+
expectedQueueCap: 1,
249+
},
250+
}
251+
252+
for _, tt := range tests {
253+
tt := tt
254+
t.Run(tt.name, func(t *testing.T) {
255+
es, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}})
256+
if err != nil {
257+
t.Fatalf("Unexpected error: %s", err)
258+
}
259+
260+
cfg := BulkIndexerConfig{
261+
NumWorkers: tt.numWorkers,
262+
QueueSizeMultiplier: tt.queueSizeMultiplier,
263+
Client: es,
264+
}
265+
266+
bi, err := NewBulkIndexer(cfg)
267+
if err != nil {
268+
t.Fatalf("Unexpected error: %s", err)
269+
}
270+
271+
bir, ok := bi.(*bulkIndexer)
272+
if !ok {
273+
t.Fatalf("Unexpected type: %T", bi)
274+
}
275+
276+
if queueCap := cap(bir.queue); queueCap != tt.expectedQueueCap {
277+
t.Errorf("Unexpected queue capacity: want=%d, got=%d", tt.expectedQueueCap, queueCap)
278+
}
279+
280+
// Clean up
281+
_ = bi.Close(context.Background())
282+
})
283+
}
284+
})
285+
189286
t.Run("Add() Timeout", func(t *testing.T) {
190287
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}})
191288
bi, _ := NewBulkIndexer(BulkIndexerConfig{NumWorkers: 1, Client: es})

0 commit comments

Comments
 (0)