diff --git a/internal/controller/drplacementcontrol.go b/internal/controller/drplacementcontrol.go index 1b60b9494..ef8f3a1d3 100644 --- a/internal/controller/drplacementcontrol.go +++ b/internal/controller/drplacementcontrol.go @@ -82,7 +82,7 @@ func (d *DRPCInstance) startProcessing() bool { done, processingErr := d.processPlacement() if d.shouldUpdateStatus() || d.statusUpdateTimeElapsed() { - if err := d.reconciler.updateDRPCStatus(d.ctx, d.instance, d.userPlacement, d.log); err != nil { + if err := d.reconciler.updateDRPCStatus(d.ctx, d.instance, d.userPlacement, d.log, d.vrgs); err != nil { errMsg := fmt.Sprintf("error from update DRPC status: %v", err) if processingErr != nil { errMsg += fmt.Sprintf(", error from process placement: %v", processingErr) diff --git a/internal/controller/drplacementcontrol_controller.go b/internal/controller/drplacementcontrol_controller.go index 44ba0af1f..003e5fbf9 100644 --- a/internal/controller/drplacementcontrol_controller.go +++ b/internal/controller/drplacementcontrol_controller.go @@ -219,12 +219,12 @@ func (r *DRPlacementControlReconciler) Reconcile(ctx context.Context, req ctrl.R } if requeue { - return ctrl.Result{Requeue: true}, r.updateDRPCStatus(ctx, drpc, placementObj, logger) + return ctrl.Result{Requeue: true}, r.updateDRPCStatus(ctx, drpc, placementObj, logger, nil) } d, err := r.createDRPCInstance(ctx, drPolicy, drpc, placementObj, ramenConfig, logger) if err != nil && !errors.Is(err, ErrInitialWaitTimeForDRPCPlacementRule) { - err2 := r.updateDRPCStatus(ctx, drpc, placementObj, logger) + err2 := r.updateDRPCStatus(ctx, drpc, placementObj, logger, nil) return ctrl.Result{}, fmt.Errorf("failed to create DRPC instance (%w) and (%v)", err, err2) } @@ -263,7 +263,7 @@ func (r *DRPlacementControlReconciler) recordFailure(ctx context.Context, drpc * needsUpdate := addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionAvailable, drpc.Generation, metav1.ConditionFalse, reason, msg) if needsUpdate { - err := r.updateDRPCStatus(ctx, drpc, placementObj, log) + err := r.updateDRPCStatus(ctx, drpc, placementObj, log, nil) if err != nil { log.Info(fmt.Sprintf("Failed to update DRPC status (%v)", err)) } @@ -1273,11 +1273,12 @@ func (r *DRPlacementControlReconciler) getStatusCheckDelay( // //nolint:cyclop func (r *DRPlacementControlReconciler) updateDRPCStatus( - ctx context.Context, drpc *rmn.DRPlacementControl, userPlacement client.Object, log logr.Logger, + ctx context.Context, drpc *rmn.DRPlacementControl, + userPlacement client.Object, log logr.Logger, vrgs map[string]*rmn.VolumeReplicationGroup, ) error { log.Info("Updating DRPC status") - r.updateResourceCondition(ctx, drpc, userPlacement, log) + r.updateResourceCondition(ctx, drpc, userPlacement, log, vrgs) // set metrics if DRPC is not being deleted and if finalizer exists if !isBeingDeleted(drpc, userPlacement) && controllerutil.ContainsFinalizer(drpc, DRPCFinalizer) { @@ -1319,7 +1320,8 @@ func (r *DRPlacementControlReconciler) updateDRPCStatus( // //nolint:funlen,cyclop func (r *DRPlacementControlReconciler) updateResourceCondition( - ctx context.Context, drpc *rmn.DRPlacementControl, userPlacement client.Object, log logr.Logger, + ctx context.Context, drpc *rmn.DRPlacementControl, userPlacement client.Object, + log logr.Logger, vrgs map[string]*rmn.VolumeReplicationGroup, ) { vrgNamespace, err := selectVRGNamespace(r.Client, log, drpc, userPlacement) if err != nil { @@ -1336,65 +1338,147 @@ func (r *DRPlacementControlReconciler) updateResourceCondition( return } - annotations := make(map[string]string) - annotations[DRPCNameAnnotation] = drpc.Name - annotations[DRPCNamespaceAnnotation] = drpc.Namespace + // Retrieve VRG either from argument or fetch from managed cluster/S3 store + vrg := r.getVRG(ctx, drpc, vrgNamespace, clusterName, vrgs, log) + if vrg == nil { + log.Info("No valid VRG found, skipping update") + + return + } + + // Update DRPC with VRG details + drpc.Status.ResourceConditions.ResourceMeta = rmn.VRGResourceMeta{ + Kind: vrg.Kind, + Name: vrg.Name, + Namespace: vrg.Namespace, + Generation: vrg.Generation, + ResourceVersion: vrg.ResourceVersion, + ProtectedPVCs: extractProtectedPVCNames(vrg), + } + + drpc.Status.ResourceConditions.Conditions = assignConditionsWithConflictCheck( + vrgs, vrg, VRGConditionTypeNoClusterDataConflict) + + if rmnutil.IsCGEnabled(vrg.GetAnnotations()) { + drpc.Status.ResourceConditions.ResourceMeta.PVCGroups = vrg.Status.PVCGroups + } + + if vrg.Status.LastGroupSyncTime != nil || drpc.Spec.Action != rmn.ActionRelocate { + drpc.Status.LastGroupSyncTime = vrg.Status.LastGroupSyncTime + drpc.Status.LastGroupSyncDuration = vrg.Status.LastGroupSyncDuration + drpc.Status.LastGroupSyncBytes = vrg.Status.LastGroupSyncBytes + } + + if vrg.Status.KubeObjectProtection.CaptureToRecoverFrom != nil { + drpc.Status.LastKubeObjectProtectionTime = &vrg.Status.KubeObjectProtection.CaptureToRecoverFrom.EndTime + } + + updateDRPCProtectedCondition(drpc, vrg, clusterName) +} + +// getVRG retrieves a VRG either from the provided map or fetches it from the managed cluster/S3 store. +func (r *DRPlacementControlReconciler) getVRG( + ctx context.Context, drpc *rmn.DRPlacementControl, vrgNamespace, clusterName string, + vrgs map[string]*rmn.VolumeReplicationGroup, log logr.Logger, +) *rmn.VolumeReplicationGroup { + // Use provided VRG map if available + if vrgs != nil { + if vrg, exists := vrgs[clusterName]; exists { + return vrg + } + + log.Info("VRG not found in provided VRG map, trying to fetch from cluster", "drpcName", drpc.Name) + } + + // Fetch VRG from managed cluster + annotations := map[string]string{ + DRPCNameAnnotation: drpc.Name, + DRPCNamespaceAnnotation: drpc.Namespace, + } - vrg, err := r.MCVGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, - clusterName, annotations) + vrg, err := r.MCVGetter.GetVRGFromManagedCluster(drpc.Name, vrgNamespace, clusterName, annotations) if err != nil { - log.Info("Failed to get VRG from managed cluster. Trying s3 store...", "errMsg", err.Error()) + log.Info("Failed to get VRG from managed cluster. Trying S3 store...", "errMsg", err.Error()) - // The VRG from the s3 store might be stale, however, the worst case should be at most around 1 minute. vrg = GetLastKnownVRGPrimaryFromS3(ctx, r.APIReader, GetAvailableS3Profiles(ctx, r.Client, drpc, log), drpc.GetName(), vrgNamespace, r.ObjStoreGetter, log) + if vrg == nil { - log.Info("Failed to get VRG from s3 store") + log.Info("Failed to get VRG from S3 store") drpc.Status.ResourceConditions = rmn.VRGConditions{} updateProtectedConditionUnknown(drpc, clusterName) - return + return nil } if vrg.ResourceVersion < drpc.Status.ResourceConditions.ResourceMeta.ResourceVersion { log.Info("VRG resourceVersion is lower than the previously recorded VRG's resourceVersion in DRPC") - // if the VRG resourceVersion is less, then leave the DRPC ResourceConditions.ResourceMeta.ResourceVersion as is. - return + + return nil } } - drpc.Status.ResourceConditions.ResourceMeta.Kind = vrg.Kind - drpc.Status.ResourceConditions.ResourceMeta.Name = vrg.Name - drpc.Status.ResourceConditions.ResourceMeta.Namespace = vrg.Namespace - drpc.Status.ResourceConditions.ResourceMeta.Generation = vrg.Generation - drpc.Status.ResourceConditions.ResourceMeta.ResourceVersion = vrg.ResourceVersion - drpc.Status.ResourceConditions.Conditions = vrg.Status.Conditions + return vrg +} +// extractProtectedPVCNames extracts protected PVC names from a VRG +func extractProtectedPVCNames(vrg *rmn.VolumeReplicationGroup) []string { protectedPVCs := []string{} for _, protectedPVC := range vrg.Status.ProtectedPVCs { protectedPVCs = append(protectedPVCs, protectedPVC.Name) } - drpc.Status.ResourceConditions.ResourceMeta.ProtectedPVCs = protectedPVCs + return protectedPVCs +} - if rmnutil.IsCGEnabled(vrg.GetAnnotations()) { - drpc.Status.ResourceConditions.ResourceMeta.PVCGroups = vrg.Status.PVCGroups - } +// findConflictCondition selects the appropriate condition from VRGs based on the conflict type. +func findConflictCondition(vrgs map[string]*rmn.VolumeReplicationGroup, conflictType string) *metav1.Condition { + var selectedCondition *metav1.Condition - if vrg.Status.LastGroupSyncTime != nil || drpc.Spec.Action != rmn.ActionRelocate { - drpc.Status.LastGroupSyncTime = vrg.Status.LastGroupSyncTime - drpc.Status.LastGroupSyncDuration = vrg.Status.LastGroupSyncDuration - drpc.Status.LastGroupSyncBytes = vrg.Status.LastGroupSyncBytes + for _, vrg := range vrgs { + for i, condition := range vrg.Status.Conditions { + if condition.Type == conflictType && condition.Status == metav1.ConditionFalse { + // Prioritize primary VRG's condition if available + if isVRGPrimary(vrg) { + return &vrg.Status.Conditions[i] // Exit early if primary VRG condition is found + } + + // Assign the first non-primary VRG's condition if no primary found yet + if selectedCondition == nil { + selectedCondition = &vrg.Status.Conditions[i] + } + } + } } - if vrg.Status.KubeObjectProtection.CaptureToRecoverFrom != nil { - drpc.Status.LastKubeObjectProtectionTime = &vrg.Status.KubeObjectProtection.CaptureToRecoverFrom.EndTime + return selectedCondition +} + +// assignConditionsWithConflictCheck assigns conditions from a given VRG while prioritizing conflict conditions. +func assignConditionsWithConflictCheck(vrgs map[string]*rmn.VolumeReplicationGroup, + vrg *rmn.VolumeReplicationGroup, conflictType string, +) []metav1.Condition { + conditions := vrg.Status.Conditions + conflictCondition := findConflictCondition(vrgs, conflictType) + + // Ensure the conflict condition is present in the conditions list + if conflictCondition != nil { + for i, condition := range conditions { + if condition.Type == conflictType { + conditions[i] = *conflictCondition + + return conditions + } + } + + // If not found, append it + conditions = append(conditions, *conflictCondition) } - updateDRPCProtectedCondition(drpc, vrg, clusterName) + return conditions } // clusterForVRGStatus determines which cluster's VRG should be inspected for status updates to DRPC