diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 1d758b1c9b..95639f7a2b 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -54,9 +54,10 @@ type BulkIndexer interface { // BulkIndexerConfig represents configuration of the indexer. type BulkIndexerConfig struct { - NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). - FlushBytes int // The flush threshold in bytes. Defaults to 5MB. - FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. + NumWorkers int // The number of workers. Defaults to runtime.NumCPU(). + FlushBytes int // The flush threshold in bytes. Defaults to 5MB. + FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec. + QueueSizeMultiplier int // The multiplier on the size of the worker queue. Defaults to 1. Client esapi.Transport // The Elasticsearch client. Decoder BulkResponseJSONDecoder // A custom JSON decoder. @@ -289,18 +290,22 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { cfg.Decoder = defaultJSONDecoder{} } - if cfg.NumWorkers == 0 { + if cfg.NumWorkers <= 0 { cfg.NumWorkers = runtime.NumCPU() } - if cfg.FlushBytes == 0 { + if cfg.FlushBytes <= 0 { cfg.FlushBytes = 5e+6 } - if cfg.FlushInterval == 0 { + if cfg.FlushInterval <= 0 { cfg.FlushInterval = 30 * time.Second } + if cfg.QueueSizeMultiplier <= 0 { + cfg.QueueSizeMultiplier = 1 + } + bi := bulkIndexer{ config: cfg, stats: &bulkIndexerStats{}, @@ -371,7 +376,7 @@ func (bi *bulkIndexer) Stats() BulkIndexerStats { // init initializes the bulk indexer. func (bi *bulkIndexer) init() { - bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers) + bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers*bi.config.QueueSizeMultiplier) for i := 1; i <= bi.config.NumWorkers; i++ { bi.wg.Add(1) diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index 4db4fa38f1..96e7da72df 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -186,6 +186,103 @@ func TestBulkIndexer(t *testing.T) { } }) + t.Run("BulkIndexerConfig.QueueSizeMultiplier", func(t *testing.T) { + tests := []struct { + name string + numWorkers int + queueSizeMultiplier int + expectedQueueCap int + }{ + { + name: "Default QueueSizeMultiplier with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 0, + expectedQueueCap: 1, + }, + { + name: "Default QueueSizeMultiplier with 2 workers", + numWorkers: 2, + queueSizeMultiplier: 0, + expectedQueueCap: 2, + }, + { + name: "QueueSizeMultiplier=2 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 2, + expectedQueueCap: 2, + }, + { + name: "QueueSizeMultiplier=5 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 5, + expectedQueueCap: 5, + }, + { + name: "QueueSizeMultiplier=10 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: 10, + expectedQueueCap: 10, + }, + { + name: "QueueSizeMultiplier=1 with 4 workers", + numWorkers: 4, + queueSizeMultiplier: 1, + expectedQueueCap: 4, + }, + { + name: "QueueSizeMultiplier=2 with 4 workers", + numWorkers: 4, + queueSizeMultiplier: 2, + expectedQueueCap: 8, + }, + { + name: "QueueSizeMultiplier=5 with 3 workers", + numWorkers: 3, + queueSizeMultiplier: 5, + expectedQueueCap: 15, + }, + { + name: "QueueSizeMultiplier=-1 with 1 worker", + numWorkers: 1, + queueSizeMultiplier: -1, + expectedQueueCap: 1, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + es, err := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + cfg := BulkIndexerConfig{ + NumWorkers: tt.numWorkers, + QueueSizeMultiplier: tt.queueSizeMultiplier, + Client: es, + } + + bi, err := NewBulkIndexer(cfg) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + + bir, ok := bi.(*bulkIndexer) + if !ok { + t.Fatalf("Unexpected type: %T", bi) + } + + if queueCap := cap(bir.queue); queueCap != tt.expectedQueueCap { + t.Errorf("Unexpected queue capacity: want=%d, got=%d", tt.expectedQueueCap, queueCap) + } + + // Clean up + _ = bi.Close(context.Background()) + }) + } + }) + t.Run("Add() Timeout", func(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) bi, _ := NewBulkIndexer(BulkIndexerConfig{NumWorkers: 1, Client: es})