diff --git a/cmd/installer/cli/enable_ha.go b/cmd/installer/cli/enable_ha.go new file mode 100644 index 000000000..dd5febbb0 --- /dev/null +++ b/cmd/installer/cli/enable_ha.go @@ -0,0 +1,92 @@ +package cli + +import ( + "context" + "fmt" + "os" + + "github.com/replicatedhq/embedded-cluster/pkg/addons" + "github.com/replicatedhq/embedded-cluster/pkg/helm" + "github.com/replicatedhq/embedded-cluster/pkg/kubeutils" + "github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig" + rcutil "github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig/util" + "github.com/replicatedhq/embedded-cluster/pkg/versions" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +// EnableHACmd is the command for enabling HA mode. +func EnableHACmd(ctx context.Context, name string) *cobra.Command { + cmd := &cobra.Command{ + Use: "enable-ha", + Short: fmt.Sprintf("Enable high availability for the %s cluster", name), + Hidden: true, + PreRunE: func(cmd *cobra.Command, args []string) error { + if os.Getuid() != 0 { + return fmt.Errorf("enable-ha command must be run as root") + } + + rcutil.InitBestRuntimeConfig(cmd.Context()) + + os.Setenv("KUBECONFIG", runtimeconfig.PathToKubeConfig()) + os.Setenv("TMPDIR", runtimeconfig.EmbeddedClusterTmpSubDir()) + + return nil + }, + PostRun: func(cmd *cobra.Command, args []string) { + runtimeconfig.Cleanup() + }, + RunE: func(cmd *cobra.Command, args []string) error { + if err := runEnableHA(cmd.Context()); err != nil { + return err + } + + return nil + }, + } + + return cmd +} + +func runEnableHA(ctx context.Context) error { + kcli, err := kubeutils.KubeClient() + if err != nil { + return fmt.Errorf("unable to get kube client: %w", err) + } + + canEnableHA, reason, err := addons.CanEnableHA(ctx, kcli) + if err != nil { + return fmt.Errorf("unable to check if HA can be enabled: %w", err) + } + if !canEnableHA { + logrus.Warnf("High availability cannot be enabled: %s", reason) + return NewErrorNothingElseToAdd(fmt.Errorf("high availability cannot be enabled: %s", reason)) + } + + kclient, err := kubeutils.GetClientset() + if err != nil { + return fmt.Errorf("unable to create kubernetes client: %w", err) + } + + in, err := kubeutils.GetLatestInstallation(ctx, kcli) + if err != nil { + return fmt.Errorf("unable to get latest installation: %w", err) + } + + airgapChartsPath := "" + if in.Spec.AirGap { + airgapChartsPath = runtimeconfig.EmbeddedClusterChartsSubDir() + } + + hcli, err := helm.NewClient(helm.HelmOptions{ + KubeConfig: runtimeconfig.PathToKubeConfig(), + K0sVersion: versions.K0sVersion, + AirgapPath: airgapChartsPath, + }) + if err != nil { + return fmt.Errorf("unable to create helm client: %w", err) + } + defer hcli.Close() + + return addons.EnableHA(ctx, kcli, kclient, hcli, in.Spec.AirGap, in.Spec.Network.ServiceCIDR, in.Spec.Proxy, in.Spec.Config) +} diff --git a/cmd/installer/cli/install.go b/cmd/installer/cli/install.go index c192e44d4..c54628d53 100644 --- a/cmd/installer/cli/install.go +++ b/cmd/installer/cli/install.go @@ -76,9 +76,6 @@ type InstallCmdFlags struct { } // InstallCmd returns a cobra command for installing the embedded cluster. -// This is the upcoming version of install without the operator and where -// install does all of the work. This is a hidden command until it's tested -// and ready. func InstallCmd(ctx context.Context, name string) *cobra.Command { var flags InstallCmdFlags diff --git a/cmd/installer/cli/join.go b/cmd/installer/cli/join.go index 82a85ab4c..1e238bd84 100644 --- a/cmd/installer/cli/join.go +++ b/cmd/installer/cli/join.go @@ -26,6 +26,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/cobra" "gopkg.in/yaml.v2" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" k8syaml "sigs.k8s.io/yaml" ) @@ -40,9 +41,7 @@ type JoinCmdFlags struct { ignoreHostPreflights bool } -// This is the upcoming version of join without the operator and where -// join does all of the work. This is a hidden command until it's tested -// and ready. +// JoinCmd returns a cobra command for joining a node to the cluster. func JoinCmd(ctx context.Context, name string) *cobra.Command { var flags JoinCmdFlags @@ -171,21 +170,6 @@ func runJoin(ctx context.Context, name string, flags JoinCmdFlags, jcmd *kotsadm return fmt.Errorf("unable to get kube client: %w", err) } - airgapChartsPath := "" - if flags.isAirgap { - airgapChartsPath = runtimeconfig.EmbeddedClusterChartsSubDir() - } - - hcli, err := helm.NewClient(helm.HelmOptions{ - KubeConfig: runtimeconfig.PathToKubeConfig(), - K0sVersion: versions.K0sVersion, - AirgapPath: airgapChartsPath, - }) - if err != nil { - return fmt.Errorf("unable to create helm client: %w", err) - } - defer hcli.Close() - hostname, err := os.Hostname() if err != nil { return fmt.Errorf("unable to get hostname: %w", err) @@ -196,7 +180,27 @@ func runJoin(ctx context.Context, name string, flags JoinCmdFlags, jcmd *kotsadm } if flags.enableHighAvailability { - if err := maybeEnableHA(ctx, kcli, hcli, flags.isAirgap, cidrCfg.ServiceCIDR, jcmd.InstallationSpec.Proxy, jcmd.InstallationSpec.Config); err != nil { + kclient, err := kubeutils.GetClientset() + if err != nil { + return fmt.Errorf("unable to create kubernetes client: %w", err) + } + + airgapChartsPath := "" + if flags.isAirgap { + airgapChartsPath = runtimeconfig.EmbeddedClusterChartsSubDir() + } + + hcli, err := helm.NewClient(helm.HelmOptions{ + KubeConfig: runtimeconfig.PathToKubeConfig(), + K0sVersion: versions.K0sVersion, + AirgapPath: airgapChartsPath, + }) + if err != nil { + return fmt.Errorf("unable to create helm client: %w", err) + } + defer hcli.Close() + + if err := maybeEnableHA(ctx, kcli, kclient, hcli, flags.isAirgap, cidrCfg.ServiceCIDR, jcmd.InstallationSpec.Proxy, jcmd.InstallationSpec.Config); err != nil { return fmt.Errorf("unable to enable high availability: %w", err) } } @@ -460,8 +464,8 @@ func waitForNode(ctx context.Context, kcli client.Client, hostname string) error return nil } -func maybeEnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error { - canEnableHA, err := addons.CanEnableHA(ctx, kcli) +func maybeEnableHA(ctx context.Context, kcli client.Client, kclient kubernetes.Interface, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error { + canEnableHA, _, err := addons.CanEnableHA(ctx, kcli) if err != nil { return fmt.Errorf("unable to check if HA can be enabled: %w", err) } @@ -476,5 +480,5 @@ func maybeEnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, is return nil } logrus.Info("") - return addons.EnableHA(ctx, kcli, hcli, isAirgap, serviceCIDR, proxy, cfgspec) + return addons.EnableHA(ctx, kcli, kclient, hcli, isAirgap, serviceCIDR, proxy, cfgspec) } diff --git a/cmd/installer/cli/root.go b/cmd/installer/cli/root.go index 4d31e0c12..15437dc3b 100644 --- a/cmd/installer/cli/root.go +++ b/cmd/installer/cli/root.go @@ -89,6 +89,7 @@ func RootCmd(ctx context.Context, name string) *cobra.Command { cmd.AddCommand(JoinCmd(ctx, name)) cmd.AddCommand(ShellCmd(ctx, name)) cmd.AddCommand(NodeCmd(ctx, name)) + cmd.AddCommand(EnableHACmd(ctx, name)) cmd.AddCommand(VersionCmd(ctx, name)) cmd.AddCommand(ResetCmd(ctx, name)) cmd.AddCommand(MaterializeCmd(ctx, name)) diff --git a/go.mod b/go.mod index 9abe2c669..1e07c7899 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,6 @@ require ( github.com/vmware-tanzu/velero v1.15.2 go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.33.0 - golang.org/x/sync v0.11.0 golang.org/x/term v0.29.0 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 @@ -272,6 +271,7 @@ require ( go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect golang.org/x/mod v0.22.0 // indirect + golang.org/x/sync v0.11.0 // indirect golang.org/x/tools v0.28.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/api v0.197.0 // indirect diff --git a/operator/pkg/cli/migrate.go b/operator/pkg/cli/migrate.go new file mode 100644 index 000000000..118301f6e --- /dev/null +++ b/operator/pkg/cli/migrate.go @@ -0,0 +1,46 @@ +package cli + +import ( + "fmt" + + "github.com/replicatedhq/embedded-cluster/pkg/addons/registry/migrate" + "github.com/replicatedhq/embedded-cluster/pkg/kubeutils" + "github.com/spf13/cobra" +) + +func MigrateCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "migrate", + Short: "Run the specified migration", + } + + cmd.AddCommand( + MigrateRegistryDataCmd(), + ) + + return cmd +} + +func MigrateRegistryDataCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "registry-data", + Short: "Run the registry-data migration", + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + + cli, err := kubeutils.KubeClient() + if err != nil { + return fmt.Errorf("failed to create kubernetes client: %w", err) + } + + err = migrate.RegistryData(ctx, cli) + if err != nil { + return fmt.Errorf("failed to migrate registry data: %w", err) + } + return nil + }, + } + + return cmd +} diff --git a/operator/pkg/cli/root.go b/operator/pkg/cli/root.go index 78980abdb..567771034 100644 --- a/operator/pkg/cli/root.go +++ b/operator/pkg/cli/root.go @@ -117,6 +117,7 @@ func addSubcommands(cmd *cobra.Command) { cmd.AddCommand( UpgradeCmd(), UpgradeJobCmd(), + MigrateCmd(), MigrateV2Cmd(), VersionCmd(), ) diff --git a/pkg/addons/embeddedclusteroperator/upgrade.go b/pkg/addons/embeddedclusteroperator/upgrade.go index 84369d813..cd2e3e047 100644 --- a/pkg/addons/embeddedclusteroperator/upgrade.go +++ b/pkg/addons/embeddedclusteroperator/upgrade.go @@ -2,10 +2,10 @@ package embeddedclusteroperator import ( "context" - "log/slog" "github.com/pkg/errors" "github.com/replicatedhq/embedded-cluster/pkg/helm" + "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -15,7 +15,7 @@ func (e *EmbeddedClusterOperator) Upgrade(ctx context.Context, kcli client.Clien return errors.Wrap(err, "check if release exists") } if !exists { - slog.Info("Release not found, installing", "release", releaseName, "namespace", namespace) + logrus.Debugf("Release not found, installing release %s in namespace %s", releaseName, namespace) if err := e.Install(ctx, kcli, hcli, overrides, nil); err != nil { return errors.Wrap(err, "install") } diff --git a/pkg/addons/highavailability.go b/pkg/addons/highavailability.go index f053cec93..b51190e8d 100644 --- a/pkg/addons/highavailability.go +++ b/pkg/addons/highavailability.go @@ -7,6 +7,7 @@ import ( ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1" "github.com/replicatedhq/embedded-cluster/pkg/addons/adminconsole" "github.com/replicatedhq/embedded-cluster/pkg/addons/registry" + registrymigrate "github.com/replicatedhq/embedded-cluster/pkg/addons/registry/migrate" "github.com/replicatedhq/embedded-cluster/pkg/addons/seaweedfs" "github.com/replicatedhq/embedded-cluster/pkg/constants" "github.com/replicatedhq/embedded-cluster/pkg/helm" @@ -16,34 +17,38 @@ import ( corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" ) // CanEnableHA checks if high availability can be enabled in the cluster. -func CanEnableHA(ctx context.Context, kcli client.Client) (bool, error) { +func CanEnableHA(ctx context.Context, kcli client.Client) (bool, string, error) { in, err := kubeutils.GetLatestInstallation(ctx, kcli) if err != nil { - return false, errors.Wrap(err, "get latest installation") + return false, "", errors.Wrap(err, "get latest installation") } if in.Spec.HighAvailability { - return false, nil + return false, "already enabled", nil } if err := kcli.Get(ctx, types.NamespacedName{Name: constants.EcRestoreStateCMName, Namespace: "embedded-cluster"}, &corev1.ConfigMap{}); err == nil { - return false, nil // cannot enable HA during a restore + return false, "a restore is in progress", nil } else if !k8serrors.IsNotFound(err) { - return false, errors.Wrap(err, "get restore state configmap") + return false, "", errors.Wrap(err, "get restore state configmap") } ncps, err := kubeutils.NumOfControlPlaneNodes(ctx, kcli) if err != nil { - return false, errors.Wrap(err, "check control plane nodes") + return false, "", errors.Wrap(err, "check control plane nodes") } - return ncps >= 3, nil + if ncps < 3 { + return false, "number of control plane nodes is less than 3", nil + } + return true, "", nil } // EnableHA enables high availability. -func EnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error { +func EnableHA(ctx context.Context, kcli client.Client, kclient kubernetes.Interface, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error { loading := spinner.Start() defer loading.Close() @@ -56,35 +61,42 @@ func EnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirga sw := &seaweedfs.SeaweedFS{ ServiceCIDR: serviceCIDR, } - exists, err := hcli.ReleaseExists(ctx, sw.Namespace(), sw.ReleaseName()) + logrus.Debugf("Installing seaweedfs") + if err := sw.Install(ctx, kcli, hcli, addOnOverrides(sw, cfgspec, nil), nil); err != nil { + return errors.Wrap(err, "install seaweedfs") + } + logrus.Debugf("Seaweedfs installed!") + + in, err := kubeutils.GetLatestInstallation(ctx, kcli) if err != nil { - return errors.Wrap(err, "check if seaweedfs release exists") + return errors.Wrap(err, "get latest installation") } - if !exists { - logrus.Debugf("Installing seaweedfs") - if err := sw.Install(ctx, kcli, hcli, addOnOverrides(sw, cfgspec, nil), nil); err != nil { - return errors.Wrap(err, "install seaweedfs") - } - logrus.Debugf("Seaweedfs installed!") - } else { - logrus.Debugf("Seaweedfs already installed") + + operatorImage, err := getOperatorImage() + if err != nil { + return errors.Wrap(err, "get operator image") } - // TODO (@salah): add support for end user overrides - reg := ®istry.Registry{ - ServiceCIDR: serviceCIDR, - IsHA: true, + // TODO: timeout + + loading.Infof("Migrating data for high availability") + logrus.Debugf("Migrating data for high availability") + progressCh, errCh, err := registrymigrate.RunDataMigrationJob(ctx, kcli, kclient, in, operatorImage) + if err != nil { + return errors.Wrap(err, "run registry data migration job") } - logrus.Debugf("Migrating registry data") - if err := reg.Migrate(ctx, kcli, loading); err != nil { - return errors.Wrap(err, "migrate registry data") + if err := waitForJobAndLogProgress(loading, progressCh, errCh); err != nil { + return errors.Wrap(err, "registry data migration job failed") } - logrus.Debugf("Registry migration complete!") - logrus.Debugf("Upgrading registry") - if err := reg.Upgrade(ctx, kcli, hcli, addOnOverrides(reg, cfgspec, nil)); err != nil { - return errors.Wrap(err, "upgrade registry") + logrus.Debugf("Data migration complete!") + + loading.Infof("Enabling registry high availability") + logrus.Debugf("Enabling registry high availability") + err = enableRegistryHA(ctx, kcli, hcli, serviceCIDR, cfgspec) + if err != nil { + return errors.Wrap(err, "enable registry high availability") } - logrus.Debugf("Registry upgraded!") + logrus.Debugf("Registry high availability enabled!") } loading.Infof("Updating the Admin Console for high availability") @@ -113,6 +125,20 @@ func EnableHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirga return nil } +// enableRegistryHA scales the registry deployment to the desired number of replicas. +func enableRegistryHA(ctx context.Context, kcli client.Client, hcli helm.Client, serviceCIDR string, cfgspec *ecv1beta1.ConfigSpec) error { + // TODO (@salah): add support for end user overrides + r := ®istry.Registry{ + IsHA: true, + ServiceCIDR: serviceCIDR, + } + if err := r.Upgrade(ctx, kcli, hcli, addOnOverrides(r, cfgspec, nil)); err != nil { + return errors.Wrap(err, "upgrade registry") + } + + return nil +} + // EnableAdminConsoleHA enables high availability for the admin console. func EnableAdminConsoleHA(ctx context.Context, kcli client.Client, hcli helm.Client, isAirgap bool, serviceCIDR string, proxy *ecv1beta1.ProxySpec, cfgspec *ecv1beta1.ConfigSpec) error { // TODO (@salah): add support for end user overrides @@ -128,3 +154,15 @@ func EnableAdminConsoleHA(ctx context.Context, kcli client.Client, hcli helm.Cli return nil } + +func waitForJobAndLogProgress(progressWriter *spinner.MessageWriter, progressCh <-chan string, errCh <-chan error) error { + for { + select { + case err := <-errCh: + return err + case progress := <-progressCh: + logrus.Debugf("Migrating data for high availability (%s)", progress) + progressWriter.Infof("Migrating data for high availability (%s)", progress) + } + } +} diff --git a/pkg/addons/highavailability_test.go b/pkg/addons/highavailability_test.go index c1d67ce26..319ab6005 100644 --- a/pkg/addons/highavailability_test.go +++ b/pkg/addons/highavailability_test.go @@ -6,6 +6,7 @@ import ( "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1" "github.com/replicatedhq/embedded-cluster/pkg/constants" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v12 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,10 +23,11 @@ func Test_canEnableHA(t *testing.T) { kcli client.Client } tests := []struct { - name string - args args - want bool - wantErr bool + name string + args args + want bool + wantReason string + wantErr bool }{ { name: "high availability is not enabled and there is three or more controller nodes", @@ -55,7 +57,8 @@ func Test_canEnableHA(t *testing.T) { &v12.Node{ObjectMeta: v1.ObjectMeta{Name: "node3"}}, ).Build(), }, - want: false, + want: false, + wantReason: "number of control plane nodes is less than 3", }, { name: "high availability is already enabled", @@ -70,7 +73,8 @@ func Test_canEnableHA(t *testing.T) { &v12.Node{ObjectMeta: v1.ObjectMeta{Name: "node3", Labels: controllerLabels}}, ).Build(), }, - want: false, + want: false, + wantReason: "already enabled", }, { name: "high availability is not enabled and there is three or more controller nodes but a restore is in progress", @@ -88,20 +92,23 @@ func Test_canEnableHA(t *testing.T) { &v12.Node{ObjectMeta: v1.ObjectMeta{Name: "node3", Labels: controllerLabels}}, ).Build(), }, - want: false, + want: false, + wantReason: "a restore is in progress", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { req := require.New(t) + assert := assert.New(t) ctx := context.Background() - got, err := CanEnableHA(ctx, tt.args.kcli) + got, reason, err := CanEnableHA(ctx, tt.args.kcli) if tt.wantErr { req.Error(err) return } req.NoError(err) - req.Equal(tt.want, got) + assert.Equal(tt.want, got) + assert.Equal(tt.wantReason, reason) }) } } diff --git a/pkg/addons/metadata.go b/pkg/addons/metadata.go index 2adc6d421..3de6f875b 100644 --- a/pkg/addons/metadata.go +++ b/pkg/addons/metadata.go @@ -1,6 +1,8 @@ package addons import ( + "strings" + k0sv1beta1 "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/pkg/errors" ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1" @@ -117,3 +119,12 @@ func GetAdditionalImages() []string { return images } + +func getOperatorImage() (string, error) { + for _, image := range embeddedclusteroperator.GetImages() { + if strings.Contains(image, "/embedded-cluster-operator-image:") { + return image, nil + } + } + return "", errors.New("embedded-cluster-operator image not found in metadata") +} diff --git a/pkg/addons/openebs/upgrade.go b/pkg/addons/openebs/upgrade.go index 70a8550f1..fda204b05 100644 --- a/pkg/addons/openebs/upgrade.go +++ b/pkg/addons/openebs/upgrade.go @@ -2,10 +2,10 @@ package openebs import ( "context" - "log/slog" "github.com/pkg/errors" "github.com/replicatedhq/embedded-cluster/pkg/helm" + "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -15,7 +15,7 @@ func (o *OpenEBS) Upgrade(ctx context.Context, kcli client.Client, hcli helm.Cli return errors.Wrap(err, "check if release exists") } if !exists { - slog.Info("Release not found, installing", "release", releaseName, "namespace", namespace) + logrus.Debugf("Release not found, installing release %s in namespace %s", releaseName, namespace) if err := o.Install(ctx, kcli, hcli, overrides, nil); err != nil { return errors.Wrap(err, "install") } diff --git a/pkg/addons/registry/migrate.go b/pkg/addons/registry/migrate.go deleted file mode 100644 index 6e7e8e4b3..000000000 --- a/pkg/addons/registry/migrate.go +++ /dev/null @@ -1,222 +0,0 @@ -package registry - -import ( - "archive/tar" - "bytes" - "context" - "fmt" - "io" - "path/filepath" - "strconv" - "strings" - - "github.com/aws/aws-sdk-go-v2/aws" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/pkg/errors" - "github.com/replicatedhq/embedded-cluster/pkg/addons/seaweedfs" - "github.com/replicatedhq/embedded-cluster/pkg/spinner" - "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/remotecommand" - "sigs.k8s.io/controller-runtime/pkg/client" - k8sconfig "sigs.k8s.io/controller-runtime/pkg/client/config" -) - -var ( - s3Bucket = "registry" - s3RootDirectory = "registry" - labelSelector = "app=docker-registry" -) - -// Migrate runs a migration that copies data on disk in the registry PVC to the seaweedfs s3 store. -func (r *Registry) Migrate(ctx context.Context, kcli client.Client, progressWriter *spinner.MessageWriter) error { - s3Client, err := getS3Client(ctx, kcli, r.ServiceCIDR) - if err != nil { - return errors.Wrap(err, "get s3 client") - } - - logrus.Debug("Ensuring registry bucket") - if err := ensureRegistryBucket(ctx, s3Client); err != nil { - return errors.Wrap(err, "ensure registry bucket") - } - logrus.Debug("Registry bucket ensured!") - - pipeReader, pipeWriter := io.Pipe() - g, ctx := errgroup.WithContext(ctx) - - g.Go(func() error { - defer pipeWriter.Close() - return readRegistryData(ctx, pipeWriter) - }) - - g.Go(func() error { - return writeRegistryData(ctx, pipeReader, s3manager.NewUploader(s3Client), progressWriter) - }) - - logrus.Debug("Copying registry data") - if err := g.Wait(); err != nil { - return err - } - logrus.Debug("Registry data copied!") - - return nil -} - -func getS3Client(ctx context.Context, kcli client.Client, serviceCIDR string) (*s3.Client, error) { - accessKey, secretKey, err := seaweedfs.GetS3RWCreds(ctx, kcli) - if err != nil { - return nil, errors.Wrap(err, "get seaweedfs s3 rw creds") - } - - creds := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") - conf, err := awsconfig.LoadDefaultConfig( - ctx, - awsconfig.WithCredentialsProvider(creds), - awsconfig.WithRegion("us-east-1"), - ) - if err != nil { - return nil, errors.Wrap(err, "load aws config") - } - - s3URL, err := seaweedfs.GetS3URL(serviceCIDR) - if err != nil { - return nil, errors.Wrap(err, "get seaweedfs s3 endpoint") - } - - s3Client := s3.NewFromConfig(conf, func(o *s3.Options) { - o.UsePathStyle = true - o.BaseEndpoint = aws.String(s3URL) - }) - - return s3Client, nil -} - -func ensureRegistryBucket(ctx context.Context, s3Client *s3.Client) error { - _, err := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ - Bucket: &s3Bucket, - }) - if err != nil { - if !strings.Contains(err.Error(), "BucketAlreadyExists") { - return errors.Wrap(err, "create bucket") - } - } - return nil -} - -func readRegistryData(ctx context.Context, writer io.Writer) error { - return execInPod(ctx, []string{"tar", "-c", "-C", "/var/lib/registry", "."}, writer) -} - -func writeRegistryData(ctx context.Context, reader io.Reader, s3Uploader *s3manager.Uploader, progressWriter *spinner.MessageWriter) error { - total, err := countRegistryFiles(ctx) - if err != nil { - return errors.Wrap(err, "count registry files") - } - - progress := 0 - tr := tar.NewReader(reader) - for { - header, err := tr.Next() - if err == io.EOF { - break - } - if err != nil { - return errors.Wrap(err, "read tar header") - } - - if header.FileInfo().IsDir() { - continue - } - - relPath, err := filepath.Rel("./", header.Name) - if err != nil { - return errors.Wrap(err, "get relative path") - } - - _, err = s3Uploader.Upload(ctx, &s3.PutObjectInput{ - Bucket: &s3Bucket, - Key: aws.String(filepath.Join(s3RootDirectory, relPath)), - Body: tr, - }) - if err != nil { - return errors.Wrap(err, "upload to s3") - } - - progress++ - progressWriter.Infof("Migrating data for high availability (%d%%)", (progress*100)/total) - } - - return nil -} - -func countRegistryFiles(ctx context.Context) (int, error) { - var stdout bytes.Buffer - if err := execInPod(ctx, []string{"sh", "-c", "find /var/lib/registry -type f | wc -l"}, &stdout); err != nil { - return 0, errors.Wrap(err, "exec in pod") - } - return strconv.Atoi(strings.TrimSpace(stdout.String())) -} - -func execInPod(ctx context.Context, command []string, stdout io.Writer) error { - cfg, err := k8sconfig.GetConfig() - if err != nil { - return errors.Wrap(err, "get kubernetes config") - } - - clientSet, err := kubernetes.NewForConfig(cfg) - if err != nil { - return errors.Wrap(err, "create kubernetes clientset") - } - - scheme := runtime.NewScheme() - if err := corev1.AddToScheme(scheme); err != nil { - return errors.Wrap(err, "add corev1 scheme") - } - - pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - return errors.Wrap(err, "list registry pods") - } - if len(pods.Items) == 0 { - return errors.New("no registry pods found") - } - podName := pods.Items[0].Name - - req := clientSet.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(podName). - Namespace(namespace). - SubResource("exec") - - parameterCodec := runtime.NewParameterCodec(scheme) - req.VersionedParams(&corev1.PodExecOptions{ - Command: command, - Container: "docker-registry", - Stdout: true, - Stderr: true, - }, parameterCodec) - - executor, err := remotecommand.NewSPDYExecutor(cfg, "POST", req.URL()) - if err != nil { - return errors.Wrap(err, "create exec") - } - - var stderr bytes.Buffer - if err := executor.StreamWithContext(ctx, remotecommand.StreamOptions{ - Stdout: stdout, - Stderr: &stderr, - }); err != nil { - return fmt.Errorf("stream exec: %v: %s", err, stderr.String()) - } - - return nil -} diff --git a/pkg/addons/registry/migrate/job.go b/pkg/addons/registry/migrate/job.go new file mode 100644 index 000000000..5557bc625 --- /dev/null +++ b/pkg/addons/registry/migrate/job.go @@ -0,0 +1,438 @@ +package migrate + +import ( + "bufio" + "context" + "fmt" + "io" + "log/slog" + "regexp" + "time" + + ecv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1" + "github.com/replicatedhq/embedded-cluster/pkg/addons/seaweedfs" + "github.com/replicatedhq/embedded-cluster/pkg/kubeutils" + "github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig" + "github.com/sirupsen/logrus" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + StatusConditionType = "RegistryMigrationStatus" + + dataMigrationCompleteSecretName = "registry-data-migration-complete" + dataMigrationJobName = "registry-data-migration" + serviceAccountName = "registry-data-migration-serviceaccount" + + // seaweedfsS3SecretName is the name of the secret containing the s3 credentials. + // This secret name is defined in the chart in the release metadata. + seaweedfsS3SecretName = "seaweedfs-s3-rw" +) + +// RunDataMigrationJob should be called when transitioning from non-HA to HA airgapped +// installations. This function creates a job that will scale down the registry deployment then +// upload the data to s3 before finally creating a 'migration is complete' secret in the registry +// namespace. If this secret is present, the function will return without reattempting the +// migration. +func RunDataMigrationJob(ctx context.Context, cli client.Client, kclient kubernetes.Interface, in *ecv1beta1.Installation, image string) (<-chan string, <-chan error, error) { + progressCh := make(chan string) + errCh := make(chan error, 1) + + // TODO: this should be an argument + clientset, err := kubeutils.GetClientset() + if err != nil { + return nil, nil, fmt.Errorf("get kubernetes clientset: %w", err) + } + + hasMigrated, err := hasRegistryMigrated(ctx, cli) + if err != nil { + return nil, nil, fmt.Errorf("check if registry has migrated before running migration: %w", err) + } else if hasMigrated { + close(progressCh) + errCh <- nil + return progressCh, errCh, nil + } + + // TODO: should we check seaweedfs health? + + logrus.Debug("Ensuring migration service account") + err = ensureMigrationServiceAccount(ctx, cli) + if err != nil { + return nil, nil, fmt.Errorf("ensure service account: %w", err) + } + + logrus.Debug("Ensuring s3 access credentials secret") + err = ensureS3Secret(ctx, cli) + if err != nil { + return nil, nil, fmt.Errorf("ensure s3 secret: %w", err) + } + + logrus.Debug("Ensuring data migration job") + _, err = ensureDataMigrationJob(ctx, cli, image) + if err != nil { + return nil, nil, fmt.Errorf("ensure job: %w", err) + } + + ctx, cancel := context.WithCancel(ctx) + go func() { + defer cancel() + logrus.Debug("Monitoring job status") + defer logrus.Debug("Job status monitor stopped") + monitorJobStatus(ctx, cli, errCh) + }() + + go monitorJobProgress(ctx, cli, clientset, progressCh) + + return progressCh, errCh, nil +} + +func ensureDataMigrationJob(ctx context.Context, cli client.Client, image string) (*batchv1.Job, error) { + job := newMigrationJob(image) + + err := kubeutils.EnsureObject(ctx, cli, job, func(opts *kubeutils.EnsureObjectOptions) { + opts.DeleteOptions = append(opts.DeleteOptions, client.PropagationPolicy(metav1.DeletePropagationForeground)) + opts.ShouldDelete = func(obj client.Object) bool { + exceedsBackoffLimit := job.Status.Failed > *job.Spec.BackoffLimit + return exceedsBackoffLimit + } + }) + if err != nil { + return nil, fmt.Errorf("ensure object: %w", err) + } + return job, nil +} + +func monitorJobStatus(ctx context.Context, cli client.Client, errCh chan<- error) { + defer close(errCh) + + opts := &kubeutils.WaitOptions{ + // 30 minutes + Backoff: &wait.Backoff{ + Steps: 300, + Duration: 6 * time.Second, + Factor: 1, + Jitter: 0.1, + }, + } + err := kubeutils.WaitForJob(ctx, cli, runtimeconfig.RegistryNamespace, dataMigrationJobName, 1, opts) + errCh <- err +} + +func monitorJobProgress(ctx context.Context, cli client.Client, kclient kubernetes.Interface, progressCh chan<- string) { + defer close(progressCh) + + for ctx.Err() == nil { + logrus.Debugf("Streaming registry data migration pod logs") + podLogs, err := streamJobLogs(ctx, cli, kclient) + if err != nil { + if ctx.Err() == nil { + if ok, err := isJobRetrying(ctx, cli); err != nil { + logrus.Debugf("Failed to check if data migration job is retrying: %v", err) + } else if ok { + progressCh <- "failed, retrying with backoff..." + } else { + logrus.Debugf("Failed to stream registry data migration pod logs: %v", err) + } + } + } else { + err := streamJobProgress(podLogs, progressCh) + if err != nil { + logrus.Debugf("Failed to stream registry data migration pod logs: %v", err) + } else { + logrus.Debugf("Finished streaming registry data migration pod logs") + } + } + + select { + case <-ctx.Done(): + case <-time.After(2 * time.Second): + } + } +} + +func streamJobLogs(ctx context.Context, cli client.Client, kclient kubernetes.Interface) (io.ReadCloser, error) { + podList := &corev1.PodList{} + err := cli.List(ctx, podList, client.InNamespace(runtimeconfig.RegistryNamespace), client.MatchingLabels{"job-name": dataMigrationJobName}) + if err != nil { + return nil, fmt.Errorf("list pods: %w", err) + } + + var podName string + for _, pod := range podList.Items { + if pod.Status.Phase == corev1.PodRunning { + podName = pod.Name + break + } + } + if podName == "" { + return nil, fmt.Errorf("no running pods found") + } + + logOpts := corev1.PodLogOptions{ + Container: "migrate-registry-data", + Follow: true, + TailLines: ptr.To(int64(100)), + } + podLogs, err := kclient.CoreV1().Pods(runtimeconfig.RegistryNamespace).GetLogs(podName, &logOpts).Stream(ctx) + if err != nil { + return nil, fmt.Errorf("get logs: %w", err) + } + + return podLogs, nil +} + +func isJobRetrying(ctx context.Context, cli client.Client) (bool, error) { + job := &batchv1.Job{} + err := cli.Get(ctx, client.ObjectKey{Namespace: runtimeconfig.RegistryNamespace, Name: dataMigrationJobName}, job) + if err != nil { + return false, fmt.Errorf("get job: %w", err) + } + if job.Status.Succeeded >= 1 { + // job is successful + return false, nil + } + if job.Status.Failed == 0 { + // job has not yet tried + return false, nil + } + exceedsBackoffLimit := job.Status.Failed > *job.Spec.BackoffLimit + return !exceedsBackoffLimit, nil +} + +func streamJobProgress(rc io.ReadCloser, progressCh chan<- string) error { + defer rc.Close() + + scanner := bufio.NewScanner(rc) + for scanner.Scan() { + line := scanner.Text() + if isProgressLogLine(line) { + progressCh <- getProgressFromLogLine(line) + } + } + return scanner.Err() +} + +func getProgressArgs(count, total int) []any { + return []any{ + slog.String("progress", fmt.Sprintf("%d%%", count*100/total)), + slog.Int("count", count), + slog.Int("total", total), + } +} + +var progressRegex = regexp.MustCompile(`progress=(\d+%) count=\d+ total=\d+`) + +func isProgressLogLine(line string) bool { + return progressRegex.MatchString(line) +} + +func getProgressFromLogLine(line string) string { + matches := progressRegex.FindStringSubmatch(line) + if len(matches) != 2 { + return "" + } + return progressRegex.FindStringSubmatch(line)[1] +} + +func newMigrationJob(image string) *batchv1.Job { + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataMigrationJobName, + Namespace: runtimeconfig.RegistryNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + Spec: batchv1.JobSpec{ + BackoffLimit: ptr.To[int32](6), // this is the default + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + ServiceAccountName: serviceAccountName, + Volumes: []corev1.Volume{ + { + Name: "registry-data", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "registry", // yes it's really just called "registry" + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "migrate-registry-data", + Image: image, + Command: []string{"/manager"}, + Args: []string{"migrate", "registry-data"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "registry-data", + MountPath: "/registry", + }, + }, + EnvFrom: []corev1.EnvFromSource{ + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: seaweedfsS3SecretName, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + job.ObjectMeta.Labels = applyRegistryLabels(job.ObjectMeta.Labels, dataMigrationJobName) + + return job +} + +func ensureMigrationServiceAccount(ctx context.Context, cli client.Client) error { + newRole := rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: "registry-data-migration-role", + Namespace: runtimeconfig.RegistryNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Role", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{"apps"}, + Resources: []string{"deployments"}, + Verbs: []string{"get", "list", "update"}, + }, + { + APIGroups: []string{""}, + Resources: []string{"secrets"}, + Verbs: []string{"create"}, + }, + }, + } + err := cli.Create(ctx, &newRole) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("create role: %w", err) + } + + newServiceAccount := corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceAccountName, + Namespace: runtimeconfig.RegistryNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "ServiceAccount", + APIVersion: "v1", + }, + } + err = cli.Create(ctx, &newServiceAccount) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("create service account: %w", err) + } + + newRoleBinding := rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "registry-data-migration-rolebinding", + Namespace: runtimeconfig.RegistryNamespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "RoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1", + }, + RoleRef: rbacv1.RoleRef{ + Kind: "Role", + Name: "registry-data-migration-role", + APIGroup: "rbac.authorization.k8s.io", + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: serviceAccountName, + Namespace: runtimeconfig.RegistryNamespace, + }, + }, + } + + err = cli.Create(ctx, &newRoleBinding) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("create role binding: %w", err) + } + + return nil +} + +func ensureS3Secret(ctx context.Context, kcli client.Client) error { + accessKey, secretKey, err := seaweedfs.GetS3RWCreds(ctx, kcli) + if err != nil { + return fmt.Errorf("get seaweedfs s3 rw creds: %w", err) + } + + obj := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Namespace: runtimeconfig.RegistryNamespace, Name: seaweedfsS3SecretName}, + Data: map[string][]byte{ + "s3AccessKey": []byte(accessKey), + "s3SecretKey": []byte(secretKey), + }, + } + + obj.ObjectMeta.Labels = seaweedfs.ApplyLabels(obj.ObjectMeta.Labels, "s3") + + if err := kcli.Create(ctx, obj); err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("create secret: %w", err) + } + return nil +} + +// hasRegistryMigrated checks if the registry data has been migrated by looking for the 'migration complete' secret in the registry namespace +func hasRegistryMigrated(ctx context.Context, cli client.Client) (bool, error) { + sec := corev1.Secret{} + err := cli.Get(ctx, client.ObjectKey{Namespace: runtimeconfig.RegistryNamespace, Name: dataMigrationCompleteSecretName}, &sec) + if err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("get registry migration secret: %w", err) + } + + return true, nil +} + +func maybeDeleteRegistryJob(ctx context.Context, cli client.Client) error { + err := cli.Delete(ctx, &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: runtimeconfig.RegistryNamespace, + Name: dataMigrationJobName, + }, + }) + if err != nil && !k8serrors.IsNotFound(err) { + return fmt.Errorf("delete registry migration job: %w", err) + } + return nil +} + +func applyRegistryLabels(labels map[string]string, component string) map[string]string { + if labels == nil { + labels = make(map[string]string) + } + // TODO: why would we want to back this up? + // labels["app"] = "docker-registry" // this is the backup/restore label for the registry + labels["app.kubernetes.io/component"] = component + labels["app.kubernetes.io/part-of"] = "embedded-cluster" + labels["app.kubernetes.io/managed-by"] = "embedded-cluster-operator" + return labels +} diff --git a/pkg/addons/registry/migrate/job_test.go b/pkg/addons/registry/migrate/job_test.go new file mode 100644 index 000000000..2c92e5360 --- /dev/null +++ b/pkg/addons/registry/migrate/job_test.go @@ -0,0 +1,71 @@ +package migrate + +import ( + "bytes" + "context" + "log/slog" + "testing" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func Test_isProgressLogLine(t *testing.T) { + buf := new(bytes.Buffer) + logger := slog.New(slog.NewTextHandler(buf, nil)) + + logger.Info("test", getProgressArgs(2, 10)...) + if !isProgressLogLine(buf.String()) { + t.Errorf("isProgressLogLine(%v) = false, want true", buf.String()) + } + buf.Reset() + + logger.Info("not a progress line") + if isProgressLogLine(buf.String()) { + t.Errorf("isProgressLogLine(%v) = true, want false", buf.String()) + } + buf.Reset() +} + +func Test_getProgressFromLogLine(t *testing.T) { + buf := new(bytes.Buffer) + logger := slog.New(slog.NewTextHandler(buf, nil)) + + logger.Info("test", getProgressArgs(2, 10)...) + if got := getProgressFromLogLine(buf.String()); got != "20%" { + t.Errorf("getProgressFromLogLine(%v) = %v, want %v", buf.String(), got, "20%") + } + buf.Reset() + + logger.Info("not a progress line") + if got := getProgressFromLogLine(buf.String()); got != "" { + t.Errorf("getProgressFromLogLine(%v) = %v, want %v", buf.String(), got, "") + } + buf.Reset() +} + +func Test_isJobRetrying(t *testing.T) { + type args struct { + ctx context.Context + cli client.Client + } + tests := []struct { + name string + args args + want bool + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := isJobRetrying(tt.args.ctx, tt.args.cli) + if (err != nil) != tt.wantErr { + t.Errorf("isJobRetrying() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("isJobRetrying() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/addons/registry/migrate/migrate.go b/pkg/addons/registry/migrate/migrate.go new file mode 100644 index 000000000..11f8f5b34 --- /dev/null +++ b/pkg/addons/registry/migrate/migrate.go @@ -0,0 +1,242 @@ +package migrate + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/pkg/errors" + "github.com/replicatedhq/embedded-cluster/pkg/runtimeconfig" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + s3Bucket = "registry" + s3RootDirectory = "registry" + labelSelector = "app=docker-registry" +) + +// RegistryData runs a migration that copies data on disk in the registry-data PVC to the seaweedfs +// s3 store. If it fails, it will scale the registry deployment back to 1. If it succeeds, it will +// create a secret used to indicate success to the operator. +func RegistryData(ctx context.Context, cli client.Client) error { + // if the migration fails, we need to scale the registry back to 1 + success := false + + slog.Info("Scaling registry to 0 replicas") + + err := registryScale(ctx, cli, 0) + if err != nil { + return fmt.Errorf("scale registry to 0 replicas: %w", err) + } + + defer func() { + r := recover() + + if !success { + slog.Info("Scaling registry back to 1 replica after migration failure") + + // this should use the background context as we want it to run even if the context expired + err := registryScale(context.Background(), cli, 1) + if err != nil { + slog.Error("Failed to scale registry back to 1 replica", "error", err) + } + } + + if r != nil { + panic(r) + } + }() + + slog.Info("Connecting to s3") + + s3Client, err := getS3Client(ctx) + if err != nil { + return errors.Wrap(err, "get s3 client") + } + + slog.Info("Ensuring registry bucket") + + err = ensureRegistryBucket(ctx, s3Client) + if err != nil { + return errors.Wrap(err, "ensure registry bucket") + } + + slog.Info("Running registry data migration") + + s3Uploader := s3manager.NewUploader(s3Client) + + total, err := countRegistryFiles() + if err != nil { + return errors.Wrap(err, "count registry files") + } + + count := 0 + err = filepath.Walk("/registry", func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("walk: %w", err) + } + + if info.IsDir() { + return nil + } + + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + defer f.Close() + + relPath, err := filepath.Rel("/", path) + if err != nil { + return fmt.Errorf("get relative path: %w", err) + } + + count++ + // NOTE: this is used by the cli to report progress + // DO NOT CHANGE THIS + slog.Info( + "Uploading object", + append( + []any{"path", relPath, "size", info.Size()}, + getProgressArgs(count, total)..., + )..., + ) + _, err = s3Uploader.Upload(ctx, &s3.PutObjectInput{ + Bucket: ptr.To(s3Bucket), + Key: &relPath, + Body: f, + }) + if err != nil { + return fmt.Errorf("upload object: %w", err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("walk registry data: %w", err) + } + + slog.Info("Creating registry data migration secret") + + err = ensureRegistryDataMigrationSecret(ctx, cli) + if err != nil { + return fmt.Errorf("ensure registry data migration secret: %w", err) + } + + success = true + + slog.Info("Registry data migration complete") + + return nil +} + +// registryScale scales the registry deployment to the given replica count. +// '0' and '1' are the only acceptable values. +func registryScale(ctx context.Context, cli client.Client, scale int32) error { + if scale != 0 && scale != 1 { + return fmt.Errorf("invalid scale: %d", scale) + } + + currentRegistry := &appsv1.Deployment{} + err := cli.Get(ctx, client.ObjectKey{Namespace: runtimeconfig.RegistryNamespace, Name: "registry"}, currentRegistry) + if err != nil { + return fmt.Errorf("get registry deployment: %w", err) + } + + currentRegistry.Spec.Replicas = &scale + + err = cli.Update(ctx, currentRegistry) + if err != nil { + return fmt.Errorf("update registry deployment: %w", err) + } + + return nil +} + +func getS3Client(ctx context.Context) (*s3.Client, error) { + creds := credentials.NewStaticCredentialsProvider(os.Getenv("s3AccessKey"), os.Getenv("s3SecretKey"), "") + conf, err := config.LoadDefaultConfig(ctx, + config.WithCredentialsProvider(creds), + config.WithRegion("us-east-1"), + ) + if err != nil { + return nil, fmt.Errorf("load aws config: %w", err) + } + + s3Client := s3.NewFromConfig(conf, func(o *s3.Options) { + o.UsePathStyle = true + o.BaseEndpoint = aws.String("http://seaweedfs-s3.seaweedfs:8333/") + }) + + return s3Client, nil +} + +func ensureRegistryBucket(ctx context.Context, s3Client *s3.Client) error { + _, err := s3Client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: ptr.To(s3Bucket), + }) + if err != nil { + if !strings.Contains(err.Error(), "BucketAlreadyExists") { + return errors.Wrap(err, "create bucket") + } + } + return nil +} + +// ensureRegistryDataMigrationSecret indicates that the registry data migration has been completed. +func ensureRegistryDataMigrationSecret(ctx context.Context, cli client.Client) error { + migrationSecret := corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: dataMigrationCompleteSecretName, + Namespace: runtimeconfig.RegistryNamespace, + Labels: map[string]string{ + "replicated.com/disaster-recovery": "ec-install", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + Data: map[string][]byte{ + "migration": []byte("complete"), + }, + } + err := cli.Create(ctx, &migrationSecret) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("create registry data migration secret: %w", err) + } + + return nil +} + +func countRegistryFiles() (int, error) { + var count int + err := filepath.Walk("/registry", func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + count++ + return nil + }) + if err != nil { + return 0, fmt.Errorf("walk /registry directory: %w", err) + } + return count, nil +} diff --git a/pkg/addons/registry/upgrade.go b/pkg/addons/registry/upgrade.go index 33f5ff58c..f66e17446 100644 --- a/pkg/addons/registry/upgrade.go +++ b/pkg/addons/registry/upgrade.go @@ -2,11 +2,11 @@ package registry import ( "context" - "log/slog" "github.com/pkg/errors" "github.com/replicatedhq/embedded-cluster/pkg/addons/seaweedfs" "github.com/replicatedhq/embedded-cluster/pkg/helm" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,7 +26,7 @@ func (r *Registry) Upgrade(ctx context.Context, kcli client.Client, hcli helm.Cl return errors.Wrap(err, "check if release exists") } if !exists { - slog.Info("Release not found, installing", "release", releaseName, "namespace", namespace) + logrus.Debugf("Release not found, installing release %s in namespace %s", releaseName, namespace) if err := r.Install(ctx, kcli, hcli, overrides, nil); err != nil { return errors.Wrap(err, "install") } @@ -60,7 +60,7 @@ func (r *Registry) Upgrade(ctx context.Context, kcli client.Client, hcli helm.Cl func (r *Registry) createUpgradePreRequisites(ctx context.Context, kcli client.Client) error { if r.IsHA { - if err := createS3Secret(ctx, kcli); err != nil { + if err := ensureS3Secret(ctx, kcli); err != nil { return errors.Wrap(err, "create s3 secret") } } @@ -68,7 +68,7 @@ func (r *Registry) createUpgradePreRequisites(ctx context.Context, kcli client.C return nil } -func createS3Secret(ctx context.Context, kcli client.Client) error { +func ensureS3Secret(ctx context.Context, kcli client.Client) error { accessKey, secretKey, err := seaweedfs.GetS3RWCreds(ctx, kcli) if err != nil { return errors.Wrap(err, "get seaweedfs s3 rw creds") diff --git a/pkg/addons/seaweedfs/install.go b/pkg/addons/seaweedfs/install.go index 5fbd1eaa8..faef82082 100644 --- a/pkg/addons/seaweedfs/install.go +++ b/pkg/addons/seaweedfs/install.go @@ -17,32 +17,11 @@ import ( ) func (s *SeaweedFS) Install(ctx context.Context, kcli client.Client, hcli helm.Client, overrides []string, writer *spinner.MessageWriter) error { - if err := s.createPreRequisites(ctx, kcli); err != nil { - return errors.Wrap(err, "create prerequisites") - } - - values, err := s.GenerateHelmValues(ctx, kcli, overrides) - if err != nil { - return errors.Wrap(err, "generate helm values") - } - - _, err = hcli.Install(ctx, helm.InstallOptions{ - ReleaseName: releaseName, - ChartPath: Metadata.Location, - ChartVersion: Metadata.Version, - Values: values, - Namespace: namespace, - Labels: getBackupLabels(), - }) - if err != nil { - return errors.Wrap(err, "helm install") - } - - return nil + return s.Upgrade(ctx, kcli, hcli, overrides) } -func (s *SeaweedFS) createPreRequisites(ctx context.Context, kcli client.Client) error { - if err := createNamespace(ctx, kcli, namespace); err != nil { +func (s *SeaweedFS) ensurePreRequisites(ctx context.Context, kcli client.Client) error { + if err := ensureNamespace(ctx, kcli, namespace); err != nil { return errors.Wrap(err, "create namespace") } @@ -50,14 +29,14 @@ func (s *SeaweedFS) createPreRequisites(ctx context.Context, kcli client.Client) return errors.Wrap(err, "create s3 service") } - if err := createS3Secret(ctx, kcli); err != nil { + if err := ensureS3Secret(ctx, kcli); err != nil { return errors.Wrap(err, "create s3 secret") } return nil } -func createNamespace(ctx context.Context, kcli client.Client, namespace string) error { +func ensureNamespace(ctx context.Context, kcli client.Client, namespace string) error { ns := corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespace, @@ -121,7 +100,7 @@ func ensureService(ctx context.Context, kcli client.Client, serviceCIDR string) return nil } -func createS3Secret(ctx context.Context, kcli client.Client) error { +func ensureS3Secret(ctx context.Context, kcli client.Client) error { var config seaweedfsConfig config.Identities = append(config.Identities, seaweedfsIdentity{ Name: "anvAdmin", diff --git a/pkg/addons/seaweedfs/upgrade.go b/pkg/addons/seaweedfs/upgrade.go index 1a42857a1..fbb808e5d 100644 --- a/pkg/addons/seaweedfs/upgrade.go +++ b/pkg/addons/seaweedfs/upgrade.go @@ -2,31 +2,43 @@ package seaweedfs import ( "context" - "log/slog" "github.com/pkg/errors" "github.com/replicatedhq/embedded-cluster/pkg/helm" + "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) func (s *SeaweedFS) Upgrade(ctx context.Context, kcli client.Client, hcli helm.Client, overrides []string) error { + if err := s.ensurePreRequisites(ctx, kcli); err != nil { + return errors.Wrap(err, "create prerequisites") + } + + values, err := s.GenerateHelmValues(ctx, kcli, overrides) + if err != nil { + return errors.Wrap(err, "generate helm values") + } + exists, err := hcli.ReleaseExists(ctx, namespace, releaseName) if err != nil { return errors.Wrap(err, "check if release exists") } if !exists { - slog.Info("Release not found, installing", "release", releaseName, "namespace", namespace) - if err := s.Install(ctx, kcli, hcli, overrides, nil); err != nil { - return errors.Wrap(err, "install") + logrus.Debugf("Release not found, installing release %s in namespace %s", releaseName, namespace) + _, err = hcli.Install(ctx, helm.InstallOptions{ + ReleaseName: releaseName, + ChartPath: Metadata.Location, + ChartVersion: Metadata.Version, + Values: values, + Namespace: namespace, + Labels: getBackupLabels(), + }) + if err != nil { + return errors.Wrap(err, "helm install") } return nil } - values, err := s.GenerateHelmValues(ctx, kcli, overrides) - if err != nil { - return errors.Wrap(err, "generate helm values") - } - _, err = hcli.Upgrade(ctx, helm.UpgradeOptions{ ReleaseName: releaseName, ChartPath: Metadata.Location, diff --git a/pkg/addons/velero/upgrade.go b/pkg/addons/velero/upgrade.go index 0be084340..860564499 100644 --- a/pkg/addons/velero/upgrade.go +++ b/pkg/addons/velero/upgrade.go @@ -2,10 +2,10 @@ package velero import ( "context" - "log/slog" "github.com/pkg/errors" "github.com/replicatedhq/embedded-cluster/pkg/helm" + "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -15,7 +15,7 @@ func (v *Velero) Upgrade(ctx context.Context, kcli client.Client, hcli helm.Clie return errors.Wrap(err, "check if release exists") } if !exists { - slog.Info("Release not found, installing", "release", releaseName, "namespace", namespace) + logrus.Debugf("Release not found, installing release %s in namespace %s", releaseName, namespace) if err := v.Install(ctx, kcli, hcli, overrides, nil); err != nil { return errors.Wrap(err, "install") } diff --git a/pkg/dryrun/kubeutils.go b/pkg/dryrun/kubeutils.go index 533b43c92..2f4fa3ea1 100644 --- a/pkg/dryrun/kubeutils.go +++ b/pkg/dryrun/kubeutils.go @@ -64,10 +64,6 @@ func (k *KubeUtils) IsDaemonsetReady(ctx context.Context, cli client.Client, ns, return true, nil } -func (k *KubeUtils) IsJobComplete(ctx context.Context, cli client.Client, ns, name string, completions int32) (bool, error) { - return true, nil -} - func (k *KubeUtils) WaitForKubernetes(ctx context.Context, cli client.Client) <-chan error { errCh := make(chan error) close(errCh) diff --git a/pkg/helm/client.go b/pkg/helm/client.go index c9e49ae26..f7778c571 100644 --- a/pkg/helm/client.go +++ b/pkg/helm/client.go @@ -239,6 +239,24 @@ func (h *HelmClient) PullOCI(url string, version string) (string, error) { return path, nil } +func (h *HelmClient) PullOCIWithRetries(ctx context.Context, chartPath, chartVersion string, tries int) (string, error) { + for i := 0; ; i++ { + localPath, err := h.PullOCI(chartPath, chartVersion) + if err == nil { + return localPath, nil + } + logrus.Debugf("Failed to pull %s:%v (%d/%d): %v", chartPath, chartVersion, i+1, tries, err) + if i == tries-1 { + return "", err + } + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return "", ctx.Err() + } + } +} + func (h *HelmClient) Pull(repo string, chart string, version string) (string, error) { if err := h.prepare(); err != nil { return "", fmt.Errorf("prepare: %w", err) @@ -335,7 +353,7 @@ func (h *HelmClient) Install(ctx context.Context, opts InstallOptions) (*release var localPath string if h.airgapPath == "" { // online, pull chart from remote - localPath, err = h.PullOCI(opts.ChartPath, opts.ChartVersion) + localPath, err = h.PullOCIWithRetries(ctx, opts.ChartPath, opts.ChartVersion, 3) if err != nil { return nil, fmt.Errorf("pull oci: %w", err) } @@ -389,7 +407,7 @@ func (h *HelmClient) Upgrade(ctx context.Context, opts UpgradeOptions) (*release var localPath string if h.airgapPath == "" { // online, pull chart from remote - localPath, err = h.PullOCI(opts.ChartPath, opts.ChartVersion) + localPath, err = h.PullOCIWithRetries(ctx, opts.ChartPath, opts.ChartVersion, 3) if err != nil { return nil, fmt.Errorf("pull oci: %w", err) } diff --git a/pkg/kubeutils/client.go b/pkg/kubeutils/client.go index 99b931d76..f708deacb 100644 --- a/pkg/kubeutils/client.go +++ b/pkg/kubeutils/client.go @@ -10,6 +10,7 @@ import ( velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" @@ -45,3 +46,12 @@ func (k *KubeUtils) RESTClientGetterFactory(namespace string) genericclioptions. } return cfgFlags } + +func GetClientset() (*kubernetes.Clientset, error) { + cfg, err := config.GetConfig() + if err != nil { + return nil, fmt.Errorf("get kubernetes client config: %w", err) + } + + return kubernetes.NewForConfig(cfg) +} diff --git a/pkg/kubeutils/interface.go b/pkg/kubeutils/interface.go index cff74598f..db6da528a 100644 --- a/pkg/kubeutils/interface.go +++ b/pkg/kubeutils/interface.go @@ -33,7 +33,6 @@ type KubeUtilsInterface interface { IsDeploymentReady(ctx context.Context, cli client.Client, ns, name string) (bool, error) IsStatefulSetReady(ctx context.Context, cli client.Client, ns, name string) (bool, error) IsDaemonsetReady(ctx context.Context, cli client.Client, ns, name string) (bool, error) - IsJobComplete(ctx context.Context, cli client.Client, ns, name string, completions int32) (bool, error) WaitForKubernetes(ctx context.Context, cli client.Client) <-chan error WaitForCRDToBeReady(ctx context.Context, cli client.Client, name string) error KubeClient() (client.Client, error) @@ -106,10 +105,6 @@ func IsDaemonsetReady(ctx context.Context, cli client.Client, ns, name string) ( return kb.IsDaemonsetReady(ctx, cli, ns, name) } -func IsJobComplete(ctx context.Context, cli client.Client, ns, name string, completions int32) (bool, error) { - return kb.IsJobComplete(ctx, cli, ns, name, completions) -} - func WaitForKubernetes(ctx context.Context, cli client.Client) <-chan error { return kb.WaitForKubernetes(ctx, cli) } diff --git a/pkg/kubeutils/kubeutils.go b/pkg/kubeutils/kubeutils.go index ad2066c0f..9815db496 100644 --- a/pkg/kubeutils/kubeutils.go +++ b/pkg/kubeutils/kubeutils.go @@ -10,6 +10,7 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" @@ -117,19 +118,39 @@ func (k *KubeUtils) WaitForJob(ctx context.Context, cli client.Client, ns, name var lasterr error if err := wait.ExponentialBackoffWithContext( ctx, backoff, func(ctx context.Context) (bool, error) { - ready, err := k.IsJobComplete(ctx, cli, ns, name, completions) - if err != nil { - lasterr = fmt.Errorf("unable to get job status: %w", err) + var job batchv1.Job + err := cli.Get(ctx, client.ObjectKey{Namespace: ns, Name: name}, &job) + if k8serrors.IsNotFound(err) { + // exit + lasterr = fmt.Errorf("job not found") + return false, lasterr + } else if err != nil { + lasterr = fmt.Errorf("unable to get job: %w", err) return false, nil } - return ready, nil + + failed := k.isJobFailed(job) + if failed { + // exit + lasterr = fmt.Errorf("job failed") + return false, lasterr + } + + completed := k.isJobCompleted(job, completions) + if completed { + return true, nil + } + + // TODO: need to handle the case where the pod get stuck in pending + // This can happen if nodes are not schedulable or if a volume is not found + + return false, nil }, ); err != nil { if lasterr != nil { - return fmt.Errorf("timed out waiting for job %s: %w", name, lasterr) - } else { - return fmt.Errorf("timed out waiting for job %s", name) + return lasterr } + return fmt.Errorf("timed out waiting for job %s", name) } return nil } @@ -275,17 +296,20 @@ func (k *KubeUtils) IsDaemonsetReady(ctx context.Context, cli client.Client, ns, return false, nil } -// IsJobComplete returns true if the job has been completed successfully. -func (k *KubeUtils) IsJobComplete(ctx context.Context, cli client.Client, ns, name string, completions int32) (bool, error) { - var job batchv1.Job - nsn := types.NamespacedName{Namespace: ns, Name: name} - if err := cli.Get(ctx, nsn, &job); err != nil { - return false, err - } - if job.Status.Succeeded >= completions { - return true, nil +// isJobCompleted returns true if the job has been completed successfully. +func (k *KubeUtils) isJobCompleted(job batchv1.Job, completions int32) bool { + isSucceeded := job.Status.Succeeded >= completions + return isSucceeded +} + +// isJobFailed if the job has exceeded the backoff limit. +func (k *KubeUtils) isJobFailed(job batchv1.Job) bool { + backoffLimit := int32(6) // default + if job.Spec.BackoffLimit != nil { + backoffLimit = *job.Spec.BackoffLimit } - return false, nil + exceedsBackoffLimit := job.Status.Failed > backoffLimit + return exceedsBackoffLimit } // IsPodComplete returns true if the pod has completed.