Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,9 @@ func (r *CPReconciler) ReconcileStaticPlugin(ctx context.Context, enable bool) e

// Reconcile is the reconciler entry point to reconcile the static plugin state with the desired configuration
func (r *CPReconciler) reconcileStatic(ctx context.Context, desired *flowslatest.FlowCollector) error {
l := log.FromContext(ctx).WithName("console-plugin")
l := log.FromContext(ctx).WithName("static-console-plugin")
ctx = log.IntoContext(ctx, l)

// Skip static reconciler on older OpenShift (feature not implemented)
if less415, _, err := r.ClusterInfo.IsOpenShiftVersionLessThan("4.15.0"); less415 {
l.Info("Static plugin not supported for this version of OpenShift; skipping")
r.Managed.TryDeleteAll(ctx)
return nil
} else if err != nil {
l.Error(err, "Could not determine if static plugin is supported; proceed with deploying")
}

// Retrieve current owned objects
err := r.Managed.FetchAll(ctx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type FlowCollectorReconciler struct {
watcher *watchers.Watcher
}

func Start(ctx context.Context, mgr *manager.Manager) error {
func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, error) {
log := log.FromContext(ctx)
log.Info("Starting FlowCollector controller")
r := FlowCollectorReconciler{
Expand Down Expand Up @@ -70,11 +70,11 @@ func Start(ctx context.Context, mgr *manager.Manager) error {

ctrl, err := builder.Build(&r)
if err != nil {
return err
return nil, err
}
r.watcher = watchers.NewWatcher(ctrl)

return nil
return nil, nil
}

// Reconcile is part of the main kubernetes reconciliation loop which aims to
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/flp/flp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Reconciler struct {
currentNamespace string
}

func Start(ctx context.Context, mgr *manager.Manager) error {
func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, error) {
log := log.FromContext(ctx)
log.Info("Starting Flowlogs Pipeline parent controller")

Expand Down Expand Up @@ -66,11 +66,11 @@ func Start(ctx context.Context, mgr *manager.Manager) error {

ctrl, err := builder.Build(&r)
if err != nil {
return err
return nil, err
}
r.watcher = watchers.NewWatcher(ctrl)

return nil
return nil, nil
}

type subReconciler interface {
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/monitoring/monitoring_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ type Reconciler struct {
currentNamespace string
}

func Start(ctx context.Context, mgr *manager.Manager) error {
func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, error) {
log := log.FromContext(ctx)
log.Info("Starting Monitoring controller")
r := Reconciler{
Client: mgr.Client,
mgr: mgr,
status: mgr.Status.ForComponent(status.Monitoring),
}
return ctrl.NewControllerManagedBy(mgr).
return nil, ctrl.NewControllerManagedBy(mgr).
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
Named("monitoring").
Owns(&corev1.Namespace{}, reconcilers.UpdateOrDeleteOnlyPred).
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/networkpolicy/np_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ type Reconciler struct {
status status.Instance
}

func Start(ctx context.Context, mgr *manager.Manager) error {
func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, error) {
log := log.FromContext(ctx)
log.Info("Starting Network Policy controller")
r := Reconciler{
Client: mgr.Client,
mgr: mgr,
status: mgr.Status.ForComponent(status.NetworkPolicy),
}
return ctrl.NewControllerManagedBy(mgr).
return nil, ctrl.NewControllerManagedBy(mgr).
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
Named("networkPolicy").
Owns(&networkingv1.NetworkPolicy{}, reconcilers.UpdateOrDeleteOnlyPred).
Expand Down
80 changes: 46 additions & 34 deletions internal/controller/static/static_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -17,8 +19,14 @@ import (
"github.com/netobserv/network-observability-operator/internal/pkg/manager/status"
)

const (
initReconcileAttempts = 5
var (
retryBackoff = wait.Backoff{
Steps: 6,
Duration: 2 * time.Second,
Factor: 2,
Jitter: 0.1,
}
clog = log.Log.WithName("static-controller")
)

type Reconciler struct {
Expand All @@ -27,62 +35,66 @@ type Reconciler struct {
status status.Instance
}

func Start(ctx context.Context, mgr *manager.Manager) error {
func Start(ctx context.Context, mgr *manager.Manager) (manager.PostCreateHook, error) {
log := log.FromContext(ctx)
log.Info("Starting Static controller")
r := Reconciler{
Client: mgr.Client,
mgr: mgr,
status: mgr.Status.ForComponent(status.StaticPlugin),
status: mgr.Status.ForComponent(status.StaticController),
}

// force reconcile at startup
go r.InitReconcile(ctx)

return ctrl.NewControllerManagedBy(mgr).
// Return initReconcile as a post-create hook
return r.initReconcile, ctrl.NewControllerManagedBy(mgr).
For(&flowslatest.FlowCollector{}, reconcilers.IgnoreStatusChange).
Named("staticPlugin").
Complete(&r)
}

func (r *Reconciler) InitReconcile(ctx context.Context) {
log := log.FromContext(ctx)
log.Info("Initializing resources...")

for attempt := range initReconcileAttempts {
// delay the reconcile calls to let some time to the cache to load
time.Sleep(5 * time.Second)
_, err := r.Reconcile(ctx, ctrl.Request{})
if err != nil {
log.Error(err, "Error while doing initial reconcile", "attempt", attempt)
} else {
return
func (r *Reconciler) initReconcile(ctx context.Context) error {
attempt := 0
err := retry.OnError(retryBackoff, func(error) bool { return true }, func() error {
attempt++
if _, err := r.Reconcile(ctx, ctrl.Request{}); err != nil {
clog.WithValues("attempt", attempt, "error", err).Info("Initial reconcile: attempt failed")
return err
}
return nil
})
if err != nil {
return fmt.Errorf("failed initial reconcile, all attempts failed: %w", err)
}
return nil
}

// Reconcile is the controller entry point for reconciling current state with desired state.
// It manages the controller status at a high level. Business logic is delegated into `reconcile`.
func (r *Reconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
l := log.Log.WithName("staticPlugin") // clear context (too noisy)
ctx = log.IntoContext(ctx, l)
ctx = log.IntoContext(ctx, clog)

r.status.SetUnknown()
defer r.status.Commit(ctx, r.Client)

// always reconcile static console plugin
scp, err := helper.NewControllerClientHelper(ctx, r.mgr.Config.Namespace, r.Client)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get controller deployment: %w", err)
}
staticPluginReconciler := consoleplugin.NewStaticReconciler(r.newDefaultReconcilerInstance(scp))
if err := staticPluginReconciler.ReconcileStaticPlugin(ctx, true); err != nil {
l.Error(err, "Static plugin reconcile failure")
// Set status failure unless it was already set
if !r.status.HasFailure() {
r.status.SetFailure("StaticPluginError", err.Error())
if r.mgr.ClusterInfo.HasConsolePlugin() {
if supported, _, err := r.mgr.ClusterInfo.IsOpenShiftVersionAtLeast("4.15.0"); err != nil {
return ctrl.Result{}, err
} else if !supported {
clog.Info("Skipping static plugin reconciler (no console detected)")
} else {
scp, err := helper.NewControllerClientHelper(ctx, r.mgr.Config.Namespace, r.Client)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get controller deployment: %w", err)
}
staticPluginReconciler := consoleplugin.NewStaticReconciler(r.newDefaultReconcilerInstance(scp))
if err := staticPluginReconciler.ReconcileStaticPlugin(ctx, true); err != nil {
clog.Error(err, "Static plugin reconcile failure")
// Set status failure unless it was already set
if !r.status.HasFailure() {
r.status.SetFailure("StaticPluginError", err.Error())
}
return ctrl.Result{}, err
}
}
return ctrl.Result{}, err
}

r.status.SetReady()
Expand Down
Loading