diff --git a/cmd/redisoperator/main.go b/cmd/redisoperator/main.go index 4137161cc..ba1232fb4 100644 --- a/cmd/redisoperator/main.go +++ b/cmd/redisoperator/main.go @@ -79,7 +79,7 @@ func (m *Main) Run() error { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.EnableObjectHashing) // Create the redis clients redisClient := redis.New(metricsRecorder) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 86927ed09..db7e1f081 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -22,6 +22,7 @@ type CMDFlags struct { K8sQueriesBurstable int Concurrency int LogLevel string + EnableObjectHashing bool } // Init initializes and parse the flags @@ -39,6 +40,7 @@ func (c *CMDFlags) Init() { // reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89 flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") flag.StringVar(&c.LogLevel, "log-level", "info", "set log level") + flag.BoolVar(&c.EnableObjectHashing, "enable-hash", false, "Add hashed annotations to k8s objects, apply changes only when theres a diff.") // Parse flags flag.Parse() diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 1b6fc1424..7c4837f4f 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -76,6 +76,14 @@ func (p *ConfigMapService) CreateOrUpdateConfigMap(namespace string, configMap * return err } + if hashingEnabled() { + if !shouldUpdate(configMap, storedConfigMap) { + p.logger.Debugf("%v/%v configmap is upto date, no need to apply changes...", configMap.Namespace, configMap.Name) + return nil + } + p.logger.Debugf("%v/%v configmap has a different resource hash, updating the object...", configMap.Namespace, configMap.Name) + addHashAnnotation(configMap) + } // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 46d63bb93..a33b47426 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -101,6 +101,15 @@ func (d *DeploymentService) CreateOrUpdateDeployment(namespace string, deploymen return err } + if hashingEnabled() { + if !shouldUpdate(deployment, storedDeployment) { + d.logger.Debugf("%v/%v deployment is upto date, no need to apply changes...", deployment.Namespace, deployment.Name) + return nil + } + d.logger.Debugf("%v/%v deployment has a different resource hash, updating the object...", deployment.Namespace, deployment.Name) + addHashAnnotation(deployment) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/hash_annotations.go b/service/k8s/hash_annotations.go new file mode 100644 index 000000000..b3449d7ec --- /dev/null +++ b/service/k8s/hash_annotations.go @@ -0,0 +1,64 @@ +package k8s + +import ( + "crypto/sha256" + "encoding/base64" + "hash" + + "github.com/davecgh/go-spew/spew" +) + +// taken from https://github.com/k8ssandra/cass-operator/blob/master/pkg/utils/hash_annotation.go + +type Annotated interface { + GetAnnotations() map[string]string + SetAnnotations(annotations map[string]string) + GetName() string +} + +const resourceHashAnnotationKey = "databases.spotahome.com/resource-hash" + +// Create hash of a given object + +func addHashAnnotation(r Annotated) { + hash := deepHashString(r) + m := r.GetAnnotations() + if m == nil { + m = map[string]string{} + } + m[resourceHashAnnotationKey] = hash + r.SetAnnotations(m) +} + +func deepHashString(obj interface{}) string { + hasher := sha256.New() + deepHashObject(hasher, obj) + hashBytes := hasher.Sum([]byte{}) + b64Hash := base64.StdEncoding.EncodeToString(hashBytes) + return b64Hash +} + +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func deepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +} + +func shouldUpdate(desired Annotated, stored Annotated) bool { + + storedHash, exists := stored.GetAnnotations()[resourceHashAnnotationKey] + if !exists { + return true + } + desiredHash := deepHashString(desired) + + return desiredHash != storedHash +} diff --git a/service/k8s/hash_annotations_test.go b/service/k8s/hash_annotations_test.go new file mode 100644 index 000000000..7a970ef4c --- /dev/null +++ b/service/k8s/hash_annotations_test.go @@ -0,0 +1,62 @@ +package k8s + +import "testing" + +// create test for addHashAnnotation + +// create a dummy struct that implements Annotated interface +type dummy struct { + annotations map[string]string + name string +} + +func (d *dummy) GetAnnotations() map[string]string { + return d.annotations +} + +func (d *dummy) SetAnnotations(annotations map[string]string) { + d.annotations = annotations +} + +func (d *dummy) GetName() string { + return d.name +} + +func TestAddHashAnnotation(t *testing.T) { + originalObject := &dummy{name: "test"} + copyOfOriginalObject := &dummy{name: "test"} + differentObject := &dummy{name: "test2"} + + addHashAnnotation(originalObject) + + tests := []struct { + name string + object Annotated + errorMessage string + expected bool + }{ + { + name: "Hashes of same object should be equal", + object: copyOfOriginalObject, + errorMessage: "Hashes of same object should be equal", + expected: true, + }, + { + name: "Hashes of different objects should not be equal", + object: differentObject, + errorMessage: "Hashes of different objects should not be equal", + expected: false, + }, + } + for _, test := range tests { + addHashAnnotation(test.object) + hash := test.object.GetAnnotations()[resourceHashAnnotationKey] + if hash == "" { + t.Errorf("Hash not created") + } + equal := hash == originalObject.GetAnnotations()[resourceHashAnnotationKey] + if equal != test.expected { + t.Error(test.errorMessage) + } + } +} diff --git a/service/k8s/k8s.go b/service/k8s/k8s.go index b6e68ae44..f0c70842d 100644 --- a/service/k8s/k8s.go +++ b/service/k8s/k8s.go @@ -22,6 +22,14 @@ type Services interface { StatefulSet } +var ( + objectHashingEnabled bool +) + +func hashingEnabled() bool { + return objectHashingEnabled +} + type services struct { ConfigMap Secret @@ -35,7 +43,8 @@ type services struct { } // New returns a new Kubernetes service. -func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services { +func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, enableHashing bool) Services { + objectHashingEnabled = enableHashing return &services{ ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder), Secret: NewSecretService(kubecli, logger, metricsRecorder), diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index 48350bc43..98b4f8763 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -77,6 +77,15 @@ func (p *PodDisruptionBudgetService) CreateOrUpdatePodDisruptionBudget(namespace return err } + if hashingEnabled() { + if !shouldUpdate(podDisruptionBudget, storedPodDisruptionBudget) { + p.logger.Debugf("%v/%v pdb is upto date, no need to apply changes...", podDisruptionBudget.Namespace, podDisruptionBudget.Name) + return nil + } + p.logger.Debugf("%v/%v pdb has a different resource hash, updating the object...", podDisruptionBudget.Namespace, podDisruptionBudget.Name) + addHashAnnotation(podDisruptionBudget) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index a5534b445..0120e9444 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -100,6 +100,14 @@ func (r *RBACService) CreateOrUpdateRole(namespace string, role *rbacv1.Role) er return err } + if hashingEnabled() { + if !shouldUpdate(role, storedRole) { + r.logger.Debugf("%v/%v role is upto date, no need to apply changes...", role.Namespace, role.Name) + return nil + } + r.logger.Debugf("%v/%v role has a different resource hash, updating the object...", role.Namespace, role.Name) + addHashAnnotation(role) + } // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), @@ -148,6 +156,15 @@ func (r *RBACService) CreateOrUpdateRoleBinding(namespace string, binding *rbacv return err } + if hashingEnabled() { + if !shouldUpdate(binding, storedBinding) { + r.logger.Debugf("%v/%v rolebinding is upto date, no need to apply changes...", binding.Namespace, binding.Name) + return nil + } + r.logger.Debugf("%v/%v rolebinding has a different resource hash, updating the object...", binding.Namespace, binding.Name) + addHashAnnotation(binding) + } + // Check if the role ref has changed, roleref updates are not allowed, if changed then delete and create again the role binding. // https://github.com/kubernetes/kubernetes/blob/0f0a5223dfc75337d03c9b80ae552ae8ef138eeb/pkg/apis/rbac/validation/validation.go#L157-L159 if storedBinding.RoleRef != binding.RoleRef { diff --git a/service/k8s/service.go b/service/k8s/service.go index 712cc4c0d..0959c9203 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -90,6 +90,15 @@ func (s *ServiceService) CreateOrUpdateService(namespace string, service *corev1 return err } + if hashingEnabled() { + if !shouldUpdate(service, storedService) { + s.logger.Debugf("%v/%v service is upto date, no need to apply changes...", service.Namespace, service.Name) + return nil + } + s.logger.Debugf("%v/%v service has a different resource hash, updating the object...", service.Namespace, service.Name) + addHashAnnotation(service) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 38cc95ff2..921d80ea5 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -97,19 +97,15 @@ func (s *StatefulSetService) UpdateStatefulSet(namespace string, statefulSet *ap // CreateOrUpdateStatefulSet will update the statefulset or create it if does not exist func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefulSet *appsv1.StatefulSet) error { storedStatefulSet, err := s.GetStatefulSet(namespace, statefulSet.Name) + if err != nil { // If no resource we need to create. if errors.IsNotFound(err) { + addHashAnnotation(statefulSet) return s.CreateStatefulSet(namespace, statefulSet) } return err } - - // Already exists, need to Update. - // Set the correct resource version to ensure we are on the latest version. This way the only valid - // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), - // we will replace the current namespace state. - statefulSet.ResourceVersion = storedStatefulSet.ResourceVersion // resize pvc // 1.Get the data already stored internally // 2.Get the desired data @@ -171,6 +167,17 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu // set stored.volumeClaimTemplates statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations) + + if hashingEnabled() { + delete(statefulSet.Annotations, resourceHashAnnotationKey) // this will be regenerated if changes are required. + if !shouldUpdate(statefulSet, storedStatefulSet) { + s.logger.Debugf("%v/%v statefulset is upto date, no need to apply changes...", statefulSet.Namespace, statefulSet.Name) + return nil + } + s.logger.Debugf("%v/%v statefulset has a different resource hash, updating the object...", statefulSet.Namespace, statefulSet.Name) + addHashAnnotation(statefulSet) + } + return s.UpdateStatefulSet(namespace, statefulSet) } diff --git a/test/integration/redisfailover/creation_test.go b/test/integration/redisfailover/creation_test.go index 5458634ba..c63af3f64 100644 --- a/test/integration/redisfailover/creation_test.go +++ b/test/integration/redisfailover/creation_test.go @@ -94,7 +94,7 @@ func TestRedisFailover(t *testing.T) { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy, true) // Prepare namespace prepErr := clients.prepareNS()