diff --git a/phase/upgrade_workers.go b/phase/upgrade_workers.go index c59df939..bebf5a08 100644 --- a/phase/upgrade_workers.go +++ b/phase/upgrade_workers.go @@ -77,22 +77,32 @@ func (p *UpgradeWorkers) CleanUp() { // Run the phase func (p *UpgradeWorkers) Run(ctx context.Context) error { - // Upgrade worker hosts parallelly in 10% chunks - concurrentUpgrades := int(math.Floor(float64(len(p.hosts)) * float64(p.Config.Spec.Options.Concurrency.WorkerDisruptionPercent/100))) - if concurrentUpgrades == 0 { - concurrentUpgrades = 1 - } - concurrentUpgrades = min(concurrentUpgrades, p.Config.Spec.Options.Concurrency.Limit) - - log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades) - return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades, - p.start, - p.cordonWorker, - p.drainWorker, - p.upgradeWorker, - p.uncordonWorker, - p.finish, - ) + // Upgrade worker hosts parallelly in 10% chunks + concurrentUpgrades := int(math.Floor(float64(len(p.hosts)) * float64(p.Config.Spec.Options.Concurrency.WorkerDisruptionPercent/100))) + if concurrentUpgrades == 0 { + concurrentUpgrades = 1 + } + concurrentUpgrades = min(concurrentUpgrades, p.Config.Spec.Options.Concurrency.Limit) + + // Wait once for kube-proxy to be at desired version across the cluster. + if !p.IsWet() { + p.DryMsg(p.leader, "wait for kube-proxy to be at the desired version (cluster-wide)") + } else if !NoWait { // honor --no-wait + log.Infof("waiting for kube-proxy cluster-wide roll-out") + if err := retry.AdaptiveTimeout(ctx, retry.DefaultTimeout, node.KubeProxyRolledOutFunc(p.leader)); err != nil { + return fmt.Errorf("kube-proxy did not reach the desired version: %w", err) + } + } + + log.Infof("Upgrading max %d workers in parallel", concurrentUpgrades) + return p.hosts.BatchedParallelEach(ctx, concurrentUpgrades, + p.start, + p.cordonWorker, + p.drainWorker, + p.upgradeWorker, + p.uncordonWorker, + p.finish, + ) } func (p *UpgradeWorkers) cordonWorker(_ context.Context, h *cluster.Host) error { diff --git a/phase/validate_facts.go b/phase/validate_facts.go index 6151ebf5..c919081f 100644 --- a/phase/validate_facts.go +++ b/phase/validate_facts.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster" + "github.com/k0sproject/version" log "github.com/sirupsen/logrus" ) @@ -28,6 +30,10 @@ func (p *ValidateFacts) Run(_ context.Context) error { return err } + if err := p.validateVersionSkew(); err != nil { + return err + } + return nil } @@ -70,3 +76,45 @@ func (p *ValidateFacts) validateDefaultVersion() error { return nil } + +func (p *ValidateFacts) validateVersionSkew() error { + return p.Config.Spec.Hosts.Filter(func(h *cluster.Host) bool { + return h.Metadata.NeedsUpgrade + }).Each(context.Background(), func(_ context.Context, h *cluster.Host) error { + log.Debugf("%s: validating k0s version skew", h) + delta := version.NewDelta(h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version) + log.Debugf("%s: version delta: %s", h, delta) + + var unacceptable bool + switch { + case delta.MinorUpgrade: + if h.IsController() { + if p.Config.Spec.K0s.Version.Segments()[1]-h.Metadata.K0sRunningVersion.Segments()[1] > 1 { + log.Debugf("%s: controller upgrade not within version skew policy", h) + unacceptable = true + } + } else if p.Config.Spec.K0s.Version.Segments()[1]-h.Metadata.K0sRunningVersion.Segments()[1] > 3 { + log.Debugf("%s: worker upgrade not within version skew policy", h) + unacceptable = true + } + + if !unacceptable { + log.Debugf("%s: minor upgrade within acceptable skew", h) + } + case delta.MajorUpgrade: + unacceptable = true + log.Warnf("%s: major upgrades are not supported, the operation will highly likely fail", h) + } + + if unacceptable { + if Force { + log.Warnf("upgrade from %s directly to %s is not within the version skew policy, allowing because --force given", h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version) + return nil + } + return fmt.Errorf("upgrade from %s directly to %s is not within the version skew policy, you can use --force to skip this check", h.Metadata.K0sRunningVersion, p.Config.Spec.K0s.Version) + } + + log.Debugf("%s: version skew check passed", h) + return nil + }) +} diff --git a/pkg/node/statusfunc.go b/pkg/node/statusfunc.go index 8db17102..9f2cc8c9 100644 --- a/pkg/node/statusfunc.go +++ b/pkg/node/statusfunc.go @@ -3,6 +3,7 @@ package node import ( "context" "encoding/json" + "errors" "fmt" "strings" "time" @@ -136,3 +137,249 @@ func ServiceStoppedFunc(h *cluster.Host, service string) retryFunc { return nil } } + +type daemonSetInfo struct { + Metadata struct { + Name string `json:"name"` + Generation int64 `json:"generation"` + } `json:"metadata"` + Spec struct { + Selector struct { + MatchLabels map[string]string `json:"matchLabels"` + } `json:"selector"` + Template struct { + Spec struct { + Containers []struct { + Name string `json:"name"` + Image string `json:"image"` + } `json:"containers"` + } `json:"spec"` + } `json:"template"` + } `json:"spec"` + Status struct { + ObservedGeneration int64 `json:"observedGeneration"` + DesiredNumberScheduled int32 `json:"desiredNumberScheduled"` + UpdatedNumberScheduled int32 `json:"updatedNumberScheduled"` + NumberAvailable int32 `json:"numberAvailable"` + } `json:"status"` +} + +type podList struct { + Items []struct { + Metadata struct { + Name string `json:"name"` + } `json:"metadata"` + Spec struct { + NodeName string `json:"nodeName"` + Containers []struct { + Name string `json:"name"` + Image string `json:"image"` + } `json:"containers"` + } `json:"spec"` + Status struct { + Phase string `json:"phase"` + ContainerStatuses []struct { + Name string `json:"name"` + Ready bool `json:"ready"` + Image string `json:"image"` + ImageID string `json:"imageID"` + } `json:"containerStatuses"` + } `json:"status"` + } `json:"items"` +} + +// KubeProxyRolledOutFunc waits for kube-proxy DS to match the desired +// state across all scheduled nodes in the cluster. The query is executed on `q`. +func KubeProxyRolledOutFunc(q *cluster.Host) retryFunc { + return DaemonSetRolledOutFunc(q, "kube-system", "kube-proxy", "kube-proxy", true) +} + +func fetchDaemonSet(h *cluster.Host, ns, name string) (*daemonSetInfo, error) { + out, err := h.ExecOutput( + h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n %s get ds %s -o json", ns, name), + exec.HideOutput(), exec.Sudo(h), + ) + if err != nil { + return nil, wrapKubectlNotFound(err) + } + var ds daemonSetInfo + if uerr := json.Unmarshal([]byte(out), &ds); uerr != nil { + return nil, fmt.Errorf("failed to decode DaemonSet %s/%s: %w", ns, name, uerr) + } + return &ds, nil +} + +func desiredContainerImage(ds *daemonSetInfo, containerName string) (string, error) { + containers := ds.Spec.Template.Spec.Containers + if len(containers) == 0 { + return "", fmt.Errorf("DaemonSet has no containers in pod template") + } + if containerName == "" { + return containers[0].Image, nil + } + for _, c := range containers { + if c.Name == containerName { + return c.Image, nil + } + } + return "", fmt.Errorf("container %q not found in DaemonSet template", containerName) +} + +func listPodsForDaemonSet(h *cluster.Host, ns string, ds *daemonSetInfo) (*podList, error) { + selector := buildLabelSelector(ds.Spec.Selector.MatchLabels) + out, err := h.ExecOutput( + h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n %s get pods -l %s -o json", ns, selector), + exec.HideOutput(), exec.Sudo(h), + ) + if err != nil { + return nil, fmt.Errorf("failed to list pods for selector %q in %s: %w", selector, ns, err) + } + var pods podList + if uerr := json.Unmarshal([]byte(out), &pods); uerr != nil { + return nil, fmt.Errorf("failed to decode pods for selector %q: %w", selector, uerr) + } + return &pods, nil +} + +// DaemonSetRolledOutFunc waits for the DS to be fully rolled out across +// the cluster according to controller status and pod readiness/image checks. +// If skipIfMissing is true and DS is NotFound, it returns nil. +func DaemonSetRolledOutFunc(h *cluster.Host, namespace, dsName, containerName string, skipIfMissing bool) retryFunc { + return func(_ context.Context) error { + ds, err := fetchDaemonSet(h, namespace, dsName) + if err != nil { + if skipIfMissing && isNotFoundErr(err) { + log.Infof("%s: DaemonSet %s/%s not found; skipping as requested", h, namespace, dsName) + return nil + } + return err + } + + // Controller must have observed current generation and report full rollout. + if ds.Status.ObservedGeneration != ds.Metadata.Generation { + return fmt.Errorf("DaemonSet not yet observed: gen=%d obs=%d", ds.Metadata.Generation, ds.Status.ObservedGeneration) + } + if ds.Status.DesiredNumberScheduled == 0 { + log.Infof("%s: %s/%s desiredNumberScheduled=0; nothing to roll out", h, namespace, dsName) + return nil + } + + desiredImg, err := desiredContainerImage(ds, containerName) + if err != nil { + return err + } + + pods, err := listPodsForDaemonSet(h, namespace, ds) + if err != nil { + return err + } + if int32(len(pods.Items)) != ds.Status.DesiredNumberScheduled { + return fmt.Errorf("pod count mismatch for DS %s/%s: have=%d desired=%d", namespace, dsName, len(pods.Items), ds.Status.DesiredNumberScheduled) + } + + notReady, mismatches := verifyPodsReadyAndImage(pods, containerName, desiredImg) + if notReady > 0 { + return fmt.Errorf("%d containers NotReady for DaemonSet %s/%s", notReady, namespace, dsName) + } + if mismatches > 0 { + return fmt.Errorf("%d pods running unexpected image for DaemonSet %s/%s", mismatches, namespace, dsName) + } + + if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled || ds.Status.NumberAvailable != ds.Status.DesiredNumberScheduled { + return fmt.Errorf("DaemonSet not fully rolled out: updated=%d available=%d desired=%d", + ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, ds.Status.DesiredNumberScheduled) + } + + log.Debugf("%s: %s/%s rolled out cluster-wide: desired=%d updated=%d available=%d image=%s", h, namespace, dsName, + ds.Status.DesiredNumberScheduled, ds.Status.UpdatedNumberScheduled, ds.Status.NumberAvailable, desiredImg) + return nil + } +} + +func verifyPodsReadyAndImage(pods *podList, containerName, desiredImg string) (notReady, mismatches int) { + for _, p := range pods.Items { + if p.Status.Phase != "Running" { + notReady++ + continue + } + var podImg, imageID string + var hasContainer, ready bool + + for _, c := range p.Spec.Containers { + if containerName == "" || c.Name == containerName { + podImg = c.Image + break + } + } + for _, cs := range p.Status.ContainerStatuses { + if containerName == "" || cs.Name == containerName { + hasContainer = true + ready = cs.Ready + imageID = cs.ImageID + break + } + } + if !hasContainer || !ready { + notReady++ + continue + } + if !matchImage(desiredImg, podImg, imageID) { + mismatches++ + } + } + return +} + +func buildLabelSelector(labels map[string]string) string { + // Simple AND of matchLabels (k=v,k2=v2,...) + if len(labels) == 0 { + return "" + } + parts := make([]string, 0, len(labels)) + for k, v := range labels { + parts = append(parts, fmt.Sprintf("%s=%s", k, v)) + } + // Deterministic order not required by kubectl, but harmless as-is. + return strings.Join(parts, ",") +} + +func matchImage(dsImage, podImage, podImageID string) bool { + // Exact tag match + if dsImage != "" && dsImage == podImage { + return true + } + // Digest pin match: DS template uses @sha256:..., ensure pod's ImageID has same digest. + if at := strings.Index(dsImage, "@sha256:"); at != -1 { + digest := dsImage[at+1:] // "sha256:..." + return strings.Contains(podImageID, digest) + } + return false +} + +func wrapKubectlNotFound(err error) error { + if err == nil { + return nil + } + // Typical stderr: 'Error from server (NotFound): daemonsets.apps "kube-proxy" not found' + low := strings.ToLower(err.Error()) + if strings.Contains(low, "notfound") || strings.Contains(low, "not found") { + return ¬FoundError{err} + } + return err +} + +type notFoundError struct{ error } + +func (e *notFoundError) Unwrap() error { return e.error } + +func isNotFoundErr(err error) bool { + if err == nil { + return false + } + var nf *notFoundError + if errors.As(err, &nf) { + return true + } + low := strings.ToLower(err.Error()) + return strings.Contains(low, "notfound") || strings.Contains(low, "not found") +} diff --git a/smoke-test/smoke-dryrun.sh b/smoke-test/smoke-dryrun.sh index 701afcba..26910bf4 100755 --- a/smoke-test/smoke-dryrun.sh +++ b/smoke-test/smoke-dryrun.sh @@ -90,8 +90,8 @@ expectNoK0s() { } applyConfig() { - local extra_flag=$1 - ../k0sctl apply --config "${K0SCTL_CONFIG}" --debug "${extra_flag}" | tee "${log}" + local extra_flags=("$@") + ../k0sctl apply --config "${K0SCTL_CONFIG}" --debug "${extra_flags[@]}" | tee "${log}" } deleteCluster @@ -100,7 +100,7 @@ createCluster K0S_VERSION="${K0S_FROM}" colorEcho 3 "Installing ${K0S_VERSION} with --dry-run" -applyConfig "--dry-run" +applyConfig --dry-run expectNoK0s checkDryRunLines min 3 dumpDryRunLines @@ -111,7 +111,7 @@ expectK0sVersion "${K0S_FROM}" checkDryRunLines none colorEcho 3 "Installing ${K0S_VERSION} with --dry-run again" -applyConfig "--dry-run" +applyConfig --dry-run expectK0sVersion "${K0S_FROM}" dryRunNoChanges @@ -119,17 +119,17 @@ colorEcho 4 "Succesfully installed ${K0S_FROM}, moving on to upgrade to ${K0S_TO K0S_VERSION="${K0S_TO}" colorEcho 3 "Upgrading to ${K0S_VERSION} with --dry-run" -applyConfig "--dry-run" +applyConfig --dry-run --force expectK0sVersion "${K0S_FROM}" checkDryRunLines min 3 dumpDryRunLines colorEcho 3 "Upgrading to ${K0S_VERSION}" -applyConfig +applyConfig --force expectK0sVersion "${K0S_TO}" checkDryRunLines none colorEcho 3 "Upgrading to ${K0S_VERSION} with --dry-run again" -applyConfig "--dry-run" +applyConfig --dry-run --force expectK0sVersion "${K0S_TO}" dryRunNoChanges diff --git a/smoke-test/smoke-upgrade.sh b/smoke-test/smoke-upgrade.sh index 71f7b7bf..257dfbb6 100755 --- a/smoke-test/smoke-upgrade.sh +++ b/smoke-test/smoke-upgrade.sh @@ -28,5 +28,12 @@ K0S_VERSION=$(curl -s "https://docs.k0sproject.io/stable.txt") # Create config with latest version and apply as upgrade echo "Upgrading to k0s ${K0S_VERSION}" -../k0sctl apply --config "${K0SCTL_CONFIG}" --debug +# First attempt should fail without --force because of version skew +if ../k0sctl apply --config "${K0SCTL_CONFIG}" --debug; then + echo "Expected failure when applying without --force" + exit 1 +fi + +# Second attempt should succeed with --force +../k0sctl apply --config "${K0SCTL_CONFIG}" --debug --force remoteCommand "root@manager0" "k0s version | grep -q ${K0S_VERSION}"