From f7e74ea052b9745fde0c6617bcfc7de1350ca0f6 Mon Sep 17 00:00:00 2001 From: Max Froehlich Date: Fri, 17 Oct 2025 21:38:06 +0200 Subject: [PATCH] handle running scheduler but not yet in shouldRun state --- pkg/scheduler/scheduler.go | 4 +++ pkg/scheduler/scheduler_test.go | 58 ++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 74c03bf31021b..0e884d91ad49a 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -250,6 +250,10 @@ func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_Front if err := frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}); err != nil { return err } + } else if s.State() == services.Running && !s.shouldRun.Load() { + // Scheduler is "RUNNING" should not run (yet) + level.Info(s.log).Log("msg", "scheduler is not in ReplicationSet, sending ERROR so frontend can try another scheduler", "frontend", frontendAddress) + return frontend.Send(&schedulerpb.SchedulerToFrontend{Status: schedulerpb.ERROR}) } // We stop accepting new queries in Stopping state. By returning quickly, we disconnect frontends, which in turns diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 939aa2b18bb93..2412e65ce8370 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -6,10 +6,12 @@ import ( "testing" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" "github.com/grafana/loki/v3/pkg/scheduler/schedulerpb" @@ -63,6 +65,56 @@ func TestScheduler_setRunState(t *testing.T) { s.setRunState(false) assert.Nil(t, mock.msg) +} +func TestFrontendConnectsToRunningSchedulerButBeforeShouldRun(t *testing.T) { + + // This test is even a bit bit cruder than the one above as we inject a noop BaseService + // to have a way to transition into the RUNNING state. + // This scheduler starts with no frontends connected + + s := Scheduler{ + log: util_log.Logger, + schedulerRunning: promauto.With(nil).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_running", + Help: "Value will be 1 if the scheduler is in the ReplicationSet and actively receiving/processing requests", + }), + Service: services.NewBasicService(func(serviceContext context.Context) error { + return nil + }, func(serviceContext context.Context) error { + <-serviceContext.Done() + return serviceContext.Err() + }, func(failureCase error) error { + return nil + }), + } + require.NoError(t, s.StartAsync(t.Context())) + require.NoError(t, s.AwaitRunning(t.Context())) + require.Equal(t, services.Running, s.State()) + mock := &mockSchedulerForFrontendFrontendLoopServer{ + recvFn: func() (*schedulerpb.FrontendToScheduler, error) { + return &schedulerpb.FrontendToScheduler{ + Type: schedulerpb.INIT, + FrontendAddress: "127.0.0.1:9095", + }, nil + }, + } + s.connectedFrontends = map[string]*connectedFrontend{} + + // not_running, shouldRun == false + assert.False(t, s.shouldRun.Load()) + + err := s.FrontendLoop(mock) + assert.NoError(t, err) + + // not_running -> running, shouldRun == true + // to simulate last "setRunState(true)" happening after FrontendLoop started + s.setRunState(true) + assert.True(t, s.shouldRun.Load()) + + // Now we expect the scheduler to have sent a ERROR message to the frontend + // so the frontend will retry connecting now that the scheduler and is not waiting for an INIT response + assert.Equal(t, schedulerpb.ERROR, mock.msg.Status) + } func TestProtobufBackwardsCompatibility(t *testing.T) { @@ -114,7 +166,8 @@ func TestProtobufBackwardsCompatibility(t *testing.T) { } type mockSchedulerForFrontendFrontendLoopServer struct { - msg *schedulerpb.SchedulerToFrontend + msg *schedulerpb.SchedulerToFrontend + recvFn func() (*schedulerpb.FrontendToScheduler, error) } func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb.SchedulerToFrontend) error { @@ -123,6 +176,9 @@ func (m *mockSchedulerForFrontendFrontendLoopServer) Send(frontend *schedulerpb. } func (m mockSchedulerForFrontendFrontendLoopServer) Recv() (*schedulerpb.FrontendToScheduler, error) { + if m.recvFn != nil { + return m.recvFn() + } panic("implement me") }