Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/v1alpha1/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func TestAreConditionsEqual(t *testing.T) {
}
}


func TestAreConditionSlicesSame(t *testing.T) {
// Create two slices of conditions with the same elements in different orders
c1 := []ControlPlaneCondition{
Expand Down
4 changes: 2 additions & 2 deletions cmd/cmupdate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ func main() {
case util.VClusterKubeConfigSecret:
nodePortServiceName = util.VClusterNodePortServiceName
serviceName = util.VClusterServiceName
if err := util.WaitForStatefulSetReady(*clientset,
if err := util.WaitForStatefulSetReady(clientset,
util.VClusterServerDeploymentName,
namespace); err != nil {
log.Fatalf("Error waiting for stateful set to become ready: %s", err)
}
case util.OCMKubeConfigSecret:
serviceName = ocm.ServiceName
nodePortServiceName = ocm.ServiceName
if err := util.WaitForDeploymentReady(*clientset,
if err := util.WaitForDeploymentReady(clientset,
util.OCMServerDeploymentName,
namespace); err != nil {
log.Fatalf("Error waiting for deployment to become ready: %s", err)
Expand Down
6 changes: 3 additions & 3 deletions cmd/kflex/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,23 @@ func ExecuteCreate(cp common.CP, controlPlaneType string, backendType string, ho
case string(tenancyv1alpha1.ControlPlaneTypeVCluster):

kubeconfig.WatchForSecretCreation(clientset, cp.Name, util.GetKubeconfSecretNameByControlPlaneType(controlPlaneType))
if err := util.WaitForStatefulSetReady(clientset,
if err := util.WaitForStatefulSetReady(clientsetp,
util.GetAPIServerDeploymentNameByControlPlaneType(controlPlaneType),
// TODO replace util.GenerateNamespaceFromControlPlaneName like in k3s
util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil {
return fmt.Errorf("error waiting for stateful set to become ready: %v", err)
}
case string(tenancyv1alpha1.ControlPlaneTypeK8S), string(tenancyv1alpha1.ControlPlaneTypeOCM):
kubeconfig.WatchForSecretCreation(clientset, cp.Name, util.GetKubeconfSecretNameByControlPlaneType(controlPlaneType))
if err := util.WaitForDeploymentReady(clientset,
if err := util.WaitForDeploymentReady(clientsetp,
util.GetAPIServerDeploymentNameByControlPlaneType(controlPlaneType),
// TODO replace util.GenerateNamespaceFromControlPlaneName like in k3s
util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil {
return fmt.Errorf("error waiting for deployment to become ready: %v", err)
}
case string(tenancyv1alpha1.ControlPlaneTypeK3s):
kubeconfig.WatchForSecretCreation(clientset, cp.Name, k3s.KubeconfigSecretName)
if err := util.WaitForStatefulSetReady(clientset, k3s.ServerName, util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil {
if err := util.WaitForStatefulSetReady(clientsetp, k3s.ServerName, util.GenerateNamespaceFromControlPlaneName(controlPlane.Name)); err != nil {
return fmt.Errorf("error waiting for stateful set to become ready: %v", err)
}
default:
Expand Down
2 changes: 1 addition & 1 deletion cmd/kflex/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func ExecuteDelete(cp common.CP, chattyStatus bool) error {
return fmt.Errorf("error getting kf client: %v", err)
}
util.PrintStatus(fmt.Sprintf("Waiting for control plane %s to be deleted...", cp.Name), done, &wg, chattyStatus)
util.WaitForNamespaceDeletion(*clientsetKflex, util.GenerateNamespaceFromControlPlaneName(cp.Name))
util.WaitForNamespaceDeletion(clientsetKflex, util.GenerateNamespaceFromControlPlaneName(cp.Name))

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to WaitForNamespaceDeletion() does not check the returned error. Since the function signature was changed to return an error when the watch channel closes unexpectedly, this error must be checked and handled. The function can now fail and the caller needs to respond appropriately rather than silently ignoring the error.

Suggested change
util.WaitForNamespaceDeletion(clientsetKflex, util.GenerateNamespaceFromControlPlaneName(cp.Name))
if err := util.WaitForNamespaceDeletion(clientsetKflex, util.GenerateNamespaceFromControlPlaneName(cp.Name)); err != nil {
done <- true
wg.Wait()
return fmt.Errorf("error waiting for control plane namespace deletion: %v", err)
}

Copilot uses AI. Check for mistakes.

if controlPlane.Spec.Type != tenancyv1alpha1.ControlPlaneTypeHost &&
controlPlane.Spec.Type != tenancyv1alpha1.ControlPlaneTypeExternal {
Expand Down
15 changes: 7 additions & 8 deletions cmd/kflex/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
)

const (
CreateKindFlag = "create-kind"
DomainFlag = "domain"
HostContainerNameFlag = "host-container-name" // REFACTOR? replace with host-container-name?
ExternalPortFlag = "external-port" // REFACTOR? replace with external-port?
CreateKindFlag = "create-kind"
DomainFlag = "domain"
HostContainerNameFlag = "host-container-name" // REFACTOR? replace with host-container-name?
ExternalPortFlag = "external-port" // REFACTOR? replace with external-port?
DefaultKindClusterName = "kind-kubeflex" // Default cluster name for kind clusters
)

Expand All @@ -58,7 +58,7 @@ func Command() *cobra.Command {
domain, _ := flagset.GetString(DomainFlag)
externalPort, _ := flagset.GetInt(ExternalPortFlag)
hostContainer, _ := flagset.GetString(HostContainerNameFlag)

// Handle positional cluster name parameter
clusterName := DefaultKindClusterName // default
if len(args) > 0 {
Expand Down Expand Up @@ -108,7 +108,6 @@ func ExecuteInit(ctx context.Context, kubeconfig, version, buildDate string, dom
if err != nil {
return fmt.Errorf("error getting clientset: %v", err)
}
clientset := *clientsetp

util.PrintStatus(fmt.Sprintf("Kubeflex %s %s", version, buildDate), done, &wg, chattyStatus)
done <- true
Expand Down Expand Up @@ -145,7 +144,7 @@ func ExecuteInit(ctx context.Context, kubeconfig, version, buildDate string, dom

util.PrintStatus("Waiting for shared backend DB to become ready...", done, &wg, chattyStatus)
util.WaitForStatefulSetReady(
clientset,
clientsetp,
util.GeneratePSReplicaSetName(util.DBReleaseName),
util.SystemNamespace)
Comment on lines 146 to 149

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to WaitForStatefulSetReady() does not check the returned error. Since the function signature was changed to return an error when the watch channel closes unexpectedly, this error must be checked and handled. The function can now fail and the caller needs to respond appropriately rather than silently ignoring the error.

Copilot uses AI. Check for mistakes.
done <- true
Expand All @@ -159,7 +158,7 @@ func ExecuteInit(ctx context.Context, kubeconfig, version, buildDate string, dom

util.PrintStatus("Waiting for kubeflex operator to become ready...", done, &wg, chattyStatus)
util.WaitForDeploymentReady(
clientset,
clientsetp,
util.GenerateOperatorDeploymentName(),
util.SystemNamespace)
Comment on lines 160 to 163

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to WaitForDeploymentReady() does not check the returned error. Since the function signature was changed to return an error when the watch channel closes unexpectedly, this error must be checked and handled. The function can now fail and the caller needs to respond appropriately rather than silently ignoring the error.

Copilot uses AI. Check for mistakes.
done <- true
Expand Down
3 changes: 1 addition & 2 deletions cmd/kflex/init/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ func TestDefaultKindClusterNameUsage(t *testing.T) {
if len(DefaultKindClusterName) <= 5 {
t.Error("DefaultKindClusterName should be longer than 5 characters")
}

if DefaultKindClusterName[:5] != "kind-" {
t.Error("DefaultKindClusterName should start with 'kind-' prefix")
}
}

30 changes: 15 additions & 15 deletions internal/controller/controlplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,23 @@ func (r *ControlPlaneReconciler) Reconcile(ctx context.Context, req ctrl.Request
if hcp.Spec.PostCreateHook != nil || len(hcp.Spec.PostCreateHooks) > 0 {
log.Info("Processing PostCreateHooks with complete kubeconfig")
switch pchReconciler := reconciler.(type) {
case shared.PostCreateHookReconciler:
case shared.PostCreateHookReconciler:
// Reconciler that supports PostCreateHook
if err := pchReconciler.ReconcileUpdatePostCreateHook(ctx, hcp); err != nil {
log.Error(err, "Failed to process PostCreateHooks")
return ctrl.Result{}, err
}

// Refresh hcp object after PCH processing
if err := r.Get(ctx, client.ObjectKey{Name: hcp.Name}, hcp); err != nil {
log.Error(err, "Failed to refresh ControlPlane after hook processing")
return ctrl.Result{}, err
}
default:
// Simple reconciler
break
if err := pchReconciler.ReconcileUpdatePostCreateHook(ctx, hcp); err != nil {
log.Error(err, "Failed to process PostCreateHooks")
return ctrl.Result{}, err
}

// Refresh hcp object after PCH processing
if err := r.Get(ctx, client.ObjectKey{Name: hcp.Name}, hcp); err != nil {
log.Error(err, "Failed to refresh ControlPlane after hook processing")
return ctrl.Result{}, err
}
default:
// Simple reconciler
break
}

}

// Determine overall controlplane readiness based on both API server and PCHs
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconcilers/shared/postcreate_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
clog "sigs.k8s.io/controller-runtime/pkg/log"

"errors"
"github.com/kubestellar/kubeflex/api/v1alpha1"
"github.com/kubestellar/kubeflex/pkg/util"
"errors"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/reconcilers/shared/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
type PostCreateHookReconciler interface {
ReconcileUpdatePostCreateHook(context.Context, *tenancyv1alpha1.ControlPlane) error
}

// ControlPlaneReconciler defines Reconcile loop
// each controlplane type must implement ControlPlaneReconciler as
// internal/controller/controlplane_controller.go Reconcile acts
Expand Down
16 changes: 8 additions & 8 deletions pkg/util/status_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// TODO - refactor in a single base "WaitFor" function that can operate on the resource types
// needed here

func WaitForDeploymentReady(clientset kubernetes.Clientset, name, namespace string) error {
func WaitForDeploymentReady(clientset kubernetes.Interface, name, namespace string) error {
watcher, err := clientset.AppsV1().Deployments(namespace).Watch(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
ResourceVersion: "0",
Expand All @@ -49,7 +49,7 @@ func WaitForDeploymentReady(clientset kubernetes.Clientset, name, namespace stri
for {
event, ok := <-watcher.ResultChan()
if !ok {
return nil
return fmt.Errorf("watch channel closed before deployment %s/%s became ready", namespace, name)
}

switch event.Type {
Expand All @@ -66,7 +66,7 @@ func WaitForDeploymentReady(clientset kubernetes.Clientset, name, namespace stri
}
}

func WaitForStatefulSetReady(clientset kubernetes.Clientset, name, namespace string) error {
func WaitForStatefulSetReady(clientset kubernetes.Interface, name, namespace string) error {
watcher, err := clientset.AppsV1().StatefulSets(namespace).Watch(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
Expand All @@ -78,7 +78,7 @@ func WaitForStatefulSetReady(clientset kubernetes.Clientset, name, namespace str
for {
event, ok := <-watcher.ResultChan()
if !ok {
return nil
return fmt.Errorf("watch channel closed before statefulset %s/%s became ready", namespace, name)
}

switch event.Type {
Expand All @@ -95,7 +95,7 @@ func WaitForStatefulSetReady(clientset kubernetes.Clientset, name, namespace str
}
}

func WaitForNamespaceDeletion(clientset kubernetes.Clientset, name string) error {
func WaitForNamespaceDeletion(clientset kubernetes.Interface, name string) error {
watcher, err := clientset.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
})
Expand All @@ -107,7 +107,7 @@ func WaitForNamespaceDeletion(clientset kubernetes.Clientset, name string) error
for {
event, ok := <-watcher.ResultChan()
if !ok {
return nil
return fmt.Errorf("watch channel closed before namespace %s was deleted", name)
}

switch event.Type {
Expand Down Expand Up @@ -167,8 +167,8 @@ func IsAPIServerDeploymentReady(log logr.Logger, c client.Client, hcp tenancyv1a
return false, err
}

log.Info("Deployment status check", "name", d.Name, "namespace", d.Namespace,
"readyReplicas", d.Status.ReadyReplicas, "replicas", d.Status.Replicas,
log.Info("Deployment status check", "name", d.Name, "namespace", d.Namespace,
"readyReplicas", d.Status.ReadyReplicas, "replicas", d.Status.Replicas,
"specReplicas", *d.Spec.Replicas)

// we need to ensure that there is al least one replica in the spec
Expand Down
143 changes: 143 additions & 0 deletions pkg/util/status_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
Copyright 2023 The KubeStellar Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"testing"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

func int32Ptr(i int32) *int32 { return &i }

// TestWaitForDeploymentReady_ChannelClosedBeforeReady verifies that closing the
// watch channel before the deployment reaches readiness returns an error instead
// of nil (false positive success).
func TestWaitForDeploymentReady_ChannelClosedBeforeReady(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fw := watch.NewFake()

fakeClient.PrependWatchReactor("deployments", func(action k8stesting.Action) (bool, watch.Interface, error) {
return true, fw, nil
})

// Close the channel immediately — simulates API server dropping the connection
// before the deployment becomes ready.
go fw.Stop()

err := WaitForDeploymentReady(fakeClient, "test-deploy", "test-ns")
if err == nil {
t.Fatal("expected error when watch channel closes before deployment is ready, got nil")
}
}

// TestWaitForDeploymentReady_ReadyBeforeChannelClose verifies the happy path:
// deployment reports ready replicas and the function returns nil.
func TestWaitForDeploymentReady_ReadyBeforeChannelClose(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fw := watch.NewFake()

fakeClient.PrependWatchReactor("deployments", func(action k8stesting.Action) (bool, watch.Interface, error) {
return true, fw, nil
})

deploy := &v1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "test-deploy", Namespace: "test-ns"},
Spec: v1.DeploymentSpec{Replicas: int32Ptr(1)},
Status: v1.DeploymentStatus{
Replicas: 1,
ReadyReplicas: 1,
},
}

go fw.Modify(deploy)

err := WaitForDeploymentReady(fakeClient, "test-deploy", "test-ns")
if err != nil {
t.Fatalf("expected nil when deployment is ready, got: %v", err)
}
}

// TestWaitForStatefulSetReady_ChannelClosedBeforeReady verifies that closing the
// watch channel before the statefulset is ready returns an error.
func TestWaitForStatefulSetReady_ChannelClosedBeforeReady(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fw := watch.NewFake()

fakeClient.PrependWatchReactor("statefulsets", func(action k8stesting.Action) (bool, watch.Interface, error) {
return true, fw, nil
})

go fw.Stop()

err := WaitForStatefulSetReady(fakeClient, "test-sts", "test-ns")
if err == nil {
t.Fatal("expected error when watch channel closes before statefulset is ready, got nil")
}
}

Copilot AI Apr 3, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test file is missing a test for the happy path of WaitForStatefulSetReady(). While there is a test for WaitForDeploymentReady_ReadyBeforeChannelClose and TestWaitForNamespaceDeletion_DeletedSuccessfully, there is no corresponding positive test for WaitForStatefulSetReady. This leaves the success path for StatefulSet readiness without explicit test coverage.

Suggested change
// TestWaitForStatefulSetReady_ReadyBeforeChannelClose verifies the happy path:
// statefulset reports ready replicas and the function returns nil.
func TestWaitForStatefulSetReady_ReadyBeforeChannelClose(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fw := watch.NewFake()
fakeClient.PrependWatchReactor("statefulsets", func(action k8stesting.Action) (bool, watch.Interface, error) {
return true, fw, nil
})
sts := &v1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sts",
Namespace: "test-ns",
},
Spec: v1.StatefulSetSpec{
Replicas: int32Ptr(3),
},
Status: v1.StatefulSetStatus{
ReadyReplicas: 3,
},
}
go func() {
fw.Modify(sts)
}()
err := WaitForStatefulSetReady(fakeClient, "test-sts", "test-ns")
if err != nil {
t.Fatalf("expected nil when statefulset becomes ready, got: %v", err)
}
}

Copilot uses AI. Check for mistakes.
// TestWaitForNamespaceDeletion_ChannelClosedBeforeDeletion verifies that closing
// the watch channel before the namespace is actually deleted returns an error.
func TestWaitForNamespaceDeletion_ChannelClosedBeforeDeletion(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fw := watch.NewFake()

fakeClient.PrependWatchReactor("namespaces", func(action k8stesting.Action) (bool, watch.Interface, error) {
return true, fw, nil
})

go fw.Stop()

err := WaitForNamespaceDeletion(fakeClient, "test-ns")
if err == nil {
t.Fatal("expected error when watch channel closes before namespace is deleted, got nil")
}
}

// TestWaitForNamespaceDeletion_DeletedSuccessfully verifies the happy path:
// the namespace is deleted and the function returns nil.
func TestWaitForNamespaceDeletion_DeletedSuccessfully(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fw := watch.NewFake()

fakeClient.PrependWatchReactor("namespaces", func(action k8stesting.Action) (bool, watch.Interface, error) {
return true, fw, nil
})

ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: "test-ns"},
Status: corev1.NamespaceStatus{Phase: corev1.NamespaceTerminating},
}

go func() {
fw.Modify(ns)
fw.Delete(&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: "test-ns"},
TypeMeta: metav1.TypeMeta{Kind: "Namespace", APIVersion: "v1"},
})
}()

err := WaitForNamespaceDeletion(fakeClient, "test-ns")
if err != nil {
t.Fatalf("expected nil when namespace is deleted, got: %v", err)
}
}
Loading