Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,9 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
IgnoredStacks: []string{
controller.AutopilotStackName,
controller.ClusterConfigStackName,
controller.EtcdMemberStackName,
controller.SystemRBACStackName,
controller.WindowsStackName,
},
Expand Down Expand Up @@ -373,8 +375,8 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de
if err != nil {
return err
}
clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, "etcd", controller.WithStackName("etcd-member")))
nodeComponents.Add(ctx, etcdReconciler)
clusterComponents.Add(ctx, controller.NewCRDStack(adminClientFactory, "etcd", controller.WithStackName(controller.EtcdMemberStackName)))
clusterComponents.Add(ctx, etcdReconciler)
}

perfTimer.Checkpoint("starting-certificates-init")
Expand Down Expand Up @@ -439,7 +441,7 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de
))

if !slices.Contains(flags.DisableComponents, constant.HelmComponentName) {
clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, "helm"))
clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, controller.HelmExtensionStackName))
clusterComponents.Add(ctx, controller.NewExtensionsController(
c.K0sVars,
adminClientFactory,
Expand All @@ -448,7 +450,7 @@ func (c *command) start(ctx context.Context, flags *config.ControllerOptions, de
}

if !slices.Contains(flags.DisableComponents, constant.AutopilotComponentName) {
clusterComponents.Add(ctx, controller.NewCRD(c.K0sVars.ManifestsDir, "autopilot"))
clusterComponents.Add(ctx, controller.NewCRDStack(adminClientFactory, controller.AutopilotStackName))
}

if enableK0sEndpointReconciler {
Expand Down
47 changes: 47 additions & 0 deletions pkg/applier/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package applier

import (
"cmp"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"io"
"slices"
"sync"
"time"
Expand All @@ -23,15 +25,60 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/restmapper"
"k8s.io/utils/ptr"

"github.com/avast/retry-go"
jsonpatch "github.com/evanphx/json-patch"
"github.com/sirupsen/logrus"
)

func ApplyStack(ctx context.Context, clients kubernetes.ClientFactoryInterface, src io.Reader, srcName, stackName string) error {
infos, err := resource.NewLocalBuilder().
Unstructured().
Stream(src, srcName).
Flatten().
Do().
Infos()
if err != nil {
return err
}

resources := make([]*unstructured.Unstructured, len(infos))
for i := range infos {
resources[i] = infos[i].Object.(*unstructured.Unstructured)
}

var lastErr error
if err := retry.Do(
func() error {
stack := Stack{
Name: stackName,
Resources: resources,
Clients: clients,
}
lastErr = stack.Apply(ctx, true)
return lastErr
},
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.OnRetry(func(attempt uint, err error) {
logrus.WithFields(logrus.Fields{
"component": "applier",
"stack": stackName,
"attempt": attempt + 1,
}).WithError(err).Debug("Failed to apply stack, retrying after backoff")
}),
); err != nil {
return cmp.Or(lastErr, err)
}

return nil
}

// Stack is a k8s resource bundle
type Stack struct {
Name string
Expand Down
47 changes: 2 additions & 45 deletions pkg/autopilot/controller/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@ import (
"context"
"fmt"
"runtime"
"time"

apv1beta2 "github.com/k0sproject/k0s/pkg/apis/autopilot/v1beta2"
apcli "github.com/k0sproject/k0s/pkg/autopilot/client"
apcomm "github.com/k0sproject/k0s/pkg/autopilot/common"
apconst "github.com/k0sproject/k0s/pkg/autopilot/constant"
"github.com/k0sproject/k0s/pkg/build"
"github.com/k0sproject/k0s/pkg/component/status"
"github.com/k0sproject/k0s/pkg/kubernetes/watch"

"github.com/avast/retry-go"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -81,7 +78,7 @@ func (sc *setupController) Run(ctx context.Context) error {

if err := retry.Do(func() error {
logger.Infof("Attempting to create controlnode '%s'", controlNodeName)
if err := sc.createControlNode(ctx, sc.clientFactory, controlNodeName, kubeletNodeName); err != nil {
if err := sc.createControlNode(ctx, controlNodeName, kubeletNodeName); err != nil {
return fmt.Errorf("create controlnode '%s' attempt failed, retrying: %w", controlNodeName, err)
}

Expand Down Expand Up @@ -112,7 +109,7 @@ func createNamespace(ctx context.Context, cf apcli.FactoryInterface, name string

// createControlNode creates a new control node, ignoring errors if one already exists
// for this physical host.
func (sc *setupController) createControlNode(ctx context.Context, cf apcli.FactoryInterface, name, nodeName string) error {
func (sc *setupController) createControlNode(ctx context.Context, name, nodeName string) error {
logger := sc.log.WithField("component", "setup")
client, err := sc.clientFactory.GetK0sClient()
if err != nil {
Expand All @@ -122,13 +119,6 @@ func (sc *setupController) createControlNode(ctx context.Context, cf apcli.Facto
// Create the ControlNode object if needed
node, err := client.AutopilotV1beta2().ControlNodes().Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
logger.Info("Autopilot 'controlnodes' CRD not found, waiting...")
if err := sc.waitForControlNodesCRD(ctx, cf); err != nil {
return fmt.Errorf("while waiting for autopilot 'controlnodes' CRD: %w", err)
}

logger.Info("Autopilot 'controlnodes' CRD found, continuing")

logger.Infof("ControlNode '%s' not found, creating", name)
mode := apconst.K0SControlNodeModeController
if sc.enableWorker {
Expand Down Expand Up @@ -208,36 +198,3 @@ func getControllerAPIAddress() (string, error) {

return status.ClusterConfig.Spec.API.Address, nil
}

// waitForControlNodesCRD waits until the controlnodes CRD is established for
// max 2 minutes.
func (sc *setupController) waitForControlNodesCRD(ctx context.Context, cf apcli.FactoryInterface) error {
extClient, err := cf.GetExtensionClient()
if err != nil {
return fmt.Errorf("unable to obtain extensions client: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
return watch.CRDs(extClient.CustomResourceDefinitions()).
WithObjectName("controlnodes."+apv1beta2.GroupName).
WithErrorCallback(func(err error) (time.Duration, error) {
if retryDelay, e := watch.IsRetryable(err); e == nil {
sc.log.WithError(err).Debugf(
"Encountered transient error while waiting for autopilot 'controlnodes' CRD, retrying in %s",
retryDelay,
)
return retryDelay, nil
}
return 0, err
}).
Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) {
for _, cond := range item.Status.Conditions {
if cond.Type == extensionsv1.Established {
return cond.Status == extensionsv1.ConditionTrue, nil
}
}

return false, nil
})
}
2 changes: 2 additions & 0 deletions pkg/component/controller/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/sirupsen/logrus"
)

const AutopilotStackName = "autopilot"

var _ manager.Component = (*Autopilot)(nil)

type Autopilot struct {
Expand Down
71 changes: 71 additions & 0 deletions pkg/component/controller/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,53 @@
package controller

import (
"bytes"
"context"
"fmt"
"io"
"io/fs"
"path"
"path/filepath"
"strings"

"github.com/k0sproject/k0s/internal/pkg/dir"
"github.com/k0sproject/k0s/internal/pkg/file"
"github.com/k0sproject/k0s/pkg/applier"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/constant"
"github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/k0sproject/k0s/static"
)

var _ manager.Component = (*CRD)(nil)

// CRD unpacks bundled CRD definitions to the filesystem
//
// Deprecated: Use [CRDStack] instead.
type CRD struct {
bundle string
manifestsDir string

crdOpts
}

// CRDStack applies bundled CRDs.
type CRDStack struct {
clients kubernetes.ClientFactoryInterface
bundle string

crdOpts
}

type crdOpts struct {
stackName, assetsDir string
}

type CRDOption func(*crdOpts)

// NewCRD build new CRD
//
// Deprecated: Use [NewCRDStack] instead.
func NewCRD(manifestsDir, bundle string, opts ...CRDOption) *CRD {
var options crdOpts
for _, opt := range opts {
Expand All @@ -52,6 +69,27 @@ func NewCRD(manifestsDir, bundle string, opts ...CRDOption) *CRD {
}
}

var _ manager.Component = (*CRDStack)(nil)

// Creates a new CRD stack for the given bundle.
func NewCRDStack(clients kubernetes.ClientFactoryInterface, bundle string, opts ...CRDOption) *CRDStack {
var options crdOpts
for _, opt := range opts {
opt(&options)
}

if options.assetsDir == "" {
options.stackName = bundle
options.assetsDir = bundle
}

return &CRDStack{
clients: clients,
bundle: bundle,
crdOpts: options,
}
}

func WithStackName(stackName string) CRDOption {
return func(opts *crdOpts) { opts.stackName = stackName }
}
Expand Down Expand Up @@ -93,3 +131,36 @@ func (c CRD) Start(context.Context) error {
func (c CRD) Stop() error {
return nil
}

// Applies this CRD stack. Implements [manager.Component].
func (c *CRDStack) Init(ctx context.Context) error {
var crds []io.Reader
if crdFiles, err := fs.ReadDir(static.CRDs, c.assetsDir); err != nil {
return fmt.Errorf("failed to read %s CRD stack: %w", c.bundle, err)
} else {
for _, entry := range crdFiles {
filename := entry.Name()
content, err := fs.ReadFile(static.CRDs, path.Join(c.assetsDir, filename))
if err != nil {
return fmt.Errorf("failed to fetch %s CRD manifest %s: %w", c.bundle, filename, err)
}
crds = append(crds, strings.NewReader("\n---\n"), bytes.NewReader(content))
}
}

if err := applier.ApplyStack(ctx, c.clients, io.MultiReader(crds...), c.bundle, c.stackName); err != nil {
return fmt.Errorf("failed to apply %s CRD stack: %w", c.bundle, err)
}

return nil
}

// Start implements [manager.Component]. It does nothing.
func (c *CRDStack) Start(context.Context) error {
return nil
}

// Stop implements [manager.Component]. It does nothing.
func (*CRDStack) Stop() error {
return nil
}
Loading
Loading