Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Rollouthandler #125

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Implement the RolloutHandler
  • Loading branch information
dhaiducek committed Feb 16, 2024
commit 2326693e2dca0b04add8dbbdc2ff6769c1d303c9
117 changes: 93 additions & 24 deletions controllers/common/common.go
Original file line number Diff line number Diff line change
@@ -11,11 +11,14 @@ import (
"fmt"
"strings"

"github.com/go-logr/logr"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clusterv1 "open-cluster-management.io/api/cluster/v1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1"
ctrl "sigs.k8s.io/controller-runtime"
@@ -206,11 +209,66 @@ func HasValidPlacementRef(pb *policiesv1.PlacementBinding) bool {
}
}

// GetDecisions returns the placement decisions from the Placement or PlacementRule referred to by
// the PlacementBinding
func GetDecisions(
// The placementDecisionGetter is an implementation of the cluster PlacementDecisionGetter
type placementDecisionGetter struct {
c client.Client
}

// List the PlacementDecisions using the method signature from the cluster PlacementDecisionGetter
// interface. (The signature was originally written to retrieve from a cache, so we need to convert
// the returned values to an array of pointers.)
func (pd placementDecisionGetter) List(
selector labels.Selector, namespace string,
) ([]*clusterv1beta1.PlacementDecision, error) {
pdList := &clusterv1beta1.PlacementDecisionList{}
pdPtrList := []*clusterv1beta1.PlacementDecision{}
lopts := &client.ListOptions{
LabelSelector: selector,
Namespace: namespace,
}

err := pd.c.List(context.TODO(), pdList, lopts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this need context.TODO()? why not pass context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point. This List() function is buried inside of the placementDecisionLister interface, which should be implemented using a cache.Indexer instead of this List(), so I need to look into making that update..

if err != nil {
return nil, err
}

for _, pd := range pdList.Items {
pdPtr := pd
pdPtrList = append(pdPtrList, &pdPtr)
}

return pdPtrList, nil
}

// GetRolloutHandler returns a rollout handler from the Placement library to retrieve rollout results
func GetRolloutHandler(
c client.Client, placement *clusterv1beta1.Placement, log logr.Logger,
) (*clusterv1alpha1.RolloutHandler, error) {
pdTracker := clusterv1beta1.NewPlacementDecisionClustersTracker(placement, placementDecisionGetter{c}, nil)

err := pdTracker.Refresh()
if err != nil {
log.Error(err, "Error retrieving PlacementDecisions from tracker")
}

return clusterv1alpha1.NewRolloutHandler(pdTracker)
}

// GetClusterRolloutStatus is the implementation of the cluster ClusterRolloutStatusFunc used to
// specify policy-specific rollout status for a particular cluster
var GetClusterRolloutStatus clusterv1alpha1.ClusterRolloutStatusFunc = func(
clusterName string,
) clusterv1alpha1.ClusterRolloutStatus {
return clusterv1alpha1.ClusterRolloutStatus{}
}

// GetRolloutClusters returns the clusters for the current rollout (i.e. the rollout result) from
// the Placement or PlacementRule referred to by the PlacementBinding
func GetRolloutClusters(
ctx context.Context, c client.Client, pb *policiesv1.PlacementBinding,
) ([]string, error) {
) (clusterv1alpha1.RolloutResult, error) {
rolloutResult := clusterv1alpha1.RolloutResult{}

// If the PlacementRef is invalid, log and return. (This is not recoverable.)
if !HasValidPlacementRef(pb) {
log.Info(fmt.Sprintf("PlacementBinding %s/%s placementRef is not valid. Ignoring.", pb.Namespace, pb.Name))
@@ -230,46 +288,57 @@ func GetDecisions(

err := c.Get(ctx, refNN, pl)
if err != nil && !k8serrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get Placement '%v': %w", pb.PlacementRef.Name, err)
return rolloutResult, fmt.Errorf("failed to get Placement '%v': %w", pb.PlacementRef.Name, err)
}

if k8serrors.IsNotFound(err) {
return nil, nil
return rolloutResult, nil
}

list := &clusterv1beta1.PlacementDecisionList{}
lopts := &client.ListOptions{Namespace: pb.GetNamespace()}
rolloutHandler, err := GetRolloutHandler(c, pl, log)
if err != nil {
log.Error(err, "Failed to instantiate the cluster rollout handler")

opts := client.MatchingLabels{"cluster.open-cluster-management.io/placement": pl.GetName()}
opts.ApplyToList(lopts)
return rolloutResult, err
}

err = c.List(ctx, list, lopts)
if err != nil && !k8serrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to list the PlacementDecisions for '%v', %w", pb.PlacementRef.Name, err)
// Set the RolloutStrategy to "All"
// (this will be configurable in the future)
defaultRolloutStrategy := clusterv1alpha1.RolloutStrategy{
Type: clusterv1alpha1.All,
}

for _, item := range list.Items {
for _, cluster := range item.Status.Decisions {
clusterDecisions = append(clusterDecisions, cluster.ClusterName)
}
strategy, rolloutResult, err := rolloutHandler.GetRolloutCluster(
defaultRolloutStrategy, GetClusterRolloutStatus)
if err != nil {
log.Error(err, "Failed to retrieve clusters from rollout handler")

return clusterv1alpha1.RolloutResult{}, err
}

return clusterDecisions, nil
log.V(1).Info(fmt.Sprintf("Rolling out policies using rollout strategy %s", strategy.Type))

return rolloutResult, nil
case "PlacementRule":
plr := &appsv1.PlacementRule{}
if err := c.Get(ctx, refNN, plr); err != nil && !k8serrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to get PlacementRule '%v': %w", pb.PlacementRef.Name, err)
return rolloutResult, fmt.Errorf("failed to get PlacementRule '%v': %w", pb.PlacementRef.Name, err)
}

rolloutResult.ClustersToRollout = map[string]clusterv1alpha1.ClusterRolloutStatus{}
for _, decision := range plr.Status.Decisions {
rolloutResult.ClustersToRollout[decision.ClusterName] = GetClusterRolloutStatus(decision.ClusterName)
}

for _, cluster := range plr.Status.Decisions {
clusterDecisions = append(clusterDecisions, cluster.ClusterName)
}

// if the PlacementRule was not found, the decisions will be empty
return clusterDecisions, nil
return rolloutResult, nil
}

return nil, fmt.Errorf("placement binding %s/%s reference is not valid", pb.Namespace, pb.Name)
return rolloutResult, fmt.Errorf("placement binding %s/%s reference is not valid", pb.Namespace, pb.Name)
}

func ParseRootPolicyLabel(rootPlc string) (name, namespace string, err error) {
@@ -302,17 +371,17 @@ func FullNameForPolicy(plc *policiesv1.Policy) string {
func GetRepPoliciesInPlacementBinding(
ctx context.Context, c client.Client, pb *policiesv1.PlacementBinding,
) []reconcile.Request {
decisions, err := GetDecisions(ctx, c, pb)
rolloutResult, err := GetRolloutClusters(ctx, c, pb)
if err != nil {
return []reconcile.Request{}
}
// Use this for removing duplicated policies
rootPolicyRequest := GetPoliciesInPlacementBinding(ctx, c, pb)

result := make([]reconcile.Request, 0, len(rootPolicyRequest)*len(decisions))
result := make([]reconcile.Request, 0, len(rootPolicyRequest)*len(rolloutResult.ClustersToRollout))

for _, rp := range rootPolicyRequest {
for _, clusterName := range decisions {
for clusterName := range rolloutResult.ClustersToRollout {
result = append(result, reconcile.Request{NamespacedName: types.NamespacedName{
Name: rp.Namespace + "." + rp.Name,
Namespace: clusterName,
45 changes: 24 additions & 21 deletions controllers/common/common_status_update.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (

k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -22,7 +23,7 @@ import (
func RootStatusUpdate(ctx context.Context, c client.Client, rootPolicy *policiesv1.Policy) (DecisionSet, error) {
placements, decisions, err := GetClusterDecisions(ctx, c, rootPolicy)
if err != nil {
log.Info("Failed to get any placement decisions. Giving up on the request.")
log.Info("Failed to get any clusters from the placement. Giving up on the request.")

return nil, err
}
@@ -63,12 +64,13 @@ func RootStatusUpdate(ctx context.Context, c client.Client, rootPolicy *policies
return decisions, nil
}

// GetPolicyPlacementDecisions retrieves the placement decisions for a input PlacementBinding when
// the policy is bound within it. It can return an error if the PlacementBinding is invalid, or if
// a required lookup fails.
func GetPolicyPlacementDecisions(ctx context.Context, c client.Client,
// GetPolicyRolloutClusters retrieves the clusters for the current rollout (i.e. the rollout result)
// for a input PlacementBinding when the policy is bound within it. It can return an error if the
// PlacementBinding is invalid, or if a required lookup fails.
func GetPolicyRolloutClusters(ctx context.Context, c client.Client,
instance *policiesv1.Policy, pb *policiesv1.PlacementBinding,
) (clusterDecisions []string, placements []*policiesv1.Placement, err error) {
) (rolloutResult clusterv1alpha1.RolloutResult, placements []*policiesv1.Placement, err error) {
rolloutResult = clusterv1alpha1.RolloutResult{}
policySubjectFound := false
policySetSubjects := make(map[string]struct{}) // a set, to prevent duplicates

@@ -102,7 +104,7 @@ func GetPolicyPlacementDecisions(ctx context.Context, c client.Client,

if len(placements) == 0 {
// None of the subjects in the PlacementBinding were relevant to this Policy.
return nil, nil, nil
return rolloutResult, nil, nil
}

// If the PlacementRef is invalid, log and return. (This is not recoverable.)
@@ -122,7 +124,8 @@ func GetPolicyPlacementDecisions(ctx context.Context, c client.Client,
case "PlacementRule":
plr := &appsv1.PlacementRule{}
if err := c.Get(ctx, refNN, plr); err != nil && !k8serrors.IsNotFound(err) {
return nil, nil, fmt.Errorf("failed to check for PlacementRule '%v': %w", pb.PlacementRef.Name, err)
return rolloutResult, nil,
fmt.Errorf("failed to check for PlacementRule '%v': %w", pb.PlacementRef.Name, err)
}

for i := range placements {
@@ -131,7 +134,7 @@ func GetPolicyPlacementDecisions(ctx context.Context, c client.Client,
case "Placement":
pl := &clusterv1beta1.Placement{}
if err := c.Get(ctx, refNN, pl); err != nil && !k8serrors.IsNotFound(err) {
return nil, nil, fmt.Errorf("failed to check for Placement '%v': %w", pb.PlacementRef.Name, err)
return rolloutResult, nil, fmt.Errorf("failed to check for Placement '%v': %w", pb.PlacementRef.Name, err)
}

for i := range placements {
@@ -141,17 +144,17 @@ func GetPolicyPlacementDecisions(ctx context.Context, c client.Client,

// If there are no placements, then the PlacementBinding is not for this Policy.
if len(placements) == 0 {
return nil, nil, nil
return rolloutResult, nil, nil
}

// If the policy is disabled, don't return any decisions, so that the policy isn't put on any clusters
if instance.Spec.Disabled {
return nil, placements, nil
return rolloutResult, placements, nil
}

clusterDecisions, err = GetDecisions(ctx, c, pb)
rolloutResult, err = GetRolloutClusters(ctx, c, pb)

return clusterDecisions, placements, err
return rolloutResult, placements, err
}

type DecisionSet map[string]bool
@@ -188,18 +191,18 @@ func GetClusterDecisions(
continue
}

plcDecisions, plcPlacements, err := GetPolicyPlacementDecisions(ctx, c, rootPolicy, &pbList.Items[i])
plcRolloutClusters, plcPlacements, err := GetPolicyRolloutClusters(ctx, c, rootPolicy, &pbList.Items[i])
if err != nil {
return nil, nil, err
}

if len(plcDecisions) == 0 {
log.Info("No placement decisions to process for this policy from this non-restricted binding",
if len(plcRolloutClusters.ClustersToRollout) == 0 {
log.Info("No clusters to process from the placement for this policy from this non-restricted binding",
"policyName", rootPolicy.GetName(), "bindingName", pb.GetName())
}

// Decisions are all unique
for _, clusterName := range plcDecisions {
for clusterName := range plcRolloutClusters.ClustersToRollout {
decisions[clusterName] = true
}

@@ -214,18 +217,18 @@ func GetClusterDecisions(

foundInDecisions := false

plcDecisions, plcPlacements, err := GetPolicyPlacementDecisions(ctx, c, rootPolicy, &pbList.Items[i])
plcRolloutClusters, plcPlacements, err := GetPolicyRolloutClusters(ctx, c, rootPolicy, &pbList.Items[i])
if err != nil {
return nil, nil, err
}

if len(plcDecisions) == 0 {
log.Info("No placement decisions to process for this policy from this restricted binding",
if len(plcRolloutClusters.ClustersToRollout) == 0 {
log.Info("No clusters to process from the placement for this policy from this restricted binding",
"policyName", rootPolicy.GetName(), "bindingName", pb.GetName())
}

// Decisions are all unique
for _, clusterName := range plcDecisions {
for clusterName := range plcRolloutClusters.ClustersToRollout {
if _, ok := decisions[clusterName]; ok {
foundInDecisions = true
}
9 changes: 6 additions & 3 deletions controllers/policyset/policyset_controller.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1"
ctrl "sigs.k8s.io/controller-runtime"
@@ -189,13 +190,15 @@ func (r *PolicySetReconciler) processPolicySet(ctx context.Context, plcSet *poli
log.V(1).Info("Error getting placement binding " + pbName)
}

var clusterDecisions []string
clusterDecisions, err = common.GetDecisions(ctx, r.Client, pb)
var rolloutResult clusterv1alpha1.RolloutResult
rolloutResult, err = common.GetRolloutClusters(ctx, r.Client, pb)
if err != nil {
log.Error(err, "Error getting placement decisions for binding "+pbName)
}

clusters = append(clusters, clusterDecisions...)
for clusterName := range rolloutResult.ClustersToRollout {
clusters = append(clusters, clusterName)
}
}

// aggregate compliance state
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ go 1.21

require (
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-logr/logr v1.2.4
github.com/go-logr/zapr v1.2.4
github.com/golang-migrate/migrate/v4 v4.16.2
github.com/google/go-cmp v0.6.0
@@ -38,7 +39,6 @@ require (
github.com/evanphx/json-patch v5.7.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.7.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect