From a53d36bcb67ccb33cd23dc00e22844a66bf00dc6 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Wed, 19 Jul 2023 23:33:01 +0300 Subject: [PATCH 1/2] Fix memory leak bulk indexer --- esutil/bulk_indexer.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index c462876562..d6c06bb014 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -251,7 +251,8 @@ type bulkIndexer struct { workers []*worker stats *bulkIndexerStats - config BulkIndexerConfig + config BulkIndexerConfig + bufPool *sync.Pool } type bulkIndexerStats struct { @@ -290,6 +291,11 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { bi := bulkIndexer{ config: cfg, stats: &bulkIndexerStats{}, + bufPool: &sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, cfg.FlushBytes)) + }, + }, } bi.init() @@ -337,6 +343,11 @@ func (bi *bulkIndexer) Close(ctx context.Context) error { bi.wg.Wait() } + for _, w := range bi.workers { + w.buf.Reset() + bi.bufPool.Put(w.buf) + } + return nil } @@ -363,7 +374,7 @@ func (bi *bulkIndexer) init() { id: i, ch: bi.queue, bi: bi, - buf: bytes.NewBuffer(make([]byte, 0, bi.config.FlushBytes)), + buf: bi.bufPool.Get().(*bytes.Buffer), ticker: time.NewTicker(bi.config.FlushInterval), } w.run() From c445c2ed0b30542771d0d148000dbd1acbffedba Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 20 Jul 2023 00:06:10 +0300 Subject: [PATCH 2/2] Fix wrong sync pool location --- esutil/bulk_indexer.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index d6c06bb014..6618611f33 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -251,8 +251,7 @@ type bulkIndexer struct { workers []*worker stats *bulkIndexerStats - config BulkIndexerConfig - bufPool *sync.Pool + config BulkIndexerConfig } type bulkIndexerStats struct { @@ -266,6 +265,12 @@ type bulkIndexerStats struct { numRequests uint64 } +var bufPool = &sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + // NewBulkIndexer creates a new bulk indexer. func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { if cfg.Client == nil { @@ -291,11 +296,6 @@ func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) { bi := bulkIndexer{ config: cfg, stats: &bulkIndexerStats{}, - bufPool: &sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, cfg.FlushBytes)) - }, - }, } bi.init() @@ -345,7 +345,7 @@ func (bi *bulkIndexer) Close(ctx context.Context) error { for _, w := range bi.workers { w.buf.Reset() - bi.bufPool.Put(w.buf) + bufPool.Put(w.buf) } return nil @@ -374,7 +374,7 @@ func (bi *bulkIndexer) init() { id: i, ch: bi.queue, bi: bi, - buf: bi.bufPool.Get().(*bytes.Buffer), + buf: bufPool.Get().(*bytes.Buffer), ticker: time.NewTicker(bi.config.FlushInterval), } w.run()