diff --git a/pkg/apply/task/inv_add_task.go b/pkg/apply/task/inv_add_task.go index 721ac878..0e121346 100644 --- a/pkg/apply/task/inv_add_task.go +++ b/pkg/apply/task/inv_add_task.go @@ -18,6 +18,7 @@ import ( "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/common" "sigs.k8s.io/cli-utils/pkg/inventory" + "sigs.k8s.io/cli-utils/pkg/inventory2" "sigs.k8s.io/cli-utils/pkg/object" ) @@ -30,12 +31,13 @@ var ( // before the actual object is applied. type InvAddTask struct { TaskName string - InvClient inventory.Client + InvClient inventory2.Client DynamicClient dynamic.Interface Mapper meta.RESTMapper InvInfo inventory.Info Objects object.UnstructuredSet DryRun common.DryRunStrategy + StatusPolicy inventory.StatusPolicy } func (i *InvAddTask) Name() string { @@ -71,11 +73,43 @@ func (i *InvAddTask) Start(taskContext *taskrunner.TaskContext) { } klog.V(4).Infof("merging %d local objects into inventory", len(i.Objects)) currentObjs := object.UnstructuredSetToObjMetadataSet(i.Objects) - _, err := i.InvClient.Merge(i.InvInfo, currentObjs, i.DryRun) + err := i.extendInventory(currentObjs) i.sendTaskResult(taskContext, err) }() } +// extendInventory adds the specified objects to the inventory, if not already +// present. +func (i *InvAddTask) extendInventory(objs object.ObjMetadataSet) error { + if len(objs) == 0 { + return nil + } + id := inventory2.ID{ + Name: i.InvInfo.Name(), + Namespace: i.InvInfo.Namespace(), + } + inv, err := i.InvClient.Get(context.TODO(), id) + if err != nil { + return fmt.Errorf("getting inventory: %w") + } + + oldObjs := inventory.ObjMetadataSetFromObjectReferenceList(inv.Spec.Objects) + newObjs := oldObjs.Union(objs) + inv.Spec.Objects = inventory.ObjectReferenceListFromObjMetadataSet(newObjs) + + if err = i.InvClient.Update(context.TODO(), inv, i.updateOptionList()...); err != nil { + return fmt.Errorf("updating inventory: %w") + } + return nil +} + +func (i *InvAddTask) updateOptionList() []inventory2.UpdateOption { + return []inventory2.UpdateOption{ + inventory2.WithDryRun(i.DryRun), + inventory2.WithStatus(i.StatusPolicy), + } +} + // Cancel is not supported by the InvAddTask. func (i *InvAddTask) Cancel(_ *taskrunner.TaskContext) {} diff --git a/pkg/apply/task/inv_set_task.go b/pkg/apply/task/inv_set_task.go index b039c4fe..d26835ae 100644 --- a/pkg/apply/task/inv_set_task.go +++ b/pkg/apply/task/inv_set_task.go @@ -4,12 +4,17 @@ package task import ( + "context" + "fmt" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog/v2" + "sigs.k8s.io/cli-utils/pkg/apis/actuation" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/apply/taskrunner" "sigs.k8s.io/cli-utils/pkg/common" "sigs.k8s.io/cli-utils/pkg/inventory" + "sigs.k8s.io/cli-utils/pkg/inventory2" "sigs.k8s.io/cli-utils/pkg/object" ) @@ -17,10 +22,11 @@ import ( // inventory references at the end of the apply/prune. type DeleteOrUpdateInvTask struct { TaskName string - InvClient inventory.Client + InvClient inventory2.Client InvInfo inventory.Info PrevInventory object.ObjMetadataSet DryRun common.DryRunStrategy + StatusPolicy inventory.StatusPolicy // if Destroy is set, the inventory will be deleted if all objects were successfully pruned Destroy bool } @@ -47,12 +53,14 @@ func (i *DeleteOrUpdateInvTask) Identifiers() object.ObjMetadataSet { // If Destroy is false, the inventory will be updated. func (i *DeleteOrUpdateInvTask) Start(taskContext *taskrunner.TaskContext) { go func() { + klog.V(2).Infof("inventory set task starting (name: %q)", i.TaskName) var err error if i.Destroy && i.destroySuccessful(taskContext) { err = i.deleteInventory() } else { err = i.updateInventory(taskContext) } + klog.V(2).Infof("inventory set task completing (name: %q)", i.TaskName) taskContext.TaskChannel() <- taskrunner.TaskResult{Err: err} }() } @@ -84,7 +92,16 @@ func (i *DeleteOrUpdateInvTask) StatusUpdate(_ *taskrunner.TaskContext, _ object // - Deleted resources (successful) // - Abandoned resources (successful) func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskContext) error { - klog.V(2).Infof("inventory set task starting (name: %q)", i.TaskName) + id := inventory2.ID{ + Name: i.InvInfo.Name(), + Namespace: i.InvInfo.Namespace(), + } + inv, err := i.InvClient.Get(context.TODO(), id) + if err != nil { + return fmt.Errorf("getting inventory: %w") + } + prevObjs := inventory.ObjMetadataSetFromObjectReferenceList(inv.Spec.Objects).Unique() + invObjs := object.ObjMetadataSet{} // TODO: Just use InventoryManager.Store() @@ -100,7 +117,7 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont // This will remove new resources that failed to apply from the inventory, // because even tho they were added by InvAddTask, the PrevInventory // represents the inventory before the pipeline has run. - applyFailures := i.PrevInventory.Intersection(im.FailedApplies()) + applyFailures := prevObjs.Intersection(im.FailedApplies()) klog.V(4).Infof("keep in inventory %d failed applies", len(applyFailures)) invObjs = invObjs.Union(applyFailures) @@ -109,7 +126,7 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont // It's likely that all the skipped applies are already in the inventory, // because the apply filters all currently depend on cluster state, // but we're doing the intersection anyway just to be sure. - applySkips := i.PrevInventory.Intersection(im.SkippedApplies()) + applySkips := prevObjs.Intersection(im.SkippedApplies()) klog.V(4).Infof("keep in inventory %d skipped applies", len(applySkips)) invObjs = invObjs.Union(applySkips) @@ -118,7 +135,7 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont // It's likely that all the delete failures are already in the inventory, // because the set of resources to prune comes from the inventory, // but we're doing the intersection anyway just to be sure. - pruneFailures := i.PrevInventory.Intersection(im.FailedDeletes()) + pruneFailures := prevObjs.Intersection(im.FailedDeletes()) klog.V(4).Infof("set inventory %d failed prunes", len(pruneFailures)) invObjs = invObjs.Union(pruneFailures) @@ -127,19 +144,19 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont // It's likely that all the skipped deletes are already in the inventory, // because the set of resources to prune comes from the inventory, // but we're doing the intersection anyway just to be sure. - pruneSkips := i.PrevInventory.Intersection(im.SkippedDeletes()) + pruneSkips := prevObjs.Intersection(im.SkippedDeletes()) klog.V(4).Infof("keep in inventory %d skipped prunes", len(pruneSkips)) invObjs = invObjs.Union(pruneSkips) // If an object failed to reconcile and was previously stored in the inventory, // then keep it in the inventory so it can be waited on next time. - reconcileFailures := i.PrevInventory.Intersection(im.FailedReconciles()) + reconcileFailures := prevObjs.Intersection(im.FailedReconciles()) klog.V(4).Infof("set inventory %d failed reconciles", len(reconcileFailures)) invObjs = invObjs.Union(reconcileFailures) // If an object timed out reconciling and was previously stored in the inventory, // then keep it in the inventory so it can be waited on next time. - reconcileTimeouts := i.PrevInventory.Intersection(im.TimeoutReconciles()) + reconcileTimeouts := prevObjs.Intersection(im.TimeoutReconciles()) klog.V(4).Infof("keep in inventory %d timeout reconciles", len(reconcileTimeouts)) invObjs = invObjs.Union(reconcileTimeouts) @@ -150,24 +167,44 @@ func (i *DeleteOrUpdateInvTask) updateInventory(taskContext *taskrunner.TaskCont // If an object is invalid and was previously stored in the inventory, // then keep it in the inventory so it can be applied/pruned next time. - invalidObjects := i.PrevInventory.Intersection(taskContext.InvalidObjects()) + invalidObjects := prevObjs.Intersection(taskContext.InvalidObjects()) klog.V(4).Infof("keep in inventory %d invalid objects", len(invalidObjects)) invObjs = invObjs.Union(invalidObjects) + // Update inventory spec in memory + inv.Spec.Objects = inventory.ObjectReferenceListFromObjMetadataSet(invObjs) + klog.V(4).Infof("get the apply status for %d objects", len(invObjs)) - objStatus := taskContext.InventoryManager().Inventory().Status.Objects + inv.Status.Objects = taskContext.InventoryManager().Inventory().Status.Objects klog.V(4).Infof("set inventory %d total objects", len(invObjs)) - err := i.InvClient.Replace(i.InvInfo, invObjs, objStatus, i.DryRun) + if err = i.InvClient.Update(context.TODO(), inv, i.updateOptionList()...); err != nil { + return fmt.Errorf("updating inventory: %w") + } + return nil +} - klog.V(2).Infof("inventory set task completing (name: %q)", i.TaskName) - return err +func (i *DeleteOrUpdateInvTask) updateOptionList() []inventory2.UpdateOption { + return []inventory2.UpdateOption{ + inventory2.WithDryRun(i.DryRun), + inventory2.WithStatus(i.StatusPolicy), + } +} + +func (i *DeleteOrUpdateInvTask) deleteOptionList() []inventory2.DeleteOption { + return []inventory2.DeleteOption{ + inventory2.WithDryRun(i.DryRun), + inventory2.WithStatus(i.StatusPolicy), + } } // deleteInventory deletes the inventory object from the cluster. func (i *DeleteOrUpdateInvTask) deleteInventory() error { klog.V(2).Infof("delete inventory task starting (name: %q)", i.Name()) - err := i.InvClient.DeleteInventoryObj(i.InvInfo, i.DryRun) + inv := &actuation.Inventory{} + inv.SetName(i.InvInfo.Name()) + inv.SetName(i.InvInfo.Namespace()) + err := i.InvClient.Delete(context.TODO(), inv, i.deleteOptionList()...) // Not found is not error, since this means it was already deleted. if apierrors.IsNotFound(err) { err = nil diff --git a/pkg/inventory/type-conv.go b/pkg/inventory/type-conv.go index ed33f153..f6da58d7 100644 --- a/pkg/inventory/type-conv.go +++ b/pkg/inventory/type-conv.go @@ -30,3 +30,19 @@ func ObjMetadataFromObjectReference(ref actuation.ObjectReference) object.ObjMet Namespace: ref.Namespace, } } + +func ObjectReferenceListFromObjMetadataSet(ids []object.ObjMetadata) []actuation.ObjectReference { + var refs []actuation.ObjectReference + for _, ref := range ids { + refs = append(refs, ObjectReferenceFromObjMetadata(ref)) + } + return refs +} + +func ObjMetadataSetFromObjectReferenceList(refs []actuation.ObjectReference) object.ObjMetadataSet { + var ids object.ObjMetadataSet + for _, ref := range refs { + ids = append(ids, ObjMetadataFromObjectReference(ref)) + } + return ids +} diff --git a/pkg/inventory2/client.go b/pkg/inventory2/client.go new file mode 100644 index 00000000..d43935de --- /dev/null +++ b/pkg/inventory2/client.go @@ -0,0 +1,116 @@ +package inventory2 + +import ( + "context" + + "sigs.k8s.io/cli-utils/pkg/apis/actuation" + "sigs.k8s.io/cli-utils/pkg/common" + "sigs.k8s.io/cli-utils/pkg/inventory" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ID client.ObjectKey + +type Client interface { + ReadClient + WriteClient +} + +type ReadClient interface { + Get(ctx context.Context, id ID, opts ...GetOption) (*actuation.Inventory, error) + List(ctx context.Context, opts ...ListOption) error +} + +type WriteClient interface { + Create(ctx context.Context, inv *actuation.Inventory, opts ...CreateOption) error + Update(ctx context.Context, inv *actuation.Inventory, opts ...UpdateOption) error + Delete(ctx context.Context, inv *actuation.Inventory, opts ...DeleteOption) error +} + +type CreateOption interface { + ApplyCreateOptions(opts *CreateOptions) +} + +type CreateOptions struct { + DryRunStrategy common.DryRunStrategy + StatusPolicy inventory.StatusPolicy +} + +type GetOption interface { + ApplyGetOptions(opts *GetOptions) +} + +type GetOptions struct { + ResourceVersion string + LabelSelector string +} + +type UpdateOption interface { + ApplyUpdateOptions(opts *UpdateOptions) +} + +type UpdateOptions struct { + DryRunStrategy common.DryRunStrategy + StatusPolicy inventory.StatusPolicy +} + +type DeleteOption interface { + ApplyDeleteOptions(opts *DeleteOptions) +} + +type DeleteOptions struct { + DryRunStrategy common.DryRunStrategy +} + +type ListOption interface { + ApplyListOptions(opts *ListOptions) +} + +type ListOptions struct { + ResourceVersion string + LabelSelector string +} + +func WithDryRun(strategy common.DryRunStrategy) DryRunOption { + return DryRunOption(strategy) +} + +type DryRunOption common.DryRunStrategy + +func (o DryRunOption) ApplyCreateOptions(opts *CreateOptions) { + opts.DryRunStrategy = common.DryRunStrategy(o) +} + +func (o DryRunOption) ApplyUpdateOptions(opts *UpdateOptions) { + opts.DryRunStrategy = common.DryRunStrategy(o) +} + +func (o DryRunOption) ApplyDeleteOptions(opts *DeleteOptions) { + opts.DryRunStrategy = common.DryRunStrategy(o) +} + +var _ CreateOption = DryRunOption(common.DryRunServer) +var _ UpdateOption = DryRunOption(common.DryRunServer) +var _ DeleteOption = DryRunOption(common.DryRunServer) + +func WithStatus(policy inventory.StatusPolicy) StatusOption { + return StatusOption(policy) +} + +type StatusOption common.DryRunStrategy + +func (o StatusOption) ApplyCreateOptions(opts *CreateOptions) { + opts.DryRunStrategy = common.DryRunStrategy(o) +} + +func (o StatusOption) ApplyUpdateOptions(opts *UpdateOptions) { + opts.DryRunStrategy = common.DryRunStrategy(o) +} + +func (o StatusOption) ApplyDeleteOptions(opts *DeleteOptions) { + opts.DryRunStrategy = common.DryRunStrategy(o) +} + +var _ CreateOption = StatusOption(inventory.StatusPolicyAll) +var _ UpdateOption = StatusOption(inventory.StatusPolicyAll) +var _ DeleteOption = StatusOption(inventory.StatusPolicyAll)