Skip to content

Generic metrics and alerts for the cluster data conflict(PVC/VM) - cherrypick PR#1974 #2020

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (d *DRPCInstance) startProcessing() bool {
done, processingErr := d.processPlacement()

if d.shouldUpdateStatus() || d.statusUpdateTimeElapsed() {
if err := d.reconciler.updateDRPCStatus(d.ctx, d.instance, d.userPlacement, d.log); err != nil {
if err := d.reconciler.updateDRPCStatus(d.ctx, d.instance, d.userPlacement, d.log, d.vrgs); err != nil {
errMsg := fmt.Sprintf("error from update DRPC status: %v", err)
if processingErr != nil {
errMsg += fmt.Sprintf(", error from process placement: %v", processingErr)
Expand Down
152 changes: 118 additions & 34 deletions internal/controller/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ func (r *DRPlacementControlReconciler) Reconcile(ctx context.Context, req ctrl.R
}

if requeue {
return ctrl.Result{Requeue: true}, r.updateDRPCStatus(ctx, drpc, placementObj, logger)
return ctrl.Result{Requeue: true}, r.updateDRPCStatus(ctx, drpc, placementObj, logger, nil)
}

d, err := r.createDRPCInstance(ctx, drPolicy, drpc, placementObj, ramenConfig, logger)
if err != nil && !errors.Is(err, ErrInitialWaitTimeForDRPCPlacementRule) {
err2 := r.updateDRPCStatus(ctx, drpc, placementObj, logger)
err2 := r.updateDRPCStatus(ctx, drpc, placementObj, logger, nil)

return ctrl.Result{}, fmt.Errorf("failed to create DRPC instance (%w) and (%v)", err, err2)
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func (r *DRPlacementControlReconciler) recordFailure(ctx context.Context, drpc *
needsUpdate := addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionAvailable,
drpc.Generation, metav1.ConditionFalse, reason, msg)
if needsUpdate {
err := r.updateDRPCStatus(ctx, drpc, placementObj, log)
err := r.updateDRPCStatus(ctx, drpc, placementObj, log, nil)
if err != nil {
log.Info(fmt.Sprintf("Failed to update DRPC status (%v)", err))
}
Expand Down Expand Up @@ -1273,11 +1273,12 @@ func (r *DRPlacementControlReconciler) getStatusCheckDelay(
//
//nolint:cyclop
func (r *DRPlacementControlReconciler) updateDRPCStatus(
ctx context.Context, drpc *rmn.DRPlacementControl, userPlacement client.Object, log logr.Logger,
ctx context.Context, drpc *rmn.DRPlacementControl,
userPlacement client.Object, log logr.Logger, vrgs map[string]*rmn.VolumeReplicationGroup,
) error {
log.Info("Updating DRPC status")

r.updateResourceCondition(ctx, drpc, userPlacement, log)
r.updateResourceCondition(ctx, drpc, userPlacement, log, vrgs)

// set metrics if DRPC is not being deleted and if finalizer exists
if !isBeingDeleted(drpc, userPlacement) && controllerutil.ContainsFinalizer(drpc, DRPCFinalizer) {
Expand Down Expand Up @@ -1319,7 +1320,8 @@ func (r *DRPlacementControlReconciler) updateDRPCStatus(
//
//nolint:funlen,cyclop
func (r *DRPlacementControlReconciler) updateResourceCondition(
ctx context.Context, drpc *rmn.DRPlacementControl, userPlacement client.Object, log logr.Logger,
ctx context.Context, drpc *rmn.DRPlacementControl, userPlacement client.Object,
log logr.Logger, vrgs map[string]*rmn.VolumeReplicationGroup,
) {
vrgNamespace, err := selectVRGNamespace(r.Client, log, drpc, userPlacement)
if err != nil {
Expand All @@ -1336,65 +1338,147 @@ func (r *DRPlacementControlReconciler) updateResourceCondition(
return
}

annotations := make(map[string]string)
annotations[DRPCNameAnnotation] = drpc.Name
annotations[DRPCNamespaceAnnotation] = drpc.Namespace
// Retrieve VRG either from argument or fetch from managed cluster/S3 store
vrg := r.getVRG(ctx, drpc, vrgNamespace, clusterName, vrgs, log)
if vrg == nil {
log.Info("No valid VRG found, skipping update")

return
}

// Update DRPC with VRG details
drpc.Status.ResourceConditions.ResourceMeta = rmn.VRGResourceMeta{
Kind: vrg.Kind,
Name: vrg.Name,
Namespace: vrg.Namespace,
Generation: vrg.Generation,
ResourceVersion: vrg.ResourceVersion,
ProtectedPVCs: extractProtectedPVCNames(vrg),
}

drpc.Status.ResourceConditions.Conditions = assignConditionsWithConflictCheck(
vrgs, vrg, VRGConditionTypeNoClusterDataConflict)

if rmnutil.IsCGEnabled(vrg.GetAnnotations()) {
drpc.Status.ResourceConditions.ResourceMeta.PVCGroups = vrg.Status.PVCGroups
}

if vrg.Status.LastGroupSyncTime != nil || drpc.Spec.Action != rmn.ActionRelocate {
drpc.Status.LastGroupSyncTime = vrg.Status.LastGroupSyncTime
drpc.Status.LastGroupSyncDuration = vrg.Status.LastGroupSyncDuration
drpc.Status.LastGroupSyncBytes = vrg.Status.LastGroupSyncBytes
}

if vrg.Status.KubeObjectProtection.CaptureToRecoverFrom != nil {
drpc.Status.LastKubeObjectProtectionTime = &vrg.Status.KubeObjectProtection.CaptureToRecoverFrom.EndTime
}

updateDRPCProtectedCondition(drpc, vrg, clusterName)
}

// getVRG retrieves a VRG either from the provided map or fetches it from the managed cluster/S3 store.
func (r *DRPlacementControlReconciler) getVRG(
ctx context.Context, drpc *rmn.DRPlacementControl, vrgNamespace, clusterName string,
vrgs map[string]*rmn.VolumeReplicationGroup, log logr.Logger,
) *rmn.VolumeReplicationGroup {
// Use provided VRG map if available
if vrgs != nil {
if vrg, exists := vrgs[clusterName]; exists {
return vrg
}

log.Info("VRG not found in provided VRG map, trying to fetch from cluster", "drpcName", drpc.Name)
}

// Fetch VRG from managed cluster
annotations := map[string]string{
DRPCNameAnnotation: drpc.Name,
DRPCNamespaceAnnotation: drpc.Namespace,
}

vrg, err := r.MCVGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace,
clusterName, annotations)
vrg, err := r.MCVGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, clusterName, annotations)
if err != nil {
log.Info("Failed to get VRG from managed cluster. Trying s3 store...", "errMsg", err.Error())
log.Info("Failed to get VRG from managed cluster. Trying S3 store...", "errMsg", err.Error())

// The VRG from the s3 store might be stale, however, the worst case should be at most around 1 minute.
vrg = GetLastKnownVRGPrimaryFromS3(ctx, r.APIReader,
GetAvailableS3Profiles(ctx, r.Client, drpc, log),
drpc.GetName(), vrgNamespace, r.ObjStoreGetter, log)

if vrg == nil {
log.Info("Failed to get VRG from s3 store")
log.Info("Failed to get VRG from S3 store")

drpc.Status.ResourceConditions = rmn.VRGConditions{}

updateProtectedConditionUnknown(drpc, clusterName)

return
return nil
}

if vrg.ResourceVersion < drpc.Status.ResourceConditions.ResourceMeta.ResourceVersion {
log.Info("VRG resourceVersion is lower than the previously recorded VRG's resourceVersion in DRPC")
// if the VRG resourceVersion is less, then leave the DRPC ResourceConditions.ResourceMeta.ResourceVersion as is.
return

return nil
}
}

drpc.Status.ResourceConditions.ResourceMeta.Kind = vrg.Kind
drpc.Status.ResourceConditions.ResourceMeta.Name = vrg.Name
drpc.Status.ResourceConditions.ResourceMeta.Namespace = vrg.Namespace
drpc.Status.ResourceConditions.ResourceMeta.Generation = vrg.Generation
drpc.Status.ResourceConditions.ResourceMeta.ResourceVersion = vrg.ResourceVersion
drpc.Status.ResourceConditions.Conditions = vrg.Status.Conditions
return vrg
}

// extractProtectedPVCNames extracts protected PVC names from a VRG
func extractProtectedPVCNames(vrg *rmn.VolumeReplicationGroup) []string {
protectedPVCs := []string{}
for _, protectedPVC := range vrg.Status.ProtectedPVCs {
protectedPVCs = append(protectedPVCs, protectedPVC.Name)
}

drpc.Status.ResourceConditions.ResourceMeta.ProtectedPVCs = protectedPVCs
return protectedPVCs
}

if rmnutil.IsCGEnabled(vrg.GetAnnotations()) {
drpc.Status.ResourceConditions.ResourceMeta.PVCGroups = vrg.Status.PVCGroups
}
// findConflictCondition selects the appropriate condition from VRGs based on the conflict type.
func findConflictCondition(vrgs map[string]*rmn.VolumeReplicationGroup, conflictType string) *metav1.Condition {
var selectedCondition *metav1.Condition

if vrg.Status.LastGroupSyncTime != nil || drpc.Spec.Action != rmn.ActionRelocate {
drpc.Status.LastGroupSyncTime = vrg.Status.LastGroupSyncTime
drpc.Status.LastGroupSyncDuration = vrg.Status.LastGroupSyncDuration
drpc.Status.LastGroupSyncBytes = vrg.Status.LastGroupSyncBytes
for _, vrg := range vrgs {
for i, condition := range vrg.Status.Conditions {
if condition.Type == conflictType && condition.Status == metav1.ConditionFalse {
// Prioritize primary VRG's condition if available
if isVRGPrimary(vrg) {
return &vrg.Status.Conditions[i] // Exit early if primary VRG condition is found
}

// Assign the first non-primary VRG's condition if no primary found yet
if selectedCondition == nil {
selectedCondition = &vrg.Status.Conditions[i]
}
}
}
}

if vrg.Status.KubeObjectProtection.CaptureToRecoverFrom != nil {
drpc.Status.LastKubeObjectProtectionTime = &vrg.Status.KubeObjectProtection.CaptureToRecoverFrom.EndTime
return selectedCondition
}

// assignConditionsWithConflictCheck assigns conditions from a given VRG while prioritizing conflict conditions.
func assignConditionsWithConflictCheck(vrgs map[string]*rmn.VolumeReplicationGroup,
vrg *rmn.VolumeReplicationGroup, conflictType string,
) []metav1.Condition {
conditions := vrg.Status.Conditions
conflictCondition := findConflictCondition(vrgs, conflictType)

// Ensure the conflict condition is present in the conditions list
if conflictCondition != nil {
for i, condition := range conditions {
if condition.Type == conflictType {
conditions[i] = *conflictCondition

return conditions
}
}

// If not found, append it
conditions = append(conditions, *conflictCondition)
}

updateDRPCProtectedCondition(drpc, vrg, clusterName)
return conditions
}

// clusterForVRGStatus determines which cluster's VRG should be inspected for status updates to DRPC
Expand Down