diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go index 41134647d..283ea4d3b 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/crds.go @@ -1347,6 +1347,76 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "walPvc": { + Type: "object", + Required: []string{"size"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "iops": { + Type: "integer", + }, + "selector": { + Type: "object", + Properties: map[string]apiextv1.JSONSchemaProps{ + "matchExpressions": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + Required: []string{"key", "operator"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "key": { + Type: "string", + }, + "operator": { + Type: "string", + Enum: []apiextv1.JSON{ + { + Raw: []byte(`"DoesNotExist"`), + }, + { + Raw: []byte(`"Exists"`), + }, + { + Raw: []byte(`"In"`), + }, + { + Raw: []byte(`"NotIn"`), + }, + }, + }, + "values": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + }, + }, + }, + }, + "matchLabels": { + Type: "object", + XPreserveUnknownFields: util.True(), + }, + }, + }, + "size": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "storageClass": { + Type: "string", + }, + "subPath": { + Type: "string", + }, + "throughput": { + Type: "integer", + }, + }, + }, }, }, "status": { diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go index 98b9e3ce0..8828c3a45 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go @@ -96,6 +96,7 @@ type PostgresSpec struct { Backup *Backup `json:"backup,omitempty"` TDE *TDE `json:"tde,omitempty"` Monitoring *Monitoring `json:"monitor,omitempty"` + WalPvc *Volume `json:"walPvc,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go index 662d4f781..b87578efd 100644 --- a/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go +++ b/pkg/apis/cpo.opensource.cybertec.at/v1/zz_generated.deepcopy.go @@ -965,6 +965,11 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { *out = new(Monitoring) **out = **in } + if in.WalPvc != nil { + in, out := &in.WalPvc, &out.WalPvc + *out = new(Volume) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d09f4c389..533442f37 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -617,7 +617,16 @@ func (c *Cluster) compareStatefulSetWith(oldSts, newSts *appsv1.StatefulSet) *co needsReplace = true reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one") } - for i := 0; i < len(oldSts.Spec.VolumeClaimTemplates); i++ { + + //Account for the deleted PVC for wal + lenVCT := 0 + if len(oldSts.Spec.VolumeClaimTemplates) < len(newSts.Spec.VolumeClaimTemplates) { + lenVCT = len(oldSts.Spec.VolumeClaimTemplates) + } else { + lenVCT = len(newSts.Spec.VolumeClaimTemplates) + } + + for i := 0; i < lenVCT; i++ { name := oldSts.Spec.VolumeClaimTemplates[i].Name // Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta if name != newSts.Spec.VolumeClaimTemplates[i].Name { @@ -1012,8 +1021,17 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { c.syncMonitoringSecret(oldSpec, newSpec) } + //sync WAL-PVC + if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) { + if err := c.syncWalPvc(oldSpec, newSpec); err != nil { + c.logger.Warningf("could not sync PVC WAL %v", err) + } + } + //sync sts when there is a change in the pgbackrest secret, since we need to mount this - if !reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Configuration, newSpec.Spec.Backup.Pgbackrest.Configuration) { + if oldSpec.Spec.Backup != nil && newSpec.Spec.Backup != nil && + oldSpec.Spec.Backup.Pgbackrest != nil && newSpec.Spec.Backup.Pgbackrest != nil && + !reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Configuration, newSpec.Spec.Backup.Pgbackrest.Configuration) { syncStatefulSet = true } @@ -1068,6 +1086,10 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error { updateFailed = true return } + if oldSpec.Spec.WalPvc != nil { + //if pvc wal is removed then carry the relevant env vars to the new sts + + } if c.restoreInProgress() { c.applyRestoreStatefulSetSyncOverrides(newSs, oldSs) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index c331a3430..1ff5af7c5 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -649,6 +649,10 @@ func isBootstrapOnlyParameter(param string) bool { return result } +func getWALPVCName(cluster_name string) string { + return "walpvc" + cluster_name +} + func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount { return []v1.VolumeMount{ { @@ -659,6 +663,14 @@ func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount { } } +func generateWalVolumeMounts(volume cpov1.Volume, cluster_name string) v1.VolumeMount { + return v1.VolumeMount{ + Name: getWALPVCName(cluster_name), + MountPath: constants.PostgresPVCWalMount, + SubPath: volume.SubPath, + } +} + func generateContainer( name string, dockerImage *string, @@ -1005,6 +1017,10 @@ func (c *Cluster) generateSpiloPodEnvVars( envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"}) } + if spec.WalPvc != nil { + envVars = append(envVars, v1.EnvVar{Name: "NEWWALDIR", Value: constants.PostgresPVCWalMount}) + envVars = append(envVars, v1.EnvVar{Name: "OLDWALDIR",Value: ""}) + } if c.OpConfig.EnablePgVersionEnvVar { envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()}) } @@ -1290,9 +1306,9 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu sidecarContainers []v1.Container podTemplate *v1.PodTemplateSpec volumeClaimTemplate *v1.PersistentVolumeClaim + WalPvcClaim *v1.PersistentVolumeClaim additionalVolumes = spec.AdditionalVolumes ) - defaultResources := makeDefaultResources(&c.OpConfig) resourceRequirements, err := c.generateResourceRequirements( spec.Resources, defaultResources, constants.PostgresContainerName) @@ -1365,6 +1381,10 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu volumeMounts := generateVolumeMounts(spec.Volume) + if spec.WalPvc != nil { + volumeMounts = append(volumeMounts, generateWalVolumeMounts(*spec.WalPvc, c.Spec.ClusterName)) + } + // configure TLS with a custom secret volume if spec.TLS != nil && spec.TLS.SecretName != "" { getSpiloTLSEnv := func(k string) string { @@ -1509,7 +1529,13 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu additionalVolumes = append(additionalVolumes, c.generateCertSecretVolume()) } } - + if spec.WalPvc != nil { + WalPvcClaim, err = c.generatePersistentVolumeClaimTemplate(spec.WalPvc.Size, + spec.WalPvc.StorageClass, spec.WalPvc.Selector, getWALPVCName(spec.ClusterName)) + if err != nil { + c.logger.Errorf("could not generate volume claim template for WAL directory: %v", err) + } + } // generate pod template for the statefulset, based on the spilo container and sidecars podTemplate, err = c.generatePodTemplate( c.Namespace, @@ -1578,6 +1604,11 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu persistentVolumeClaimRetentionPolicy.WhenScaled = appsv1.RetainPersistentVolumeClaimRetentionPolicyType } + final_vols := []v1.PersistentVolumeClaim{*volumeClaimTemplate} + if spec.WalPvc != nil { + final_vols = []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim} + } + statefulSet := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: c.statefulSetName(), @@ -1590,7 +1621,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu Selector: c.labelsSelector(TYPE_POSTGRESQL), ServiceName: c.serviceName(Master), Template: *podTemplate, - VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate}, + VolumeClaimTemplates: final_vols, UpdateStrategy: updateStrategy, PodManagementPolicy: podManagementPolicy, PersistentVolumeClaimRetentionPolicy: &persistentVolumeClaimRetentionPolicy, diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 1c42fcf78..522072965 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -218,7 +218,11 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error { return err } - // sync volume may already transition volumes to gp3, if iops/throughput or type is specified + if err = c.syncWalPvc(&oldSpec, newSpec); err != nil { + return fmt.Errorf("could not sync WAL-PVC: %v", err) + } + + //sync volume may already transition volumes to gp3, if iops/throughput or type is specified if err = c.syncVolumes(); err != nil { return err } @@ -1632,7 +1636,7 @@ func (c *Cluster) syncPgbackrestJob(forceRemove bool) error { if err := c.createPgbackrestJob(job); err != nil { return fmt.Errorf("could not create a pgbackrest cronjob: %v", err) } - c.logger.Info("pgbackrest cronjob for %v %v has been successfully created", rep, schedul) + c.logger.Infof("pgbackrest cronjob for %v %v has been successfully created", rep, schedul) } } } @@ -1756,6 +1760,37 @@ func (c *Cluster) syncMonitoringSecret(oldSpec, newSpec *cpov1.Postgresql) error return nil } +func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error { + c.logger.Info("syncing PVC for WAL") + c.setProcessName("syncing PVC for WAL") + + if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil { + + containers := c.Statefulset.Spec.Template.Spec.Containers + for _, con := range containers { + con.Env = append(con.Env, v1.EnvVar{Name: "NEWWALDIR", Value: ""}) + con.Env = append(con.Env, v1.EnvVar{Name: "OLDWALDIR",Value: constants.PostgresPVCWalMount}) + } + // run the script to move the wal files and then remove the pvc + //result, err = c.ExecCommand(podName, "scripts/move_wal_dir.sh" + constants.PostgresPVCWalMount + " " + constants.PostgresWALPath) + + pvcs, err := c.listPersistentVolumeClaims() + if err != nil { + return fmt.Errorf("Could not list PVCs") + } else { + for _, pvc := range pvcs { + if strings.Contains(pvc.Name, getWALPVCName(c.Spec.ClusterName)) { + c.logger.Infof("deleting WAL-PVC %q", util.NameFromMeta(pvc.ObjectMeta)) + if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil { + return fmt.Errorf("could not delete WAL PVC: %v", err) + } + } + } + } + } + return nil +} + func generateRootCertificate( privateKey *ecdsa.PrivateKey, serialNumber *big.Int, ) (*x509.Certificate, error) { diff --git a/pkg/util/constants/postgresql.go b/pkg/util/constants/postgresql.go index 8bd7508a7..4a1069f00 100644 --- a/pkg/util/constants/postgresql.go +++ b/pkg/util/constants/postgresql.go @@ -18,4 +18,7 @@ const ( RunVolumeName = "postgresql-run" RunVolumePath = "/var/run/postgresql" + + PostgresPVCWalMount = "/home/postgres/pvc/" + PostgresWALPath = PostgresDataPath + "/pg_wal" )