Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/gorilla/mux"
Expand All @@ -25,7 +27,10 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

const (
Expand All @@ -47,6 +52,10 @@ const (

var caFile = filepath.Join(os.TempDir(), "k8s-webhook-server", "client-ca", "ca.crt")

// leaderFlag indicates whether this process is the elected leader.
// Gate config mutation work on this to avoid concurrent writers.
var leaderFlag atomic.Bool

// tlsOpt option function applied to all webhook servers.
var tlsOpt = func(config *tls.Config) {
config.MinVersion = tls.VersionTLS12
Expand Down Expand Up @@ -85,6 +94,51 @@ func ListenAndServe(ctx context.Context, cfg *rest.Config, mcmEnabled bool) erro
return err
}

k8sClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return fmt.Errorf("failed to create kubernetes clientset: %w", err)
}
id, err := os.Hostname()
if err != nil || id == "" {
id = fmt.Sprintf("rancher-webhook-%d", time.Now().UnixNano())
}

lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "rancher-webhook-leader",
Namespace: namespace,
},
Client: k8sClient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}

go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
ReleaseOnCancel: true,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
leaderFlag.Store(true)
logrus.Infof("[%s] elected leader: will manage webhook configurations", id)
},
OnStoppedLeading: func() {
leaderFlag.Store(false)
logrus.Infof("[%s] lost leadership: will stop managing webhook configurations", id)
},
OnNewLeader: func(identity string) {
if identity == id {
logrus.Infof("[%s] I am the new leader", id)
} else {
logrus.Infof("[%s] observed new leader: %s", id, identity)
}
},
},
})

if err = listenAndServe(ctx, clients, validators, mutators); err != nil {
return err
}
Expand All @@ -111,6 +165,7 @@ func listenAndServe(ctx context.Context, clients *clients.Clients, validators []
router := mux.NewRouter()
errChecker := health.NewErrorChecker("Config Applied")
health.RegisterHealthCheckers(router, errChecker)
errChecker.Store(errors.New("webhook configuration not yet applied"))
router.Use(certAuth())

logrus.Debug("Creating Webhook routes")
Expand Down Expand Up @@ -178,14 +233,20 @@ type secretHandler struct {
}

// sync updates the validating admission configuration whenever the TLS cert changes.
// Only the elected leader performs the updates, followers are a no-op.
func (s *secretHandler) sync(_ string, secret *corev1.Secret) (*corev1.Secret, error) {
// The leader is responsible for applying the webhook configuration.
// Follower pods are only responsible for serving traffic and can be marked as healthy once the certificates are generated.
if !leaderFlag.Load() {
s.errChecker.Store(nil)
return nil, nil
}

if secret == nil || secret.Name != caName || secret.Namespace != namespace || len(secret.Data[corev1.TLSCertKey]) == 0 {
return nil, nil
}

logrus.Info("Sleeping for 15 seconds then applying webhook config")
// Sleep here to make sure server is listening and all caches are primed
time.Sleep(15 * time.Second)
logrus.Info("Applying webhook config")

validationClientConfig := v1.WebhookClientConfig{
Service: &v1.ServiceReference{
Expand Down
119 changes: 116 additions & 3 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package server

import (
"errors"
"testing"

"github.com/rancher/webhook/pkg/health"
"github.com/rancher/wrangler/v3/pkg/generic/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
v1 "k8s.io/api/admissionregistration/v1"
"k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
Expand All @@ -23,14 +26,14 @@ func TestSecretHandlerEnsureWebhookConfigurationCreate(t *testing.T) {

ctrl := gomock.NewController(t)
validatingController := fake.NewMockNonNamespacedClientInterface[*v1.ValidatingWebhookConfiguration, *v1.ValidatingWebhookConfigurationList](ctrl)
validatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, errors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "validatingwebhookconfiguration"}, configName)).Times(1)
validatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, apierrors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "validatingwebhookconfiguration"}, configName)).Times(1)
validatingController.EXPECT().Create(gomock.Any()).DoAndReturn(func(obj *v1.ValidatingWebhookConfiguration) (*v1.ValidatingWebhookConfiguration, error) {
storedValidatingConfig = obj.DeepCopy()
return obj, nil
}).Times(1)

mutatingController := fake.NewMockNonNamespacedClientInterface[*v1.MutatingWebhookConfiguration, *v1.MutatingWebhookConfigurationList](ctrl)
mutatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, errors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "mutatingwebhookconfiguration"}, configName)).Times(1)
mutatingController.EXPECT().Get(configName, gomock.Any()).Return(nil, apierrors.NewNotFound(schema.GroupResource{Group: v1.GroupName, Resource: "mutatingwebhookconfiguration"}, configName)).Times(1)
mutatingController.EXPECT().Create(gomock.Any()).DoAndReturn(func(obj *v1.MutatingWebhookConfiguration) (*v1.MutatingWebhookConfiguration, error) {
storedMutatingConfig = obj.DeepCopy()
return obj, nil
Expand Down Expand Up @@ -73,3 +76,113 @@ func TestSecretHandlerEnsureWebhookConfigurationCreate(t *testing.T) {
require.Len(t, storedMutatingConfig.Webhooks, 1)
assert.Equal(t, mutatingConfig.Webhooks[0].Name, storedMutatingConfig.Webhooks[0].Name)
}

// TestSyncLeaderLogic tests the logic within the sync function related to leader election.
func TestSyncLeaderLogic(t *testing.T) {
t.Parallel()
configName := "rancher.cattle.io"
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: caName,
Namespace: namespace,
},
Data: map[string][]byte{
corev1.TLSCertKey: []byte("cert-data"),
},
}
testErr := errors.New("test error")

tests := []struct {
name string
isLeader bool
getMutatingErr error
getValidatingErr error
createMutatingErr error
createValidatingErr error
expectedErr error
expectControllersRun bool
}{
{
name: "follower becomes healthy",
isLeader: false,
expectedErr: nil,
},
{
name: "leader becomes healthy on create",
isLeader: true,
getMutatingErr: apierrors.NewNotFound(schema.GroupResource{}, ""),
getValidatingErr: apierrors.NewNotFound(schema.GroupResource{}, ""),
expectedErr: nil,
expectControllersRun: true,
},
{
name: "leader becomes unhealthy on create error",
isLeader: true,
getValidatingErr: apierrors.NewNotFound(schema.GroupResource{}, ""),
createValidatingErr: testErr,
expectedErr: testErr,
expectControllersRun: true,
},
{
name: "leader becomes healthy on update",
isLeader: true,
expectedErr: nil,
expectControllersRun: true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctrl := gomock.NewController(t)
validatingController := fake.NewMockNonNamespacedClientInterface[*v1.ValidatingWebhookConfiguration, *v1.ValidatingWebhookConfigurationList](ctrl)
mutatingController := fake.NewMockNonNamespacedClientInterface[*v1.MutatingWebhookConfiguration, *v1.MutatingWebhookConfigurationList](ctrl)
errChecker := health.NewErrorChecker("test")
errChecker.Store(errors.New("initial error"))

handler := &secretHandler{
validatingController: validatingController,
mutatingController: mutatingController,
errChecker: errChecker,
}

leaderFlag.Store(tt.isLeader)

if tt.expectControllersRun {
validatingController.EXPECT().Get(configName, gomock.Any()).Return(&v1.ValidatingWebhookConfiguration{}, tt.getValidatingErr).Times(1)
if apierrors.IsNotFound(tt.getValidatingErr) {
validatingController.EXPECT().Create(gomock.Any()).Return(&v1.ValidatingWebhookConfiguration{}, tt.createValidatingErr).Times(1)
} else if tt.getValidatingErr == nil {
validatingController.EXPECT().Update(gomock.Any()).Return(&v1.ValidatingWebhookConfiguration{}, nil).Times(1)
}

// Only expect calls to the mutating controller if the validating part is expected to succeed.
if (tt.getValidatingErr == nil || apierrors.IsNotFound(tt.getValidatingErr)) && tt.createValidatingErr == nil {
mutatingController.EXPECT().Get(configName, gomock.Any()).Return(&v1.MutatingWebhookConfiguration{}, tt.getMutatingErr).Times(1)
if apierrors.IsNotFound(tt.getMutatingErr) {
mutatingController.EXPECT().Create(gomock.Any()).Return(&v1.MutatingWebhookConfiguration{}, tt.createMutatingErr).Times(1)
} else if tt.getMutatingErr == nil {
mutatingController.EXPECT().Update(gomock.Any()).Return(&v1.MutatingWebhookConfiguration{}, nil).Times(1)
}
}
}

_, err := handler.sync("test-sync", secret)

// The only error we might get is a transient one from ensureWebhookConfiguration.
if tt.expectedErr != nil {
assert.ErrorContains(t, err, tt.expectedErr.Error(), "expected an error when ensuring webhook config")
} else {
require.NoError(t, err)
}

healthErr := errChecker.Check(nil)
if tt.expectedErr == nil {
assert.NoError(t, healthErr, "expected pod to be healthy")
} else {
assert.Error(t, healthErr, "expected pod to be unhealthy")
}
})
}
}
Loading