diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 63d400934..634320d82 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -486,6 +486,25 @@ Note that `s3_wal_path` and `gs_wal_path` are mutually exclusive. from a remote primary. See the Patroni documentation [here](https://patroni.readthedocs.io/en/latest/standby_cluster.html) for more details. Optional. +## Lifecycle configuration + +Parameters to control cluster hibernate/wake-up behavior. + +* **phase** + Set to `"stopped"` to hibernate the cluster. When this field is set on a + running cluster, the operator will: + * Store the current number of instances in the status + * Scale down the StatefulSet to 0 replicas + * Scale down the connection pooler to 0 replicas + * Set the cluster status to "Stopping", then "Stopped" + + When this field is removed from a stopped cluster, the operator will: + * Restore the number of instances from the stored value + * Scale up the StatefulSet and connection pooler + * Set the cluster status to "Updating", then "Running" + + This field is optional. When not set, the cluster operates normally. + ## Volume properties Those parameters are grouped under the `volume` top-level key and define the @@ -714,3 +733,21 @@ can have the following properties: * **memory** memory requests to be set as an annotation on the stream resource. Optional. + +## Status fields + +The operator reports the cluster state through the `status` sub-resource. These +fields are managed by the operator and should not be set manually. + +* **PostgresClusterStatus** + Current state of the cluster. One of: Creating, Updating, Running, + UpdateFailed, SyncFailed, CreateFailed, Invalid, Stopping, Stopped. + +* **previousNumberOfInstances** + The number of instances the cluster had before hibernation. Used to restore + the cluster to its previous size when waking up. Cleared after wake-up. + +* **previousPoolerInstances** + A map of connection pooler role to its replica count before hibernation. + The keys are "master" and "replica". Used to restore the pooler when waking + up. Cleared after wake-up. diff --git a/docs/user.md b/docs/user.md index 1c530a48c..5cca700a3 100644 --- a/docs/user.md +++ b/docs/user.md @@ -930,6 +930,107 @@ When you apply this manifest, the operator will: The process is asynchronous. You can monitor the operator logs and the state of the `postgresql` resource to follow the progress. Once the new cluster is up and running, your applications can reconnect. +## Hibernate and Wake-up a Cluster + +The operator supports hibernating a PostgreSQL cluster to save resources when it's +not needed, and waking it up again when required. This feature: + +* Scales down the PostgreSQL StatefulSet to 0 replicas (stops all pods) +* Scales down the connection pooler to 0 replicas +* Preserves the cluster configuration and data (PVCs are retained) +* Stores the previous replica counts for automatic restoration + +### Initiating Hibernate + +To hibernate a running cluster, set the `lifecycle.phase` field to `"stopped"`: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-test-cluster +spec: + teamId: "test-team" + # ... other cluster parameters + numberOfInstances: 3 + lifecycle: + phase: "stopped" +``` + +When you apply this manifest, the operator will: + +* Store the current `numberOfInstances` in `status.previousNumberOfInstances` +* Store the connection pooler replica counts in `status.previousPoolerInstances` +* Set `spec.numberOfInstances` to 0 +* Scale down the StatefulSet to 0 replicas +* Scale down the connection pooler deployments to 0 replicas +* Suspend the logical backup CronJob (if enabled) +* Set `status.PostgresClusterStatus` to "Stopping", then "Stopped" + +### Waking up a Cluster + +To wake up a hibernated cluster, remove the `lifecycle.phase` field or set it to +an empty value: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-test-cluster +spec: + teamId: "test-team" + # ... other cluster parameters + # lifecycle.phase is not set or is removed +``` + +When you apply this manifest, the operator will: + +* Restore `numberOfInstances` from `status.previousNumberOfInstances` +* Restore the connection pooler replica counts from `status.previousPoolerInstances` +* Resume the logical backup CronJob (if enabled) +* Scale up the StatefulSet to the previous replica count +* Scale up the connection pooler deployments to the previous replica counts +* Set `status.PostgresClusterStatus` to "Updating", then "Running" +* Clear `status.previousNumberOfInstances` and `status.previousPoolerInstances` + +### Cluster Status During Lifecycle Transitions + +| Status | Meaning | +|--------|---------| +| Running | Cluster is running normally | +| Stopping | Cluster is transitioning to stopped state (pods terminating) | +| Stopped | All pods have been terminated, cluster is hibernated | + +### Restrictions During Hibernate + +* **During Stopping**: All spec changes are blocked. You must wait for the cluster + to reach the Stopped state before making changes. + +* **During Stopped**: Spec changes are blocked unless you remove `lifecycle.phase` + to wake up the cluster. This prevents accidental modifications to a hibernated + cluster. + +### Connection Pooler Behavior + +The connection pooler is automatically scaled alongside the cluster: + +* When the cluster hibernates, the pooler is scaled to 0 replicas +* When the cluster wakes up, the pooler is restored to its previous replica count +* The previous replica counts are stored in `status.previousPoolerInstances` + +Note: If the connection pooler was already at 0 replicas before hibernate, it +will remain at 0 after wake-up. + +### Logical Backup Behavior + +The logical backup CronJob is automatically suspended during hibernate: + +* When the cluster hibernates, the backup CronJob is suspended (`.spec.suspend: true`) +* When the cluster wakes up, the backup CronJob is automatically resumed +* The backup schedule is preserved and resumes from its normal schedule + +This prevents failed backup jobs from running when the database is unavailable. + ## Setting up a standby cluster Standby cluster is a [Patroni feature](https://github.com/zalando/patroni/blob/master/docs/replica_bootstrap.rst#standby-cluster) diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 39811824e..10e952cd2 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -3246,6 +3246,13 @@ spec: - name type: object type: array + lifecycle: + description: LifecycleSpec describes the lifecycle state of a Postgres + cluster. + properties: + phase: + type: string + type: object logicalBackupRetention: type: string logicalBackupSchedule: @@ -4197,6 +4204,14 @@ spec: properties: PostgresClusterStatus: type: string + previousNumberOfInstances: + format: int32 + type: integer + previousPoolerInstances: + type: object + additionalProperties: + format: int32 + type: integer required: - PostgresClusterStatus type: object diff --git a/pkg/apis/acid.zalan.do/v1/const.go b/pkg/apis/acid.zalan.do/v1/const.go index 4102ea3d3..69012427a 100644 --- a/pkg/apis/acid.zalan.do/v1/const.go +++ b/pkg/apis/acid.zalan.do/v1/const.go @@ -9,6 +9,8 @@ const ( ClusterStatusSyncFailed = "SyncFailed" ClusterStatusAddFailed = "CreateFailed" ClusterStatusRunning = "Running" + ClusterStatusStopping = "Stopping" + ClusterStatusStopped = "Stopped" ClusterStatusInvalid = "Invalid" ) diff --git a/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml b/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml index 39811824e..10e952cd2 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml +++ b/pkg/apis/acid.zalan.do/v1/postgresql.crd.yaml @@ -3246,6 +3246,13 @@ spec: - name type: object type: array + lifecycle: + description: LifecycleSpec describes the lifecycle state of a Postgres + cluster. + properties: + phase: + type: string + type: object logicalBackupRetention: type: string logicalBackupSchedule: @@ -4197,6 +4204,14 @@ spec: properties: PostgresClusterStatus: type: string + previousNumberOfInstances: + format: int32 + type: integer + previousPoolerInstances: + type: object + additionalProperties: + format: int32 + type: integer required: - PostgresClusterStatus type: object diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 1dadfd06c..23c8a286a 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -115,6 +115,7 @@ type PostgresSpec struct { TLS *TLSDescription `json:"tls,omitempty"` AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"` Streams []Stream `json:"streams,omitempty"` + Lifecycle *LifecycleSpec `json:"lifecycle,omitempty"` Env []v1.EnvVar `json:"env,omitempty"` // deprecated @@ -257,6 +258,11 @@ type StandbyDescription struct { StandbyPrimarySlotName string `json:"standby_primary_slot_name,omitempty"` } +// LifecycleSpec describes the lifecycle state of a Postgres cluster. +type LifecycleSpec struct { + Phase string `json:"phase,omitempty"` +} + // TLSDescription specs TLS properties type TLSDescription struct { // +required @@ -302,7 +308,9 @@ type UserFlags []string // PostgresStatus contains status of the PostgreSQL cluster (running, creation failed etc.) type PostgresStatus struct { - PostgresClusterStatus string `json:"PostgresClusterStatus"` + PostgresClusterStatus string `json:"PostgresClusterStatus"` + PreviousNumberOfInstances int32 `json:"previousNumberOfInstances,omitempty"` + PreviousPoolerInstances map[string]int32 `json:"previousPoolerInstances,omitempty"` } // ConnectionPooler Options for connection pooler diff --git a/pkg/apis/acid.zalan.do/v1/util.go b/pkg/apis/acid.zalan.do/v1/util.go index 719defe93..7bbdc0bbf 100644 --- a/pkg/apis/acid.zalan.do/v1/util.go +++ b/pkg/apis/acid.zalan.do/v1/util.go @@ -101,6 +101,16 @@ func (postgresStatus PostgresStatus) Creating() bool { return postgresStatus.PostgresClusterStatus == ClusterStatusCreating } +// Stopping status of cluster +func (postgresStatus PostgresStatus) Stopping() bool { + return postgresStatus.PostgresClusterStatus == ClusterStatusStopping +} + +// Stopped status of cluster +func (postgresStatus PostgresStatus) Stopped() bool { + return postgresStatus.PostgresClusterStatus == ClusterStatusStopped +} + func (postgresStatus PostgresStatus) String() string { return postgresStatus.PostgresClusterStatus } diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 159a87f35..692a6fe30 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -310,6 +310,22 @@ func (in *KubernetesMetaConfiguration) DeepCopy() *KubernetesMetaConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LifecycleSpec) DeepCopyInto(out *LifecycleSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LifecycleSpec. +func (in *LifecycleSpec) DeepCopy() *LifecycleSpec { + if in == nil { + return nil + } + out := new(LifecycleSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LoadBalancerConfiguration) DeepCopyInto(out *LoadBalancerConfiguration) { *out = *in @@ -874,6 +890,11 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Lifecycle != nil { + in, out := &in.Lifecycle, &out.Lifecycle + *out = new(LifecycleSpec) + **out = **in + } if in.Env != nil { in, out := &in.Env, &out.Env *out = make([]corev1.EnvVar, len(*in)) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 04c974f4c..1d22a6609 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1008,13 +1008,31 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + // Block all spec changes when cluster is stopped or stopping + blocked, err := c.blockLifecycleUpdate(newSpec) + if err != nil { + return err + } + if blocked { + return nil + } + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating - newSpec, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) + newSpec, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) if err != nil { return fmt.Errorf("could not set cluster status to updating: %w", err) } + // Handle lifecycle transitions (hibernate/wake-up) + handled, err := c.handleHibernateAndWakeUp(newSpec) + if err != nil { + return err + } + if handled { + return nil + } + if !c.isInMaintenanceWindow(newSpec.Spec.MaintenanceWindows) { // do not apply any major version related changes yet newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion @@ -1220,6 +1238,79 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { return nil } +// blockLifecycleUpdate checks if an update should be blocked due to lifecycle state. +// Returns (blocked bool, err error): +// - (true, nil) if update is blocked and caller should return early +// - (false, nil) if update can proceed +// - (false, error) on error +func (c *Cluster) blockLifecycleUpdate(newSpec *acidv1.Postgresql) (bool, error) { + if !c.Status.Stopped() && !c.Status.Stopping() { + return false, nil + } + + lifecyclePhase := "" + if newSpec.Spec.Lifecycle != nil { + lifecyclePhase = newSpec.Spec.Lifecycle.Phase + } + + // During Stopping: block ALL spec changes (no cancellation allowed) + if c.Status.Stopping() { + return true, fmt.Errorf("cannot update cluster while it is stopping. Wait for it to fully stop first") + } + + // During Stopped: only block if keeping lifecycle.phase="stopped" + if lifecyclePhase == "stopped" { + return true, fmt.Errorf("cannot update cluster while stopped. Remove lifecycle.phase to wake up the cluster") + } + + return false, nil +} + +// handleHibernateAndWakeUp handles cluster hibernate/wake-up transitions during Update(). +// This is the Update path - it detects the transition, modifies the spec via manageHibernateState, +// and persists the changes to Kubernetes. +// +// Returns (handled bool, err error): +// - (true, nil) if lifecycle transition was handled, Update() should return early +// - (false, nil) if no lifecycle transition, normal update continues +// - (false, error) on error during persistence +// +// Flow: +// 1. Detect action via detectLifecycleTransition() +// 2. Check for Stopping->Stopped (via detectStoppingCompleted) +// 3. Call manageHibernateState() to prepare the spec (same logic as Sync path) +// 4. Persist to Kubernetes via persistHibernateTransition/persistWakeUpTransition +func (c *Cluster) handleHibernateAndWakeUp(newSpec *acidv1.Postgresql) (bool, error) { + action := detectLifecycleTransition( + &c.Status, + c.Spec.Lifecycle, + newSpec.Spec.Lifecycle, + newSpec.Spec.NumberOfInstances, + newSpec.Status.PreviousNumberOfInstances, + c.Status.Running(), + ) + + if action == LifecycleActionNone { + if detectStoppingCompleted(&c.Status, c.getStatefulsetReplicas()) { + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusStopped + return c.persistStoppingCompletedTransition(newSpec) + } + return false, nil + } + + c.manageHibernateState(c.Postgresql, newSpec) + + switch action { + case LifecycleActionHibernate: + return c.persistHibernateTransition(newSpec) + + case LifecycleActionWakeUp: + return c.persistWakeUpTransition(newSpec) + } + + return false, nil +} + func syncResources(a, b *v1.ResourceRequirements) bool { for _, res := range []v1.ResourceName{ v1.ResourceCPU, diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index e38540d3e..2d54bf49c 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -2354,3 +2354,292 @@ func TestUpdatePITRResources(t *testing.T) { }) } } + +func TestUpdate_LifecycleBlocksDuringStopping(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 3, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopping", + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + newSpec.Spec.NumberOfInstances = 5 + + err := cluster.Update(oldSpec, newSpec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while it is stopping") +} + +func TestUpdate_LifecycleBlocksWhenStoppedWithPhase(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 0, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + newSpec.Spec.NumberOfInstances = 5 + + err := cluster.Update(oldSpec, newSpec) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while stopped") +} + +func TestUpdate_LifecycleAllowsWakeUp(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + updateCalled := false + statusUpdateCalled := false + + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + updateCalled = true + return true, pg, nil + }) + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + statusUpdateCalled = true + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + } + return false, nil, nil + }) + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 0, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + + err := cluster.Update(oldSpec, newSpec) + + assert.NoError(t, err) + assert.True(t, updateCalled, "Update should have been called for wake-up") + assert.True(t, statusUpdateCalled, "Status update should have been called for wake-up") +} + +func TestUpdate_LifecycleInitiatesHibernate(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + updateCalled := false + statusUpdateCalled := false + + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + updateCalled = true + return true, pg, nil + }) + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + statusUpdateCalled = true + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + } + return false, nil, nil + }) + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 3, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Running", + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + oldSpec := pg.DeepCopy() + newSpec := pg.DeepCopy() + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{Phase: "stopped"} + + err := cluster.Update(oldSpec, newSpec) + + assert.NoError(t, err) + assert.True(t, updateCalled, "Update should have been called for hibernate") + assert.True(t, statusUpdateCalled, "Status update should have been called for hibernate") +} + +func TestUpdate_LifecycleNormalUpdate(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: 3, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Running", + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = "test-cluster" + cluster.Namespace = "default" + + newSpec := pg.DeepCopy() + blocked, err := cluster.blockLifecycleUpdate(newSpec) + + assert.False(t, blocked) + assert.NoError(t, err) +} diff --git a/pkg/cluster/lifecycle.go b/pkg/cluster/lifecycle.go new file mode 100644 index 000000000..eb6a4ee03 --- /dev/null +++ b/pkg/cluster/lifecycle.go @@ -0,0 +1,414 @@ +package cluster + +import ( + "context" + "fmt" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" +) + +// LifecycleAction represents the detected lifecycle transition for a cluster. +// Used by both Sync (manageHibernateState) and Update (handleHibernateAndWakeUp) paths +// to determine what action to take regarding cluster hibernate/wake-up. +type LifecycleAction int + +const ( + LifecycleActionNone LifecycleAction = iota + LifecycleActionHibernate // Running -> Stopping (initiate hibernate) + LifecycleActionStoppingCompleted // Stopping -> Stopped (pods fully terminated) + LifecycleActionWakeUp // Stopped -> Updating (initiate wake-up) +) + +// detectLifecycleTransition is a pure function that examines the current and proposed specs +// and determines what lifecycle action (if any) should be taken. +// +// Detection logic: +// - Hibernate: Running status + lifecycle.phase="stopped" + not already stopping/stopped +// - Wake-up: Stopped status + lifecycle.phase cleared + (previousNumberOfInstances > 0 OR isWakingUp flag) +// - isWakingUp flag is set when old spec was Running, new status is Updating, and lifecycle is cleared +func detectLifecycleTransition( + currentStatus *acidv1.PostgresStatus, + oldSpecLifecycle *acidv1.LifecycleSpec, + newSpecLifecycle *acidv1.LifecycleSpec, + newSpecNumberOfInstances int32, + newSpecPreviousNumberOfInstances int32, + oldSpecStatusRunning bool, +) LifecycleAction { + isWakingUpSimple := newSpecLifecycle == nil || newSpecLifecycle.Phase != "stopped" + hasPreviousInstances := newSpecPreviousNumberOfInstances > 0 + needsRestore := newSpecNumberOfInstances == 0 + + // Detect wake-up by checking if Update() set status to Updating before Sync() runs + isWakingUp := oldSpecStatusRunning && + currentStatus.PostgresClusterStatus == acidv1.ClusterStatusUpdating && + (newSpecLifecycle == nil || newSpecLifecycle.Phase != "stopped") + + // Also detect wake-up by simple conditions: lifecycle cleared + has previous + needs restore + isWakingUp = isWakingUp || (isWakingUpSimple && hasPreviousInstances && needsRestore) + + if currentStatus.Stopped() || isWakingUp { + if isWakingUp || newSpecLifecycle == nil || newSpecLifecycle.Phase != "stopped" { + return LifecycleActionWakeUp + } + } + + if newSpecLifecycle != nil && + newSpecLifecycle.Phase == "stopped" && + !currentStatus.Stopping() && + !currentStatus.Stopped() { + return LifecycleActionHibernate + } + + return LifecycleActionNone +} + +// detectStoppingCompleted checks if the cluster should transition from Stopping to Stopped. +// This happens when the StatefulSet replicas have actually reached 0 (all pods terminated). +func detectStoppingCompleted(currentStatus *acidv1.PostgresStatus, statefulsetReplicas *int32) bool { + if !currentStatus.Stopping() { + return false + } + if statefulsetReplicas == nil { + return false + } + return *statefulsetReplicas == 0 +} + +// initiateHibernate prepares the cluster for hibernation by: +// - Storing current numberOfInstances in PreviousNumberOfInstances +// - Setting numberOfInstances to 0 +// - Setting status to Stopping +// - Scaling down connection pooler deployments +// - Suspending logical backup CronJob +// Errors during pooler/backup operations are logged but do not fail the transition. +func (c *Cluster) initiateHibernate(newSpec *acidv1.Postgresql) { + newSpec.Status.PreviousNumberOfInstances = newSpec.Spec.NumberOfInstances + newSpec.Spec.NumberOfInstances = 0 + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusStopping + + c.logger.Infof("[lifecycle] initiating hibernate: stored previousNumberOfInstances=%d", + newSpec.Status.PreviousNumberOfInstances) + + if err := c.scalePoolerDown(newSpec); err != nil { + c.logger.Warningf("[lifecycle] failed to scale pooler during hibernate: %v", err) + } + + if err := c.suspendLogicalBackupJob(); err != nil { + c.logger.Warningf("[lifecycle] failed to suspend logical backup job: %v", err) + } +} + +// initiateWakeUp prepares the cluster for wake-up by: +// - Restoring numberOfInstances from PreviousNumberOfInstances (if > 0) +// +// - Setting status to Updating +// - Scaling up connection pooler deployments +// - Resuming logical backup CronJob +// If PreviousNumberOfInstances is 0, logs a warning but still sets status to Updating. +// Errors during pooler/backup operations are logged but do not fail the transition. +func (c *Cluster) initiateWakeUp(newSpec *acidv1.Postgresql) { + if newSpec.Status.PreviousNumberOfInstances > 0 { + newSpec.Spec.NumberOfInstances = newSpec.Status.PreviousNumberOfInstances + c.logger.Infof("[lifecycle] initiating wake-up: restoring numberOfInstances=%d", + newSpec.Status.PreviousNumberOfInstances) + } else { + c.logger.Warningf("[lifecycle] cluster is waking up but previousNumberOfInstances is 0, cannot restore") + } + + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusUpdating + + if err := c.scalePoolerUp(newSpec); err != nil { + c.logger.Warningf("[lifecycle] failed to scale pooler during wake-up: %v", err) + } + + if err := c.unsuspendLogicalBackupJob(); err != nil { + c.logger.Warningf("[lifecycle] failed to resume logical backup job: %v", err) + } +} + +// persistHibernateTransition persists the hibernate transition to Kubernetes by: +// 1. Updating the PostgresCR spec (numberOfInstances=0, previousNumberOfInstances stored) +// 2. Updating the PostgresCR status (status=Stopping, previousPoolerInstances stored) +// 3. Updating the local cluster spec +// Returns (handled=true, nil) on success, (handled=false, error) on failure. +func (c *Cluster) persistHibernateTransition(newSpec *acidv1.Postgresql) (bool, error) { + pgUpdated, err := c.KubeClient.UpdatePostgresCR(c.clusterName(), newSpec) + if err != nil { + return false, fmt.Errorf("could not update spec during hibernate: %w", err) + } + + pgUpdated.Status.PreviousNumberOfInstances = newSpec.Status.PreviousNumberOfInstances + pgUpdated.Status.PostgresClusterStatus = newSpec.Status.PostgresClusterStatus + pgUpdated.Status.PreviousPoolerInstances = newSpec.Status.PreviousPoolerInstances + + pgUpdated, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pgUpdated) + if err != nil { + return false, fmt.Errorf("could not update status during hibernate: %w", err) + } + + c.setSpec(pgUpdated) + c.logger.Infof("[lifecycle] hibernate completed: cluster is stopping, numberOfInstances=0, previousNumberOfInstances=%d", + pgUpdated.Status.PreviousNumberOfInstances) + return true, nil +} + +// persistWakeUpTransition persists the wake-up transition to Kubernetes by: +// 1. Clearing PreviousNumberOfInstances and PreviousPoolerInstances +// 2. Updating the PostgresCR spec (numberOfInstances restored from previous) +// 3. Updating the PostgresCR status (status=Updating) +// 4. Updating the local cluster spec +// Returns (handled=true, nil) on success, (handled=false, error) on failure. +func (c *Cluster) persistWakeUpTransition(newSpec *acidv1.Postgresql) (bool, error) { + newSpec.Status.PreviousNumberOfInstances = 0 + newSpec.Status.PreviousPoolerInstances = nil + + pgUpdated, err := c.KubeClient.UpdatePostgresCR(c.clusterName(), newSpec) + if err != nil { + return false, fmt.Errorf("could not update spec during wake-up: %w", err) + } + + pgUpdated.Status.PostgresClusterStatus = newSpec.Status.PostgresClusterStatus + + pgUpdated, err = c.KubeClient.SetPostgresCRDStatus(c.clusterName(), pgUpdated) + if err != nil { + return false, fmt.Errorf("could not update status during wake-up: %w", err) + } + + c.setSpec(pgUpdated) + c.logger.Infof("[lifecycle] wake-up completed: cluster is updating, numberOfInstances=%d", + pgUpdated.Spec.NumberOfInstances) + return true, nil +} + +// persistStoppingCompletedTransition persists the Stopping->Stopped transition to Kubernetes +// by updating only the status (status=Stopped). This is called when StatefulSet replicas +// have actually reached 0. +// Returns (handled=true, nil) on success, (handled=false, error) on failure. +func (c *Cluster) persistStoppingCompletedTransition(newSpec *acidv1.Postgresql) (bool, error) { + pgUpdated, err := c.KubeClient.SetPostgresCRDStatus(c.clusterName(), newSpec) + if err != nil { + return false, fmt.Errorf("could not update status during stopping completed: %w", err) + } + + c.setSpec(pgUpdated) + c.logger.Info("[lifecycle] stopping completed: cluster is stopped") + return true, nil +} + +// manageHibernateState handles cluster hibernate/wake-up state transitions during Sync(). +// This is the Sync path - it only modifies the in-memory spec and returns a boolean +// indicating whether sync should continue or return early. +// +// State transitions handled: +// - Running -> Stopping: Via initiateHibernate() (when lifecycle.phase="stopped") +// - Stopping -> Stopped: When StatefulSet replicas reach 0 (detected here) +// - Stopped -> Updating: Via initiateWakeUp() (when lifecycle cleared) +// +// Returns true if sync should continue, false if it should return early. +func (c *Cluster) manageHibernateState(oldSpec acidv1.Postgresql, newSpec *acidv1.Postgresql) bool { + action := detectLifecycleTransition( + &newSpec.Status, + oldSpec.Spec.Lifecycle, + newSpec.Spec.Lifecycle, + newSpec.Spec.NumberOfInstances, + newSpec.Status.PreviousNumberOfInstances, + oldSpec.Status.Running(), + ) + + switch action { + case LifecycleActionHibernate: + c.initiateHibernate(newSpec) + return true + + case LifecycleActionWakeUp: + c.initiateWakeUp(newSpec) + return true + } + + // Check if Stopping -> Stopped transition is needed + if detectStoppingCompleted(&newSpec.Status, c.getStatefulsetReplicas()) { + newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusStopped + c.logger.Info("[lifecycle] cluster has stopped, all pods are terminated") + return true + } + + // Skip sync if cluster is stopped and lifecycle.phase="stopped" is set + if newSpec.Status.Stopped() && newSpec.Spec.Lifecycle != nil && newSpec.Spec.Lifecycle.Phase == "stopped" { + return false + } + + return true +} + +// getStatefulsetReplicas returns the current replica count from the StatefulSet. +// Returns nil if StatefulSet is nil or Replicas field is nil. +func (c *Cluster) getStatefulsetReplicas() *int32 { + if c.Statefulset == nil || c.Statefulset.Spec.Replicas == nil { + return nil + } + return c.Statefulset.Spec.Replicas +} + +// suspendLogicalBackupJob suspends the logical backup CronJob by setting spec.suspend=true. +// If the job was previously loaded but has been deleted externally, clears the cached reference. +// Returns nil if job is not loaded (no-op) or if job was not found (clears cache). +// Returns error only for actual failures (network errors, etc). +func (c *Cluster) suspendLogicalBackupJob() error { + if c.LogicalBackupJob == nil { + c.logger.Debug("logical backup job is not loaded, skipping suspend") + return nil + } + + // Check if job still exists (handles externally deleted jobs) + _, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get( + context.TODO(), c.getLogicalBackupJobName(), metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + c.logger.Info("logical backup job not found during suspend, clearing cached reference") + c.LogicalBackupJob = nil + return nil + } + if err != nil { + return fmt.Errorf("could not get logical backup job: %w", err) + } + + patchData := fmt.Sprintf(`{"spec":{"suspend":true}}`) + cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + context.TODO(), + c.getLogicalBackupJobName(), + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "", + ) + if err != nil { + return fmt.Errorf("could not suspend logical backup job: %w", err) + } + c.LogicalBackupJob = cronJob + c.logger.Info("logical backup job suspended") + + return nil +} + +// unsuspendLogicalBackupJob resumes the logical backup CronJob by setting spec.suspend=false. +// If the job was previously loaded but has been deleted externally, clears the cached reference. +// Returns nil if job is not loaded (no-op) or if job was not found (clears cache). +// Returns error only for actual failures (network errors, etc). +func (c *Cluster) unsuspendLogicalBackupJob() error { + if c.LogicalBackupJob == nil { + c.logger.Debug("logical backup job is not loaded, skipping unsuspend") + return nil + } + + // Check if job still exists (handles externally deleted jobs) + _, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Get( + context.TODO(), c.getLogicalBackupJobName(), metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + c.logger.Info("logical backup job not found during unsuspend, clearing cached reference") + c.LogicalBackupJob = nil + return nil + } + if err != nil { + return fmt.Errorf("could not get logical backup job: %w", err) + } + + patchData := fmt.Sprintf(`{"spec":{"suspend":false}}`) + cronJob, err := c.KubeClient.CronJobsGetter.CronJobs(c.Namespace).Patch( + context.TODO(), + c.getLogicalBackupJobName(), + types.MergePatchType, + []byte(patchData), + metav1.PatchOptions{}, + "", + ) + if err != nil { + return fmt.Errorf("could not resume logical backup job: %w", err) + } + c.LogicalBackupJob = cronJob + c.logger.Info("logical backup job resumed") + + return nil +} + +// getPoolerReplicas returns the current replica count for a pooler deployment. +// Returns 0 if pooler doesn't exist, hasn't been synced yet, or has nil Replicas. +func (c *Cluster) getPoolerReplicas(role PostgresRole) int32 { + if c.ConnectionPooler == nil || c.ConnectionPooler[role] == nil || + c.ConnectionPooler[role].Deployment == nil || + c.ConnectionPooler[role].Deployment.Spec.Replicas == nil { + return 0 + } + return *c.ConnectionPooler[role].Deployment.Spec.Replicas +} + +// scalePoolerDown scales all connection pooler deployments to 0 and stores their current +// replica counts in newSpec.Status.PreviousPoolerInstances. +// Should be called during hibernate initiation. +// Errors are returned immediately if a patch fails (partial state possible). +func (c *Cluster) scalePoolerDown(newSpec *acidv1.Postgresql) error { + if c.ConnectionPooler == nil { + return nil + } + + for role := range c.ConnectionPooler { + replicas := c.getPoolerReplicas(role) + + if newSpec.Status.PreviousPoolerInstances == nil { + newSpec.Status.PreviousPoolerInstances = make(map[string]int32) + } + newSpec.Status.PreviousPoolerInstances[string(role)] = replicas + + if replicas > 0 { + if err := c.patchPoolerReplicas(role, 0); err != nil { + return err + } + c.logger.Infof("[lifecycle] pooler %s scaled to 0 (was %d)", role, replicas) + } + } + return nil +} + +// scalePoolerUp restores connection pooler deployments to their previous replica counts +// from newSpec.Status.PreviousPoolerInstances. +// Should be called during wake-up. +// Errors are returned immediately if a patch fails (partial state possible). +func (c *Cluster) scalePoolerUp(newSpec *acidv1.Postgresql) error { + if newSpec.Status.PreviousPoolerInstances == nil { + return nil + } + + for roleStr, replicas := range newSpec.Status.PreviousPoolerInstances { + role := PostgresRole(roleStr) + + if err := c.patchPoolerReplicas(role, replicas); err != nil { + return err + } + c.logger.Infof("[lifecycle] pooler %s scaled to %d", role, replicas) + } + return nil +} + +// patchPoolerReplicas patches a pooler deployment's replica count. +// If the deployment doesn't exist (not found), returns nil (no-op). +// Returns error for other failures (network errors, etc). +func (c *Cluster) patchPoolerReplicas(role PostgresRole, replicas int32) error { + _, err := c.KubeClient.Deployments(c.Namespace).Get( + context.TODO(), c.connectionPoolerName(role), metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("could not get pooler deployment for %s: %w", role, err) + } + + patchData := fmt.Sprintf(`{"spec":{"replicas":%d}}`, replicas) + _, err = c.KubeClient.Deployments(c.Namespace).Patch( + context.TODO(), c.connectionPoolerName(role), types.MergePatchType, []byte(patchData), metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("could not patch pooler deployment %s replicas: %w", role, err) + } + return nil +} \ No newline at end of file diff --git a/pkg/cluster/lifecycle_test.go b/pkg/cluster/lifecycle_test.go new file mode 100644 index 000000000..3dcd53380 --- /dev/null +++ b/pkg/cluster/lifecycle_test.go @@ -0,0 +1,1430 @@ +package cluster + +import ( + "context" + "fmt" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" +) + +var lifecycleLogger = logrus.New().WithField("test", "lifecycle") +var lifecycleEventRecorder = record.NewFakeRecorder(10) + +func int32Ptr(i int32) *int32 { return &i } + +func newTestLifecycleCluster(status string, numberOfInstances int32, lifecyclePhase string) *Cluster { + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: numberOfInstances, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: status, + }, + } + + if lifecyclePhase != "" { + pg.Spec.Lifecycle = &acidv1.LifecycleSpec{ + Phase: lifecyclePhase, + } + } + + return &Cluster{ + Config: Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, + Postgresql: pg, + logger: lifecycleLogger, + eventRecorder: lifecycleEventRecorder, + } +} + +func newTestPoolerObjects(role PostgresRole, replicas int32) *ConnectionPoolerObjects { + return &ConnectionPoolerObjects{ + Deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%s-pooler", role), + Namespace: "default", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: int32Ptr(replicas), + }, + }, + Name: fmt.Sprintf("test-%s-pooler", role), + ClusterName: "test-cluster", + Namespace: "default", + Role: role, + } +} + +func newFakeK8sClientForLifecycle() (*k8sutil.KubernetesClient, *fake.Clientset, *fakeacidv1.Clientset) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + client := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + StatefulSetsGetter: clientSet.AppsV1(), + ServicesGetter: clientSet.CoreV1(), + SecretsGetter: clientSet.CoreV1(), + ConfigMapsGetter: clientSet.CoreV1(), + PodsGetter: clientSet.CoreV1(), + EndpointsGetter: clientSet.CoreV1(), + } + + return client, clientSet, acidClientSet +} + +func createTestPostgresqlInClient(client *k8sutil.KubernetesClient, name, namespace string, spec *acidv1.PostgresSpec, status *acidv1.PostgresStatus) *acidv1.Postgresql { + pg := &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + if spec != nil { + pg.Spec = *spec + } + if status != nil { + pg.Status = *status + } + + created, err := client.Postgresqls(namespace).Create(context.TODO(), pg, metav1.CreateOptions{}) + if err == nil { + return created + } + return pg +} + +func updatePostgresqlInClient(client *k8sutil.KubernetesClient, pg *acidv1.Postgresql) (*acidv1.Postgresql, error) { + return client.Postgresqls(pg.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{}) +} + +func updatePostgresqlStatusInClient(client *k8sutil.KubernetesClient, pg *acidv1.Postgresql) (*acidv1.Postgresql, error) { + return client.Postgresqls(pg.Namespace).UpdateStatus(context.TODO(), pg, metav1.UpdateOptions{}) +} + +func TestGetPoolerReplicas(t *testing.T) { + tests := []struct { + name string + poolerObjs map[PostgresRole]*ConnectionPoolerObjects + role PostgresRole + want int32 + }{ + { + name: "nil ConnectionPooler map", + poolerObjs: nil, + role: Master, + want: 0, + }, + { + name: "ConnectionPooler for role is nil", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: nil}, + role: Master, + want: 0, + }, + { + name: "Deployment is nil", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: {Deployment: nil}}, + role: Master, + want: 0, + }, + { + name: "Replicas is nil", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: {Deployment: &appsv1.Deployment{Spec: appsv1.DeploymentSpec{Replicas: nil}}}, + }, + role: Master, + want: 0, + }, + { + name: "Master with 2 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: newTestPoolerObjects(Master, 2), + }, + role: Master, + want: 2, + }, + { + name: "Master with 0 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: newTestPoolerObjects(Master, 0), + }, + role: Master, + want: 0, + }, + { + name: "Replica with 3 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Replica: newTestPoolerObjects(Replica, 3), + }, + role: Replica, + want: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Cluster{ + ConnectionPooler: tt.poolerObjs, + } + got := c.getPoolerReplicas(tt.role) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestPatchPoolerReplicas(t *testing.T) { + tests := []struct { + name string + replicas int32 + setupClient func(clientSet *fake.Clientset) + wantErr bool + errContains string + }{ + { + name: "deployment exists, patch succeeds", + replicas: 0, + setupClient: func(clientSet *fake.Clientset) { + // Pre-create the deployment in the fake clientset + clientSet.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-pooler"}, + Spec: appsv1.DeploymentSpec{Replicas: int32Ptr(2)}, + }, metav1.CreateOptions{}) + }, + wantErr: false, + }, + { + name: "deployment not found", + replicas: 2, + setupClient: func(clientSet *fake.Clientset) { + // Don't create anything - will trigger NotFound + }, + wantErr: false, // NotFound is handled gracefully + }, + { + name: "patch returns error", + replicas: 2, + setupClient: func(clientSet *fake.Clientset) { + // Pre-create deployment but make patch fail via reactor + clientSet.AppsV1().Deployments("default").Create(context.TODO(), &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "test-cluster-pooler"}, + Spec: appsv1.DeploymentSpec{Replicas: int32Ptr(2)}, + }, metav1.CreateOptions{}) + clientSet.PrependReactor("patch", "deployments", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("network error") + }) + }, + wantErr: true, + errContains: "could not patch pooler deployment", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + if tt.setupClient != nil { + tt.setupClient(clientSet) + } + + kubeClient := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + } + + c := &Cluster{ + KubeClient: *kubeClient, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + err := c.patchPoolerReplicas(Master, tt.replicas) + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestBlockLifecycleUpdate(t *testing.T) { + tests := []struct { + name string + currentStatus string + lifecyclePhase string + wantBlocked bool + wantErr bool + errContains string + }{ + { + name: "Running cluster, allows update", + currentStatus: "Running", + wantBlocked: false, + wantErr: false, + }, + { + name: "Stopping state, blocks update", + currentStatus: "Stopping", + wantBlocked: true, + wantErr: true, + errContains: "cannot update cluster while it is stopping", + }, + { + name: "Stopped with lifecycle phase, blocks update", + currentStatus: "Stopped", + lifecyclePhase: "stopped", + wantBlocked: true, + wantErr: true, + errContains: "cannot update cluster while stopped", + }, + { + name: "Stopped without lifecycle phase, allows update (wake-up)", + currentStatus: "Stopped", + lifecyclePhase: "", + wantBlocked: false, + wantErr: false, + }, + { + name: "Stopped with nil lifecycle, allows update (wake-up)", + currentStatus: "Stopped", + lifecyclePhase: "", + wantBlocked: false, + wantErr: false, + }, + { + name: "Stopped with empty lifecycle phase, allows update (wake-up)", + currentStatus: "Stopped", + lifecyclePhase: "", + wantBlocked: false, + wantErr: false, + }, + { + name: "UpdateFailed state, allows update", + currentStatus: "UpdateFailed", + wantBlocked: false, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newTestLifecycleCluster(tt.currentStatus, 3, tt.lifecyclePhase) + + newSpec := c.DeepCopy() + if tt.lifecyclePhase != "" && tt.currentStatus != "Stopped" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{Phase: tt.lifecyclePhase} + } + + blocked, err := c.blockLifecycleUpdate(newSpec) + + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantBlocked, blocked) + }) + } +} + +func TestManageHibernateState(t *testing.T) { + tests := []struct { + name string + oldSpecStatus string + newSpecStatus string + newSpecLifecyclePhase string + numberOfInstances int32 + previousNumberOfInst int32 + statefulsetReplicas *int32 + wantContinue bool + wantNumberOfInstances *int32 + wantStatus string + }{ + { + name: "Running to Stopping - initiates hibernate", + oldSpecStatus: "Running", + newSpecStatus: "Running", + newSpecLifecyclePhase: "stopped", + numberOfInstances: 3, + statefulsetReplicas: int32Ptr(3), + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Stopping", + }, + { + name: "Stopping to Stopped - when replicas reach 0", + oldSpecStatus: "Stopping", + newSpecStatus: "Stopping", + numberOfInstances: 0, + statefulsetReplicas: int32Ptr(0), + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Stopped", + }, + { + name: "Stopping - replicas not yet 0", + oldSpecStatus: "Stopping", + newSpecStatus: "Stopping", + numberOfInstances: 0, + statefulsetReplicas: int32Ptr(2), // Still terminating + wantContinue: true, + wantNumberOfInstances: nil, // Should NOT change + wantStatus: "Stopping", // Should NOT change + }, + { + name: "Stopped to wake-up - restores numberOfInstances", + oldSpecStatus: "Stopped", + newSpecStatus: "Stopped", + newSpecLifecyclePhase: "", // Cleared + numberOfInstances: 0, + previousNumberOfInst: 3, + statefulsetReplicas: int32Ptr(0), + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Updating", + }, + { + name: "Stopped but lifecycle still 'stopped' - skip sync", + oldSpecStatus: "Stopped", + newSpecStatus: "Stopped", + newSpecLifecyclePhase: "stopped", + numberOfInstances: 0, + statefulsetReplicas: int32Ptr(0), + wantContinue: false, // Should skip sync + wantNumberOfInstances: nil, + wantStatus: "Stopped", + }, + { + name: "Running without lifecycle - continue normal", + oldSpecStatus: "Running", + newSpecStatus: "Running", + newSpecLifecyclePhase: "", + numberOfInstances: 3, + statefulsetReplicas: int32Ptr(3), + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), // Unchanged + wantStatus: "Running", // Unchanged + }, + { + name: "Running to Updating - normal update", + oldSpecStatus: "Running", + newSpecStatus: "Updating", + newSpecLifecyclePhase: "", + numberOfInstances: 3, + statefulsetReplicas: int32Ptr(3), + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Updating", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + if tt.statefulsetReplicas != nil { + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{ + Replicas: tt.statefulsetReplicas, + }, + } + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.oldSpecStatus, + }, + } + + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: tt.numberOfInstances, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.newSpecStatus, + PreviousNumberOfInstances: tt.previousNumberOfInst, + }, + } + + if tt.newSpecLifecyclePhase != "" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{ + Phase: tt.newSpecLifecyclePhase, + } + } + + gotContinue := c.manageHibernateState(oldSpec, newSpec) + + assert.Equal(t, tt.wantContinue, gotContinue) + + if tt.wantNumberOfInstances != nil { + assert.Equal(t, *tt.wantNumberOfInstances, newSpec.Spec.NumberOfInstances) + } + assert.Equal(t, tt.wantStatus, newSpec.Status.PostgresClusterStatus) + }) + } +} + +func TestHandleHibernateAndWakeUp_Hibernate(t *testing.T) { + tests := []struct { + name string + currentStatus string + numberOfInstances int32 + lifecyclePhase string + poolerObjs map[PostgresRole]*ConnectionPoolerObjects + k8sUpdateSucceeds bool + k8sStatusSucceeds bool + poolerPatchSucceeds bool + wantHandled bool + wantErr bool + errContains string + }{ + { + name: "Running + lifecycle.phase=stopped - initiates hibernate", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "stopped", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: true, + poolerPatchSucceeds: true, + wantHandled: true, + wantErr: false, + }, + { + name: "Running + lifecycle.phase=stopped - K8s update fails", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "stopped", + k8sUpdateSucceeds: false, + wantHandled: false, + wantErr: true, + errContains: "could not update spec during hibernate", + }, + { + name: "Running + lifecycle.phase=stopped - K8s status update fails", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "stopped", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: false, + wantHandled: false, + wantErr: true, + errContains: "could not update status during hibernate", + }, + { + name: "Running + no lifecycle phase - no transition", + currentStatus: "Running", + numberOfInstances: 3, + lifecyclePhase: "", + wantHandled: false, + wantErr: false, + }, + { + name: "Stopped + lifecycle cleared - initiates wake-up", + currentStatus: "Stopped", + numberOfInstances: 0, + lifecyclePhase: "", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: true, + wantHandled: true, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kubeClient, _, acidClientSet := newFakeK8sClientForLifecycle() + + if tt.k8sUpdateSucceeds { + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + }) + } + + if !tt.k8sStatusSucceeds { + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + if action.GetSubresource() == "status" { + return true, nil, fmt.Errorf("status update failed") + } + return false, nil, nil + }) + } + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: tt.numberOfInstances, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + }, + }, + KubeClient: *kubeClient, + ConnectionPooler: tt.poolerObjs, + logger: lifecycleLogger, + eventRecorder: lifecycleEventRecorder, + } + + newSpec := c.DeepCopy() + if tt.lifecyclePhase != "" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{Phase: tt.lifecyclePhase} + } + + handled, err := c.handleHibernateAndWakeUp(newSpec) + + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantHandled, handled) + }) + } +} + +func TestHandleHibernateAndWakeUp_WakeUp(t *testing.T) { + tests := []struct { + name string + currentStatus string + numberOfInstances int32 + previousNumberOfInst int32 + previousPoolerInstances map[string]int32 + lifecyclePhase string + k8sUpdateSucceeds bool + k8sStatusSucceeds bool + poolerPatchSucceeds bool + wantHandled bool + wantErr bool + errContains string + }{ + { + name: "Stopped + lifecycle cleared - wake-up", + currentStatus: "Stopped", + numberOfInstances: 0, + previousNumberOfInst: 3, + previousPoolerInstances: map[string]int32{"master": 2, "replica": 0}, + lifecyclePhase: "", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: true, + wantHandled: true, + wantErr: false, + }, + { + name: "Stopped + previousNumberOfInstances is 0 - still initiates wake-up", + currentStatus: "Stopped", + numberOfInstances: 0, + previousNumberOfInst: 0, + lifecyclePhase: "", + k8sUpdateSucceeds: true, + k8sStatusSucceeds: true, + wantHandled: true, + wantErr: false, + }, + { + name: "Stopped + lifecycle cleared + K8s update fails", + currentStatus: "Stopped", + numberOfInstances: 0, + previousNumberOfInst: 3, + lifecyclePhase: "", + k8sUpdateSucceeds: false, + wantHandled: false, + wantErr: true, + errContains: "could not update spec during wake-up", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + kubeClient, _, acidClientSet := newFakeK8sClientForLifecycle() + + if tt.k8sUpdateSucceeds { + acidClientSet.PrependReactor("update", "postgresqls", func(action k8stesting.Action) (bool, runtime.Object, error) { + updateAction := action.(k8stesting.UpdateAction) + pg := updateAction.GetObject().(*acidv1.Postgresql) + return true, pg, nil + }) + } + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + TeamID: "test-team", + NumberOfInstances: tt.numberOfInstances, + Volume: acidv1.Volume{Size: "1Gi"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + PreviousNumberOfInstances: tt.previousNumberOfInst, + PreviousPoolerInstances: tt.previousPoolerInstances, + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + eventRecorder: lifecycleEventRecorder, + } + + newSpec := c.DeepCopy() + + handled, err := c.handleHibernateAndWakeUp(newSpec) + + if tt.wantErr { + assert.Error(t, err) + if tt.errContains != "" { + assert.Contains(t, err.Error(), tt.errContains) + } + } else { + assert.NoError(t, err) + } + assert.Equal(t, tt.wantHandled, handled) + }) + } +} + +func TestScalePoolerDown(t *testing.T) { + tests := []struct { + name string + poolerObjs map[PostgresRole]*ConnectionPoolerObjects + wantStored map[string]int32 + }{ + { + name: "nil ConnectionPooler - no-op", + poolerObjs: nil, + wantStored: nil, + }, + { + name: "Master at 2 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: newTestPoolerObjects(Master, 2)}, + wantStored: map[string]int32{"master": 2}, + }, + { + name: "Master already at 0 replicas", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{Master: newTestPoolerObjects(Master, 0)}, + wantStored: map[string]int32{"master": 0}, + }, + { + name: "Both Master and Replica", + poolerObjs: map[PostgresRole]*ConnectionPoolerObjects{ + Master: newTestPoolerObjects(Master, 2), + Replica: newTestPoolerObjects(Replica, 1), + }, + wantStored: map[string]int32{"master": 2, "replica": 1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + kubeClient := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + } + + c := &Cluster{ + KubeClient: *kubeClient, + ConnectionPooler: tt.poolerObjs, + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + newSpec := &acidv1.Postgresql{} + err := c.scalePoolerDown(newSpec) + + assert.NoError(t, err) + assert.Equal(t, tt.wantStored, newSpec.Status.PreviousPoolerInstances) + }) + } +} + +func TestScalePoolerUp(t *testing.T) { + tests := []struct { + name string + prevInst map[string]int32 + wantErr bool + }{ + { + name: "nil PreviousPoolerInstances - no-op", + prevInst: nil, + }, + { + name: "Restore master to 2", + prevInst: map[string]int32{"master": 2}, + }, + { + name: "Restore both roles", + prevInst: map[string]int32{"master": 2, "replica": 1}, + }, + { + name: "Restore master to 0 (keep at 0)", + prevInst: map[string]int32{"master": 0}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + kubeClient := &k8sutil.KubernetesClient{ + DeploymentsGetter: clientSet.AppsV1(), + } + + c := &Cluster{ + KubeClient: *kubeClient, + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + newSpec := &acidv1.Postgresql{ + Status: acidv1.PostgresStatus{ + PreviousPoolerInstances: tt.prevInst, + }, + } + err := c.scalePoolerUp(newSpec) + + assert.NoError(t, err) + }) + } +} + +func TestLifecycleStateTransitions(t *testing.T) { + t.Run("complete Hibernate flow: Running -> Stopping -> Stopped", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + // Initial: Running + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 3, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + + // Step 1: manageHibernateState should initiate hibernate + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(0), newSpec.Spec.NumberOfInstances) + assert.Equal(t, "Stopping", newSpec.Status.PostgresClusterStatus) + assert.Equal(t, int32(3), newSpec.Status.PreviousNumberOfInstances) + }) + + t.Run("complete Wake-up flow: Stopped -> Updating -> Running", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + // After hibernate: Stopped, replicas = 0 + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{Replicas: int32Ptr(0)}, + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Stopped"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + // Lifecycle cleared by user + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + // Step 1: manageHibernateState should restore + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(3), newSpec.Spec.NumberOfInstances) + assert.Equal(t, "Updating", newSpec.Status.PostgresClusterStatus) + }) +} + +func TestLifecycleUpdateBlocksDuringStopping(t *testing.T) { + kubeClient, _, _ := newFakeK8sClientForLifecycle() + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{}, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopping", + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + } + + newSpec := c.DeepCopy() + blocked, err := c.blockLifecycleUpdate(newSpec) + + assert.True(t, blocked) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while it is stopping") +} + +func TestLifecycleUpdateBlocksWhenStoppedWithPhase(t *testing.T) { + kubeClient, _, _ := newFakeK8sClientForLifecycle() + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{}, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + } + + newSpec := c.DeepCopy() + blocked, err := c.blockLifecycleUpdate(newSpec) + + assert.True(t, blocked) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot update cluster while stopped") +} + +func TestLifecycleUpdateAllowsWakeUp(t *testing.T) { + kubeClient, _, _ := newFakeK8sClientForLifecycle() + + c := &Cluster{ + Config: Config{ + OpConfig: config.Config{}, + }, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + // Lifecycle cleared by user + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + }, + KubeClient: *kubeClient, + logger: lifecycleLogger, + } + + newSpec := c.DeepCopy() + blocked, err := c.blockLifecycleUpdate(newSpec) + + assert.False(t, blocked) + assert.NoError(t, err) +} + +func TestManageHibernateState_EdgeCases(t *testing.T) { + tests := []struct { + name string + statefulsetReplicas *int32 + currentStatus string + newSpecLifecyclePhase string + newSpecNumberOfInst int32 + previousNumberOfInst int32 + wantContinue bool + wantNumberOfInstances *int32 + wantStatus string + }{ + { + name: "Stopping state with nil statefulset - should not transition to Stopped", + statefulsetReplicas: nil, + currentStatus: "Stopping", + newSpecLifecyclePhase: "stopped", + newSpecNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: nil, + wantStatus: "Stopping", + }, + { + name: "Stopping with nil Lifecycle spec", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopping", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: nil, + wantStatus: "Stopped", + }, + { + name: "Running to Stopping when numberOfInstances already 0", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Running", + newSpecLifecyclePhase: "stopped", + newSpecNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Stopping", + }, + { + name: "Stopped with previousNumberOfInstances=0 - sets Updating but warns", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + previousNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: int32Ptr(0), + wantStatus: "Updating", + }, + { + name: "Stopped with nil Lifecycle - wake-up", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + previousNumberOfInst: 3, + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Updating", + }, + { + name: "Stopped with empty Lifecycle phase - wake-up", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 0, + previousNumberOfInst: 2, + wantContinue: true, + wantNumberOfInstances: int32Ptr(2), + wantStatus: "Updating", + }, + { + name: "Stopped but lifecycle still 'stopped' - skip sync", + statefulsetReplicas: int32Ptr(0), + currentStatus: "Stopped", + newSpecLifecyclePhase: "stopped", + newSpecNumberOfInst: 0, + previousNumberOfInst: 3, + wantContinue: false, + wantNumberOfInstances: nil, + wantStatus: "Stopped", + }, + { + name: "Running without lifecycle change - normal update", + statefulsetReplicas: int32Ptr(3), + currentStatus: "Running", + newSpecLifecyclePhase: "", + newSpecNumberOfInst: 3, + previousNumberOfInst: 0, + wantContinue: true, + wantNumberOfInstances: int32Ptr(3), + wantStatus: "Running", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + } + + if tt.statefulsetReplicas != nil { + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{ + Replicas: tt.statefulsetReplicas, + }, + } + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + }, + } + + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: tt.newSpecNumberOfInst, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: tt.currentStatus, + PreviousNumberOfInstances: tt.previousNumberOfInst, + }, + } + + if tt.newSpecLifecyclePhase != "" || tt.newSpecLifecyclePhase == "" && tt.currentStatus == "Stopped" { + newSpec.Spec.Lifecycle = &acidv1.LifecycleSpec{ + Phase: tt.newSpecLifecyclePhase, + } + } + + gotContinue := c.manageHibernateState(oldSpec, newSpec) + + assert.Equal(t, tt.wantContinue, gotContinue, "continue sync mismatch") + if tt.wantNumberOfInstances != nil { + assert.Equal(t, *tt.wantNumberOfInstances, newSpec.Spec.NumberOfInstances, "numberOfInstances mismatch") + } + assert.Equal(t, tt.wantStatus, newSpec.Status.PostgresClusterStatus, "status mismatch") + }) + } +} + +func TestManageHibernateState_StateTransitionSequence(t *testing.T) { + t.Run("Running -> Stopping -> Stopped sequence", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 3, + Lifecycle: &acidv1.LifecycleSpec{Phase: "stopped"}, + }, + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Running"}, + } + + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(0), newSpec.Spec.NumberOfInstances) + assert.Equal(t, int32(3), newSpec.Status.PreviousNumberOfInstances) + assert.Equal(t, "Stopping", newSpec.Status.PostgresClusterStatus) + + c.Statefulset = &appsv1.StatefulSet{ + Spec: appsv1.StatefulSetSpec{Replicas: int32Ptr(2)}, + } + oldSpec = acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Stopping"}, + } + newSpec.Status.PostgresClusterStatus = "Stopping" + + continueSync = c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, "Stopping", newSpec.Status.PostgresClusterStatus) + + c.Statefulset.Spec.Replicas = int32Ptr(0) + continueSync = c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, "Stopped", newSpec.Status.PostgresClusterStatus) + }) + + t.Run("Stopped -> Updating -> Running sequence", func(t *testing.T) { + c := &Cluster{ + logger: lifecycleLogger, + Postgresql: acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + }, + } + + oldSpec := acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Stopped"}, + } + newSpec := &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + NumberOfInstances: 0, + }, + Status: acidv1.PostgresStatus{ + PostgresClusterStatus: "Stopped", + PreviousNumberOfInstances: 3, + }, + } + + continueSync := c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(3), newSpec.Spec.NumberOfInstances) + assert.Equal(t, "Updating", newSpec.Status.PostgresClusterStatus) + + oldSpec = acidv1.Postgresql{ + Status: acidv1.PostgresStatus{PostgresClusterStatus: "Updating"}, + } + newSpec.Status.PostgresClusterStatus = "Updating" + + continueSync = c.manageHibernateState(oldSpec, newSpec) + assert.True(t, continueSync) + assert.Equal(t, int32(3), newSpec.Spec.NumberOfInstances) + }) +} + +func TestSuspendLogicalBackupJob(t *testing.T) { + tests := []struct { + name string + jobExists bool + patchFails bool + wantErr bool + }{ + { + name: "job exists, suspend succeeds", + jobExists: true, + wantErr: false, + }, + { + name: "job does not exist - no-op", + jobExists: false, + wantErr: false, + }, + { + name: "job exists but patch fails", + jobExists: true, + patchFails: true, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + jobName := "logical-backup-test-cluster" + + if tt.jobExists { + clientSet.BatchV1().CronJobs("default").Create(context.TODO(), &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "30 00 * * *", + }, + }, metav1.CreateOptions{}) + } + + if tt.patchFails { + clientSet.PrependReactor("patch", "cronjobs", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("network error") + }) + } + + kubeClient := &k8sutil.KubernetesClient{ + CronJobsGetter: clientSet.BatchV1(), + } + + var job *batchv1.CronJob + if tt.jobExists { + job, _ = kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + } + + c := New( + Config{ + OpConfig: config.Config{ + LogicalBackup: config.LogicalBackup{ + LogicalBackupJobPrefix: "logical-backup-", + }, + }, + }, + *kubeClient, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + lifecycleLogger, + lifecycleEventRecorder, + ) + c.LogicalBackupJob = job + + err := c.suspendLogicalBackupJob() + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tt.jobExists && !tt.patchFails { + updatedJob, _ := kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + if updatedJob != nil { + assert.True(t, *updatedJob.Spec.Suspend, "job should be suspended") + } + } + } + }) + } +} + +func TestUnsuspendLogicalBackupJob(t *testing.T) { + tests := []struct { + name string + jobExists bool + patchFails bool + wantErr bool + }{ + { + name: "job exists, unsuspend succeeds", + jobExists: true, + wantErr: false, + }, + { + name: "job does not exist - no-op", + jobExists: false, + wantErr: false, + }, + { + name: "job exists but patch fails", + jobExists: true, + patchFails: true, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + jobName := "logical-backup-test-cluster" + + if tt.jobExists { + suspendTrue := true + clientSet.BatchV1().CronJobs("default").Create(context.TODO(), &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "30 00 * * *", + Suspend: &suspendTrue, + }, + }, metav1.CreateOptions{}) + } + + if tt.patchFails { + clientSet.PrependReactor("patch", "cronjobs", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("network error") + }) + } + + kubeClient := &k8sutil.KubernetesClient{ + CronJobsGetter: clientSet.BatchV1(), + } + + var job *batchv1.CronJob + if tt.jobExists { + job, _ = kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + } + + c := New( + Config{ + OpConfig: config.Config{ + LogicalBackup: config.LogicalBackup{ + LogicalBackupJobPrefix: "logical-backup-", + }, + }, + }, + *kubeClient, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + }, + lifecycleLogger, + lifecycleEventRecorder, + ) + c.LogicalBackupJob = job + + err := c.unsuspendLogicalBackupJob() + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + if tt.jobExists && !tt.patchFails { + updatedJob, _ := kubeClient.CronJobs("default").Get(context.TODO(), jobName, metav1.GetOptions{}) + if updatedJob != nil { + assert.False(t, *updatedJob.Spec.Suspend, "job should be unsuspended") + } + } + } + }) + } +} \ No newline at end of file diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 1925733de..b75f6626d 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -825,6 +825,8 @@ func (c *Cluster) deleteLogicalBackupJob() error { return nil } + + // GetServiceMaster returns cluster's kubernetes master Service func (c *Cluster) GetServiceMaster() *v1.Service { return c.Services[Master] diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index ffebd306c..13982900c 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -47,7 +47,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { if err != nil { c.logger.Warningf("error while syncing cluster state: %v", err) newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusSyncFailed - } else if !c.Status.Running() { + } else if !c.Status.Running() && !c.Status.Stopping() && !c.Status.Stopped() { newSpec.Status.PostgresClusterStatus = acidv1.ClusterStatusRunning } @@ -65,6 +65,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.logger.Debugf("could not sync finalizers: %v", err) } + // Handle lifecycle hibernate/wake-up state transitions + if !c.manageHibernateState(oldSpec, newSpec) { + return nil + } + if err = c.initUsers(); err != nil { err = fmt.Errorf("could not init users: %v", err) return err diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index c34faddd4..0e31112ad 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -200,6 +200,16 @@ func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.Namespaced return pg, nil } +// UpdatePostgresCR of Postgres cluster (updates full resource including spec) +func (client *KubernetesClient) UpdatePostgresCR(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql) (*apiacidv1.Postgresql, error) { + pg, err := client.PostgresqlsGetter.Postgresqls(clusterName.Namespace).Update(context.TODO(), pg, metav1.UpdateOptions{}) + if err != nil { + return pg, fmt.Errorf("could not update PostgresCR: %v", err) + } + + return pg, nil +} + // SetFinalizer of Postgres cluster func (client *KubernetesClient) SetFinalizer(clusterName spec.NamespacedName, pg *apiacidv1.Postgresql, finalizers []string) (*apiacidv1.Postgresql, error) { var (