diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index f5dfad5..90f7c5f 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -73,6 +73,46 @@ type ValkeyClusterSpec struct { // +kubebuilder:default:={enabled:true} // +optional Exporter ExporterSpec `json:"exporter,omitempty"` + + // Options, or config which are specific to Valkey server + // +optional + ValkeySpec ValkeySpec `json:"valkeyConfig,omitempty"` +} + +// ValkeySpec defines any options, or configuration that is specific to valkey-server +type ValkeySpec struct { + + // Auth-related structure for handling users, and acls + Auth AuthSpec `json:"auth,omitempty"` +} + +// AuthSpec contains authorization, user, and ACL-related configurations +type AuthSpec struct { + + // Users ACL Secret + // A reference name to a Secret containing raw user:permission entries, which will be processed first + // Defaults to clusterName-secret + // +optional + UsersSecretRef string `json:"usersSecretRef,omitempty"` + + // Array of users with raw ACL, or reference to a key in the UsersSecretRef + Users map[string]UserAcl `json:"users,omitempty"` +} + +type UserAcl struct { + + // Raw ACL line, including password + Permissions string `json:"permissions,omitempty"` + + // sha256 password + // kubebuilder:validation:MinLength=64 + // kubebuilder:validation:MaxLength=65 + // kubebuilder:XValidation:message="Password should be a sha256 hash" + Password string `json:"password,omitempty"` + + // Reference to a key in UsersSecretRef containing the password for this user + // Defaults to the username + PasswordKeyRef string `json:"passwordKeyRef,omitempty"` } type ExporterSpec struct { @@ -154,6 +194,7 @@ const ( ReasonSlotsUnassigned = "SlotsUnassigned" ReasonPrimaryLost = "PrimaryLost" ReasonNoSlots = "NoSlotsAvailable" + ReasonUsersAclError = "UsersACLError" ) // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b239c97..d0e32e6 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -26,6 +26,28 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AuthSpec) DeepCopyInto(out *AuthSpec) { + *out = *in + if in.Users != nil { + in, out := &in.Users, &out.Users + *out = make(map[string]UserAcl, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AuthSpec. +func (in *AuthSpec) DeepCopy() *AuthSpec { + if in == nil { + return nil + } + out := new(AuthSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExporterSpec) DeepCopyInto(out *ExporterSpec) { *out = *in @@ -42,6 +64,21 @@ func (in *ExporterSpec) DeepCopy() *ExporterSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UserAcl) DeepCopyInto(out *UserAcl) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserAcl. +func (in *UserAcl) DeepCopy() *UserAcl { + if in == nil { + return nil + } + out := new(UserAcl) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ValkeyCluster) DeepCopyInto(out *ValkeyCluster) { *out = *in @@ -125,6 +162,7 @@ func (in *ValkeyClusterSpec) DeepCopyInto(out *ValkeyClusterSpec) { (*in).DeepCopyInto(*out) } in.Exporter.DeepCopyInto(&out.Exporter) + in.ValkeySpec.DeepCopyInto(&out.ValkeySpec) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeyClusterSpec. @@ -158,3 +196,19 @@ func (in *ValkeyClusterStatus) DeepCopy() *ValkeyClusterStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ValkeySpec) DeepCopyInto(out *ValkeySpec) { + *out = *in + in.Auth.DeepCopyInto(&out.Auth) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeySpec. +func (in *ValkeySpec) DeepCopy() *ValkeySpec { + if in == nil { + return nil + } + out := new(ValkeySpec) + in.DeepCopyInto(out) + return out +} diff --git a/config/crd/bases/valkey.io_valkeyclusters.yaml b/config/crd/bases/valkey.io_valkeyclusters.yaml index f9d1cd9..20c7cc6 100644 --- a/config/crd/bases/valkey.io_valkeyclusters.yaml +++ b/config/crd/bases/valkey.io_valkeyclusters.yaml @@ -1163,6 +1163,42 @@ spec: type: string type: object type: array + valkeyConfig: + description: Options, or config which are specific to Valkey server + properties: + auth: + description: Auth-related structure for handling users, and acls + properties: + users: + additionalProperties: + properties: + password: + description: |- + sha256 password + kubebuilder:validation:MinLength=64 + kubebuilder:validation:MaxLength=65 + kubebuilder:XValidation:message="Password should be a sha256 hash" + type: string + passwordKeyRef: + description: |- + Reference to a key in UsersSecretRef containing the password for this user + Defaults to the username + type: string + permissions: + description: Raw ACL line, including password + type: string + type: object + description: Array of users with raw ACL, or reference to + a key in the UsersSecretRef + type: object + usersSecretRef: + description: |- + Users ACL Secret + A reference name to a Secret containing raw user:permission entries, which will be processed first + Defaults to clusterName-secret + type: string + type: object + type: object type: object status: default: diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 223bc9c..effaa8f 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -8,6 +8,7 @@ rules: - "" resources: - configmaps + - secrets - services verbs: - create diff --git a/config/samples/v1alpha1_valkeycluster.yaml b/config/samples/v1alpha1_valkeycluster.yaml index 540a4d1..863d809 100644 --- a/config/samples/v1alpha1_valkeycluster.yaml +++ b/config/samples/v1alpha1_valkeycluster.yaml @@ -1,3 +1,12 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: valkeycluster-sample-users +data: + alicepw: M21wdHlQQHNzdzByZA== + bob: YjFjMzY4OTZkMzk5MjMwNDU5NTVjMzczZjM2NWVkNjg2Y2I5N2RiNTA1NGE0NTJjN2EzZmQ5MjAwZTMzNGYxZAo= +--- apiVersion: valkey.io/v1alpha1 kind: ValkeyCluster metadata: @@ -5,6 +14,20 @@ metadata: spec: shards: 3 replicas: 1 + valkeyConfig: + auth: + usersSecretRef: valkeycluster-sample-users + users: + alice: + permissions: +@list +@connection ~jobs:* + passwordKeyRef: alicepw + bob: + permissions: ~* +@all + charlie: + permissions: -@all +get ~secretkey ~valkey:* + password: sup3rS3cr3t + david: + permissions: -@all +@admin resources: requests: memory: "256Mi" diff --git a/internal/controller/deployment.go b/internal/controller/deployment.go index 7da2e66..041137f 100644 --- a/internal/controller/deployment.go +++ b/internal/controller/deployment.go @@ -89,6 +89,11 @@ func generateContainersDef(cluster *valkeyiov1alpha1.ValkeyCluster) []corev1.Con MountPath: "/config", ReadOnly: true, }, + { + Name: "users-acl", + MountPath: "/config/users", + ReadOnly: true, + }, }, }, } @@ -141,6 +146,14 @@ func createClusterDeployment(cluster *valkeyiov1alpha1.ValkeyCluster) *appsv1.De }, }, }, + { + Name: "users-acl", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: getInternalSecretName(cluster.Name), + }, + }, + }, }, }, }, diff --git a/internal/controller/users.go b/internal/controller/users.go new file mode 100644 index 0000000..173190b --- /dev/null +++ b/internal/controller/users.go @@ -0,0 +1,284 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "crypto/sha256" + "fmt" + "sort" + "strings" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" +) + +const ( + hashLength = 64 + hashAnnotationKey = "valkey.io/internal-acl-hash" + internalRefByKey = "valkey.io/internal-acl-ref-by" + instanceLabel = "app.kubernetes.io/instance" +) + +func getInternalSecretName(cn string) string { + return cn + "-acl" +} + +func getDefaultSecretName(cn string) string { + return cn + "-secret" +} + +// When a Secret is updated, Watch() calls this function to discover +// which object should be reconciled +func (r *ValkeyClusterReconciler) findReferencedSecrets(ctx context.Context, secret client.Object) []reconcile.Request { + + log := logf.FromContext(ctx) + secretName := secret.GetName() // the Secret that was updated + + // List all Secrets, filtered by our reference label. + // This should return a list of "internal secrets" pointing to the updated Secret + internalSecretsList := &corev1.SecretList{} + if err := r.List(ctx, internalSecretsList, + client.InNamespace(secret.GetNamespace()), + client.MatchingLabels{ + internalRefByKey: secretName, + }, + ); err != nil { + log.Error(err, "failed to list referenced secrets") + return []reconcile.Request{} + } + + requests := []reconcile.Request{} + + // Take our list of internal secrets, and get the controlling cluster name. + // Return a list of clusters to be reconciled. + for _, s := range internalSecretsList.Items { + + clusterName := s.GetLabels()[instanceLabel] + if clusterName == "" { + log.Error(nil, "empty app-instance label from internal secret", "secretname", s.GetName()) + continue + } + + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: clusterName, + Namespace: s.GetNamespace(), + }, + }) + } + + return requests +} + +func (r *ValkeyClusterReconciler) reconcileUsersAcl(ctx context.Context, cluster *valkeyiov1alpha1.ValkeyCluster) error { + + log := logf.FromContext(ctx) + + // Shortcut + auth := &cluster.Spec.ValkeySpec.Auth + + // Look for a Secret matching the user-provided name, or clusterName-secret + userSecretName := getDefaultSecretName(cluster.Name) + if auth.UsersSecretRef != "" { + userSecretName = auth.UsersSecretRef + } + log.V(2).Info("usersAcl secret", "userSecretName", userSecretName) + + // Query API for user-provided secret + userSecrets := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{ + Name: userSecretName, + Namespace: cluster.Namespace, + }, userSecrets); err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "failed to fetch acl secret") + return err + } + log.V(1).Info("Users secret not found", "userSecretName", userSecretName) + } + + // Query API for the internal secrets object + internalSecretName := getInternalSecretName(cluster.Name) + needCreateInternal := false + + internalAclSecrets := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{ + Name: internalSecretName, + Namespace: cluster.Namespace, + }, internalAclSecrets); err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "failed to fetch internal acl secret") + return err + } + + // Internal secret was not found. Add metadata to the empty object + needCreateInternal = true + log.V(2).Info("creating internal secret", "secretName", internalSecretName) + + internalAclSecrets.Data = make(map[string][]byte) + internalAclSecrets.ObjectMeta = metav1.ObjectMeta{ + Name: internalSecretName, + Namespace: cluster.Namespace, + Labels: labels(cluster, map[string]string{ + internalRefByKey: userSecretName, + }), + } + } + + // Register ownership of the internal Secret + if err := controllerutil.SetControllerReference(cluster, internalAclSecrets, r.Scheme); err != nil { + log.Error(err, "Failed to grab ownership of internal secret") + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "InternalSecretsCreationFailed", "Failed to grab ownership of internal secret: %v", err) + return err + } + + // Build the users.acl file contents from the users in the Spec, and secrets reference + aclFileContents := buildAclFileContents(ctx, auth.Users, userSecrets) + + // Calculate hash of the ACL file contents + internalAclHash := fmt.Sprintf("%x", sha256.Sum256(aclFileContents)) + + // Compare hash to the one already attached to the internal secret, if present. + // If the hashes are different, then we need to update the internal secret with + // the new file contents and update the hash annotation. If the hashes are the + // same, don't update as that would cause infinite reconciliation + + if needsUpdate := upsertAnnotation(internalAclSecrets, hashAnnotationKey, internalAclHash); !needsUpdate { + log.V(1).Info("Internal ACLs unchanged") + return nil + } + + // Add the acl contents to the internal secret, replacing anything preexisting + internalAclSecrets.Data["users.acl"] = aclFileContents + + // Create the internal secret, if needed + if needCreateInternal { + if err := r.Create(ctx, internalAclSecrets); err != nil { + log.Error(err, "Failed to create internal secret") + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "InternalSecretsCreationFailed", "Failed to create internal secret: %v", err) + return err + } else { + r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "InternalSecretsCreated", "Created internal ACLs") + return nil + } + } + + // Otherwise update it + if err := r.Update(ctx, internalAclSecrets); err != nil { + log.Error(err, "Failed to update internal secret") + r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "InternalSecretsUpdateFailed", "Failed to update internal secret: %v", err) + return err + } + + r.Recorder.Eventf(cluster, corev1.EventTypeNormal, "InternalSecretsUpdated", "Synchronized internal ACLs") + + // All is good; The internal secret will be auto-mounted in the deployment + return nil +} + +// This function takes the map of users from the Spec, and the user-created Secret, +// and builds a string of ACL lines +func buildAclFileContents(ctx context.Context, users map[string]valkeyiov1alpha1.UserAcl, usersSecrets *corev1.Secret) []byte { + + log := logf.FromContext(ctx) + + // Holds the ACLs + var aclFileContents string + + // Extract usernames from the map, and sort to keep consistent processing order + sortedNames := make([]string, 0, len(users)) + for k := range users { + sortedNames = append(sortedNames, k) + } + sort.Strings(sortedNames) + + // Loop over users and build acl line + for _, un := range sortedNames { + + // Get this user's ACL + acl := users[un] + + if acl.Permissions == "" { + log.Error(nil, "no permissions for user", "username", un) + continue + } + + // If password is empty, attempt to fetch from secret + pw := acl.Password + if pw == "" { + + // Reference to key in secret file for password; defaults to username + secretKeyRef := un + if acl.PasswordKeyRef != "" { + secretKeyRef = acl.PasswordKeyRef + } + + // Check if password is in Secret + refPw, found := usersSecrets.Data[secretKeyRef] + if !found { + log.Error(nil, "no password found for user", "username", un, "secretRef", secretKeyRef) + continue + } + pw = string(refPw) + } + + // We should have a username, cleartext, or hashed password, and acl + + var hashedPass string + pw = strings.TrimRight(pw, "\n") + pwLen := len(pw) + + if pwLen == hashLength { + // If the password string is 64 characters, assume it is hashed, and prefix it + hashedPass = "#" + pw + + } else if pwLen < hashLength { + // If password string is less than 64 characters, assume plaintext password, and hash it + + // Strip off plaintext prefix, if found + if pw[:1] == ">" { + pw = pw[1:] + } + + // Hash password, and append prefix + hashedPass = fmt.Sprintf("#%x", sha256.Sum256([]byte(pw))) + + } else if pw[:1] == "#" && pwLen == hashLength+1 { + // If password begins with #, and is is 65 characters, copy as-is + hashedPass = pw + + } else { + // Anything else is something we don't recognize + + log.Error(nil, "unknown password format", "username", un) + continue + } + + // Build and append full ACL string + aclFileContents += fmt.Sprintf("user %s on %s %s\n", un, hashedPass, acl.Permissions) + } + + return []byte(aclFileContents) +} diff --git a/internal/controller/users_test.go b/internal/controller/users_test.go new file mode 100644 index 0000000..82d5fd2 --- /dev/null +++ b/internal/controller/users_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2025 Valkey Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" +) + +func TestBuildAclFileContents(t *testing.T) { + + ctx := context.TODO() + + // Simulate a Secret + testSecrets := &corev1.Secret{ + Data: map[string][]byte{ + "davidref": []byte("c90502e005ee957f29645e21d1e27f5bbfce539e38c949a00dfc12270f47fc59"), + }, + } + + // alice should succeed as there are permissions, and valid password + expected := "user alice on #b6366487efc982bfe450c46753917ee883f71a3f4fa5cdc5bde78d1784842e73 +@list +@connection ~jobs:*" + acl := buildAclFileContents(ctx, map[string]valkeyiov1alpha1.UserAcl{ + "alice": { + Permissions: "+@list +@connection ~jobs:*", + Password: "thisisagoodpassword", + }}, testSecrets) + if strings.TrimRight(string(acl), "\n") != expected { + t.Errorf("alice ACL Failed. Expected %s; got %s", expected, acl) + } + + // bob fails to be added because there is no matching password ref in Secret + expected = "" + acl = buildAclFileContents(ctx, map[string]valkeyiov1alpha1.UserAcl{ + "bob": { + Permissions: "-@all", + PasswordKeyRef: "bobkeyref", + }}, testSecrets) + if strings.TrimRight(string(acl), "\n") != expected { + t.Errorf("bob ACL Failed. Expected %s; got %s", expected, acl) + } + + // charlie fails to be added because there is no password, and no password ref + expected = "" + acl = buildAclFileContents(ctx, map[string]valkeyiov1alpha1.UserAcl{ + "charlie": { + Permissions: "+@list +@connection ~jobs:*", + }}, testSecrets) + if strings.TrimRight(string(acl), "\n") != expected { + t.Errorf("charlie ACL Failed. Expected %s; got %s", expected, acl) + } + + // david should succeed as there are permissions, and a valid key ref + expected = "user david on #c90502e005ee957f29645e21d1e27f5bbfce539e38c949a00dfc12270f47fc59 +@all" + acl = buildAclFileContents(ctx, map[string]valkeyiov1alpha1.UserAcl{ + "david": { + Permissions: "+@all", + PasswordKeyRef: "davidref", + }}, testSecrets) + if strings.TrimRight(string(acl), "\n") != expected { + t.Errorf("david ACL Failed. Expected %s; got %s", expected, acl) + } + + // edward fails to be added because there are no permissions + expected = "" + acl = buildAclFileContents(ctx, map[string]valkeyiov1alpha1.UserAcl{ + "edward": { + Password: "edwardpass", + }}, testSecrets) + if strings.TrimRight(string(acl), "\n") != expected { + t.Errorf("edward ACL Failed. Expected %s; got %s", expected, acl) + } +} diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 5ef185d..ef27571 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -19,6 +19,7 @@ package controller import ( "maps" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" valkeyv1 "valkey.io/valkey-operator/api/v1alpha1" ) @@ -26,7 +27,7 @@ const appName = "valkey" // Labels returns a copy of user defined labels including recommended: // https://kubernetes.io/docs/concepts/overview/working-with-objects/common-labels/ -func labels(cluster *valkeyv1.ValkeyCluster) map[string]string { +func labels(cluster *valkeyv1.ValkeyCluster, extraLabels ...map[string]string) map[string]string { if cluster.Labels == nil { cluster.Labels = make(map[string]string) } @@ -36,6 +37,12 @@ func labels(cluster *valkeyv1.ValkeyCluster) map[string]string { l["app.kubernetes.io/component"] = "valkey-cluster" l["app.kubernetes.io/part-of"] = appName l["app.kubernetes.io/managed-by"] = "valkey-operator" + + // Copy extra labels into main map, overriding duplicates + for _, e := range extraLabels { + maps.Copy(l, e) + } + return l } @@ -43,3 +50,29 @@ func labels(cluster *valkeyv1.ValkeyCluster) map[string]string { func annotations(cluster *valkeyv1.ValkeyCluster) map[string]string { return maps.Clone(cluster.Annotations) } + +// This function takes a K8S object reference (eg: pod, secret, configmap, etc), +// and a map of annotations to add to, or replace existing, within the object. +// Returns true if the annotation was added, or updated +func upsertAnnotation(o metav1.Object, key string, val string) bool { + + updated := false + + // Get current annotations + annotations := o.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + // If not found, insert, or update + if orig := annotations[key]; orig != val { + + updated = true + annotations[key] = val + + // Set annotations + o.SetAnnotations(annotations) + } + + return updated +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 89b36f3..7fdd60d 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -34,6 +34,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" logf "sigs.k8s.io/controller-runtime/pkg/log" valkeyiov1alpha1 "valkey.io/valkey-operator/api/v1alpha1" "valkey.io/valkey-operator/internal/valkey" @@ -65,6 +66,7 @@ var scripts embed.FS // +kubebuilder:rbac:groups=valkey.io,resources=valkeyclusters/finalizers,verbs=update // +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch // +kubebuilder:rbac:groups="apps",resources=deployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch @@ -89,6 +91,11 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } + if err := r.reconcileUsersAcl(ctx, cluster); err != nil { + setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonUsersAclError, err.Error(), metav1.ConditionFalse) + return ctrl.Result{}, err + } + if err := r.upsertConfigMap(ctx, cluster); err != nil { setCondition(cluster, valkeyiov1alpha1.ConditionReady, valkeyiov1alpha1.ReasonConfigMapError, err.Error(), metav1.ConditionFalse) _ = r.updateStatus(ctx, cluster, nil) @@ -249,7 +256,8 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, cluster * "valkey.conf": ` cluster-enabled yes protected-mode no -cluster-node-timeout 2000`, +cluster-node-timeout 2000 +aclfile /config/users/users.acl`, }, } if err := controllerutil.SetControllerReference(cluster, cm, r.Scheme); err != nil { @@ -506,6 +514,10 @@ func (r *ValkeyClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&corev1.Service{}). Owns(&corev1.ConfigMap{}). Owns(&appsv1.Deployment{}). + Watches( + &corev1.Secret{}, + handler.EnqueueRequestsFromMapFunc(r.findReferencedSecrets), + ). Named("valkeycluster"). Complete(r) } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5c4c760..e611962 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -283,6 +283,14 @@ var _ = Describe("Manager", Ordered, func() { Expect(err).NotTo(HaveOccurred(), "Failed to retrieve pod's information") Expect(output).To(MatchJSON(`{"limits":{"cpu":"500m","memory":"512Mi"},"requests":{"cpu":"100m","memory":"256Mi"}}`), "Incorrect pod resources configuration") + By("validating internal secret was created") + verifyInternalSecretsExists := func(g Gomega) { + cmd := exec.Command("kubectl", "get", "secrets", valkeyClusterName + "-acl") + _, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + } + Eventually(verifyInternalSecretsExists).Should(Succeed()) + By("validating the ValkeyCluster CR status") verifyCrStatus := func(g Gomega) { cr, err := utils.GetValkeyClusterStatus(valkeyClusterName) @@ -437,6 +445,40 @@ var _ = Describe("Manager", Ordered, func() { g.Expect(output).To(ContainSubstring("cluster_state:ok")) } Eventually(verifyClusterAccess).Should(Succeed()) + + By("verifying created users") + verifyCreatedUsers := func(g Gomega) { + // Start a Valkey client pod to access the cluster and get its status. + clusterFqdn := fmt.Sprintf("%s.default.svc.cluster.local", valkeyClusterName) + + cmd := exec.Command("kubectl", "run", "client", + fmt.Sprintf("--image=%s", valkeyClientImage), "--restart=Never", "--", + "valkey-cli", "-c", "-h", clusterFqdn, "ACL", "LIST") + _, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + + cmd = exec.Command("kubectl", "wait", "pod/client", + "--for=jsonpath={.status.phase}=Succeeded", "--timeout=30s") + _, err = utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + + cmd = exec.Command("kubectl", "logs", "client") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + + cmd = exec.Command("kubectl", "delete", "pod", "client", + "--wait=true", "--timeout=30s") + _, err = utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + + // There should be 3 defined users + g.Expect(output).To(SatisfyAll( + ContainSubstring("user alice"), + ContainSubstring("user bob"), + ContainSubstring("user charlie"), + )) + } + Eventually(verifyCreatedUsers).Should(Succeed()) }) })