From c6fbd395dc20588f5a2bd2a134f3446d36730581 Mon Sep 17 00:00:00 2001 From: gabrnavarro Date: Fri, 3 Apr 2026 17:28:20 +0200 Subject: [PATCH] fix: address issue #669 Signed-off-by: gabrnavarro --- api/v1alpha1/conditions_test.go | 1 - cmd/cmupdate/main.go | 4 +- cmd/kflex/create/create.go | 6 +- cmd/kflex/delete/delete.go | 2 +- cmd/kflex/init/init.go | 15 +- cmd/kflex/init/init_test.go | 3 +- .../controller/controlplane_controller.go | 30 ++-- pkg/reconcilers/shared/postcreate_hook.go | 2 +- pkg/reconcilers/shared/reconciler.go | 1 + pkg/util/status_check.go | 16 +- pkg/util/status_check_test.go | 143 ++++++++++++++++++ 11 files changed, 182 insertions(+), 41 deletions(-) create mode 100644 pkg/util/status_check_test.go diff --git a/api/v1alpha1/conditions_test.go b/api/v1alpha1/conditions_test.go index 76e7f2ea..4d86abe2 100644 --- a/api/v1alpha1/conditions_test.go +++ b/api/v1alpha1/conditions_test.go @@ -26,7 +26,6 @@ func TestAreConditionsEqual(t *testing.T) { } } - func TestAreConditionSlicesSame(t *testing.T) { // Create two slices of conditions with the same elements in different orders c1 := []ControlPlaneCondition{ diff --git a/cmd/cmupdate/main.go b/cmd/cmupdate/main.go index 064129d6..c314244c 100644 --- a/cmd/cmupdate/main.go +++ b/cmd/cmupdate/main.go @@ -100,7 +100,7 @@ func main() { case util.VClusterKubeConfigSecret: nodePortServiceName = util.VClusterNodePortServiceName serviceName = util.VClusterServiceName - if err := util.WaitForStatefulSetReady(*clientset, + if err := util.WaitForStatefulSetReady(clientset, util.VClusterServerDeploymentName, namespace); err != nil { log.Fatalf("Error waiting for stateful set to become ready: %s", err) @@ -108,7 +108,7 @@ func main() { case util.OCMKubeConfigSecret: serviceName = ocm.ServiceName nodePortServiceName = ocm.ServiceName - if err := util.WaitForDeploymentReady(*clientset, + if err := util.WaitForDeploymentReady(clientset, util.OCMServerDeploymentName, namespace); err != nil { log.Fatalf("Error waiting for deployment to become ready: %s", err) diff --git a/cmd/kflex/create/create.go b/cmd/kflex/create/create.go index 0ea7350e..65fc085c 100644 --- a/cmd/kflex/create/create.go +++ b/cmd/kflex/create/create.go @@ -117,7 +117,7 @@ func ExecuteCreate(cp common.CP, controlPlaneType string, backendType string, ho case string(tenancyv1alpha1.ControlPlaneTypeVCluster): kubeconfig.WatchForSecretCreation(clientset, cp.Name, util.GetKubeconfSecretNameByControlPlaneType(controlPlaneType)) - if err := util.WaitForStatefulSetReady(clientset, + if err := util.WaitForStatefulSetReady(clientsetp, util.GetAPIServerDeploymentNameByControlPlaneType(controlPlaneType), // TODO replace util.GenerateNamespaceFromControlPlaneName like in k3s util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil { @@ -125,7 +125,7 @@ func ExecuteCreate(cp common.CP, controlPlaneType string, backendType string, ho } case string(tenancyv1alpha1.ControlPlaneTypeK8S), string(tenancyv1alpha1.ControlPlaneTypeOCM): kubeconfig.WatchForSecretCreation(clientset, cp.Name, util.GetKubeconfSecretNameByControlPlaneType(controlPlaneType)) - if err := util.WaitForDeploymentReady(clientset, + if err := util.WaitForDeploymentReady(clientsetp, util.GetAPIServerDeploymentNameByControlPlaneType(controlPlaneType), // TODO replace util.GenerateNamespaceFromControlPlaneName like in k3s util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil { @@ -133,7 +133,7 @@ func ExecuteCreate(cp common.CP, controlPlaneType string, backendType string, ho } case string(tenancyv1alpha1.ControlPlaneTypeK3s): kubeconfig.WatchForSecretCreation(clientset, cp.Name, k3s.KubeconfigSecretName) - if err := util.WaitForStatefulSetReady(clientset, k3s.ServerName, util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil { + if err := util.WaitForStatefulSetReady(clientsetp, k3s.ServerName, util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil { return fmt.Errorf("error waiting for stateful set to become ready: %v", err) } default: diff --git a/cmd/kflex/delete/delete.go b/cmd/kflex/delete/delete.go index 24038323..a8ac287d 100644 --- a/cmd/kflex/delete/delete.go +++ b/cmd/kflex/delete/delete.go @@ -92,7 +92,7 @@ func ExecuteDelete(cp common.CP, chattyStatus bool) error { return fmt.Errorf("error getting kf client: %v", err) } util.PrintStatus(fmt.Sprintf("Waiting for control plane %s to be deleted...", cp.Name), done, &wg, chattyStatus) - util.WaitForNamespaceDeletion(*clientsetKflex, util.GenerateNamespaceFromControlPlaneName(cp.Name)) + util.WaitForNamespaceDeletion(clientsetKflex, util.GenerateNamespaceFromControlPlaneName(cp.Name)) if controlPlane.Spec.Type != tenancyv1alpha1.ControlPlaneTypeHost && controlPlane.Spec.Type != tenancyv1alpha1.ControlPlaneTypeExternal { diff --git a/cmd/kflex/init/init.go b/cmd/kflex/init/init.go index 4a97b9ba..c35cc291 100644 --- a/cmd/kflex/init/init.go +++ b/cmd/kflex/init/init.go @@ -37,10 +37,10 @@ import ( ) const ( - CreateKindFlag = "create-kind" - DomainFlag = "domain" - HostContainerNameFlag = "host-container-name" // REFACTOR? replace with host-container-name? - ExternalPortFlag = "external-port" // REFACTOR? replace with external-port? + CreateKindFlag = "create-kind" + DomainFlag = "domain" + HostContainerNameFlag = "host-container-name" // REFACTOR? replace with host-container-name? + ExternalPortFlag = "external-port" // REFACTOR? replace with external-port? DefaultKindClusterName = "kind-kubeflex" // Default cluster name for kind clusters ) @@ -58,7 +58,7 @@ func Command() *cobra.Command { domain, _ := flagset.GetString(DomainFlag) externalPort, _ := flagset.GetInt(ExternalPortFlag) hostContainer, _ := flagset.GetString(HostContainerNameFlag) - + // Handle positional cluster name parameter clusterName := DefaultKindClusterName // default if len(args) > 0 { @@ -108,7 +108,6 @@ func ExecuteInit(ctx context.Context, kubeconfig, version, buildDate string, dom if err != nil { return fmt.Errorf("error getting clientset: %v", err) } - clientset := *clientsetp util.PrintStatus(fmt.Sprintf("Kubeflex %s %s", version, buildDate), done, &wg, chattyStatus) done <- true @@ -145,7 +144,7 @@ func ExecuteInit(ctx context.Context, kubeconfig, version, buildDate string, dom util.PrintStatus("Waiting for shared backend DB to become ready...", done, &wg, chattyStatus) util.WaitForStatefulSetReady( - clientset, + clientsetp, util.GeneratePSReplicaSetName(util.DBReleaseName), util.SystemNamespace) done <- true @@ -159,7 +158,7 @@ func ExecuteInit(ctx context.Context, kubeconfig, version, buildDate string, dom util.PrintStatus("Waiting for kubeflex operator to become ready...", done, &wg, chattyStatus) util.WaitForDeploymentReady( - clientset, + clientsetp, util.GenerateOperatorDeploymentName(), util.SystemNamespace) done <- true diff --git a/cmd/kflex/init/init_test.go b/cmd/kflex/init/init_test.go index e3d1480a..769c9e07 100644 --- a/cmd/kflex/init/init_test.go +++ b/cmd/kflex/init/init_test.go @@ -63,9 +63,8 @@ func TestDefaultKindClusterNameUsage(t *testing.T) { if len(DefaultKindClusterName) <= 5 { t.Error("DefaultKindClusterName should be longer than 5 characters") } - + if DefaultKindClusterName[:5] != "kind-" { t.Error("DefaultKindClusterName should start with 'kind-' prefix") } } - diff --git a/internal/controller/controlplane_controller.go b/internal/controller/controlplane_controller.go index 0849a32f..87ba501e 100644 --- a/internal/controller/controlplane_controller.go +++ b/internal/controller/controlplane_controller.go @@ -205,23 +205,23 @@ func (r *ControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Request if hcp.Spec.PostCreateHook != nil || len(hcp.Spec.PostCreateHooks) > 0 { log.Info("Processing PostCreateHooks with complete kubeconfig") switch pchReconciler := reconciler.(type) { - case shared.PostCreateHookReconciler: + case shared.PostCreateHookReconciler: // Reconciler that supports PostCreateHook - if err := pchReconciler.ReconcileUpdatePostCreateHook(ctx, hcp); err != nil { - log.Error(err, "Failed to process PostCreateHooks") - return ctrl.Result{}, err - } - - // Refresh hcp object after PCH processing - if err := r.Get(ctx, client.ObjectKey{Name: hcp.Name}, hcp); err != nil { - log.Error(err, "Failed to refresh ControlPlane after hook processing") - return ctrl.Result{}, err - } - default: - // Simple reconciler - break + if err := pchReconciler.ReconcileUpdatePostCreateHook(ctx, hcp); err != nil { + log.Error(err, "Failed to process PostCreateHooks") + return ctrl.Result{}, err + } + + // Refresh hcp object after PCH processing + if err := r.Get(ctx, client.ObjectKey{Name: hcp.Name}, hcp); err != nil { + log.Error(err, "Failed to refresh ControlPlane after hook processing") + return ctrl.Result{}, err + } + default: + // Simple reconciler + break } - + } // Determine overall controlplane readiness based on both API server and PCHs diff --git a/pkg/reconcilers/shared/postcreate_hook.go b/pkg/reconcilers/shared/postcreate_hook.go index a44d0fdf..34156749 100644 --- a/pkg/reconcilers/shared/postcreate_hook.go +++ b/pkg/reconcilers/shared/postcreate_hook.go @@ -32,9 +32,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" clog "sigs.k8s.io/controller-runtime/pkg/log" + "errors" "github.com/kubestellar/kubeflex/api/v1alpha1" "github.com/kubestellar/kubeflex/pkg/util" - "errors" ) const ( diff --git a/pkg/reconcilers/shared/reconciler.go b/pkg/reconcilers/shared/reconciler.go index c6d6ee79..948ff56f 100644 --- a/pkg/reconcilers/shared/reconciler.go +++ b/pkg/reconcilers/shared/reconciler.go @@ -45,6 +45,7 @@ const ( type PostCreateHookReconciler interface { ReconcileUpdatePostCreateHook(context.Context, *tenancyv1alpha1.ControlPlane) error } + // ControlPlaneReconciler defines Reconcile loop // each controlplane type must implement ControlPlaneReconciler as // internal/controller/controlplane_controller.go Reconcile acts diff --git a/pkg/util/status_check.go b/pkg/util/status_check.go index 3b61fd5f..add716e0 100644 --- a/pkg/util/status_check.go +++ b/pkg/util/status_check.go @@ -36,7 +36,7 @@ import ( // TODO - refactor in a single base "WaitFor" function that can operate on the resource types // needed here -func WaitForDeploymentReady(clientset kubernetes.Clientset, name, namespace string) error { +func WaitForDeploymentReady(clientset kubernetes.Interface, name, namespace string) error { watcher, err := clientset.AppsV1().Deployments(namespace).Watch(context.Background(), metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", name), ResourceVersion: "0", @@ -49,7 +49,7 @@ func WaitForDeploymentReady(clientset kubernetes.Clientset, name, namespace stri for { event, ok := <-watcher.ResultChan() if !ok { - return nil + return fmt.Errorf("watch channel closed before deployment %s/%s became ready", namespace, name) } switch event.Type { @@ -66,7 +66,7 @@ func WaitForDeploymentReady(clientset kubernetes.Clientset, name, namespace stri } } -func WaitForStatefulSetReady(clientset kubernetes.Clientset, name, namespace string) error { +func WaitForStatefulSetReady(clientset kubernetes.Interface, name, namespace string) error { watcher, err := clientset.AppsV1().StatefulSets(namespace).Watch(context.Background(), metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", name), }) @@ -78,7 +78,7 @@ func WaitForStatefulSetReady(clientset kubernetes.Clientset, name, namespace str for { event, ok := <-watcher.ResultChan() if !ok { - return nil + return fmt.Errorf("watch channel closed before statefulset %s/%s became ready", namespace, name) } switch event.Type { @@ -95,7 +95,7 @@ func WaitForStatefulSetReady(clientset kubernetes.Clientset, name, namespace str } } -func WaitForNamespaceDeletion(clientset kubernetes.Clientset, name string) error { +func WaitForNamespaceDeletion(clientset kubernetes.Interface, name string) error { watcher, err := clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", name), }) @@ -107,7 +107,7 @@ func WaitForNamespaceDeletion(clientset kubernetes.Clientset, name string) error for { event, ok := <-watcher.ResultChan() if !ok { - return nil + return fmt.Errorf("watch channel closed before namespace %s was deleted", name) } switch event.Type { @@ -167,8 +167,8 @@ func IsAPIServerDeploymentReady(log logr.Logger, c client.Client, hcp tenancyv1a return false, err } - log.Info("Deployment status check", "name", d.Name, "namespace", d.Namespace, - "readyReplicas", d.Status.ReadyReplicas, "replicas", d.Status.Replicas, + log.Info("Deployment status check", "name", d.Name, "namespace", d.Namespace, + "readyReplicas", d.Status.ReadyReplicas, "replicas", d.Status.Replicas, "specReplicas", *d.Spec.Replicas) // we need to ensure that there is al least one replica in the spec diff --git a/pkg/util/status_check_test.go b/pkg/util/status_check_test.go new file mode 100644 index 00000000..d926b78e --- /dev/null +++ b/pkg/util/status_check_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2023 The KubeStellar Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" +) + +func int32Ptr(i int32) *int32 { return &i } + +// TestWaitForDeploymentReady_ChannelClosedBeforeReady verifies that closing the +// watch channel before the deployment reaches readiness returns an error instead +// of nil (false positive success). +func TestWaitForDeploymentReady_ChannelClosedBeforeReady(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fw := watch.NewFake() + + fakeClient.PrependWatchReactor("deployments", func(action k8stesting.Action) (bool, watch.Interface, error) { + return true, fw, nil + }) + + // Close the channel immediately — simulates API server dropping the connection + // before the deployment becomes ready. + go fw.Stop() + + err := WaitForDeploymentReady(fakeClient, "test-deploy", "test-ns") + if err == nil { + t.Fatal("expected error when watch channel closes before deployment is ready, got nil") + } +} + +// TestWaitForDeploymentReady_ReadyBeforeChannelClose verifies the happy path: +// deployment reports ready replicas and the function returns nil. +func TestWaitForDeploymentReady_ReadyBeforeChannelClose(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fw := watch.NewFake() + + fakeClient.PrependWatchReactor("deployments", func(action k8stesting.Action) (bool, watch.Interface, error) { + return true, fw, nil + }) + + deploy := &v1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "test-deploy", Namespace: "test-ns"}, + Spec: v1.DeploymentSpec{Replicas: int32Ptr(1)}, + Status: v1.DeploymentStatus{ + Replicas: 1, + ReadyReplicas: 1, + }, + } + + go fw.Modify(deploy) + + err := WaitForDeploymentReady(fakeClient, "test-deploy", "test-ns") + if err != nil { + t.Fatalf("expected nil when deployment is ready, got: %v", err) + } +} + +// TestWaitForStatefulSetReady_ChannelClosedBeforeReady verifies that closing the +// watch channel before the statefulset is ready returns an error. +func TestWaitForStatefulSetReady_ChannelClosedBeforeReady(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fw := watch.NewFake() + + fakeClient.PrependWatchReactor("statefulsets", func(action k8stesting.Action) (bool, watch.Interface, error) { + return true, fw, nil + }) + + go fw.Stop() + + err := WaitForStatefulSetReady(fakeClient, "test-sts", "test-ns") + if err == nil { + t.Fatal("expected error when watch channel closes before statefulset is ready, got nil") + } +} + +// TestWaitForNamespaceDeletion_ChannelClosedBeforeDeletion verifies that closing +// the watch channel before the namespace is actually deleted returns an error. +func TestWaitForNamespaceDeletion_ChannelClosedBeforeDeletion(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fw := watch.NewFake() + + fakeClient.PrependWatchReactor("namespaces", func(action k8stesting.Action) (bool, watch.Interface, error) { + return true, fw, nil + }) + + go fw.Stop() + + err := WaitForNamespaceDeletion(fakeClient, "test-ns") + if err == nil { + t.Fatal("expected error when watch channel closes before namespace is deleted, got nil") + } +} + +// TestWaitForNamespaceDeletion_DeletedSuccessfully verifies the happy path: +// the namespace is deleted and the function returns nil. +func TestWaitForNamespaceDeletion_DeletedSuccessfully(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fw := watch.NewFake() + + fakeClient.PrependWatchReactor("namespaces", func(action k8stesting.Action) (bool, watch.Interface, error) { + return true, fw, nil + }) + + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ns"}, + Status: corev1.NamespaceStatus{Phase: corev1.NamespaceTerminating}, + } + + go func() { + fw.Modify(ns) + fw.Delete(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ns"}, + TypeMeta: metav1.TypeMeta{Kind: "Namespace", APIVersion: "v1"}, + }) + }() + + err := WaitForNamespaceDeletion(fakeClient, "test-ns") + if err != nil { + t.Fatalf("expected nil when namespace is deleted, got: %v", err) + } +}