From d1ca9f82624f896fa462f9b261bace24f7c3e701 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 10 May 2026 16:31:20 -0400 Subject: [PATCH 1/4] feat: integrate batch allocator in processor allocator Signed-off-by: Thomas Lacroix --- pkg/gameserverallocations/allocation_cache.go | 237 +++++++++++----- pkg/gameserverallocations/allocator.go | 58 ++-- pkg/gameserverallocations/allocator_test.go | 181 ++++++++++++- pkg/gameserverallocations/batch_allocator.go | 254 ++++++++++++++++++ 4 files changed, 634 insertions(+), 96 deletions(-) create mode 100644 pkg/gameserverallocations/batch_allocator.go diff --git a/pkg/gameserverallocations/allocation_cache.go b/pkg/gameserverallocations/allocation_cache.go index 30a0b0b72e..2142071d0d 100644 --- a/pkg/gameserverallocations/allocation_cache.go +++ b/pkg/gameserverallocations/allocation_cache.go @@ -18,6 +18,7 @@ import ( "context" "sort" + "agones.dev/agones/pkg/apis" "agones.dev/agones/pkg/apis/agones" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" @@ -173,65 +174,12 @@ func (c *AllocationCache) ListSortedGameServers(gsa *allocationv1.GameServerAllo } counts := c.counter.Counts() + var priorities []agonesv1.Priority + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && gsa != nil { + priorities = gsa.Spec.Priorities + } sort.Slice(list, func(i, j int) bool { - gs1 := list[i] - gs2 := list[j] - - // Search Allocated GameServers first. - if gs1.Status.State != gs2.Status.State { - return gs1.Status.State == agonesv1.GameServerStateAllocated - } - - c1, ok := counts[gs1.Status.NodeName] - if !ok { - return false - } - - c2, ok := counts[gs2.Status.NodeName] - if !ok { - return true - } - - if c1.Allocated > c2.Allocated { - return true - } - if c1.Allocated < c2.Allocated { - return false - } - - // prefer nodes that have the most Ready gameservers on them - they are most likely to be - // completely filled and least likely target for scale down. - if c1.Ready < c2.Ready { - return false - } - if c1.Ready > c2.Ready { - return true - } - - // if player tracking is enabled, prefer game servers with the least amount of room left - if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { - if gs1.Status.Players != nil && gs2.Status.Players != nil { - cap1 := gs1.Status.Players.Capacity - gs1.Status.Players.Count - cap2 := gs2.Status.Players.Capacity - gs2.Status.Players.Count - - // if they are equal, pass the comparison through. - if cap1 < cap2 { - return true - } else if cap2 < cap1 { - return false - } - } - } - - // if we end up here, then break the tie with Counter or List Priority. - if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && (gsa != nil) { - if res := gs1.CompareCountAndListPriorities(gsa.Spec.Priorities, gs2); res != nil { - return *res - } - } - - // finally sort lexicographically, so we have a stable order - return gs1.GetObjectMeta().GetName() < gs2.GetObjectMeta().GetName() + return compareGameServersForPackedStrategy(list[i], list[j], priorities, counts) }) return list @@ -245,18 +193,12 @@ func (c *AllocationCache) ListSortedGameServersPriorities(gsa *allocationv1.Game return []*agonesv1.GameServer{} } + var priorities []agonesv1.Priority + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && gsa != nil { + priorities = gsa.Spec.Priorities + } sort.Slice(list, func(i, j int) bool { - gs1 := list[i] - gs2 := list[j] - - if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && (gsa != nil) { - if res := gs1.CompareCountAndListPriorities(gsa.Spec.Priorities, gs2); res != nil { - return *res - } - } - - // finally sort lexicographically, so we have a stable order - return gs1.GetObjectMeta().GetName() < gs2.GetObjectMeta().GetName() + return compareGameServersForDistributedStrategy(list[i], list[j], priorities) }) return list @@ -327,3 +269,160 @@ func (c *AllocationCache) getKey(gs *agonesv1.GameServer) (string, bool) { } return key, ok } + +// ReorderGameServerAfterAllocation repositions gsAfterAllocation in gsList after a local allocation +// has been applied, using gsIndexBeforeAllocation as a search hint. Used by the batch allocator +// to maintain sort order without a full re-sort. +func (c *AllocationCache) ReorderGameServerAfterAllocation( + gsList []*agonesv1.GameServer, + gsIndexBeforeAllocation int, gsAfterAllocation *agonesv1.GameServer, + priorities []agonesv1.Priority, strategy apis.SchedulingStrategy) { + if len(gsList) == 0 || gsIndexBeforeAllocation < 0 || gsIndexBeforeAllocation >= len(gsList) || gsAfterAllocation == nil { + c.baseLogger.WithField("gsIndexBeforeAllocation", gsIndexBeforeAllocation). + WithField("gsAfterAllocation", gsAfterAllocation). + WithField("gsListLength", len(gsList)). + Warn("ReorderGameServerAfterAllocation called with invalid parameters! Reordering is skipped!") + return + } + + newIndex := gsIndexBeforeAllocation + gsToReorderOriginal := gsList[gsIndexBeforeAllocation] + + optimizeList := func(greater bool) []*agonesv1.GameServer { + if greater { + return gsList[gsIndexBeforeAllocation+1:] + } + return gsList[:gsIndexBeforeAllocation] + } + + switch strategy { + case apis.Packed: + counts := c.counter.Counts() + greater, equal := compareGameServersAfterAllocationForPackedStrategy(gsToReorderOriginal, gsAfterAllocation, priorities, counts) + if !equal { + newIndex = findIndexAfterAllocationForPackedStrategy(optimizeList(greater), gsAfterAllocation, priorities, counts) + if greater { + newIndex += gsIndexBeforeAllocation + } + } + case apis.Distributed: + greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gsToReorderOriginal, gsAfterAllocation, priorities) + if !equal { + newIndex = findIndexAfterAllocationForDistributedStrategy(optimizeList(greater), gsAfterAllocation, priorities) + if greater { + newIndex += gsIndexBeforeAllocation + } + } + default: + c.baseLogger.WithField("strategy", strategy). + Warn("Scheduling strategy not supported! Reordering is skipped!") + } + + if newIndex != gsIndexBeforeAllocation { + gsList = append(gsList[:gsIndexBeforeAllocation], gsList[gsIndexBeforeAllocation+1:]...) + gsList = append(gsList[:newIndex], append([]*agonesv1.GameServer{gsAfterAllocation}, gsList[newIndex:]...)...) + } else { + gsList[gsIndexBeforeAllocation] = gsAfterAllocation + } +} + +// compareGameServersAfterAllocationForPackedStrategy compares a game server before and after a local +// allocation is applied. Returns (greater, equal) where greater means before has higher priority than after. +func compareGameServersAfterAllocationForPackedStrategy( + before, after *agonesv1.GameServer, + priorities []agonesv1.Priority, + counts map[string]gameservers.NodeCount) (bool, bool) { + if before.Status.State != after.Status.State { + return before.Status.State == agonesv1.GameServerStateAllocated, false + } + + c1, ok := counts[before.Status.NodeName] + if !ok { + return false, false + } + c2, ok := counts[after.Status.NodeName] + if !ok { + return true, false + } + + if c1.Allocated > c2.Allocated { + return true, false + } + if c1.Allocated < c2.Allocated { + return false, false + } + + if c1.Ready < c2.Ready { + return false, false + } + if c1.Ready > c2.Ready { + return true, false + } + + if runtime.FeatureEnabled(runtime.FeaturePlayerAllocationFilter) { + if before.Status.Players != nil && after.Status.Players != nil { + cap1 := before.Status.Players.Capacity - before.Status.Players.Count + cap2 := after.Status.Players.Capacity - after.Status.Players.Count + if cap1 < cap2 { + return true, false + } else if cap2 < cap1 { + return false, false + } + } + } + + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && priorities != nil { + if res := before.CompareCountAndListPriorities(priorities, after); res != nil { + return *res, false + } + } + + return false, true +} + +// compareGameServersAfterAllocationForDistributedStrategy compares a game server before and after a local +// allocation is applied using the distributed strategy. Returns (greater, equal). +func compareGameServersAfterAllocationForDistributedStrategy( + before, after *agonesv1.GameServer, + priorities []agonesv1.Priority) (bool, bool) { + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) && priorities != nil { + if res := before.CompareCountAndListPriorities(priorities, after); res != nil { + return *res, false + } + } + return false, true +} + +// compareGameServersForPackedStrategy compares two game servers for the packed strategy with +// lexicographic tie-breaking for a stable sort order. +func compareGameServersForPackedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) bool { + greater, equal := compareGameServersAfterAllocationForPackedStrategy(gs0, gs1, priorities, counts) + if !equal { + return greater + } + return gs0.GetObjectMeta().GetName() < gs1.GetObjectMeta().GetName() +} + +// compareGameServersForDistributedStrategy compares two game servers for the distributed strategy with +// lexicographic tie-breaking for a stable sort order. +func compareGameServersForDistributedStrategy(gs0, gs1 *agonesv1.GameServer, priorities []agonesv1.Priority) bool { + greater, equal := compareGameServersAfterAllocationForDistributedStrategy(gs0, gs1, priorities) + if !equal { + return greater + } + return gs0.GetObjectMeta().GetName() < gs1.GetObjectMeta().GetName() +} + +// findIndexAfterAllocationForPackedStrategy binary-searches for the insertion position that maintains sort order. +func findIndexAfterAllocationForPackedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority, counts map[string]gameservers.NodeCount) int { + return sort.Search(len(gsList), func(i int) bool { + return compareGameServersForPackedStrategy(gs, gsList[i], priorities, counts) + }) +} + +// findIndexAfterAllocationForDistributedStrategy binary-searches for the insertion position that maintains sort order. +func findIndexAfterAllocationForDistributedStrategy(gsList []*agonesv1.GameServer, gs *agonesv1.GameServer, priorities []agonesv1.Priority) int { + return sort.Search(len(gsList), func(i int) bool { + return compareGameServersForDistributedStrategy(gs, gsList[i], priorities) + }) +} diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index 79c01a9703..53d10a94a5 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -177,7 +177,11 @@ func (c *Allocator) Run(ctx context.Context) error { } // workers and logic for batching allocations - go c.ListenAndAllocate(ctx, maxBatchQueue) + if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { + go c.ListenAndBatchAllocate(ctx, maxBatchQueue) + } else { + go c.ListenAndAllocate(ctx, maxBatchQueue) + } return nil } @@ -271,33 +275,39 @@ func (c *Allocator) allocateFromLocalCluster(ctx context.Context, gsa *allocatio return err }) - if err != nil && err != ErrNoGameServer && err != ErrConflictInGameServerSelection { + switch { + case err == nil: + // success - fall through to fill allocation result below + case goErrors.Is(err, ErrNoGameServer): + gsa.Status.State = allocationv1.GameServerAllocationUnAllocated + return gsa, nil + case goErrors.Is(err, ErrConflictInGameServerSelection): + gsa.Status.State = allocationv1.GameServerAllocationContention + return gsa, nil + case goErrors.Is(err, ErrGameServerUpdateConflict): + c.allocationCache.Resync() + gsa.Status.State = allocationv1.GameServerAllocationUnAllocated + return gsa, nil + default: c.allocationCache.Resync() return nil, err } - switch err { - case ErrNoGameServer, ErrGameServerUpdateConflict: - gsa.Status.State = allocationv1.GameServerAllocationUnAllocated - case ErrConflictInGameServerSelection: - gsa.Status.State = allocationv1.GameServerAllocationContention - default: - gsa.ObjectMeta.Name = gs.ObjectMeta.Name - gsa.Status.State = allocationv1.GameServerAllocationAllocated - gsa.Status.GameServerName = gs.ObjectMeta.Name - gsa.Status.Ports = gs.Status.Ports - gsa.Status.Address = gs.Status.Address - gsa.Status.Addresses = append(gsa.Status.Addresses, gs.Status.Addresses...) - gsa.Status.NodeName = gs.Status.NodeName - gsa.Status.Source = localAllocationSource - gsa.Status.Metadata = &allocationv1.GameServerMetadata{ - Labels: gs.ObjectMeta.Labels, - Annotations: gs.ObjectMeta.Annotations, - } - if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { - gsa.Status.Counters = gs.Status.Counters - gsa.Status.Lists = gs.Status.Lists - } + gsa.ObjectMeta.Name = gs.ObjectMeta.Name + gsa.Status.State = allocationv1.GameServerAllocationAllocated + gsa.Status.GameServerName = gs.ObjectMeta.Name + gsa.Status.Ports = gs.Status.Ports + gsa.Status.Address = gs.Status.Address + gsa.Status.Addresses = append(gsa.Status.Addresses, gs.Status.Addresses...) + gsa.Status.NodeName = gs.Status.NodeName + gsa.Status.Source = localAllocationSource + gsa.Status.Metadata = &allocationv1.GameServerMetadata{ + Labels: gs.ObjectMeta.Labels, + Annotations: gs.ObjectMeta.Annotations, + } + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + gsa.Status.Counters = gs.Status.Counters + gsa.Status.Lists = gs.Status.Lists } c.loggerForGameServerAllocation(gsa).Debug("Game server allocation") diff --git a/pkg/gameserverallocations/allocator_test.go b/pkg/gameserverallocations/allocator_test.go index 150a9005f4..30179bea55 100644 --- a/pkg/gameserverallocations/allocator_test.go +++ b/pkg/gameserverallocations/allocator_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -509,9 +510,11 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { // try the public method result, err := a.Allocate(ctx, gsa.DeepCopy()) log.WithField("result", result).WithError(err).Info("Allocate (public): failed allocation") - require.Nil(t, result) - require.NotEqual(t, ErrNoGameServer, err) - require.EqualError(t, err, ErrGameServerUpdateConflict.Error()) + require.NoError(t, err) + require.NotNil(t, result) + gsa2, ok := result.(*allocationv1.GameServerAllocation) + require.True(t, ok) + require.Equal(t, allocationv1.GameServerAllocationUnAllocated, gsa2.Status.State) } func TestAllocatorRunLocalAllocations(t *testing.T) { @@ -1121,6 +1124,178 @@ func TestAllocatorCreateRestClientError(t *testing.T) { }) } +func TestAllocatorListenAndBatchAllocate(t *testing.T) { + t.Parallel() + + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + require.NoError(t, runtime.ParseFeatures(string(runtime.FeatureProcessorAllocator)+"=true")) + + t.Run("single allocation succeeds", func(t *testing.T) { + f, gsList := defaultFixtures(3) + + a, m := newFakeAllocator() + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{Items: gsList}, nil + }) + updateCount := 0 + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + updateCount++ + uo := action.(k8stesting.UpdateAction) + gs := uo.GetObject().(*agonesv1.GameServer) + return true, gs, nil + }) + + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + err := a.allocationCache.syncCache() + assert.Nil(t, err) + err = a.allocationCache.counter.Run(ctx, 0) + assert.Nil(t, err) + + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Selectors: []allocationv1.GameServerSelector{{LabelSelector: metav1.LabelSelector{MatchLabels: map[string]string{agonesv1.FleetNameLabel: f.ObjectMeta.Name}}}}, + }} + gsa.ApplyDefaults() + + j1 := request{gsa: gsa.DeepCopy(), response: make(chan response, 1), ctx: context.Background()} + a.pendingRequests <- j1 + + go a.ListenAndBatchAllocate(ctx, 3) + + res1 := <-j1.response + assert.NoError(t, res1.err) + assert.NotNil(t, res1.gs) + assert.Equal(t, agonesv1.GameServerStateAllocated, res1.gs.Status.State) + assert.Equal(t, 1, updateCount) + }) + + t.Run("multiple allocations to different game servers each get their own update", func(t *testing.T) { + f, gsList := defaultFixtures(5) + + a, m := newFakeAllocator() + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{Items: gsList}, nil + }) + updateCount := 0 + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + updateCount++ + uo := action.(k8stesting.UpdateAction) + gs := uo.GetObject().(*agonesv1.GameServer) + return true, gs, nil + }) + + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + err := a.allocationCache.syncCache() + assert.Nil(t, err) + err = a.allocationCache.counter.Run(ctx, 0) + assert.Nil(t, err) + + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Selectors: []allocationv1.GameServerSelector{{LabelSelector: metav1.LabelSelector{MatchLabels: map[string]string{agonesv1.FleetNameLabel: f.ObjectMeta.Name}}}}, + }} + gsa.ApplyDefaults() + + j1 := request{gsa: gsa.DeepCopy(), response: make(chan response, 1), ctx: context.Background()} + j2 := request{gsa: gsa.DeepCopy(), response: make(chan response, 1), ctx: context.Background()} + j3 := request{gsa: gsa.DeepCopy(), response: make(chan response, 1), ctx: context.Background()} + a.pendingRequests <- j1 + a.pendingRequests <- j2 + a.pendingRequests <- j3 + + go a.ListenAndBatchAllocate(ctx, 3) + + res1 := <-j1.response + assert.NoError(t, res1.err) + assert.NotNil(t, res1.gs) + assert.Equal(t, agonesv1.GameServerStateAllocated, res1.gs.Status.State) + + res2 := <-j2.response + assert.NoError(t, res2.err) + assert.NotNil(t, res2.gs) + assert.NotEqual(t, res1.gs.ObjectMeta.Name, res2.gs.ObjectMeta.Name) + + res3 := <-j3.response + assert.NoError(t, res3.err) + assert.NotNil(t, res3.gs) + assert.Equal(t, agonesv1.GameServerStateAllocated, res3.gs.Status.State) + + // Packed strategy may consolidate all 3 onto one GS (batch collapsing) or spread them — + // either way at least 1 K8s update must have happened. + assert.GreaterOrEqual(t, updateCount, 1) + }) + + t.Run("k8s conflict propagates to all callers in batch", func(t *testing.T) { + f, gsList := defaultFixtures(3) + + a, m := newFakeAllocator() + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{Items: gsList}, nil + }) + m.AgonesClient.AddReactor("update", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, nil, k8serrors.NewConflict(agonesv1.Resource("gameservers"), "gs1", errors.New("conflict")) + }) + + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + err := a.allocationCache.syncCache() + assert.Nil(t, err) + err = a.allocationCache.counter.Run(ctx, 0) + assert.Nil(t, err) + + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Selectors: []allocationv1.GameServerSelector{{LabelSelector: metav1.LabelSelector{MatchLabels: map[string]string{agonesv1.FleetNameLabel: f.ObjectMeta.Name}}}}, + }} + gsa.ApplyDefaults() + + j1 := request{gsa: gsa.DeepCopy(), response: make(chan response, 1), ctx: context.Background()} + a.pendingRequests <- j1 + + go a.ListenAndBatchAllocate(ctx, 3) + + res1 := <-j1.response + assert.Error(t, res1.err) + assert.True(t, errors.Is(res1.err, ErrGameServerUpdateConflict)) + }) + + t.Run("no game servers returns error", func(t *testing.T) { + a, m := newFakeAllocator() + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + err := a.allocationCache.syncCache() + assert.Nil(t, err) + err = a.allocationCache.counter.Run(ctx, 0) + assert.Nil(t, err) + + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Selectors: []allocationv1.GameServerSelector{{LabelSelector: metav1.LabelSelector{MatchLabels: map[string]string{agonesv1.FleetNameLabel: "thereisnofleet"}}}}, + }} + gsa.ApplyDefaults() + + j1 := request{gsa: gsa.DeepCopy(), response: make(chan response, 1), ctx: context.Background()} + a.pendingRequests <- j1 + + go a.ListenAndBatchAllocate(ctx, 3) + + res1 := <-j1.response + assert.Nil(t, res1.gs) + assert.ErrorIs(t, res1.err, ErrNoGameServer) + }) +} + // newFakeAllocator returns a fake allocator. func newFakeAllocator() (*Allocator, agtesting.Mocks) { m := agtesting.NewMocks() diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go new file mode 100644 index 0000000000..64d8dc678d --- /dev/null +++ b/pkg/gameserverallocations/batch_allocator.go @@ -0,0 +1,254 @@ +// Copyright Contributors to Agones a Series of LF Projects, LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gameserverallocations + +import ( + "context" + goErrors "errors" + "time" + + "agones.dev/agones/pkg/apis" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/util/runtime" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// batchResponses is an async list of responses for matching requests +type batchResponses struct { + counterErrors error + listErrors error + responses []response +} + +// batchAllocationUpdateWorkers tries to update each newly allocated gs with the last state. +// If the update fails because of a version conflict, all allocations that were applied onto +// a gs will receive an error, thus being available for retries. +func (c *Allocator) batchAllocationUpdateWorkers(ctx context.Context, workerCount int) chan<- batchResponses { + batchUpdateQueue := make(chan batchResponses) + + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case batchRes := <-batchUpdateQueue: + if len(batchRes.responses) > 0 { + lastGsState := batchRes.responses[len(batchRes.responses)-1].gs + + var propagatedErr error + updatedGs, updateErr := c.gameServerGetter.GameServers(lastGsState.ObjectMeta.Namespace).Update(ctx, lastGsState, metav1.UpdateOptions{}) + if updateErr != nil { + if !k8serrors.IsConflict(errors.Cause(updateErr)) { + c.allocationCache.AddGameServer(lastGsState) + } + propagatedErr = goErrors.Join(ErrGameServerUpdateConflict, updateErr) + } else { + c.allocationCache.AddGameServer(updatedGs) + + if batchRes.counterErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "CounterActionError", batchRes.counterErrors.Error()) + } + if batchRes.listErrors != nil { + c.recorder.Event(updatedGs, corev1.EventTypeWarning, "ListActionError", batchRes.listErrors.Error()) + } + c.recorder.Event(updatedGs, corev1.EventTypeNormal, string(updatedGs.Status.State), "Allocated") + } + + for _, res := range batchRes.responses { + res.err = propagatedErr + res.request.response <- res + } + } + case <-ctx.Done(): + return + } + } + }() + } + + return batchUpdateQueue +} + +// ListenAndBatchAllocate is a blocking function that runs in a loop processing allocation +// requests in batches. Unlike ListenAndAllocate, it applies allocations locally to a +// GameServer before batching updates — multiple allocations to the same GameServer within +// a flush window result in a single Kubernetes update, reducing API pressure and improving +// session packing. +func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCount int) { + batchUpdateQueue := c.batchAllocationUpdateWorkers(ctx, updateWorkerCount) + + var list []*agonesv1.GameServer + var sortKey uint64 + requestCount := 0 + gsToReorderIndex := -1 + var gsToReorder *agonesv1.GameServer + + batchResponsesPerGs := make(map[string]batchResponses) + + flush := func() { + for _, batchRes := range batchResponsesPerGs { + batchUpdateQueue <- batchRes + } + batchResponsesPerGs = make(map[string]batchResponses) + + list = nil + requestCount = 0 + gsToReorderIndex = -1 + gsToReorder = nil + } + + checkSortKey := func(gsa *allocationv1.GameServerAllocation) { + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + newSortKey, err := gsa.SortKey() + if err != nil { + c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec", err) + } + if sortKey == uint64(0) { + sortKey = newSortKey + } + + if newSortKey != sortKey { + sortKey = newSortKey + flush() + } + } + } + + checkRefreshList := func(gsa *allocationv1.GameServerAllocation) { + if requestCount >= maxBatchBeforeRefresh { + flush() + } + requestCount++ + + checkSortKey(gsa) + + if list == nil { + if !runtime.FeatureEnabled(runtime.FeatureCountsAndLists) || gsa.Spec.Scheduling == apis.Packed { + list = c.allocationCache.ListSortedGameServers(gsa) + } else { + list = c.allocationCache.ListSortedGameServersPriorities(gsa) + } + } else if gsToReorderIndex >= 0 { + c.allocationCache.ReorderGameServerAfterAllocation(list, gsToReorderIndex, gsToReorder, gsa.Spec.Priorities, gsa.Spec.Scheduling) + } + } + + for { + select { + case req := <-c.pendingRequests: + if req.ctx.Err() != nil { + c.tryRespondWithError(req, ErrTotalTimeoutExceeded) + continue + } + + checkRefreshList(req.gsa) + + foundGs, foundGsIndex, err := findGameServerForAllocation(req.gsa, list) + if err != nil { + req.response <- response{request: req, gs: nil, err: err} + continue + } + + existingBatch, alreadyAllocated := batchResponsesPerGs[string(foundGs.UID)] + if !alreadyAllocated { + if removeErr := c.allocationCache.RemoveGameServer(foundGs); removeErr != nil { + removeErr = errors.Wrap(removeErr, "error removing gameserver from cache") + req.response <- response{request: req, gs: nil, err: removeErr} + list = append(list[:foundGsIndex], list[foundGsIndex+1:]...) + continue + } + } + + gsToReorder = foundGs.DeepCopy() + gsToReorderIndex = foundGsIndex + applyErr, counterErrors, listErrors := c.applyAllocationToLocalGameServer(req.gsa.Spec.MetaPatch, gsToReorder, req.gsa) + if applyErr == nil { + if alreadyAllocated { + existingBatch.responses = append(existingBatch.responses, response{request: req, gs: gsToReorder.DeepCopy(), err: nil}) + existingBatch.counterErrors = goErrors.Join(existingBatch.counterErrors, counterErrors) + existingBatch.listErrors = goErrors.Join(existingBatch.listErrors, listErrors) + batchResponsesPerGs[string(gsToReorder.UID)] = existingBatch + } else { + batchResponsesPerGs[string(gsToReorder.UID)] = batchResponses{ + responses: []response{{request: req, gs: gsToReorder.DeepCopy(), err: nil}}, + counterErrors: counterErrors, + listErrors: listErrors, + } + } + } else { + req.response <- response{request: req, gs: nil, err: applyErr} + } + + case <-ctx.Done(): + flush() + return + + default: + flush() + time.Sleep(c.batchWaitTime) + } + } +} + +// applyAllocationToLocalGameServer patches the GameServer with allocation metadata and sets +// it to Allocated state without persisting to Kubernetes. Counter/List actions are applied +// if FeatureCountsAndLists is enabled. Returns (applyErr, counterErrors, listErrors). +func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (error, error, error) { + ts, err := time.Now().MarshalText() + if err != nil { + return err, nil, nil + } + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, 1) + } + gs.ObjectMeta.Annotations[LastAllocatedAnnotationKey] = string(ts) + gs.Status.State = agonesv1.GameServerStateAllocated + + if mp.Labels != nil { + if gs.ObjectMeta.Labels == nil { + gs.ObjectMeta.Labels = make(map[string]string, len(mp.Labels)) + } + for key, value := range mp.Labels { + gs.ObjectMeta.Labels[key] = value + } + } + + if gs.ObjectMeta.Annotations == nil { + gs.ObjectMeta.Annotations = make(map[string]string, len(mp.Annotations)) + } + for key, value := range mp.Annotations { + gs.ObjectMeta.Annotations[key] = value + } + + var counterErrors error + var listErrors error + if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { + if gsa.Spec.Counters != nil { + for counter, ca := range gsa.Spec.Counters { + counterErrors = goErrors.Join(counterErrors, ca.CounterActions(counter, gs)) + } + } + if gsa.Spec.Lists != nil { + for list, la := range gsa.Spec.Lists { + listErrors = goErrors.Join(listErrors, la.ListActions(list, gs)) + } + } + } + + return nil, counterErrors, listErrors +} From 0d564ccbf2129e3e54bb381765cada37e6f60ad5 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 10 May 2026 18:17:33 -0400 Subject: [PATCH 2/4] feat: update / re-add existing unit tests from PR Signed-off-by: Thomas Lacroix --- pkg/gameserverallocations/allocation_cache.go | 4 +- .../allocation_cache_test.go | 276 ++++++++++++++++++ pkg/gameserverallocations/allocator.go | 2 +- pkg/gameserverallocations/allocator_test.go | 115 +++++++- pkg/gameserverallocations/batch_allocator.go | 17 +- 5 files changed, 389 insertions(+), 25 deletions(-) diff --git a/pkg/gameserverallocations/allocation_cache.go b/pkg/gameserverallocations/allocation_cache.go index 2142071d0d..aa679fdb11 100644 --- a/pkg/gameserverallocations/allocation_cache.go +++ b/pkg/gameserverallocations/allocation_cache.go @@ -281,7 +281,7 @@ func (c *AllocationCache) ReorderGameServerAfterAllocation( c.baseLogger.WithField("gsIndexBeforeAllocation", gsIndexBeforeAllocation). WithField("gsAfterAllocation", gsAfterAllocation). WithField("gsListLength", len(gsList)). - Warn("ReorderGameServerAfterAllocation called with invalid parameters! Reordering is skipped!") + Warn("ReorderGameServerAfterAllocation called with invalid parameters, reordering is skipped") return } @@ -315,7 +315,7 @@ func (c *AllocationCache) ReorderGameServerAfterAllocation( } default: c.baseLogger.WithField("strategy", strategy). - Warn("Scheduling strategy not supported! Reordering is skipped!") + Warn("scheduling strategy not supported, reordering is skipped") } if newIndex != gsIndexBeforeAllocation { diff --git a/pkg/gameserverallocations/allocation_cache_test.go b/pkg/gameserverallocations/allocation_cache_test.go index 8c6967e4fd..8fedf7a6ff 100644 --- a/pkg/gameserverallocations/allocation_cache_test.go +++ b/pkg/gameserverallocations/allocation_cache_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "agones.dev/agones/pkg/apis" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" "agones.dev/agones/pkg/gameservers" @@ -602,6 +603,281 @@ func TestAllocatorRunCacheSync(t *testing.T) { assertCacheEntries(0) } +func TestAllocationCacheReorderGameServerAfterAllocation(t *testing.T) { + t.Parallel() + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + + gs0Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs0", Namespace: defaultNs, UID: "0"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs1Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs2 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateReady}} + gs2Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, + Status: agonesv1.GameServerStatus{NodeName: "node0", State: agonesv1.GameServerStateAllocated}} + gs3 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, UID: "3"}, + Status: agonesv1.GameServerStatus{NodeName: "node1", State: agonesv1.GameServerStateReady}} + gs4 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, UID: "4"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 3, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 3, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1", "player2"}, + Capacity: 10, + }, + }, + }, + } + gs5 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, UID: "5"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 2, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 2, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1"}, + Capacity: 10, + }, + }, + }, + } + gs5Allocated := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, UID: "5"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 5, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 5, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0", "player1", "player2", "player3", "player4"}, + Capacity: 10, + }, + }, + }, + } + gs6 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, UID: "6"}, + Status: agonesv1.GameServerStatus{ + NodeName: "node1", + State: agonesv1.GameServerStateAllocated, + Players: &agonesv1.PlayerStatus{ + Count: 1, + Capacity: 10, + }, + Counters: map[string]agonesv1.CounterStatus{ + "players": { + Count: 1, + Capacity: 10, + }, + }, + Lists: map[string]agonesv1.ListStatus{ + "players": { + Values: []string{"player0"}, + Capacity: 10, + }, + }, + }, + } + + fixtures := map[string]struct { + features string + list []*agonesv1.GameServer + priorities []agonesv1.Priority + packingStrategy apis.SchedulingStrategy + gsToReorder *agonesv1.GameServer + gsToReorderIndex int + want []*agonesv1.GameServer + }{ + "pakced (no change)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1Allocated, &gs2, &gs3}, + }, + "packed": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + gsToReorder: &gs2Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs2Allocated, &gs1, &gs3}, + }, + "packed (sort by name)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs2Allocated, &gs1, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1Allocated, &gs2Allocated, &gs3}, + }, + "packed (all ready)": { + list: []*agonesv1.GameServer{&gs1, &gs2, &gs3}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated, &gs2, &gs3}, + }, + "packed (only one)": { + list: []*agonesv1.GameServer{&gs1}, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated}, + }, + "packed (priority counter)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + priorities: []agonesv1.Priority{ + { + Type: "Counter", + Key: "players", + Order: "Ascending", + }, + }, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "packed (priority list)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs6, &gs5, &gs4}, + priorities: []agonesv1.Priority{ + { + Type: "List", + Key: "players", + Order: "Descending", + }, + }, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs6, &gs4, &gs5Allocated}, + }, + "packed (FeaturePlayerAllocationFilter)": { + features: fmt.Sprintf("%s=true", runtime.FeaturePlayerAllocationFilter), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "distributed (no change)": { + list: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2, &gs3}, + packingStrategy: apis.Distributed, + gsToReorder: &gs2Allocated, + gsToReorderIndex: 2, + want: []*agonesv1.GameServer{&gs0Allocated, &gs1, &gs2Allocated, &gs3}, + }, + "distributed (only one)": { + list: []*agonesv1.GameServer{&gs1}, + packingStrategy: apis.Distributed, + gsToReorder: &gs1Allocated, + gsToReorderIndex: 0, + want: []*agonesv1.GameServer{&gs1Allocated}, + }, + "distributed (priority counter)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs4, &gs5, &gs6}, + priorities: []agonesv1.Priority{ + { + Type: "Counter", + Key: "players", + Order: "Ascending", + }, + }, + packingStrategy: apis.Distributed, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs5Allocated, &gs4, &gs6}, + }, + "distributed (priority list)": { + features: fmt.Sprintf("%s=true", runtime.FeatureCountsAndLists), + list: []*agonesv1.GameServer{&gs6, &gs5, &gs4}, + priorities: []agonesv1.Priority{ + { + Type: "List", + Key: "players", + Order: "Descending", + }, + }, + packingStrategy: apis.Distributed, + gsToReorder: &gs5Allocated, + gsToReorderIndex: 1, + want: []*agonesv1.GameServer{&gs6, &gs4, &gs5Allocated}, + }, + } + + for testName, testScenario := range fixtures { + t.Run(testName, func(t *testing.T) { + // deliberately not resetting the Feature state, to catch any possible unknown regressions + if testScenario.features != "" { + require.NoError(t, runtime.ParseFeatures(testScenario.features)) + } + + cache, m := newFakeAllocationCache() + + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{ + Items: func(input []*agonesv1.GameServer) []agonesv1.GameServer { + result := make([]agonesv1.GameServer, len(input)) + for i, gs := range input { + result[i] = *gs + } + return result + }(testScenario.list), + }, nil + }) + + ctx, cancel := agtesting.StartInformers(m, cache.gameServerSynced) + defer cancel() + + err := cache.syncCache() + assert.Nil(t, err) + + err = cache.counter.Run(ctx, 0) + assert.Nil(t, err) + + strategy := apis.Packed + if testScenario.packingStrategy != "" { + strategy = testScenario.packingStrategy + } + + cache.ReorderGameServerAfterAllocation( + testScenario.list, + testScenario.gsToReorderIndex, testScenario.gsToReorder, + testScenario.priorities, strategy) + + if !assert.Equal(t, testScenario.want, testScenario.list, "reordered list should match expected") { + for _, gs := range testScenario.list { + t.Logf("%s, ", gs.Name) + } + } + }) + } +} + func newFakeAllocationCache() (*AllocationCache, agtesting.Mocks) { m := agtesting.NewMocks() cache := NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory), healthcheck.NewHandler()) diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index 53d10a94a5..2cfc4afa5a 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -625,7 +625,7 @@ func (c *Allocator) allocationUpdateWorkers(ctx context.Context, workerCount int // we should wait for it to get updated with fresh info. c.allocationCache.AddGameServer(gs) } - res.err = ErrGameServerUpdateConflict + res.err = goErrors.Join(ErrGameServerUpdateConflict, err) } else { // put the GameServer back into the cache, so it's immediately around for re-allocation c.allocationCache.AddGameServer(gs) diff --git a/pkg/gameserverallocations/allocator_test.go b/pkg/gameserverallocations/allocator_test.go index 30179bea55..4be12ed4d6 100644 --- a/pkg/gameserverallocations/allocator_test.go +++ b/pkg/gameserverallocations/allocator_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "time" @@ -497,7 +498,7 @@ func TestAllocatorAllocateOnGameServerUpdateError(t *testing.T) { _, err := a.allocate(ctx, gsa.DeepCopy()) log.WithError(err).Info("allocate (private): failed allocation") require.NotEqual(t, ErrNoGameServer, err) - require.EqualError(t, err, ErrGameServerUpdateConflict.Error()) + require.True(t, errors.Is(err, ErrGameServerUpdateConflict)) // make sure we aren't in the same batch! time.Sleep(2 * a.batchWaitTime) @@ -1008,7 +1009,8 @@ func TestControllerAllocationUpdateWorkers(t *testing.T) { r = <-r.request.response assert.True(t, updated) - assert.EqualError(t, r.err, ErrGameServerUpdateConflict.Error()) + assert.True(t, errors.Is(r.err, ErrGameServerUpdateConflict)) + assert.ErrorContains(t, r.err, "something went wrong") assert.Equal(t, gs1, r.gs) agtesting.AssertNoEvent(t, m.FakeRecorder.Events) @@ -1150,9 +1152,9 @@ func TestAllocatorListenAndBatchAllocate(t *testing.T) { defer cancel() err := a.allocationCache.syncCache() - assert.Nil(t, err) + assert.NoError(t, err) err = a.allocationCache.counter.Run(ctx, 0) - assert.Nil(t, err) + assert.NoError(t, err) gsa := &allocationv1.GameServerAllocation{ ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, @@ -1192,9 +1194,9 @@ func TestAllocatorListenAndBatchAllocate(t *testing.T) { defer cancel() err := a.allocationCache.syncCache() - assert.Nil(t, err) + assert.NoError(t, err) err = a.allocationCache.counter.Run(ctx, 0) - assert.Nil(t, err) + assert.NoError(t, err) gsa := &allocationv1.GameServerAllocation{ ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, @@ -1247,9 +1249,9 @@ func TestAllocatorListenAndBatchAllocate(t *testing.T) { defer cancel() err := a.allocationCache.syncCache() - assert.Nil(t, err) + assert.NoError(t, err) err = a.allocationCache.counter.Run(ctx, 0) - assert.Nil(t, err) + assert.NoError(t, err) gsa := &allocationv1.GameServerAllocation{ ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, @@ -1274,9 +1276,9 @@ func TestAllocatorListenAndBatchAllocate(t *testing.T) { defer cancel() err := a.allocationCache.syncCache() - assert.Nil(t, err) + assert.NoError(t, err) err = a.allocationCache.counter.Run(ctx, 0) - assert.Nil(t, err) + assert.NoError(t, err) gsa := &allocationv1.GameServerAllocation{ ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, @@ -1298,6 +1300,11 @@ func TestAllocatorListenAndBatchAllocate(t *testing.T) { // newFakeAllocator returns a fake allocator. func newFakeAllocator() (*Allocator, agtesting.Mocks) { + return newFakeAllocatorWithCustomBatchWaitTime(500 * time.Millisecond) +} + +// newFakeAllocatorWithCustomBatchWaitTime returns a fake allocator with a configurable batchWaitTime. +func newFakeAllocatorWithCustomBatchWaitTime(batchWaitTime time.Duration) (*Allocator, agtesting.Mocks) { m := agtesting.NewMocks() counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory) @@ -1309,8 +1316,94 @@ func newFakeAllocator() (*Allocator, agtesting.Mocks) { NewAllocationCache(m.AgonesInformerFactory.Agones().V1().GameServers(), counter, healthcheck.NewHandler()), time.Second, 5*time.Second, - 500*time.Millisecond) + batchWaitTime) a.recorder = m.FakeRecorder return a, m } + +func TestAllocatorAllocateNoQuickNoGameServerError(t *testing.T) { + t.Parallel() + + // TODO: remove when CountsAndLists and ProcessorAllocator feature flags are moved to stable. + runtime.FeatureTestMutex.Lock() + defer runtime.FeatureTestMutex.Unlock() + require.NoError(t, runtime.ParseFeatures( + fmt.Sprintf("%s=false&%s=true&%s=true", + runtime.FeaturePlayerAllocationFilter, + runtime.FeatureCountsAndLists, + runtime.FeatureProcessorAllocator))) + + a, m := newFakeAllocatorWithCustomBatchWaitTime(2 * time.Second) + + gs1 := agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, + Status: agonesv1.GameServerStatus{NodeName: "node1", State: agonesv1.GameServerStateAllocated, + Counters: map[string]agonesv1.CounterStatus{ + "capacity": { + Count: 1, + Capacity: 1000, + }, + }}} + gsList := []agonesv1.GameServer{gs1} + gsLen := len(gsList) + m.AgonesClient.AddReactor("list", "gameservers", func(_ k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &agonesv1.GameServerList{Items: gsList}, nil + }) + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + ua := action.(k8stesting.UpdateAction) + gs := ua.GetObject().(*agonesv1.GameServer) + return true, gs, nil + }) + + ctx, cancel := agtesting.StartInformers(m, a.allocationCache.gameServerSynced) + defer cancel() + + require.NoError(t, a.Run(ctx)) + require.Eventuallyf(t, func() bool { + return a.allocationCache.cache.Len() == gsLen + }, 10*time.Second, time.Second, fmt.Sprintf("should be %d items in the cache", gsLen)) + + ALLOCATED := agonesv1.GameServerStateAllocated + gsa := &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: allocationv1.GameServerAllocationSpec{ + Scheduling: apis.Packed, + Selectors: []allocationv1.GameServerSelector{{ + GameServerState: &ALLOCATED, + Counters: map[string]allocationv1.CounterSelector{ + "capacity": { + MinAvailable: 1, + }, + }, + }}, + }, + } + + var waitTest sync.WaitGroup + + waitTest.Add(1) + go func() { + defer waitTest.Done() + result1, err1 := a.Allocate(ctx, gsa.DeepCopy()) + require.NoError(t, err1) + require.NotNil(t, result1) + outGsa := result1.(*allocationv1.GameServerAllocation) + require.NotNil(t, outGsa) + require.Equal(t, allocationv1.GameServerAllocationAllocated, outGsa.Status.State) + require.Equal(t, gs1.ObjectMeta.Name, outGsa.Status.GameServerName) + }() + + waitTest.Add(1) + go func() { + defer waitTest.Done() + result2, err2 := a.Allocate(ctx, gsa.DeepCopy()) + require.NoError(t, err2) + require.NotNil(t, result2) + outGsa := result2.(*allocationv1.GameServerAllocation) + require.NotNil(t, outGsa) + require.Equal(t, allocationv1.GameServerAllocationAllocated, outGsa.Status.State) + require.Equal(t, gs1.ObjectMeta.Name, outGsa.Status.GameServerName) + }() + + waitTest.Wait() +} diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go index 64d8dc678d..4212d561bd 100644 --- a/pkg/gameserverallocations/batch_allocator.go +++ b/pkg/gameserverallocations/batch_allocator.go @@ -116,9 +116,9 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { newSortKey, err := gsa.SortKey() if err != nil { - c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec", err) + c.baseLogger.WithError(err).Warn("error getting sortKey for GameServerAllocationSpec") } - if sortKey == uint64(0) { + if sortKey == 0 { sortKey = newSortKey } @@ -207,14 +207,14 @@ func (c *Allocator) ListenAndBatchAllocate(ctx context.Context, updateWorkerCoun // applyAllocationToLocalGameServer patches the GameServer with allocation metadata and sets // it to Allocated state without persisting to Kubernetes. Counter/List actions are applied -// if FeatureCountsAndLists is enabled. Returns (applyErr, counterErrors, listErrors). -func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (error, error, error) { +// if FeatureCountsAndLists is enabled. +func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, gs *agonesv1.GameServer, gsa *allocationv1.GameServerAllocation) (applyErr, counterErrors, listErrors error) { ts, err := time.Now().MarshalText() if err != nil { return err, nil, nil } if gs.ObjectMeta.Annotations == nil { - gs.ObjectMeta.Annotations = make(map[string]string, 1) + gs.ObjectMeta.Annotations = make(map[string]string, 1+len(mp.Annotations)) } gs.ObjectMeta.Annotations[LastAllocatedAnnotationKey] = string(ts) gs.Status.State = agonesv1.GameServerStateAllocated @@ -228,15 +228,10 @@ func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, } } - if gs.ObjectMeta.Annotations == nil { - gs.ObjectMeta.Annotations = make(map[string]string, len(mp.Annotations)) - } for key, value := range mp.Annotations { gs.ObjectMeta.Annotations[key] = value } - var counterErrors error - var listErrors error if runtime.FeatureEnabled(runtime.FeatureCountsAndLists) { if gsa.Spec.Counters != nil { for counter, ca := range gsa.Spec.Counters { @@ -250,5 +245,5 @@ func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, } } - return nil, counterErrors, listErrors + return } From 32c71b945f75c60f0878d0e70d993986014b93b4 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 10 May 2026 18:19:24 -0400 Subject: [PATCH 3/4] feat: fix linter issue Signed-off-by: Thomas Lacroix --- pkg/gameserverallocations/batch_allocator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gameserverallocations/batch_allocator.go b/pkg/gameserverallocations/batch_allocator.go index 4212d561bd..3b6e7e93af 100644 --- a/pkg/gameserverallocations/batch_allocator.go +++ b/pkg/gameserverallocations/batch_allocator.go @@ -245,5 +245,5 @@ func (c *Allocator) applyAllocationToLocalGameServer(mp allocationv1.MetaPatch, } } - return + return nil, counterErrors, listErrors } From 3becc1d677645f72db26e990cdc6fbc9fff4507a Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Thu, 28 May 2026 19:42:25 -0400 Subject: [PATCH 4/4] feat: match previous behabiour from previous PR Signed-off-by: Thomas Lacroix --- pkg/gameserverallocations/allocator.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/gameserverallocations/allocator.go b/pkg/gameserverallocations/allocator.go index 2cfc4afa5a..9b3562fc44 100644 --- a/pkg/gameserverallocations/allocator.go +++ b/pkg/gameserverallocations/allocator.go @@ -730,8 +730,6 @@ func Retry(backoff wait.Backoff, fn func() error) error { switch { case err == nil: return true, nil - case err == ErrNoGameServer: - return true, err case err == ErrTotalTimeoutExceeded: return true, err default: