From cff0f0163cc4dc111f311c26c6323732f462f957 Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Thu, 28 Aug 2025 09:57:16 +0300 Subject: [PATCH 1/3] fixed bug of unprotected concurrent writes to a map in prefix indexer Signed-off-by: Nir Rozenbaum --- .../framework/plugins/multi/prefix/indexer.go | 23 ++++++++----------- .../plugins/multi/prefix/indexer_test.go | 3 ++- .../framework/plugins/multi/prefix/plugin.go | 2 +- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go index 0a209a8d4..844c0155f 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go @@ -38,37 +38,36 @@ type indexer struct { } // newIndexer initializes an indexer with size limits and starts cache size reporting. -func newIndexer(maxLRUSize int) *indexer { - ix := &indexer{ +func newIndexer(ctx context.Context, maxLRUSize int) *indexer { + indexer := &indexer{ hashToPods: make(map[BlockHash]podSet), podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]), maxLRUSize: maxLRUSize, } - go ix.ReportLRUSize(time.Second) - return ix + go indexer.reportLRUSize(ctx, time.Second) + return indexer } // Add adds a list of prefix hashes to the cache, tied to the server. func (i *indexer) Add(hashes []BlockHash, pod ServerID) { i.mu.Lock() + defer i.mu.Unlock() + // Check if the LRU pod exist lruForPod, exists := i.podToLRU[pod] if !exists { - newLRU, _ := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod)) + newLRU, _ := lru.NewWithEvict(i.maxLRUSize, i.makeEvictionFn(pod)) i.podToLRU[pod] = newLRU lruForPod = newLRU } - i.mu.Unlock() - // Add to LRU (may evict) for _, hash := range hashes { lruForPod.Add(hash, struct{}{}) } // Update hashToPods once under lock - i.mu.Lock() for _, hash := range hashes { pods := i.hashToPods[hash] if pods == nil { @@ -77,8 +76,6 @@ func (i *indexer) Add(hashes []BlockHash, pod ServerID) { pods[pod] = struct{}{} i.hashToPods[hash] = pods } - - i.mu.Unlock() } // Get returns a set of servers that have the given prefix hash cached. @@ -111,8 +108,8 @@ func (i *indexer) makeEvictionFn(pod ServerID) func(BlockHash, struct{}) { } } -// ReportLRUSize starts a goroutine that periodically reports the LRU cache size metric. -func (i *indexer) ReportLRUSize(interval time.Duration) { +// reportLRUSize starts a goroutine that periodically reports the LRU cache size metric. +func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { @@ -137,7 +134,7 @@ func (i *indexer) ReportLRUSize(interval time.Duration) { } metrics.RecordPrefixCacheSize(int64(totalEntries)) - log.FromContext(context.TODO()).V(logutil.TRACE).Info("Prefix cache state", + log.FromContext(ctx).V(logutil.TRACE).Info("Prefix cache state", "total entries", totalEntries, "# pods", numPods, "avg entries per pod", avg, diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go index a15112182..6d4fcc5f4 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go @@ -16,13 +16,14 @@ limitations under the License. package prefix import ( + "context" "testing" "github.com/stretchr/testify/assert" ) func TestIndexer_AddAndGet(t *testing.T) { - i := newIndexer(2) + i := newIndexer(context.Background(), 2) hash1 := BlockHash(1) server := ServerID{Namespace: "default", Name: "server1"} diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 8c7b52cec..80b77de08 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -158,7 +158,7 @@ func New(ctx context.Context, config Config) *Plugin { typedName: plugins.TypedName{Type: PrefixCachePluginType, Name: PrefixCachePluginType}, config: config, pluginState: plugins.NewPluginState(ctx), - indexer: newIndexer(capacity), + indexer: newIndexer(ctx, capacity), } } From 13b05efcf4cc8b96dc52bd8fd52d0cb6c313b2ff Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Thu, 28 Aug 2025 11:22:44 +0300 Subject: [PATCH 2/3] revert locking changes in indexer Signed-off-by: Nir Rozenbaum --- .../scheduling/framework/plugins/multi/prefix/indexer.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go index 844c0155f..bd9e2c96e 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go @@ -52,22 +52,23 @@ func newIndexer(ctx context.Context, maxLRUSize int) *indexer { // Add adds a list of prefix hashes to the cache, tied to the server. func (i *indexer) Add(hashes []BlockHash, pod ServerID) { i.mu.Lock() - defer i.mu.Unlock() - // Check if the LRU pod exist lruForPod, exists := i.podToLRU[pod] if !exists { - newLRU, _ := lru.NewWithEvict(i.maxLRUSize, i.makeEvictionFn(pod)) + newLRU, _ := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod)) i.podToLRU[pod] = newLRU lruForPod = newLRU } + i.mu.Unlock() + // Add to LRU (may evict) for _, hash := range hashes { lruForPod.Add(hash, struct{}{}) } // Update hashToPods once under lock + i.mu.Lock() for _, hash := range hashes { pods := i.hashToPods[hash] if pods == nil { @@ -76,6 +77,8 @@ func (i *indexer) Add(hashes []BlockHash, pod ServerID) { pods[pod] = struct{}{} i.hashToPods[hash] = pods } + + i.mu.Unlock() } // Get returns a set of servers that have the given prefix hash cached. From 05d90aabc42959fcd776b417f230f70c3d2371d6 Mon Sep 17 00:00:00 2001 From: Nir Rozenbaum Date: Thu, 28 Aug 2025 11:32:40 +0300 Subject: [PATCH 3/3] replace context.TODO with a context Signed-off-by: Nir Rozenbaum --- pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 80b77de08..ef33521b4 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -148,7 +148,7 @@ func New(ctx context.Context, config Config) *Plugin { capacity := config.LRUCapacityPerServer if capacity <= 0 { capacity = DefaultLRUCapacityPerServer - log.FromContext(context.TODO()).V(logutil.DEFAULT).Info( + log.FromContext(ctx).V(logutil.DEFAULT).Info( "LRUCapacityPerServer is not positive, using default value", "defaultCapacity", DefaultLRUCapacityPerServer, )