diff --git a/pkg/server/server.go b/pkg/server/server.go index d09efc69b..f41cc0974 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -5,12 +5,14 @@ import ( "context" "crypto/tls" "crypto/x509" + "errors" "fmt" "net/http" "os" "path/filepath" "strconv" "strings" + "sync/atomic" "time" "github.com/gorilla/mux" @@ -25,7 +27,10 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" ) const ( @@ -47,6 +52,10 @@ const ( var caFile = filepath.Join(os.TempDir(), "k8s-webhook-server", "client-ca", "ca.crt") +// leaderFlag indicates whether this process is the elected leader. +// Gate config mutation work on this to avoid concurrent writers. +var leaderFlag atomic.Bool + // tlsOpt option function applied to all webhook servers. var tlsOpt = func(config *tls.Config) { config.MinVersion = tls.VersionTLS12 @@ -85,6 +94,51 @@ func ListenAndServe(ctx context.Context, cfg *rest.Config, mcmEnabled bool) erro return err } + k8sClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return fmt.Errorf("failed to create kubernetes clientset: %w", err) + } + id, err := os.Hostname() + if err != nil || id == "" { + id = fmt.Sprintf("rancher-webhook-%d", time.Now().UnixNano()) + } + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: "rancher-webhook-leader", + Namespace: namespace, + }, + Client: k8sClient.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + }, + } + + go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + ReleaseOnCancel: true, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(_ context.Context) { + leaderFlag.Store(true) + logrus.Infof("[%s] elected leader: will manage webhook configurations", id) + }, + OnStoppedLeading: func() { + leaderFlag.Store(false) + logrus.Infof("[%s] lost leadership: will stop managing webhook configurations", id) + }, + OnNewLeader: func(identity string) { + if identity == id { + logrus.Infof("[%s] I am the new leader", id) + } else { + logrus.Infof("[%s] observed new leader: %s", id, identity) + } + }, + }, + }) + if err = listenAndServe(ctx, clients, validators, mutators); err != nil { return err } @@ -111,6 +165,7 @@ func listenAndServe(ctx context.Context, clients *clients.Clients, validators [] router := mux.NewRouter() errChecker := health.NewErrorChecker("Config Applied") health.RegisterHealthCheckers(router, errChecker) + errChecker.Store(errors.New("webhook configuration not yet applied")) router.Use(certAuth()) logrus.Debug("Creating Webhook routes") @@ -178,14 +233,20 @@ type secretHandler struct { } // sync updates the validating admission configuration whenever the TLS cert changes. +// Only the elected leader performs the updates, followers are a no-op. func (s *secretHandler) sync(_ string, secret *corev1.Secret) (*corev1.Secret, error) { + // The leader is responsible for applying the webhook configuration. + // Follower pods are only responsible for serving traffic and can be marked as healthy once the certificates are generated. + if !leaderFlag.Load() { + s.errChecker.Store(nil) + return nil, nil + } + if secret == nil || secret.Name != caName || secret.Namespace != namespace || len(secret.Data[corev1.TLSCertKey]) == 0 { return nil, nil } - logrus.Info("Sleeping for 15 seconds then applying webhook config") - // Sleep here to make sure server is listening and all caches are primed - time.Sleep(15 * time.Second) + logrus.Info("Applying webhook config") validationClientConfig := v1.WebhookClientConfig{ Service: &v1.ServiceReference{ diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 3a0eb72ad..cfb2e73ce 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -1,14 +1,17 @@ package server import ( + "errors" "testing" + "github.com/rancher/webhook/pkg/health" "github.com/rancher/wrangler/v3/pkg/generic/fake" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" v1 "k8s.io/api/admissionregistration/v1" - "k8s.io/apimachinery/pkg/api/errors" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -23,14 +26,14 @@ func TestSecretHandlerEnsureWebhookConfigurationCreate(t *testing.T) { ctrl := gomock.NewController(t) validatingController := fake.NewMockNonNamespacedClientInterface[*v1.ValidatingWebhookConfiguration, *v1.ValidatingWebhookConfigurationList](ctrl) - validatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, errors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "validatingwebhookconfiguration"}, configName)).Times(1) + validatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, apierrors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "validatingwebhookconfiguration"}, configName)).Times(1) validatingController.EXPECT().Create(gomock.Any()).DoAndReturn(func(obj *v1.ValidatingWebhookConfiguration) (*v1.ValidatingWebhookConfiguration, error) { storedValidatingConfig = obj.DeepCopy() return obj, nil }).Times(1) mutatingController := fake.NewMockNonNamespacedClientInterface[*v1.MutatingWebhookConfiguration, *v1.MutatingWebhookConfigurationList](ctrl) - mutatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, errors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "mutatingwebhookconfiguration"}, configName)).Times(1) + mutatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, apierrors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "mutatingwebhookconfiguration"}, configName)).Times(1) mutatingController.EXPECT().Create(gomock.Any()).DoAndReturn(func(obj *v1.MutatingWebhookConfiguration) (*v1.MutatingWebhookConfiguration, error) { storedMutatingConfig = obj.DeepCopy() return obj, nil @@ -73,3 +76,113 @@ func TestSecretHandlerEnsureWebhookConfigurationCreate(t *testing.T) { require.Len(t, storedMutatingConfig.Webhooks, 1) assert.Equal(t, mutatingConfig.Webhooks[0].Name, storedMutatingConfig.Webhooks[0].Name) } + +// TestSyncLeaderLogic tests the logic within the sync function related to leader election. +func TestSyncLeaderLogic(t *testing.T) { + t.Parallel() + configName := "rancher.cattle.io" + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: caName, + Namespace: namespace, + }, + Data: map[string][]byte{ + corev1.TLSCertKey: []byte("cert-data"), + }, + } + testErr := errors.New("test error") + + tests := []struct { + name string + isLeader bool + getMutatingErr error + getValidatingErr error + createMutatingErr error + createValidatingErr error + expectedErr error + expectControllersRun bool + }{ + { + name: "follower becomes healthy", + isLeader: false, + expectedErr: nil, + }, + { + name: "leader becomes healthy on create", + isLeader: true, + getMutatingErr: apierrors.NewNotFound(schema.GroupResource{}, ""), + getValidatingErr: apierrors.NewNotFound(schema.GroupResource{}, ""), + expectedErr: nil, + expectControllersRun: true, + }, + { + name: "leader becomes unhealthy on create error", + isLeader: true, + getValidatingErr: apierrors.NewNotFound(schema.GroupResource{}, ""), + createValidatingErr: testErr, + expectedErr: testErr, + expectControllersRun: true, + }, + { + name: "leader becomes healthy on update", + isLeader: true, + expectedErr: nil, + expectControllersRun: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctrl := gomock.NewController(t) + validatingController := fake.NewMockNonNamespacedClientInterface[*v1.ValidatingWebhookConfiguration, *v1.ValidatingWebhookConfigurationList](ctrl) + mutatingController := fake.NewMockNonNamespacedClientInterface[*v1.MutatingWebhookConfiguration, *v1.MutatingWebhookConfigurationList](ctrl) + errChecker := health.NewErrorChecker("test") + errChecker.Store(errors.New("initial error")) + + handler := &secretHandler{ + validatingController: validatingController, + mutatingController: mutatingController, + errChecker: errChecker, + } + + leaderFlag.Store(tt.isLeader) + + if tt.expectControllersRun { + validatingController.EXPECT().Get(configName, gomock.Any()).Return(&v1.ValidatingWebhookConfiguration{}, tt.getValidatingErr).Times(1) + if apierrors.IsNotFound(tt.getValidatingErr) { + validatingController.EXPECT().Create(gomock.Any()).Return(&v1.ValidatingWebhookConfiguration{}, tt.createValidatingErr).Times(1) + } else if tt.getValidatingErr == nil { + validatingController.EXPECT().Update(gomock.Any()).Return(&v1.ValidatingWebhookConfiguration{}, nil).Times(1) + } + + // Only expect calls to the mutating controller if the validating part is expected to succeed. + if (tt.getValidatingErr == nil || apierrors.IsNotFound(tt.getValidatingErr)) && tt.createValidatingErr == nil { + mutatingController.EXPECT().Get(configName, gomock.Any()).Return(&v1.MutatingWebhookConfiguration{}, tt.getMutatingErr).Times(1) + if apierrors.IsNotFound(tt.getMutatingErr) { + mutatingController.EXPECT().Create(gomock.Any()).Return(&v1.MutatingWebhookConfiguration{}, tt.createMutatingErr).Times(1) + } else if tt.getMutatingErr == nil { + mutatingController.EXPECT().Update(gomock.Any()).Return(&v1.MutatingWebhookConfiguration{}, nil).Times(1) + } + } + } + + _, err := handler.sync("test-sync", secret) + + // The only error we might get is a transient one from ensureWebhookConfiguration. + if tt.expectedErr != nil { + assert.ErrorContains(t, err, tt.expectedErr.Error(), "expected an error when ensuring webhook config") + } else { + require.NoError(t, err) + } + + healthErr := errChecker.Check(nil) + if tt.expectedErr == nil { + assert.NoError(t, healthErr, "expected pod to be healthy") + } else { + assert.Error(t, healthErr, "expected pod to be unhealthy") + } + }) + } +}