diff --git a/internal/controller/postgrescluster/instance.go b/internal/controller/postgrescluster/instance.go index ee616a0395..9073a35be9 100644 --- a/internal/controller/postgrescluster/instance.go +++ b/internal/controller/postgrescluster/instance.go @@ -776,10 +776,13 @@ func (r *Reconciler) rolloutInstance( ctx context.Context, cluster *v1beta1.PostgresCluster, instances *observedInstances, instance *Instance, ) error { + log := logging.FromContext(ctx).WithName("patroni") + log.Info("Starting to rolloutInstance...", "cluster", cluster.Name) // The StatefulSet and number of Pods should have already been verified, but // check again rather than panic. // TODO(cbandy): The check for StatefulSet can go away if we watch Pod deletes. if instance.Runner == nil || len(instance.Pods) != 1 { + log.Info("Unexpected instance state during rollout.") return errors.Errorf( "unexpected instance state during rollout: %v has %v pods", instance.Name, len(instance.Pods)) @@ -805,6 +808,8 @@ func (r *Reconciler) rolloutInstance( // NOTE(cbandy): The StatefulSet controlling this Pod reflects this change // in its Status and triggers another reconcile. if primary && len(instances.forCluster) > 1 { + log.Info("Starting controlled switchover...") + var span trace.Span ctx, span = r.Tracer.Start(ctx, "patroni-change-primary") defer span.End() @@ -814,7 +819,27 @@ func (r *Reconciler) rolloutInstance( return errors.Wrap(err, "failed to check if patroni v4 is used") } - success, err := patroni.Executor(exec).ChangePrimaryAndWait(ctx, pod.Name, "", patroniVer4) + var success bool + + // If PatroniPreferHTTP feature is enabled, try HTTP first, then fallback + if feature.Enabled(ctx, feature.PatroniPreferHTTP) { + log.Info("Attempting HTTP call...") + + if res, httpErr := patroni.NewHttpClient(ctx, r.Client, pod.Name); httpErr == nil { + if success, err = res.ChangePrimaryAndWait(ctx, pod.Name, "", true); err == nil { + log.Info("HTTP call succeeded.") + } + } + + if err != nil { + log.Info("HTTP call failed. Falling back to PodExec...") + } + } + + if err != nil { + success, err = patroni.Executor(exec).ChangePrimaryAndWait(ctx, pod.Name, "", patroniVer4) + } + if err = errors.WithStack(err); err == nil && !success { err = errors.New("unable to switchover") } diff --git a/internal/controller/postgrescluster/patroni.go b/internal/controller/postgrescluster/patroni.go index 2f5456d05f..f521376f3a 100644 --- a/internal/controller/postgrescluster/patroni.go +++ b/internal/controller/postgrescluster/patroni.go @@ -16,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/percona/percona-postgresql-operator/internal/feature" "github.com/percona/percona-postgresql-operator/internal/initialize" "github.com/percona/percona-postgresql-operator/internal/logging" "github.com/percona/percona-postgresql-operator/internal/naming" @@ -51,6 +52,7 @@ func (r *Reconciler) handlePatroniRestarts( ) error { const container = naming.ContainerDatabase var primaryNeedsRestart, replicaNeedsRestart *Instance + log := logging.FromContext(ctx).WithName("[PATRONI]") // Look for one primary and one replica that need to restart. Ignore // containers that are terminating or not running; Kubernetes will start @@ -88,11 +90,6 @@ func (r *Reconciler) handlePatroniRestarts( // first. if primaryNeedsRestart != nil { pod := primaryNeedsRestart.Pods[0] - exec := patroni.Executor(func( - ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string, - ) error { - return r.PodExec(ctx, pod.Namespace, pod.Name, container, stdin, stdout, stderr, command...) - }) patroniVer4, err := cluster.IsPatroniVer4() if err != nil { @@ -106,6 +103,27 @@ func (r *Reconciler) handlePatroniRestarts( role = "master" } + // If PatroniPreferHTTP feature is enabled, try HTTP first, then fallback + if feature.Enabled(ctx, feature.PatroniPreferHTTP) { + log.Info("Attempting HTTP call...") + + if client, err := patroni.NewHttpClient(ctx, r.Client, pod.Name); err == nil { + // We don't use scope in the HTTP Rest API + if err := client.RestartPendingMembers(ctx, role, ""); err == nil { + log.Info("HTTP call succeeded.") + return nil + } + } + + log.Info("HTTP call failed. Falling back to PodExec...") + } + + exec := patroni.Executor(func( + ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string, + ) error { + return r.PodExec(ctx, pod.Namespace, pod.Name, container, stdin, stdout, stderr, command...) + }) + return errors.WithStack(exec.RestartPendingMembers(ctx, role, naming.PatroniScope(cluster))) } @@ -124,10 +142,26 @@ func (r *Reconciler) handlePatroniRestarts( // how we decide when to restart. // - https://www.postgresql.org/docs/current/runtime-config-replication.html if replicaNeedsRestart != nil { + pod := replicaNeedsRestart.Pods[0] + + // If PatroniPreferHTTP feature is enabled, try HTTP first, then fallback + if feature.Enabled(ctx, feature.PatroniPreferHTTP) { + log.Info("Attempting HTTP call...") + + if client, err := patroni.NewHttpClient(ctx, r.Client, pod.Name); err == nil { + // We don't use scope in the HTTP Rest API + if err := client.RestartPendingMembers(ctx, "replica", ""); err == nil { + log.Info("HTTP call succeeded.") + return nil + } + } + + log.Info("HTTP call failed. Falling back to PodExec...") + } + exec := patroni.Executor(func( ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string, ) error { - pod := replicaNeedsRestart.Pods[0] return r.PodExec(ctx, pod.Namespace, pod.Name, container, stdin, stdout, stderr, command...) }) @@ -187,6 +221,8 @@ func (r *Reconciler) reconcilePatroniDynamicConfiguration( ctx context.Context, cluster *v1beta1.PostgresCluster, instances *observedInstances, pgHBAs postgres.HBAs, pgParameters postgres.Parameters, ) error { + log := logging.FromContext(ctx).WithName("[PATRONI]") + if !patroni.ClusterBootstrapped(cluster) { // Patroni has not yet bootstrapped. Dynamic configuration happens through // configuration files during bootstrap, so there's nothing to do here. @@ -209,21 +245,35 @@ func (r *Reconciler) reconcilePatroniDynamicConfiguration( return nil } - // NOTE(cbandy): Despite the guards above, calling PodExec may still fail - // due to a missing or stopped container. - - exec := func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error { - return r.PodExec(ctx, pod.Namespace, pod.Name, naming.ContainerDatabase, stdin, stdout, stderr, command...) - } - var configuration map[string]any + if cluster.Spec.Patroni != nil { configuration = cluster.Spec.Patroni.DynamicConfiguration } + configuration = patroni.DynamicConfiguration(cluster, configuration, pgHBAs, pgParameters) - return errors.WithStack( - patroni.Executor(exec).ReplaceConfiguration(ctx, configuration)) + // If PatroniPreferHTTP feature is enabled, try HTTP first, then fallback + if feature.Enabled(ctx, feature.PatroniPreferHTTP) { + log.Info("Attempting HTTP call...") + + if client, err := patroni.NewHttpClient(ctx, r.Client, pod.Name); err == nil { + if err := client.ReplaceConfiguration(ctx, configuration); err == nil { + log.Info("HTTP call succeeded.") + return nil + } + } + + log.Info("HTTP call failed. Falling back to PodExec...") + } + + // NOTE(cbandy): Despite the guards above, calling PodExec may still fail + // due to a missing or stopped container. + exec := func(ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error { + return r.PodExec(ctx, pod.Namespace, pod.Name, naming.ContainerDatabase, stdin, stdout, stderr, command...) + } + + return errors.WithStack(patroni.Executor(exec).ReplaceConfiguration(ctx, configuration)) } // generatePatroniLeaderLeaseService returns a v1.Service that exposes the @@ -462,7 +512,7 @@ func replicationCertSecretProjection(certificate *corev1.Secret) *corev1.SecretP func (r *Reconciler) reconcilePatroniSwitchover(ctx context.Context, cluster *v1beta1.PostgresCluster, instances *observedInstances, ) error { - log := logging.FromContext(ctx) + log := logging.FromContext(ctx).WithName("[PATRONI]") // If switchover is not enabled, clear out the Patroni switchover status fields // which might have been set by previous switchovers. @@ -552,9 +602,29 @@ func (r *Reconciler) reconcilePatroniSwitchover(ctx context.Context, // TODO(benjaminjb): consider pulling the timeline from the pod annotation; manual experiments // have shown that the annotation on the Leader pod is up to date during a switchover, but // missing from the Replica pods. - timeline, err := patroni.Executor(exec).GetTimeline(ctx) - if err != nil { - return err + var timeline int64 + var err error + + // If PatroniPreferHTTP feature is enabled, try HTTP first, then fallback + if feature.Enabled(ctx, feature.PatroniPreferHTTP) { + log.Info("Attempting HTTP call...") + + if res, httpErr := patroni.NewHttpClient(ctx, r.Client, runningPod.Name); httpErr == nil { + if timeline, err = res.GetTimeline(ctx); err == nil { + log.Info("HTTP call succeeded.") + } + } + + if err != nil { + log.Error(err, "HTTP call failed. Falling back to PodExec...") + } + } + + if timeline == 0 { + timeline, err = patroni.Executor(exec).GetTimeline(ctx) + if err != nil { + return err + } } if timeline == 0 { diff --git a/internal/feature/features.go b/internal/feature/features.go index db424ead42..912123e55d 100644 --- a/internal/feature/features.go +++ b/internal/feature/features.go @@ -86,6 +86,9 @@ const ( // Support VolumeSnapshots VolumeSnapshots = "VolumeSnapshots" + + // Use HTTP client for Patroni API calls instead of kubectl exec + PatroniPreferHTTP = "PatroniPreferHTTP" ) // NewGate returns a MutableGate with the Features defined in this package. @@ -98,6 +101,7 @@ func NewGate() MutableGate { AutoGrowVolumes: {Default: false, PreRelease: featuregate.Alpha}, BridgeIdentifiers: {Default: false, PreRelease: featuregate.Alpha}, InstanceSidecars: {Default: false, PreRelease: featuregate.Alpha}, + PatroniPreferHTTP: {Default: false, PreRelease: featuregate.Alpha}, PGBouncerSidecars: {Default: false, PreRelease: featuregate.Alpha}, TablespaceVolumes: {Default: false, PreRelease: featuregate.Alpha}, VolumeSnapshots: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/internal/patroni/api.go b/internal/patroni/api.go index af9d9739ec..1be3c974dc 100644 --- a/internal/patroni/api.go +++ b/internal/patroni/api.go @@ -5,14 +5,7 @@ package patroni import ( - "bytes" "context" - "encoding/json" - "errors" - "io" - "strings" - - "github.com/percona/percona-postgresql-operator/internal/logging" ) // API defines a general interface for interacting with the Patroni API. @@ -24,192 +17,22 @@ type API interface { // ReplaceConfiguration replaces Patroni's entire dynamic configuration. ReplaceConfiguration(ctx context.Context, configuration map[string]any) error -} - -// Executor implements API by calling "patronictl". -type Executor func( - ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string, -) error - -// Executor implements API. -var _ API = Executor(nil) - -// ChangePrimaryAndWait tries to demote the current Patroni leader by calling -// "patronictl". It returns true when an election completes successfully. It -// waits up to two "loop_wait" or until an error occurs. When Patroni is paused, -// next cannot be blank. Similar to the "POST /switchover" REST endpoint. -func (exec Executor) ChangePrimaryAndWait( - ctx context.Context, current, next string, patroniVer4 bool, -) (bool, error) { - var stdout, stderr bytes.Buffer - - // K8SPG-648: patroni v4.0.0 deprecated "master" role. - // We should use "primary" instead - cmd := []string{"patronictl", "switchover", "--scheduled=now", "--force", "--candidate=" + next} - if patroniVer4 { - cmd = append(cmd, "--primary="+current) - } else { - cmd = append(cmd, "--master="+current) - } - - err := exec(ctx, nil, &stdout, &stderr, cmd...) - - log := logging.FromContext(ctx) - log.V(1).Info("changed primary", - "stdout", stdout.String(), - "stderr", stderr.String(), - ) - - // The command exits zero when it is able to communicate with the Patroni - // HTTP API. It exits zero even when the API says switchover did not occur. - // Check for the text that indicates success. - // - https://github.com/zalando/patroni/blob/v2.0.2/patroni/api.py#L351-L367 - // - https://github.com/zalando/patroni/blob/v2.1.1/patroni/api.py#L461-L477 - return strings.Contains(stdout.String(), "switched over"), err -} - -// SwitchoverAndWait tries to change the current Patroni leader by calling -// "patronictl". It returns true when an election completes successfully. It -// waits up to two "loop_wait" or until an error occurs. When Patroni is paused, -// next cannot be blank. Similar to the "POST /switchover" REST endpoint. -// The "patronictl switchover" variant does not require the current master to be passed -// as a flag. -func (exec Executor) SwitchoverAndWait( - ctx context.Context, target string, -) (bool, error) { - var stdout, stderr bytes.Buffer - - err := exec(ctx, nil, &stdout, &stderr, - "patronictl", "switchover", "--scheduled=now", "--force", - "--candidate="+target) - - log := logging.FromContext(ctx) - log.V(1).Info("changed primary", - "stdout", stdout.String(), - "stderr", stderr.String(), - ) - - // The command exits zero when it is able to communicate with the Patroni - // HTTP API. It exits zero even when the API says switchover did not occur. - // Check for the text that indicates success. - // - https://github.com/zalando/patroni/blob/v2.0.2/patroni/api.py#L351-L367 - // Patroni has an edge case where it could switchover to an instance other - // than the requested candidate. In this case, stdout will contain - // "Switched over" instead of "switched over" and return false, nil - return strings.Contains(stdout.String(), "switched over"), err -} - -// FailoverAndWait tries to change the current Patroni leader by calling -// "patronictl". It returns true when an election completes successfully. It -// waits up to two "loop_wait" or until an error occurs. When Patroni is paused, -// next cannot be blank. Similar to the "POST /switchover" REST endpoint. -// The "patronictl failover" variant does not require the current master to be passed -// as a flag. -func (exec Executor) FailoverAndWait( - ctx context.Context, target string, -) (bool, error) { - var stdout, stderr bytes.Buffer - - err := exec(ctx, nil, &stdout, &stderr, - "patronictl", "failover", "--force", - "--candidate="+target) - - log := logging.FromContext(ctx) - log.V(1).Info("changed primary", - "stdout", stdout.String(), - "stderr", stderr.String(), - ) - // The command exits zero when it is able to communicate with the Patroni - // HTTP API. It exits zero even when the API says failover did not occur. - // Check for the text that indicates success. - // - https://github.com/zalando/patroni/blob/v2.0.2/patroni/api.py#L351-L367 - // Patroni has an edge case where it could failover to an instance other - // than the requested candidate. In this case, stdout will contain "Failed over" - // instead of "failed over" and return false, nil - return strings.Contains(stdout.String(), "failed over"), err -} - -// ReplaceConfiguration replaces Patroni's entire dynamic configuration by -// calling "patronictl". Similar to the "POST /switchover" REST endpoint. -func (exec Executor) ReplaceConfiguration( - ctx context.Context, configuration map[string]any, -) error { - var stdin, stdout, stderr bytes.Buffer - - err := json.NewEncoder(&stdin).Encode(configuration) - if err == nil { - err = exec(ctx, &stdin, &stdout, &stderr, - "patronictl", "edit-config", "--replace=-", "--force") - - log := logging.FromContext(ctx) - log.V(1).Info("replaced configuration", - "stdout", stdout.String(), - "stderr", stderr.String(), - ) - } - - return err -} - -// RestartPendingMembers looks up Patroni members with role in scope and restarts -// those that have a pending restart. -func (exec Executor) RestartPendingMembers(ctx context.Context, role, scope string) error { - var stdout, stderr bytes.Buffer - - // The following exits zero when it is able to read the DCS and communicate - // with the Patroni HTTP API. It prints the result of calling "POST /restart" - // on each member found with the desired role. The "Failed … 503 … restart - // conditions are not satisfied" message is normal and means that a particular - // member has already restarted. - // - https://github.com/zalando/patroni/blob/v2.1.1/patroni/ctl.py#L580-L596 - err := exec(ctx, nil, &stdout, &stderr, - "patronictl", "restart", "--pending", "--force", "--role="+role, scope) - - log := logging.FromContext(ctx) - log.V(1).Info("restarted members", - "stdout", stdout.String(), - "stderr", stderr.String(), - ) - - return err -} - -// GetTimeline gets the patronictl status and returns the timeline, -// currently the only information required by PGO. -// Returns zero if it runs into errors or cannot find a running Leader pod -// to get the up-to-date timeline from. -func (exec Executor) GetTimeline(ctx context.Context) (int64, error) { - var stdout, stderr bytes.Buffer - - // The following exits zero when it is able to read the DCS and communicate - // with the Patroni HTTP API. It prints the result of calling "GET /cluster" - // - https://github.com/zalando/patroni/blob/v2.1.1/patroni/ctl.py#L849 - err := exec(ctx, nil, &stdout, &stderr, - "patronictl", "list", "--format", "json") - if err != nil { - return 0, err - } - - if stderr.String() != "" { - return 0, errors.New(stderr.String()) - } + // SwitchoverAndWait tries to change the current Patroni leader. It + // returns true when an election completes successfully. When Patroni is + // paused, next cannot be blank. + SwitchoverAndWait(ctx context.Context, target string) (bool, error) - var members []struct { - Role string `json:"Role"` - State string `json:"State"` - Timeline int64 `json:"TL"` - } - err = json.Unmarshal(stdout.Bytes(), &members) - if err != nil { - return 0, err - } + // FailoverAndWait tries to change the current Patroni leader. It + // returns true when an election completes successfully. When Patroni is + // paused, next cannot be blank. + FailoverAndWait(ctx context.Context, target string) (bool, error) - for _, member := range members { - if member.Role == "Leader" && member.State == "running" { - return member.Timeline, nil - } - } + // RestartPendingMembers looks up Patroni members with role in scope and + // restarts those that have a pending restart. + RestartPendingMembers(ctx context.Context, role, scope string) error - return 0, err + // GetTimeline gets the current timeline from Patroni cluster status. + // Returns zero if it runs into errors or cannot find a running Leader pod. + GetTimeline(ctx context.Context) (int64, error) } diff --git a/internal/patroni/api_exec.go b/internal/patroni/api_exec.go new file mode 100644 index 0000000000..ffba296c8b --- /dev/null +++ b/internal/patroni/api_exec.go @@ -0,0 +1,233 @@ +package patroni + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "strings" + + "github.com/percona/percona-postgresql-operator/internal/logging" +) + +// Executor implements API by calling "patronictl". +type Executor func( + ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string, +) error + +// Executor implements API. +var _ API = Executor(nil) + +// ChangePrimaryAndWait tries to demote the current Patroni leader by calling +// "patronictl". It returns true when an election completes successfully. It +// waits up to two "loop_wait" or until an error occurs. When Patroni is paused, +// next cannot be blank. Similar to the "POST /switchover" REST endpoint. +func (exec Executor) ChangePrimaryAndWait( + ctx context.Context, current, next string, patroniVer4 bool, +) (bool, error) { + var stdout, stderr bytes.Buffer + + // K8SPG-648: patroni v4.0.0 deprecated "master" role. + // We should use "primary" instead + cmd := []string{"patronictl", "switchover", "--scheduled=now", "--force", "--candidate=" + next} + if patroniVer4 { + cmd = append(cmd, "--primary="+current) + } else { + cmd = append(cmd, "--master="+current) + } + + log := logging.FromContext(ctx) + log.Info( + "[PATRONI] Executing patronictl switchover command", + "command", + cmd, + "current", + current, + "next", + next, + "patroniVer4", + patroniVer4, + ) + err := exec(ctx, nil, &stdout, &stderr, cmd...) + + log.V(1).Info("changed primary", + "stdout", stdout.String(), + "stderr", stderr.String(), + ) + + // The command exits zero when it is able to communicate with the Patroni + // HTTP API. It exits zero even when the API says switchover did not occur. + // Check for the text that indicates success. + // - https://github.com/zalando/patroni/blob/v2.0.2/patroni/api.py#L351-L367 + // - https://github.com/zalando/patroni/blob/v2.1.1/patroni/api.py#L461-L477 + return strings.Contains(stdout.String(), "switched over"), err +} + +// SwitchoverAndWait tries to change the current Patroni leader by calling +// "patronictl". It returns true when an election completes successfully. It +// waits up to two "loop_wait" or until an error occurs. When Patroni is paused, +// next cannot be blank. Similar to the "POST /switchover" REST endpoint. +// The "patronictl switchover" variant does not require the current master to be passed +// as a flag. +func (exec Executor) SwitchoverAndWait( + ctx context.Context, target string, +) (bool, error) { + var stdout, stderr bytes.Buffer + + log := logging.FromContext(ctx) + cmd := []string{ + "patronictl", + "switchover", + "--scheduled=now", + "--force", + "--candidate=" + target, + } + log.Info( + "[PATRONI] Executing patronictl switchover command (simplified)", + "command", + cmd, + "target", + target, + ) + err := exec(ctx, nil, &stdout, &stderr, cmd...) + + log.V(1).Info("changed primary", + "stdout", stdout.String(), + "stderr", stderr.String(), + ) + + // The command exits zero when it is able to communicate with the Patroni + // HTTP API. It exits zero even when the API says switchover did not occur. + // Check for the text that indicates success. + // - https://github.com/zalando/patroni/blob/v2.0.2/patroni/api.py#L351-L367 + // Patroni has an edge case where it could switchover to an instance other + // than the requested candidate. In this case, stdout will contain + // "Switched over" instead of "switched over" and return false, nil + return strings.Contains(stdout.String(), "switched over"), err +} + +// FailoverAndWait tries to change the current Patroni leader by calling +// "patronictl". It returns true when an election completes successfully. It +// waits up to two "loop_wait" or until an error occurs. When Patroni is paused, +// next cannot be blank. Similar to the "POST /switchover" REST endpoint. +// The "patronictl failover" variant does not require the current master to be passed +// as a flag. +func (exec Executor) FailoverAndWait( + ctx context.Context, target string, +) (bool, error) { + var stdout, stderr bytes.Buffer + + log := logging.FromContext(ctx) + cmd := []string{"patronictl", "failover", "--force", "--candidate=" + target} + log.Info("[PATRONI] Executing patronictl failover command", "command", cmd, "target", target) + err := exec(ctx, nil, &stdout, &stderr, cmd...) + + log.V(1).Info("changed primary", + "stdout", stdout.String(), + "stderr", stderr.String(), + ) + + // The command exits zero when it is able to communicate with the Patroni + // HTTP API. It exits zero even when the API says failover did not occur. + // Check for the text that indicates success. + // - https://github.com/zalando/patroni/blob/v2.0.2/patroni/api.py#L351-L367 + // Patroni has an edge case where it could failover to an instance other + // than the requested candidate. In this case, stdout will contain "Failed over" + // instead of "failed over" and return false, nil + return strings.Contains(stdout.String(), "failed over"), err +} + +// ReplaceConfiguration replaces Patroni's entire dynamic configuration by +// calling "patronictl". Similar to the "POST /switchover" REST endpoint. +func (exec Executor) ReplaceConfiguration(ctx context.Context, configuration map[string]any) error { + var stdin, stdout, stderr bytes.Buffer + + err := json.NewEncoder(&stdin).Encode(configuration) + if err == nil { + log := logging.FromContext(ctx) + cmd := []string{"patronictl", "edit-config", "--replace=-", "--force"} + log.Info("[PATRONI] Executing patronictl edit-config command", "command", cmd) + err = exec(ctx, &stdin, &stdout, &stderr, cmd...) + + log.V(1).Info("replaced configuration", + "stdout", stdout.String(), + "stderr", stderr.String(), + ) + } + + return err +} + +// RestartPendingMembers looks up Patroni members with role in scope and restarts +// those that have a pending restart. +func (exec Executor) RestartPendingMembers(ctx context.Context, role, scope string) error { + var stdout, stderr bytes.Buffer + + // The following exits zero when it is able to read the DCS and communicate + // with the Patroni HTTP API. It prints the result of calling "POST /restart" + // on each member found with the desired role. The "Failed … 503 … restart + // conditions are not satisfied" message is normal and means that a particular + // member has already restarted. + // - https://github.com/zalando/patroni/blob/v2.1.1/patroni/ctl.py#L580-L596 + log := logging.FromContext(ctx) + cmd := []string{"patronictl", "restart", "--pending", "--force", "--role=" + role, scope} + log.Info( + "[PATRONI] Executing patronictl restart command", + "command", + cmd, + "role", + role, + "scope", + scope, + ) + err := exec(ctx, nil, &stdout, &stderr, cmd...) + + log.V(1).Info("restarted members", + "stdout", stdout.String(), + "stderr", stderr.String(), + ) + + return err +} + +// GetTimeline gets the patronictl status and returns the timeline, +// currently the only information required by PGO. +// Returns zero if it runs into errors or cannot find a running Leader pod +// to get the up-to-date timeline from. +func (exec Executor) GetTimeline(ctx context.Context) (int64, error) { + var stdout, stderr bytes.Buffer + + // The following exits zero when it is able to read the DCS and communicate + // with the Patroni HTTP API. It prints the result of calling "GET /cluster" + // - https://github.com/zalando/patroni/blob/v2.1.1/patroni/ctl.py#L849 + log := logging.FromContext(ctx) + cmd := []string{"patronictl", "list", "--format", "json"} + log.Info("[PATRONI] Executing patronictl list command", "command", cmd) + err := exec(ctx, nil, &stdout, &stderr, cmd...) + if err != nil { + return 0, err + } + + if stderr.String() != "" { + return 0, errors.New(stderr.String()) + } + + var members []struct { + Role string `json:"Role"` + State string `json:"State"` + Timeline int64 `json:"TL"` + } + err = json.Unmarshal(stdout.Bytes(), &members) + if err != nil { + return 0, err + } + + for _, member := range members { + if member.Role == "Leader" && member.State == "running" { + return member.Timeline, nil + } + } + + return 0, nil +} diff --git a/internal/patroni/api_test.go b/internal/patroni/api_exec_test.go similarity index 100% rename from internal/patroni/api_test.go rename to internal/patroni/api_exec_test.go diff --git a/internal/patroni/api_http.go b/internal/patroni/api_http.go new file mode 100644 index 0000000000..dc3f40a4c8 --- /dev/null +++ b/internal/patroni/api_http.go @@ -0,0 +1,677 @@ +package patroni + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io" + "net/http" + "slices" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/percona/percona-postgresql-operator/internal/logging" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type podMetadata struct { + Name string + Namespace string + InstanceName string +} + +func (m podMetadata) getPatroniServerUrl() string { + return fmt.Sprintf("https://%s.%s-pods.%s.svc:8008", m.Name, m.Namespace, m.Namespace) +} + +// The operator creates a "${instanceName}-certs" secret for every Postgres instance. +// It contains pgbouncer, patroni and postgres certificates generated by the operator's own rootCA. +// We need it to call Patroni Rest API server. +func (m podMetadata) getSecretCertKey() client.ObjectKey { + return client.ObjectKey{ + Name: fmt.Sprintf("%s-certs", m.InstanceName), + Namespace: m.Namespace, + } +} + +// Attempt to extract pod metadata from the pod name. Note that this a +// known pattern: pod names include both the namespace (database name) and +// instance name. +// +// Example: +// - podName: pgdb-609qv5o187x841r2-instance1-h8q2-0 +// - instanceName: pgdb-609qv5o187x841r2-instance1-h8q2 +// - namespace: pgdb-609qv5o187x841r2 +func extractMetadataFromPodName(podName string) (podMetadata, error) { + // 1. Check number of "-" to make sure name has the correct format + dashes := strings.Count(podName, "-") + if dashes != 4 { + err := fmt.Errorf("Member name has unexpected format: name=%s, dashes=%d", podName, dashes) + return podMetadata{}, err + } + + // 2. Remove everything after the last dash and we get instanceName: pgdb-609qv5o187x841r2-instance1-h8q2 + lastDashIndex := strings.LastIndex(podName, "-") + instanceName := podName[:lastDashIndex] + + // 3. Remove everything after the second dash and we get namespace: pgdb-609qv5o187x841r2 + lastDashIndex = strings.LastIndex(instanceName, "-") + lastDashIndex = strings.LastIndex(instanceName[:lastDashIndex], "-") + namespace := instanceName[:lastDashIndex] + + return podMetadata{ + Name: podName, + Namespace: namespace, + InstanceName: instanceName, + }, nil +} + +// A client to a Patroni server. There is one per pod. +type instanceClient struct { + baseUrl string + httpClient *http.Client + logger logr.Logger +} + +type patroniEndpoint string + +const ( + patroniEndpointSwitchover patroniEndpoint = "switchover" + patroniEndpointFailover patroniEndpoint = "failover" + patroniEndpointRestart patroniEndpoint = "restart" + patroniEndpointConfig patroniEndpoint = "config" + patroniEndpointCluster patroniEndpoint = "cluster" +) + +func (p instanceClient) do( + ctx context.Context, + method string, + endpoint patroniEndpoint, + body map[string]any, +) (*http.Response, error) { + bodyInBytes, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal configuration: %w", err) + } + + req, err := http.NewRequestWithContext( + ctx, + method, + fmt.Sprintf("%s/%s", p.baseUrl, endpoint), + bytes.NewReader(bodyInBytes), + ) + if err != nil { + return nil, fmt.Errorf("failed to create HTTP request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + return p.httpClient.Do(req) +} + +// Pattern: +// +// { +// "name": "pgdb-609qv5o187x841r2-instance1-h8q2-0", +// "role": "leader", +// "state": "running", +// "api_url": "https://pgdb-609qv5o187x841r2-instance1-h8q2-0.pgdb-609qv5o187x841r2-pods:8008/patroni", +// "host": "pgdb-609qv5o187x841r2-instance1-h8q2-0.pgdb-609qv5o187x841r2-pods", +// "port": 5432, +// "timeline": 8 +// } +// +// InstanceName: pgdb-609qv5o187x841r2-instance1-h8q2 +// Namespace: pgdb-609qv5o187x841r2 +type clusterMember struct { + Name string `json:"name"` + Role string `json:"role"` + State string `json:"state"` + Host string `json:"host"` + Timeline int64 `json:"timeline"` +} + +func (p instanceClient) getMembers(ctx context.Context) ([]clusterMember, error) { + resp, err := p.do(ctx, http.MethodGet, patroniEndpointCluster, nil) + if err != nil { + return []clusterMember{}, fmt.Errorf("failed to get cluster status: %w", err) + } + + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return []clusterMember{}, fmt.Errorf( + "cluster status request failed with status %d: %s", + resp.StatusCode, + string(body), + ) + } + + var clusterStatus struct { + Members []clusterMember `json:"members"` + } + + if err := json.NewDecoder(resp.Body).Decode(&clusterStatus); err != nil { + return []clusterMember{}, fmt.Errorf("failed to parse cluster status response: %w", err) + } + + return clusterStatus.Members, nil +} + +func (p instanceClient) getMembersByRole(ctx context.Context, roles []string) ([]clusterMember, error) { + members, err := p.getMembers(ctx) + if err != nil { + return []clusterMember{}, err + } + + filtered := []clusterMember{} + for _, member := range members { + if slices.Contains(roles, member.Role) { + filtered = append(filtered, member) + } + } + + return filtered, nil +} + +// Get current cluster leader. It can also be a "primary", so we test for both. +func (p instanceClient) getLeader(ctx context.Context) (clusterMember, error) { + members, err := p.getMembersByRole(ctx, []string{"leader", "primary"}) + if err != nil { + return clusterMember{}, err + } + + if len(members) == 0 { + return clusterMember{}, fmt.Errorf("no leader found") + } + + return members[0], nil +} + +func (p instanceClient) putConfig(ctx context.Context, configuration map[string]any) error { + p.logger.Info("Replacing configuration") + + resp, err := p.do(ctx, http.MethodPut, patroniEndpointConfig, configuration) + if err != nil { + err := fmt.Errorf("failed to execute HTTP request: %w", err) + p.logger.Error(err, "Failed to replace configuration") + return err + } + + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + err := fmt.Errorf("HTTP request failed with status %d: %s", resp.StatusCode, string(body)) + p.logger.Error(err, "Failed to replace configuration") + return err + } + + p.logger.Info("Successfully replaced configuration", "status", resp.Status) + return nil +} + +func (p instanceClient) switchover(ctx context.Context, leader, candidate string) (bool, error) { + p.logger.Info("Requesting switchover") + + if leader == "" { + // The switchover Rest API *requires* a leader, while in patronictl it's autodetected. + // Ref.: https://patroni.readthedocs.io/en/latest/patronictl.html#patronictl-switchover + err := fmt.Errorf("leader is required for switchover") + p.logger.Error(err, "Switchover failed. Leader is required") + return false, err + } + + // NOTE: The REST API uses "leader" field (not "master" or "primary") + requestBody := map[string]any{ + "leader": leader, + } + + // Optional. If set, patroni will prefer this candidate for leader. + // Otherwise, it will figure out the next leader by itself. + if candidate != "" { + requestBody["candidate"] = candidate + } + + resp, err := p.do(ctx, http.MethodPost, patroniEndpointSwitchover, requestBody) + if err != nil { + p.logger.Error(err, "Switchover failed") + return false, fmt.Errorf("failed to execute switchover request: %w", err) + } + + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + + p.logger.V(1).Info("switchover response", "status", resp.StatusCode, "body", bodyStr) + + // Check for successful switchover + // 200: Immediate successful switchover + // 202: Scheduled switchover (we use scheduled_at=now, but might still get 202) + if resp.StatusCode == 200 || resp.StatusCode == 202 { + // Like CLI implementation, check for "Successfully" to ensure we got the requested candidate + // Patroni might switch to a different candidate and return different message format + if strings.Contains(bodyStr, "Successfully") { + p.logger.Info("Switchover successful") + return true, nil + } else { + // Patroni chose different candidate - return false like CLI does for consistency + p.logger.Info("Switchover occurred but not to requested candidate", "response", bodyStr) + return false, nil + } + } + + // Any other status code is an error + err = fmt.Errorf("switchover failed with status %d: %s", resp.StatusCode, bodyStr) + p.logger.Error(err, "Failed to switchover") + return false, err +} + +func (p instanceClient) failover(ctx context.Context, candidate string) (bool, error) { + p.logger.Info("Requesting failover") + + if candidate == "" { + err := fmt.Errorf("candidate is required for failover") + p.logger.Error(err, "Failover failed. Candidate is required") + return false, err + } + + resp, err := p.do(ctx, http.MethodPost, patroniEndpointFailover, map[string]any{ + "candidate": candidate, + }) + + if err != nil { + p.logger.Error(err, "Failover failed") + return false, err + } + + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + + p.logger.V(1).Info("failover response", "status", resp.StatusCode, "body", bodyStr) + + // Failover cannot be scheduled, so we don't check for 202. + if resp.StatusCode == 200 { + // Like CLI implementation, check for "Successfully" to ensure we got the requested candidate + // Patroni might failover to a different candidate and return different message format + if strings.Contains(bodyStr, "Successfully") { + p.logger.Info("Failover successful") + return true, nil + } else { + // Patroni chose different candidate - return false like CLI does for consistency + p.logger.Info("Failover occurred but not to requested candidate", "response", bodyStr) + return false, nil + } + } + + // Any other status code is an error + err = fmt.Errorf("failover failed with status %d: %s", resp.StatusCode, bodyStr) + p.logger.Error(err, "Failover failed") + return false, err +} + +// A fancier version of the restart API that only restart pending nodes that match a given role. +func (p instanceClient) restartPendingWithRole(ctx context.Context, role string) error { + p.logger.Info("Requesting instance restart for pending changes", "role", role) + + if role == "" { + // Patroni's rest API does not require role to be filled, but /our/ usage + // expects a role check, so we also enforce it here. + err := fmt.Errorf("role is empty") + p.logger.Error(err, "Restart failed") + return err + } + + requestBody := map[string]any{ + "restart_pending": true, + "role": role, + } + + resp, err := p.do(ctx, http.MethodPost, patroniEndpointRestart, requestBody) + if err != nil { + err = fmt.Errorf("failed to execute restart request: %w", err) + p.logger.Error(err, "Restart failed") + return err + } + + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + + p.logger.V(1).Info("restart response", "status", resp.StatusCode, "body", bodyStr) + + // 200: Restart initiated successfully + // 202: Restart accepted/scheduled + // 503: No restart needed (restart conditions not satisfied) - this is OK + if resp.StatusCode == 200 || resp.StatusCode == 202 { + p.logger.Info("Restart initiated successfully") + return nil + } + + if resp.StatusCode == 503 { + // This is normal when the node doesn't need restart or already restarted + p.logger.Info("No restart needed (503)", "body", bodyStr) + return nil + } + + // Any other status code is an error + err = fmt.Errorf("restart failed with status %d: %s", resp.StatusCode, bodyStr) + p.logger.Error(err, "Restart failed") + return err +} + +func newInstanceClient( + ctx context.Context, + parentLogger logr.Logger, + kubeClient client.Client, + pod podMetadata, +) (instanceClient, error) { + logger := parentLogger.WithValues( + "namespace", pod.Namespace, + "pod", pod.Name, + ).WithName("client") + + // Fetch client certificates. They are created in the same namespace as + // the pods with a special "-certs" suffix. + secret := &corev1.Secret{} + err := kubeClient.Get(ctx, pod.getSecretCertKey(), secret) + if err != nil { + return instanceClient{}, fmt.Errorf("failed to fetch certificates for patroni: %w", err) + } + + rawCaCert := secret.Data[certAuthorityFileKey] + rawClientCert := secret.Data[certServerFileKey] + + if len(rawCaCert) == 0 { + return instanceClient{}, fmt.Errorf("CA certificate is empty") + } + + if len(rawClientCert) == 0 { + return instanceClient{}, fmt.Errorf("Client certificate is empty") + } + + caCert := x509.NewCertPool() + if !caCert.AppendCertsFromPEM(rawCaCert) { + return instanceClient{}, fmt.Errorf("failed to parse CA certificate from secret") + } + + clientCert, err := tls.X509KeyPair(rawClientCert, rawClientCert) + if err != nil { + return instanceClient{}, fmt.Errorf( + "failed to parse key pair from combined cert: %w", + err, + ) + } + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + Transport: otelhttp.NewTransport(&http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCert, + Certificates: []tls.Certificate{clientCert}, + }, + }), + } + + return instanceClient{ + baseUrl: pod.getPatroniServerUrl(), + httpClient: httpClient, + logger: logger, + }, nil +} + +type HTTPClient struct { + kubeClient client.Client + client instanceClient + logger logr.Logger + tracer trace.Tracer +} + +var _ API = HTTPClient{} + +func NewHttpClient(ctx context.Context, kube client.Client, podName string) (HTTPClient, error) { + logger := logging.FromContext(ctx).WithName("patroni.http") + tracer := otel.Tracer("github.com/percona/percona-postgresql-operator/patroni") + + // We can extract all the information we need from the podName due to the + // way podName is built: ${namespace}-${instanceSuffix}-${podNumeral}. + podMetadata, err := extractMetadataFromPodName(podName) + if err != nil { + return HTTPClient{}, err + } + + patroniHttpClient, err := newInstanceClient(ctx, logger, kube, podMetadata) + + if err != nil { + return HTTPClient{}, err + } + + return HTTPClient{ + client: patroniHttpClient, + kubeClient: kube, + logger: logger, + tracer: tracer, + }, nil +} + +// Called when the operator believes Patroni configuration needs to be updated due to CRD changes. +func (h HTTPClient) ReplaceConfiguration(ctx context.Context, configuration map[string]any) error { + ctx, span := h.tracer.Start(ctx, "patroni.replace-configuration") + defer span.End() + + h.logger.Info("Calling ReplaceConfiguration") + + err := h.client.putConfig(ctx, configuration) + if err != nil { + span.RecordError(err) + } + return err +} + +// Called when the operator detects pod restarts or changes that require pod restarts, such +// as CPU/mem changes. +func (h HTTPClient) ChangePrimaryAndWait( + ctx context.Context, + leader, candidate string, + _patroniVer4 bool, +) (bool, error) { + ctx, span := h.tracer.Start(ctx, "patroni.change-primary") + defer span.End() + + span.SetAttributes( + attribute.String("patroni.leader", leader), + attribute.String("patroni.candidate", candidate), + attribute.Bool("patroni.ver4", _patroniVer4), + ) + + h.logger.WithValues("leader", leader).Info("Calling ChangePrimaryAndWait") + + success, err := h.client.switchover(ctx, leader, candidate) + if err != nil { + span.RecordError(err) + } + span.SetAttributes(attribute.Bool("patroni.success", success)) + return success, err +} + +// Very similar to ChangePrimaryAndWait, but implemented by the Percona team for +// the reconcileSwitchover method. The difference here is that SwitchoverAndWait +// does not provide a leader. +func (h HTTPClient) SwitchoverAndWait(ctx context.Context, candidate string) (bool, error) { + ctx, span := h.tracer.Start(ctx, "patroni.switchover") + defer span.End() + + span.SetAttributes(attribute.String("patroni.candidate", candidate)) + + h.logger.WithValues("candidate", candidate).Info("Calling SwitchoverAndWait") + leader, err := h.client.getLeader(ctx) + + if err != nil { + h.logger.Error( + err, + "Failed to auto-detect current leader for switchover", + ) + span.RecordError(err) + return false, fmt.Errorf("failed to detect current leader: %w", err) + } + + span.SetAttributes(attribute.String("patroni.leader", leader.Name)) + + // NOTE: + // Potential race condition where the leader changes between these two calls. + // If this happens, Patroni will error out and the operation will be retried. + h.logger.Info( + "Auto-detected current leader", + "leader", + leader, + "candidate", + candidate, + ) + + success, err := h.client.switchover(ctx, leader.Name, candidate) + if err != nil { + span.RecordError(err) + } + span.SetAttributes(attribute.Bool("patroni.success", success)) + return success, err +} + +// FailoverAndWait tries to change the leader when the cluster is NOT healthy. When it's +// healthy, switchover is advised. +// Ref.: https://patroni.readthedocs.io/en/latest/rest_api.html#failover +func (h HTTPClient) FailoverAndWait(ctx context.Context, candidate string) (bool, error) { + ctx, span := h.tracer.Start(ctx, "patroni.failover") + defer span.End() + + span.SetAttributes(attribute.String("patroni.candidate", candidate)) + + h.logger.WithValues("candidate", candidate).Info("Calling FailoverAndWait") + + success, err := h.client.failover(ctx, candidate) + if err != nil { + span.RecordError(err) + } + span.SetAttributes(attribute.Bool("patroni.success", success)) + return success, err +} + +// Restarts Patroni members that have a pending restart and match a given role. +// The pending status is given by Patroni when it detects that a configuration was updated that +// requires a restart to take effect. The operator watches for the pending status and first +// asks for the leader to be restarted, followed by its replicas. +func (h HTTPClient) RestartPendingMembers(ctx context.Context, role, _scope string) error { + ctx, span := h.tracer.Start(ctx, "patroni.restart-pending-members") + defer span.End() + + span.SetAttributes( + attribute.String("patroni.role", role), + attribute.String("patroni.scope", _scope), + ) + + h.logger.WithValues("role", role).Info("Calling RestartPendingMembers") + + if role == "" { + // Role is not a required field for restart, but the operator's usage of it will + // always pass a role to double check we are restarting nodes in the correct order: + // first primary, then replicas. To make sure this is the case, we error if the + // function was called without a role. + err := fmt.Errorf("role is empty") + h.logger.Error(err, "Failed to restart pending members") + span.RecordError(err) + return err + } + + roles := []string{role} + // patronictl allows both "primary" and "leader" roles to mean "leader", so + // in order to keep compatibility we need to check for both. + // NOTE(juliana): We can remove this after we fully migrate to the REST API. + if role == "primary" { + roles = append(roles, "leader") + } + + members, err := h.client.getMembersByRole(ctx, roles) + if err != nil { + h.logger.Error(err, "Failed to fetch cluster members") + span.RecordError(err) + return err + } + + span.SetAttributes(attribute.Int("patroni.members_found", len(members))) + + if len(members) == 0 { + h.logger.Info("Found no members to restart", "role", role) + return nil + } + + for _, member := range members { + h.logger.Info("Requesting restart for pod", "pod", member.Name, "role", role) + + podMetadata, err := extractMetadataFromPodName(member.Name) + if err != nil { + span.RecordError(err) + return err + } + + client, err := newInstanceClient(ctx, h.logger, h.kubeClient, podMetadata) + if err != nil { + h.logger.Error(err, "Failed to create client for pod", "pod", member.Name) + span.RecordError(err) + return err + } + + err = client.restartPendingWithRole(ctx, role) + if err != nil { + h.logger.Error(err, "Restart failed for pod", "pod", member.Name) + span.RecordError(err) + return err + } + } + + h.logger.Info("Restart pending members succeeded") + return nil +} + +// Gets the current timeline from Patroni cluster status. +// Used as sanity check by the operator before a switchover/failover operation. +// Returns the timeline of the running leader, or 0 if no running leader found. +func (h HTTPClient) GetTimeline(ctx context.Context) (int64, error) { + ctx, span := h.tracer.Start(ctx, "patroni.get-timeline") + defer span.End() + + h.logger.Info("Calling GetTimeline") + + leader, err := h.client.getLeader(ctx) + if err != nil { + h.logger.Info("No leader found for timeline", "error", err) + span.SetAttributes(attribute.Bool("patroni.leader_found", false)) + return 0, nil // Return 0 when no leader (matches CLI behavior) + } + + span.SetAttributes( + attribute.Bool("patroni.leader_found", true), + attribute.String("patroni.leader_name", leader.Name), + attribute.String("patroni.leader_state", leader.State), + ) + + // Check if leader is running (same logic as CLI implementation) + if leader.State != "running" { + h.logger.Info("Leader not in running state", "state", leader.State) + span.SetAttributes(attribute.Int64("patroni.timeline", 0)) + return 0, nil + } + + h.logger.Info("Found running leader", "member", leader.Name, "timeline", leader.Timeline) + span.SetAttributes(attribute.Int64("patroni.timeline", leader.Timeline)) + return leader.Timeline, nil +} diff --git a/internal/patroni/api_http_test.go b/internal/patroni/api_http_test.go new file mode 100644 index 0000000000..5a8f1548dc --- /dev/null +++ b/internal/patroni/api_http_test.go @@ -0,0 +1,673 @@ +package patroni + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-logr/logr" + "go.opentelemetry.io/otel" + "gotest.tools/v3/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func createMockKubeClient() client.Client { + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + + // Use a valid self-signed certificate. + // This will be parsed, so it really needs to be valid. + certData := map[string][]byte{ + certAuthorityFileKey: []byte(`-----BEGIN CERTIFICATE----- +MIIDczCCAlugAwIBAgIUDnQ23t9B4fcouJacQrL93ejxKdgwDQYJKoZIhvcNAQEL +BQAwSTELMAkGA1UEBhMCVVMxDTALBgNVBAgMBFRlc3QxDTALBgNVBAcMBFRlc3Qx +DTALBgNVBAoMBFRlc3QxDTALBgNVBAMMBHRlc3QwHhcNMjUwOTI2MTM0MTE2WhcN +MjYwOTI2MTM0MTE2WjBJMQswCQYDVQQGEwJVUzENMAsGA1UECAwEVGVzdDENMAsG +A1UEBwwEVGVzdDENMAsGA1UECgwEVGVzdDENMAsGA1UEAwwEdGVzdDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAOqni0gg5nXsnFiFWdz0yCcn5cdAz/W3 +6WlewEEF+sjUX9+cbUXqeFRisWX64FcMZ5802I8SJHEhavtfzWdBdKqvlQ0XeKRR +jmVXByS790IlgVZQ0aWOKuSJVsPRhwNQ34U4EcPA9xjU9PWR2ULeNEmXAOIVJEhP +2vzWKbF5xPKe3FJtx4gEi3YyxiPbxP45Hf5b6B4duGml11zO+ZHSrTzte04eKoya +BN/bYHdA4kHh5PksxdZIUQFUX6KUorqEMv4FJwNs4e73YY7zwdW3c0IlztjGcUGs +kVnDrLNaKep+t1iNyoXeB6reZ9OwqVZBwQuTCorsaWjUFIbhCsRWjtUCAwEAAaNT +MFEwHQYDVR0OBBYEFAx9cxoaa9mymVRi69uF9boSNyGHMB8GA1UdIwQYMBaAFAx9 +cxoaa9mymVRi69uF9boSNyGHMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAJ00idIco8+KG6LZlXp5xXxM29xPU5QvujUfDbW/E2VUKHcePGmuBwHF +m/qYM/itZGSOuw2nsIOIUHDVNBX7vVHDeEzXlSC++GsEo7ptlsD2xSkIjCeY8o1d +PWaqiTI8jeXSISjFIiLF6D4lG7i7JxhNTKUqp5R7HKxiQr5vDxD5YlZuevSZJQIu +QplCHd646HHd1F07MVSfuGWeN1bf/rSQfvUkrnTDIUgz2oMye7uF6aDHRsteBGuw +6eM3ewTAxbEZzxSA0mMUgNbXO9do3OGr6UVFmWQ47NEkJBNR511DER5K82B6ZM/w +c6O9VaABYfNuet0+w/J9nKEdi2r16+Y= +-----END CERTIFICATE-----`), + certServerFileKey: []byte(`-----BEGIN CERTIFICATE----- +MIIDczCCAlugAwIBAgIUDnQ23t9B4fcouJacQrL93ejxKdgwDQYJKoZIhvcNAQEL +BQAwSTELMAkGA1UEBhMCVVMxDTALBgNVBAgMBFRlc3QxDTALBgNVBAcMBFRlc3Qx +DTALBgNVBAoMBFRlc3QxDTALBgNVBAMMBHRlc3QwHhcNMjUwOTI2MTM0MTE2WhcN +MjYwOTI2MTM0MTE2WjBJMQswCQYDVQQGEwJVUzENMAsGA1UECAwEVGVzdDENMAsG +A1UEBwwEVGVzdDENMAsGA1UECgwEVGVzdDENMAsGA1UEAwwEdGVzdDCCASIwDQYJ +KoZIhvcNAQEBBQADggEPADCCAQoCggEBAOqni0gg5nXsnFiFWdz0yCcn5cdAz/W3 +6WlewEEF+sjUX9+cbUXqeFRisWX64FcMZ5802I8SJHEhavtfzWdBdKqvlQ0XeKRR +jmVXByS790IlgVZQ0aWOKuSJVsPRhwNQ34U4EcPA9xjU9PWR2ULeNEmXAOIVJEhP +2vzWKbF5xPKe3FJtx4gEi3YyxiPbxP45Hf5b6B4duGml11zO+ZHSrTzte04eKoya +BN/bYHdA4kHh5PksxdZIUQFUX6KUorqEMv4FJwNs4e73YY7zwdW3c0IlztjGcUGs +kVnDrLNaKep+t1iNyoXeB6reZ9OwqVZBwQuTCorsaWjUFIbhCsRWjtUCAwEAAaNT +MFEwHQYDVR0OBBYEFAx9cxoaa9mymVRi69uF9boSNyGHMB8GA1UdIwQYMBaAFAx9 +cxoaa9mymVRi69uF9boSNyGHMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL +BQADggEBAJ00idIco8+KG6LZlXp5xXxM29xPU5QvujUfDbW/E2VUKHcePGmuBwHF +m/qYM/itZGSOuw2nsIOIUHDVNBX7vVHDeEzXlSC++GsEo7ptlsD2xSkIjCeY8o1d +PWaqiTI8jeXSISjFIiLF6D4lG7i7JxhNTKUqp5R7HKxiQr5vDxD5YlZuevSZJQIu +QplCHd646HHd1F07MVSfuGWeN1bf/rSQfvUkrnTDIUgz2oMye7uF6aDHRsteBGuw +6eM3ewTAxbEZzxSA0mMUgNbXO9do3OGr6UVFmWQ47NEkJBNR511DER5K82B6ZM/w +c6O9VaABYfNuet0+w/J9nKEdi2r16+Y= +-----END CERTIFICATE----- +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDqp4tIIOZ17JxY +hVnc9MgnJ+XHQM/1t+lpXsBBBfrI1F/fnG1F6nhUYrFl+uBXDGefNNiPEiRxIWr7 +X81nQXSqr5UNF3ikUY5lVwcku/dCJYFWUNGljirkiVbD0YcDUN+FOBHDwPcY1PT1 +kdlC3jRJlwDiFSRIT9r81imxecTyntxSbceIBIt2MsYj28T+OR3+W+geHbhppddc +zvmR0q087XtOHiqMmgTf22B3QOJB4eT5LMXWSFEBVF+ilKK6hDL+BScDbOHu92GO +88HVt3NCJc7YxnFBrJFZw6yzWinqfrdYjcqF3geq3mfTsKlWQcELkwqK7Glo1BSG +4QrEVo7VAgMBAAECggEAHi9wUNZ+nvPRhueckDpi1vqgadnSBqNiYL4iEBtDT/tV +2++E9QX89an+dQZpPnlniQjkxL7KNk1ctDp2M06twdk1XMpEqCqfnTStRBHz9Cvb +7+0Uku3vYZezNBxreEc6gaodSuezQZv/aOman6ny4vaMVAjxMmYnXvfzxBNMfQMo +dqmcOHXG3hrcXcIrsq297k4GmgnBOjlv4pLIIWfSSECcFqhFD8uZamnSlh96b4C1 +BU2RVnjsYx/asATvJzDAP+zMt+2s9MV9j/iYAyzmkTywk6RFheLXR/VW+9o2CSi4 +0+8ypiLnI9K/KJWw5OI33MzGRBDUNipR4D7brh8nfwKBgQD4+AcB7QGkzhobv1Hg +IvP1kk5BjPZhEGxWlnQHM8u4LwrJCQEtwd2eF9KAR53kenfdR9wJ7dipGkRcVWqq +fUSU4VOZdCg037AMIVdv/bGr2fbtnBWpIxyVYKg1mtx/6UhYoGBkx7kYDVOUsK9a +SN4HZOiu7mA/a8JJYy1/oK3gXwKBgQDxSAcfbQHMQc1Ar29ixgApxZ50FOpzDt5f +HSa2Dgt1/Fo8mAgeknKi4AwA4GzY2sSkKwEQkSQvg/DZFNi8Tuyk//Nhx7ynwy7o +y5LBQ3HYCxarLPcyACez3C4YsoQiqu9YL572p23SLxby63hrePj/JJa9Tbb01UoH +rAWnvp8NSwKBgQDyCS3G0YInlbYMA5K1M0W4FuO9Fizvb+fixaFG3zPNeu4hQn/C +3BV2+/HIg9cbp3Ofy5w+itt2ifKrUN7Bn8ZsdiGvrRzpSgz7ve4jEZ8IUn2bwYHN +TDUdgzoD4uk58LBEeKU9VGy81TfL9XiDbRNsXM1YQqWPAlN+xMwWpz5iQQKBgF+r +kbdyP55AESSu61mc7P+jLjsU+Al7Qc0w/+J8GytDTnxsQ/vrUa0nbVsDoeUyiXoW +2ys4gcKdbGiHDZFNMiQSoOyKiFF04SrJXX1oQsHJU8m34KRgz11P1q9QSXh9kr3C +1CM1LCSFK3JSz8K9iu2QEn0pTXwy/lGgcfWbbfGVAoGAWd6Xra6e4qidxcRlEtdp +B7ghQKYwX4aOCPCtcTSW+aaQNiRZQU5V8XB5I79V1CyjPDrOuvva/2KslvQvOxvB +A7e9QkYI0UcYQIHYTi0HEaCm9K6FcjVcc35DhIGhq9jBkb9bwa4amj+KAdiTXIwI +m6N/96rp2DBjm4avLKb8jo0= +-----END PRIVATE KEY-----`), + } + + // Mock Patroni certificates. + secrets := []*corev1.Secret{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-namespace-instance1-abc1-certs", + Namespace: "test-namespace", + }, + Data: certData, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-namespace-instance1-abc2-certs", + Namespace: "test-namespace", + }, + Data: certData, + }, + } + + objects := make([]client.Object, len(secrets)) + for i, secret := range secrets { + objects[i] = secret + } + + return fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(objects...). + Build() +} + +func TestHTTPClientChangePrimaryAndWait(t *testing.T) { + t.Run("Arguments", func(t *testing.T) { + var capturedRequest *http.Request + var capturedBody map[string]any + + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedRequest = r + if err := json.NewDecoder(r.Body).Decode(&capturedBody); err == nil { + w.WriteHeader(200) + w.Write([]byte(`Successfully switched over to "new"`)) + } else { + w.WriteHeader(400) + } + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.ChangePrimaryAndWait(context.Background(), "old", "new", true) + + assert.NilError(t, err) + assert.Assert(t, success) + assert.Equal(t, capturedRequest.Method, "POST") + assert.Assert(t, strings.HasSuffix(capturedRequest.URL.Path, "/switchover")) + assert.Equal(t, capturedRequest.Header.Get("Content-Type"), "application/json") + assert.DeepEqual(t, capturedBody, map[string]any{ + "leader": "old", + "candidate": "new", + }) + }) + + t.Run("Error", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + w.Write([]byte(`Switchover failed: cluster is not healthy`)) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.ChangePrimaryAndWait(context.Background(), "old", "new", true) + + assert.Assert(t, err != nil) + assert.Assert(t, !success) + assert.Assert(t, strings.Contains(err.Error(), "switchover failed with status 500")) + }) + + t.Run("EmptyLeader", func(t *testing.T) { + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.ChangePrimaryAndWait(context.Background(), "", "new", true) + + assert.Assert(t, err != nil) + assert.Assert(t, !success) + assert.Assert(t, strings.Contains(err.Error(), "leader is required")) + }) + + // The current behavior is that if patroni switches over to a candidate + // different than the one we specified, we should return success = false. + t.Run("DifferentCandidate", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + // Patroni chose different candidate - no "Successfully" + w.Write([]byte(`Switched over to "different-node" instead of "new"`)) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.ChangePrimaryAndWait(context.Background(), "old", "new", true) + + assert.NilError(t, err) // No error, but success is false + assert.Assert(t, !success) // Should return false like CLI does + }) +} + +func TestHTTPClientSwitchoverAndWait(t *testing.T) { + t.Run("Arguments", func(t *testing.T) { + var capturedSwitchoverRequest *http.Request + var capturedSwitchoverBody map[string]any + switchoverCalled := false + + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/cluster") { + response := map[string]any{ + "members": []map[string]any{ + { + "name": "current-leader", + "role": "leader", + "state": "running", + "timeline": int64(4), + }, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + } else if strings.HasSuffix(r.URL.Path, "/switchover") { + capturedSwitchoverRequest = r + json.NewDecoder(r.Body).Decode(&capturedSwitchoverBody) + switchoverCalled = true + w.WriteHeader(200) + w.Write([]byte(`Successfully switched over to "new"`)) + } + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.SwitchoverAndWait(context.Background(), "new") + + assert.NilError(t, err) + assert.Assert(t, success) + assert.Assert(t, switchoverCalled) + assert.Equal(t, capturedSwitchoverRequest.Method, "POST") + assert.DeepEqual(t, capturedSwitchoverBody, map[string]any{ + "leader": "current-leader", + "candidate": "new", + }) + }) + + // When this happens we need to perform a failover + t.Run("NoLeader", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/cluster") { + // Mock cluster with no leader + response := map[string]any{ + "members": []map[string]any{ + { + "name": "replica-1", + "role": "replica", + "state": "running", + }, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + } + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.SwitchoverAndWait(context.Background(), "new") + + assert.Assert(t, err != nil) + assert.Assert(t, !success) + assert.Assert(t, strings.Contains(err.Error(), "failed to detect current leader")) + }) +} + +func TestHTTPClientFailoverAndWait(t *testing.T) { + t.Run("Arguments", func(t *testing.T) { + var capturedRequest *http.Request + var capturedBody map[string]any + + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedRequest = r + json.NewDecoder(r.Body).Decode(&capturedBody) + w.WriteHeader(200) + w.Write([]byte(`Successfully failed over to "new"`)) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.FailoverAndWait(context.Background(), "new") + + assert.NilError(t, err) + assert.Assert(t, success) + assert.Equal(t, capturedRequest.Method, "POST") + assert.Assert(t, strings.HasSuffix(capturedRequest.URL.Path, "/failover")) + assert.DeepEqual(t, capturedBody, map[string]any{ + "candidate": "new", + }) + }) + + t.Run("Error", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + w.Write([]byte(`Failover failed: no healthy replicas available`)) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.FailoverAndWait(context.Background(), "new") + + assert.Assert(t, err != nil) + assert.Assert(t, !success) + assert.Assert(t, strings.Contains(err.Error(), "failover failed with status 500")) + }) + + t.Run("EmptyCandidate", func(t *testing.T) { + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.FailoverAndWait(context.Background(), "") + + assert.Assert(t, err != nil) + assert.Assert(t, !success) + assert.Assert(t, strings.Contains(err.Error(), "candidate is required for failover")) + }) + + // Same as switchover. If we failover but to a different candidate, the + // operator expects success = false. + t.Run("DifferentCandidate", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + // Patroni chose different candidate - no "Successfully" + w.Write([]byte(`Failed over to "different-node" instead of "new"`)) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + success, err := httpClient.FailoverAndWait(context.Background(), "new") + + assert.NilError(t, err) // No error, but success is false + assert.Assert(t, !success) // Should return false like CLI does + }) +} + +func TestHTTPClientReplaceConfiguration(t *testing.T) { + t.Run("Success", func(t *testing.T) { + var capturedRequest *http.Request + var capturedBody map[string]any + + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedRequest = r + json.NewDecoder(r.Body).Decode(&capturedBody) + w.WriteHeader(200) + // Patroni returns the updated configuration as JSON + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(capturedBody) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + config := map[string]any{"some": "values"} + err := httpClient.ReplaceConfiguration(context.Background(), config) + + assert.NilError(t, err) + assert.Equal(t, capturedRequest.Method, "PUT") + assert.Assert(t, strings.HasSuffix(capturedRequest.URL.Path, "/config")) + assert.Equal(t, capturedRequest.Header.Get("Content-Type"), "application/json") + assert.DeepEqual(t, capturedBody, config) + }) + + t.Run("Error", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(400) + w.Write([]byte(`Invalid configuration: postgresql.parameters.max_connections must be integer`)) + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + err := httpClient.ReplaceConfiguration( + context.Background(), + map[string]any{"some": "values"}, + ) + + assert.Assert(t, err != nil) + assert.Assert(t, strings.Contains(err.Error(), "HTTP request failed with status 400")) + }) +} + +func TestHTTPClientGetTimeline(t *testing.T) { + t.Run("Success", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/cluster") { + response := map[string]any{ + "members": []map[string]any{ + { + "name": "leader-pod", + "role": "leader", + "state": "running", + "timeline": int64(4), + }, + { + "name": "replica-pod", + "role": "replica", + "state": "running", + "timeline": int64(4), + }, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + } + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + timeline, err := httpClient.GetTimeline(context.Background()) + + assert.NilError(t, err) + assert.Equal(t, timeline, int64(4)) + }) + + t.Run("NoLeader", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/cluster") { + response := map[string]any{ + "members": []map[string]any{ + { + "name": "replica-pod", + "role": "replica", + "state": "running", + "timeline": int64(4), + }, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + } + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + timeline, err := httpClient.GetTimeline(context.Background()) + + assert.NilError(t, err) + assert.Equal(t, timeline, int64(0)) // Should return 0 when no leader + }) + + t.Run("LeaderNotRunning", func(t *testing.T) { + server := httptest.NewTLSServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if strings.HasSuffix(r.URL.Path, "/cluster") { + response := map[string]any{ + "members": []map[string]any{ + { + "name": "leader-pod", + "role": "leader", + "state": "stopped", // Not running + "timeline": int64(4), + }, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + } + }), + ) + defer server.Close() + + mockClient := createMockKubeClient() + httpClient := HTTPClient{ + kubeClient: mockClient, + client: instanceClient{ + baseUrl: server.URL, + httpClient: server.Client(), + logger: logr.Discard(), + }, + logger: logr.Discard(), + tracer: otel.Tracer("test"), + } + + timeline, err := httpClient.GetTimeline(context.Background()) + + assert.NilError(t, err) + assert.Equal(t, timeline, int64(0)) // Should return 0 when leader not running + }) +} + +func TestExtractMetadataFromPodName(t *testing.T) { + t.Run("ValidPodName", func(t *testing.T) { + metadata, err := extractMetadataFromPodName("pgdb-609qv5o187x841r2-instance1-h8q2-0") + + assert.NilError(t, err) + assert.Equal(t, metadata.Name, "pgdb-609qv5o187x841r2-instance1-h8q2-0") + assert.Equal(t, metadata.Namespace, "pgdb-609qv5o187x841r2") + assert.Equal(t, metadata.InstanceName, "pgdb-609qv5o187x841r2-instance1-h8q2") + }) + + t.Run("InvalidPodName", func(t *testing.T) { + _, err := extractMetadataFromPodName("invalid-pod-name") + + assert.Assert(t, err != nil) + assert.Assert(t, strings.Contains(err.Error(), "unexpected format")) + }) +}