From afa2858dbfb26f4737ff9ec0e534651e5241f795 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Wed, 24 Dec 2025 14:04:53 +0530 Subject: [PATCH 1/9] Added sync restore in place, while keeping the secrerts and services alive --- pkg/cluster/cluster.go | 14 ++-- pkg/controller/postgresql.go | 149 +++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 9cd750e84..2e9952e34 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1211,6 +1211,8 @@ func (c *Cluster) Delete() error { defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") + isRestoreInPlace := c.Annotations["postgres-operator.zalando.org/action"] == "restore-in-place" + c.logger.Debugf("restore-in-place: Deleting the cluster, verifying whether resotore-in-place is true or not: %+v\n", isRestoreInPlace) if err := c.deleteStreams(); err != nil { anyErrors = true c.logger.Warningf("could not delete event streams: %v", err) @@ -1231,7 +1233,7 @@ func (c *Cluster) Delete() error { c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete statefulset: %v", err) } - if c.OpConfig.EnableSecretsDeletion != nil && *c.OpConfig.EnableSecretsDeletion { + if c.OpConfig.EnableSecretsDeletion != nil && *c.OpConfig.EnableSecretsDeletion && !isRestoreInPlace { if err := c.deleteSecrets(); err != nil { anyErrors = true c.logger.Warningf("could not delete secrets: %v", err) @@ -1256,10 +1258,12 @@ func (c *Cluster) Delete() error { } } - if err := c.deleteService(role); err != nil { - anyErrors = true - c.logger.Warningf("could not delete %s service: %v", role, err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) + if !isRestoreInPlace { + if err := c.deleteService(role); err != nil { + anyErrors = true + c.logger.Warningf("could not delete %s service: %v", role, err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete %s service: %v", role, err) + } } } diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 824a030f4..6fb2e341b 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -14,7 +14,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -539,6 +541,13 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { pgOld := c.postgresqlCheck(prev) pgNew := c.postgresqlCheck(cur) if pgOld != nil && pgNew != nil { + + if pgNew.Annotations["postgres-operator.zalando.org/action"] == "restore-in-place" { + c.logger.Debugf("restore-in-place: postgresqlUpdate called for cluster %q", pgNew.Name) + c.handlerRestoreInPlace(pgOld, pgNew) + return + } + // Avoid the inifinite recursion for status updates if reflect.DeepEqual(pgOld.Spec, pgNew.Spec) { if reflect.DeepEqual(pgNew.Annotations, pgOld.Annotations) { @@ -568,6 +577,146 @@ func (c *Controller) postgresqlCheck(obj interface{}) *acidv1.Postgresql { return pg } +// validateRestoreInPlace checks if the restore parameters are valid +func (c *Controller) validateRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) error { + c.logger.Debugf("restore-in-place: validating restore parameters for cluster %q", pgNew.Name) + + if pgNew.Spec.Clone == nil { + return fmt.Errorf("'clone' section is missing in the manifest") + } + + // Use ClusterName from CloneDescription + if pgNew.Spec.Clone.ClusterName != pgOld.Name { + return fmt.Errorf("clone cluster name %q does not match the current cluster name %q", pgNew.Spec.Clone.ClusterName, pgOld.Name) + } + + // Use EndTimestamp from CloneDescription + cloneTimestamp, err := time.Parse(time.RFC3339, pgNew.Spec.Clone.EndTimestamp) + if err != nil { + return fmt.Errorf("could not parse clone timestamp %q: %v", pgNew.Spec.Clone.EndTimestamp, err) + } + + if cloneTimestamp.After(time.Now()) { + return fmt.Errorf("clone timestamp %q is in the future", pgNew.Spec.Clone.EndTimestamp) + } + + c.logger.Debugf("restore-in-place: validation successful") + return nil +} + +// waitForOldResourcesTermination waits until the postgresql CR and its StatefulSet are terminated +func (c *Controller) waitForOldResourcesTermination(pgOld *acidv1.Postgresql, statefulSetName string) error { + c.logger.Debugf("restore-in-place: Waiting for old CR %q and StatefulSet %q to be fully terminated", pgOld.Name, statefulSetName) + + err := wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { + // Check for CR + _, crErr := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(pgOld.Namespace).Get(ctx, pgOld.Name, metav1.GetOptions{}) + crGone := errors.IsNotFound(crErr) + if crErr != nil && !crGone { + c.logger.Errorf("restore-in-place: Error while waiting for CR deletion: %v", crErr) + return false, crErr // A real error occurred + } + + // Check for StatefulSet + _, stsErr := c.KubeClient.StatefulSets(pgOld.Namespace).Get(ctx, statefulSetName, metav1.GetOptions{}) + stsGone := errors.IsNotFound(stsErr) + if stsErr != nil && !stsGone { + c.logger.Errorf("restore-in-place: Error while waiting for StatefulSet deletion: %v", stsErr) + return false, stsErr // A real error occurred + } + + if crGone && stsGone { + c.logger.Debugf("restore-in-place: Both old CR and StatefulSet are fully terminated.") + return true, nil + } + + if !crGone { + c.logger.Infof("restore-in-place: still waiting for postgresql CR %q to be deleted", pgOld.Name) + } + if !stsGone { + c.logger.Infof("restore-in-place: still waiting for StatefulSet %q to be deleted", statefulSetName) + } + + return false, nil // Not done yet, continue polling. + }) + + if err != nil { + return fmt.Errorf("error while waiting for old resources to be deleted: %v", err) + } + + c.logger.Debugf("restore-in-place: Finished waiting for old resource deletion.") + return nil +} + +// handlerRestoreInPlace is to handle the resotre in place, it does few operatons +// 1. Verifies the parameters required for restoring in place +// 2. Removes the old CR if it exists, wait for it, if not present check the err that it is a k8sNotfound error and continue +// 3. Wait for the successful removal of statefulsets, if not present check the err that it is a k8sNotfound error and continue +// 4. Create a new CR with the latest details, while keeping few metadata about restore +func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { + c.logger.Infof("restore-in-place: starting restore-in-place for cluster %q", pgNew.Name) + + if err := c.validateRestoreInPlace(pgOld, pgNew); err != nil { + c.logger.Errorf("restore-in-place: validation failed for cluster %q: %v", pgNew.Name, err) + return + } + + newPgSpec := pgNew.DeepCopy() + delete(newPgSpec.Annotations, "postgres-operator.zalando.org/action") + newPgSpec.ResourceVersion = "" + newPgSpec.UID = "" + c.logger.Debugf("restore-in-place: newPgSpec after removing annotation: %+v", newPgSpec) + + statefulSetName := pgOld.Name // Capture StatefulSet name, it's the same as the cluster name + + // Initiate CR deletion first, as requested + c.logger.Debugf("restore-in-place: Attempting direct API deletion of postgresql CR %q", pgOld.Name) + err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + c.logger.Errorf("restore-in-place: could not delete postgresql CR via API: %v", err) + return // Stop if there's a critical error deleting the CR + } + c.logger.Debugf("restore-in-place: Direct API deletion of postgresql CR for %q initiated (or CR was already not found).", pgOld.Name) + + // Then, initiate cluster sub-resource deletion if the cluster object is in memory + clusterName := util.NameFromMeta(pgOld.ObjectMeta) + c.clustersMu.RLock() + cl, clusterFound := c.clusters[clusterName] + c.clustersMu.RUnlock() + + if clusterFound { + c.logger.Debugf("restore-in-place: Cluster object found in memory. Calling cluster.Delete() for %q", clusterName) + if cl.Annotations == nil { + cl.Annotations = make(map[string]string) + } + cl.Annotations["postgres-operator.zalando.org/action"] = "restore-in-place" // User requested to keep this + if err := cl.Delete(); err != nil { + // Log error but continue to ensure we wait for termination + c.logger.Errorf("restore-in-place: error during cluster.Delete() for %q: %v. Proceeding to wait for termination.", clusterName, err) + } + c.logger.Debugf("restore-in-place: cluster.Delete() returned for %q", clusterName) + } else { + c.logger.Warningf("restore-in-place: cluster %q not found in controller's map. Relying on CR deletion to trigger cleanup.", clusterName) + } + + if err := c.waitForOldResourcesTermination(pgOld, statefulSetName); err != nil { + c.logger.Errorf("restore-in-place: %v", err) + return + } + + // Create a new CR with the latest details + c.logger.Debugf("restore-in-place: Creating new postgresql CR %q", newPgSpec.Name) + _, err = c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(newPgSpec.Namespace).Create(context.TODO(), newPgSpec, metav1.CreateOptions{}) + if err != nil { + c.logger.Errorf("restore-in-place: could not create postgresql CR for restore-in-place: %v", err) + // If the new CR cannot be created, the user needs to intervene. + return + } + c.logger.Debugf("restore-in-place: New postgresql CR %q created", newPgSpec.Name) + + c.logger.Infof("restore-in-place: for cluster %q triggered successfully", pgNew.Name) +} + /* Ensures the pod service account and role bindings exists in a namespace before a PG cluster is created there so that a user does not have to deploy From a36b962e7dc99c6ceddbc4240ca503133a52ab7c Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Wed, 31 Dec 2025 00:00:49 +0530 Subject: [PATCH 2/9] Added async restore in place --- pkg/cluster/cluster.go | 52 ++++++++- pkg/cluster/resources.go | 16 +++ pkg/controller/postgresql.go | 221 +++++++++++++++++++++-------------- 3 files changed, 201 insertions(+), 88 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2e9952e34..20102df47 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -3,6 +3,7 @@ package cluster // Postgres CustomResourceDefinition object i.e. Spilo import ( + "context" "database/sql" "encoding/json" "fmt" @@ -431,6 +432,30 @@ func (c *Cluster) Create() (err error) { c.logger.Errorf("could not list resources: %v", err) } + + if err := c.updatePITRResources(); err != nil { + return fmt.Errorf("could not update pitr resources: %v", err) + } + return nil +} + +// update the label to finished for PITR for the given config map +func (c *Cluster) updatePITRResources() error { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name) + cmNamespace := c.Namespace + patchPayload := map[string]any{ + "metadata": map[string]any{ + "labels": map[string]string{ + PitrStateLabelKey: PitrStateLabelValueFinished, + }, + }, + } + + data, _ := json.Marshal(patchPayload) + if _, err := c.KubeClient.ConfigMaps(cmNamespace).Patch(context.TODO(), cmName, types.MergePatchType, data, metav1.PatchOptions{}, ""); err != nil { + c.logger.Errorf("restore-in-place: error updating config map label to final state: %v", err) + return err + } return nil } @@ -1200,6 +1225,15 @@ func syncResources(a, b *v1.ResourceRequirements) bool { return false } +const ( + PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state" + PitrStateLabelValuePending = "pending" + PitrStateLabelValueInProgress = "in-progress" + PitrStateLabelValueFinished = "finished" + PitrConfigMapNameTemplate = "pitr-state-%s" + PitrSpecDataKey = "spec" +) + // Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). // The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes // DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint @@ -1211,7 +1245,23 @@ func (c *Cluster) Delete() error { defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") - isRestoreInPlace := c.Annotations["postgres-operator.zalando.org/action"] == "restore-in-place" + + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name) + + isRestoreInPlace := false + cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + c.logger.Debugf("restore-in-place: Error while fetching config map: %s before deletion", cmName) + } + + if cm != nil { + if val, ok := cm.Labels[PitrStateLabelKey]; ok { + if val == PitrStateLabelValuePending { + isRestoreInPlace = true + } + } + } + c.logger.Debugf("restore-in-place: Deleting the cluster, verifying whether resotore-in-place is true or not: %+v\n", isRestoreInPlace) if err := c.deleteStreams(); err != nil { anyErrors = true diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index ed3eb3d75..1925733de 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -10,6 +10,7 @@ import ( batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -301,6 +302,21 @@ func (c *Cluster) createService(role PostgresRole) (*v1.Service, error) { c.setProcessName("creating %v service", role) serviceSpec := c.generateService(role, &c.Spec) + + // check if the service already exists in case of pitr + svc, err := c.KubeClient.Services(serviceSpec.Namespace).Get(context.TODO(), serviceSpec.Name, metav1.GetOptions{}) + + // service already exists + if err == nil { + c.Services[role] = svc + return svc, nil + } + + if !errors.IsNotFound(err) { + return nil, err + } + + // at last create the service service, err := c.KubeClient.Services(serviceSpec.Namespace).Create(context.TODO(), serviceSpec, metav1.CreateOptions{}) if err != nil { return nil, err diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 6fb2e341b..23025a888 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -16,7 +16,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -27,6 +26,11 @@ import ( "github.com/zalando/postgres-operator/pkg/util/ringlog" ) +const ( + restoreAnnotationKey = "postgres-operator.zalando.org/action" + restoreAnnotationValue = "restore-in-place" +) + func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() ticker := time.NewTicker(c.opConfig.ResyncPeriod) @@ -37,6 +41,9 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { if err := c.clusterListAndSync(); err != nil { c.logger.Errorf("could not list clusters: %v", err) } + if err := c.processPendingRestores(); err != nil { + c.logger.Errorf("could not process pending restores: %v", err) + } case <-stopCh: return } @@ -542,7 +549,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { pgNew := c.postgresqlCheck(cur) if pgOld != nil && pgNew != nil { - if pgNew.Annotations["postgres-operator.zalando.org/action"] == "restore-in-place" { + if pgNew.Annotations[restoreAnnotationKey] == restoreAnnotationValue { c.logger.Debugf("restore-in-place: postgresqlUpdate called for cluster %q", pgNew.Name) c.handlerRestoreInPlace(pgOld, pgNew) return @@ -604,117 +611,157 @@ func (c *Controller) validateRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) err return nil } -// waitForOldResourcesTermination waits until the postgresql CR and its StatefulSet are terminated -func (c *Controller) waitForOldResourcesTermination(pgOld *acidv1.Postgresql, statefulSetName string) error { - c.logger.Debugf("restore-in-place: Waiting for old CR %q and StatefulSet %q to be fully terminated", pgOld.Name, statefulSetName) - - err := wait.PollUntilContextTimeout(context.TODO(), 2*time.Second, 5*time.Minute, true, func(ctx context.Context) (bool, error) { - // Check for CR - _, crErr := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(pgOld.Namespace).Get(ctx, pgOld.Name, metav1.GetOptions{}) - crGone := errors.IsNotFound(crErr) - if crErr != nil && !crGone { - c.logger.Errorf("restore-in-place: Error while waiting for CR deletion: %v", crErr) - return false, crErr // A real error occurred - } - - // Check for StatefulSet - _, stsErr := c.KubeClient.StatefulSets(pgOld.Namespace).Get(ctx, statefulSetName, metav1.GetOptions{}) - stsGone := errors.IsNotFound(stsErr) - if stsErr != nil && !stsGone { - c.logger.Errorf("restore-in-place: Error while waiting for StatefulSet deletion: %v", stsErr) - return false, stsErr // A real error occurred - } - - if crGone && stsGone { - c.logger.Debugf("restore-in-place: Both old CR and StatefulSet are fully terminated.") - return true, nil - } - - if !crGone { - c.logger.Infof("restore-in-place: still waiting for postgresql CR %q to be deleted", pgOld.Name) - } - if !stsGone { - c.logger.Infof("restore-in-place: still waiting for StatefulSet %q to be deleted", statefulSetName) - } - - return false, nil // Not done yet, continue polling. - }) - - if err != nil { - return fmt.Errorf("error while waiting for old resources to be deleted: %v", err) - } - c.logger.Debugf("restore-in-place: Finished waiting for old resource deletion.") - return nil -} -// handlerRestoreInPlace is to handle the resotre in place, it does few operatons -// 1. Verifies the parameters required for restoring in place -// 2. Removes the old CR if it exists, wait for it, if not present check the err that it is a k8sNotfound error and continue -// 3. Wait for the successful removal of statefulsets, if not present check the err that it is a k8sNotfound error and continue -// 4. Create a new CR with the latest details, while keeping few metadata about restore +// handlerRestoreInPlace starts an asynchronous point-in-time-restore. +// It creates a ConfigMap to store the state and then deletes the old Postgresql CR. func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { - c.logger.Infof("restore-in-place: starting restore-in-place for cluster %q", pgNew.Name) + c.logger.Infof("restore-in-place: starting asynchronous restore-in-place for cluster %q", pgNew.Name) if err := c.validateRestoreInPlace(pgOld, pgNew); err != nil { c.logger.Errorf("restore-in-place: validation failed for cluster %q: %v", pgNew.Name, err) return } + // Prepare new spec for the restored cluster + c.logger.Debugf("restore-in-place: preparing new postgresql spec for cluster %q", pgNew.Name) newPgSpec := pgNew.DeepCopy() - delete(newPgSpec.Annotations, "postgres-operator.zalando.org/action") + delete(newPgSpec.Annotations, restoreAnnotationKey) newPgSpec.ResourceVersion = "" newPgSpec.UID = "" - c.logger.Debugf("restore-in-place: newPgSpec after removing annotation: %+v", newPgSpec) - statefulSetName := pgOld.Name // Capture StatefulSet name, it's the same as the cluster name + specData, err := json.Marshal(newPgSpec) + if err != nil { + c.logger.Errorf("restore-in-place: could not marshal new postgresql spec for cluster %q: %v", newPgSpec.Name, err) + return + } + + // Create or update ConfigMap to store restore state + cmName := fmt.Sprintf(cluster.PitrConfigMapNameTemplate, newPgSpec.Name) + c.logger.Debugf("restore-in-place: creating or updating state ConfigMap %q for cluster %q", cmName, newPgSpec.Name) + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: newPgSpec.Namespace, + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: string(specData), + }, + } + + // Check if ConfigMap already exists + _, err = c.KubeClient.ConfigMaps(cm.Namespace).Get(context.TODO(), cm.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + _, err = c.KubeClient.ConfigMaps(cm.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + } + } else { + // If for some reason CM exists, update it + _, err = c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), cm, metav1.UpdateOptions{}) + } + + if err != nil { + c.logger.Errorf("restore-in-place: could not create or update state ConfigMap %q for cluster %q: %v", cmName, newPgSpec.Name, err) + return + } + c.logger.Infof("restore-in-place: state ConfigMap %q created for cluster %q", cmName, newPgSpec.Name) - // Initiate CR deletion first, as requested - c.logger.Debugf("restore-in-place: Attempting direct API deletion of postgresql CR %q", pgOld.Name) - err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{}) + // Delete old postgresql CR to trigger cleanup and UID change + c.logger.Debugf("restore-in-place: attempting deletion of postgresql CR %q", pgOld.Name) + err = c.KubeClient.Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { - c.logger.Errorf("restore-in-place: could not delete postgresql CR via API: %v", err) - return // Stop if there's a critical error deleting the CR + c.logger.Errorf("restore-in-place: could not delete postgresql CR %q: %v", pgOld.Name, err) + // Consider deleting the ConfigMap here to allow a retry + return + } + c.logger.Infof("restore-in-place: initiated deletion of postgresql CR %q", pgOld.Name) +} + +// processPendingRestores handles the re-creation part of the asynchronous point-in-time-restore. +// It is called periodically and checks for ConfigMaps that signal a pending or in-progress restore. +func (c *Controller) processPendingRestores() error { + c.logger.Debug("restore-in-place: checking for pending restores") + + namespace := c.opConfig.WatchedNamespace + if namespace == "" { + namespace = v1.NamespaceAll } - c.logger.Debugf("restore-in-place: Direct API deletion of postgresql CR for %q initiated (or CR was already not found).", pgOld.Name) - // Then, initiate cluster sub-resource deletion if the cluster object is in memory - clusterName := util.NameFromMeta(pgOld.ObjectMeta) - c.clustersMu.RLock() - cl, clusterFound := c.clusters[clusterName] - c.clustersMu.RUnlock() + // Process "pending" restores: wait for deletion and move to "in-progress" + pendingOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValuePending)} + pendingCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), pendingOpts) + if err != nil { + return fmt.Errorf("restore-in-place: could not list pending restore ConfigMaps: %v", err) + } + if len(pendingCmList.Items) > 0 { + c.logger.Debugf("restore-in-place: found %d pending restore(s) to process", len(pendingCmList.Items)) + } - if clusterFound { - c.logger.Debugf("restore-in-place: Cluster object found in memory. Calling cluster.Delete() for %q", clusterName) - if cl.Annotations == nil { - cl.Annotations = make(map[string]string) + for _, cm := range pendingCmList.Items { + c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name) + clusterName := strings.TrimPrefix(cm.Name, "pitr-state-") + + _, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{}) + if err == nil { + c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName) + continue } - cl.Annotations["postgres-operator.zalando.org/action"] = "restore-in-place" // User requested to keep this - if err := cl.Delete(); err != nil { - // Log error but continue to ensure we wait for termination - c.logger.Errorf("restore-in-place: error during cluster.Delete() for %q: %v. Proceeding to wait for termination.", clusterName, err) + if !errors.IsNotFound(err) { + c.logger.Errorf("restore-in-place: could not check for existence of Postgresql CR %q: %v", clusterName, err) + continue } - c.logger.Debugf("restore-in-place: cluster.Delete() returned for %q", clusterName) - } else { - c.logger.Warningf("restore-in-place: cluster %q not found in controller's map. Relying on CR deletion to trigger cleanup.", clusterName) - } - if err := c.waitForOldResourcesTermination(pgOld, statefulSetName); err != nil { - c.logger.Errorf("restore-in-place: %v", err) - return + c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName) + cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress + if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil { + c.logger.Errorf("restore-in-place: could not update ConfigMap %q to 'in-progress': %v", cm.Name, err) + } } - // Create a new CR with the latest details - c.logger.Debugf("restore-in-place: Creating new postgresql CR %q", newPgSpec.Name) - _, err = c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(newPgSpec.Namespace).Create(context.TODO(), newPgSpec, metav1.CreateOptions{}) + // Process "in-progress" restores: re-create the CR and clean up + inProgressOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValueInProgress)} + inProgressCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), inProgressOpts) if err != nil { - c.logger.Errorf("restore-in-place: could not create postgresql CR for restore-in-place: %v", err) - // If the new CR cannot be created, the user needs to intervene. - return + return fmt.Errorf("restore-in-place: could not list in-progress restore ConfigMaps: %v", err) + } + if len(inProgressCmList.Items) > 0 { + c.logger.Debugf("restore-in-place: found %d in-progress restore(s) to process", len(inProgressCmList.Items)) + } + + for _, cm := range inProgressCmList.Items { + c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name) + + c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name) + var newPgSpec acidv1.Postgresql + if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil { + c.logger.Errorf("restore-in-place: could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err) + continue + } + + c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name) + _, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name) + // fallthrough to delete + } else { + c.logger.Errorf("restore-in-place: could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err) + continue // Retry on next cycle + } + } else { + c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name) + } + + // c.logger.Debugf("restore-in-place: deleting successfully used restore ConfigMap %q", cm.Name) + // if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}); err != nil { + // c.logger.Errorf("restore-in-place: could not delete state ConfigMap %q: %v", cm.Name, err) + // } } - c.logger.Debugf("restore-in-place: New postgresql CR %q created", newPgSpec.Name) - c.logger.Infof("restore-in-place: for cluster %q triggered successfully", pgNew.Name) + return nil } /* From fd1ff0c5cee39549e171965c04ca45f6df25baf1 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Wed, 31 Dec 2025 13:35:05 +0530 Subject: [PATCH 3/9] Refactored restore-in-place async flow, not removing the config map for now --- pkg/cluster/cluster.go | 48 ++++++++-------- pkg/controller/postgresql.go | 104 ++++++++++++++++++++--------------- 2 files changed, 85 insertions(+), 67 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 20102df47..c14a0fc6e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -433,27 +433,27 @@ func (c *Cluster) Create() (err error) { } - if err := c.updatePITRResources(); err != nil { + if err := c.updatePITRResources(PitrStateLabelValueFinished); err != nil { return fmt.Errorf("could not update pitr resources: %v", err) } return nil } // update the label to finished for PITR for the given config map -func (c *Cluster) updatePITRResources() error { +func (c *Cluster) updatePITRResources(state string) error { cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name) cmNamespace := c.Namespace patchPayload := map[string]any{ "metadata": map[string]any{ "labels": map[string]string{ - PitrStateLabelKey: PitrStateLabelValueFinished, + PitrStateLabelKey: state, }, }, } data, _ := json.Marshal(patchPayload) if _, err := c.KubeClient.ConfigMaps(cmNamespace).Patch(context.TODO(), cmName, types.MergePatchType, data, metav1.PatchOptions{}, ""); err != nil { - c.logger.Errorf("restore-in-place: error updating config map label to final state: %v", err) + c.logger.Errorf("restore-in-place: error updating config map label to state: %v", err) return err } return nil @@ -1226,42 +1226,44 @@ func syncResources(a, b *v1.ResourceRequirements) bool { } const ( - PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state" + PitrStateLabelKey = "postgres-operator.zalando.org/pitr-state" PitrStateLabelValuePending = "pending" PitrStateLabelValueInProgress = "in-progress" - PitrStateLabelValueFinished = "finished" - PitrConfigMapNameTemplate = "pitr-state-%s" - PitrSpecDataKey = "spec" + PitrStateLabelValueFinished = "finished" + PitrConfigMapNameTemplate = "pitr-state-%s" + PitrSpecDataKey = "spec" ) -// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). -// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes -// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint -// before the pods, it will be re-created by the current master pod and will remain, obstructing the -// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last. -func (c *Cluster) Delete() error { - var anyErrors = false - c.mu.Lock() - defer c.mu.Unlock() - c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") - - +func (c *Cluster) isRestoreInPlace() bool { cmName := fmt.Sprintf(PitrConfigMapNameTemplate, c.Name) - - isRestoreInPlace := false cm, err := c.KubeClient.ConfigMaps(c.Namespace).Get(context.TODO(), cmName, metav1.GetOptions{}) if err != nil { c.logger.Debugf("restore-in-place: Error while fetching config map: %s before deletion", cmName) + return false } if cm != nil { if val, ok := cm.Labels[PitrStateLabelKey]; ok { if val == PitrStateLabelValuePending { - isRestoreInPlace = true + return true } } } + return false +} + +// Delete deletes the cluster and cleans up all objects associated with it (including statefulsets). +// The deletion order here is somewhat significant, because Patroni, when running with the Kubernetes +// DCS, reuses the master's endpoint to store the leader related metadata. If we remove the endpoint +// before the pods, it will be re-created by the current master pod and will remain, obstructing the +// creation of the new cluster with the same name. Therefore, the endpoints should be deleted last. +func (c *Cluster) Delete() error { + var anyErrors = false + c.mu.Lock() + defer c.mu.Unlock() + c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of cluster resources") + isRestoreInPlace := c.isRestoreInPlace() c.logger.Debugf("restore-in-place: Deleting the cluster, verifying whether resotore-in-place is true or not: %+v\n", isRestoreInPlace) if err := c.deleteStreams(); err != nil { anyErrors = true diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 23025a888..a84656a98 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -680,17 +680,25 @@ func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { c.logger.Infof("restore-in-place: initiated deletion of postgresql CR %q", pgOld.Name) } -// processPendingRestores handles the re-creation part of the asynchronous point-in-time-restore. -// It is called periodically and checks for ConfigMaps that signal a pending or in-progress restore. func (c *Controller) processPendingRestores() error { c.logger.Debug("restore-in-place: checking for pending restores") - namespace := c.opConfig.WatchedNamespace if namespace == "" { namespace = v1.NamespaceAll } - // Process "pending" restores: wait for deletion and move to "in-progress" + if err := c.processPendingCm(namespace); err != nil { + return err + } + + if err := c.processInProgressCm(namespace); err != nil { + return err + } + + return nil +} + +func (c *Controller) processPendingCm(namespace string) error { pendingOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValuePending)} pendingCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), pendingOpts) if err != nil { @@ -701,27 +709,35 @@ func (c *Controller) processPendingRestores() error { } for _, cm := range pendingCmList.Items { - c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name) - clusterName := strings.TrimPrefix(cm.Name, "pitr-state-") - - _, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{}) - if err == nil { - c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName) - continue - } - if !errors.IsNotFound(err) { - c.logger.Errorf("restore-in-place: could not check for existence of Postgresql CR %q: %v", clusterName, err) - continue + if err := c.processSinglePendingCm(cm); err != nil { + c.logger.Errorf("restore-in-place: could not process pending restore for config map %s: %v", cm.Name, err) } + } + return nil +} - c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName) - cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress - if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil { - c.logger.Errorf("restore-in-place: could not update ConfigMap %q to 'in-progress': %v", cm.Name, err) - } +func (c *Controller) processSinglePendingCm(cm v1.ConfigMap) error { + c.logger.Debugf("restore-in-place: processing pending ConfigMap %q", cm.Name) + clusterName := strings.TrimPrefix(cm.Name, "pitr-state-") + + _, err := c.KubeClient.Postgresqls(cm.Namespace).Get(context.TODO(), clusterName, metav1.GetOptions{}) + if err == nil { + c.logger.Infof("restore-in-place: pending restore for cluster %q is waiting for old Postgresql CR to be deleted", clusterName) + return nil } + if !errors.IsNotFound(err) { + return fmt.Errorf("could not check for existence of Postgresql CR %q: %v", clusterName, err) + } + + c.logger.Infof("restore-in-place: old Postgresql CR %q is deleted, moving restore to 'in-progress'", clusterName) + cm.Labels[cluster.PitrStateLabelKey] = cluster.PitrStateLabelValueInProgress + if _, err := c.KubeClient.ConfigMaps(cm.Namespace).Update(context.TODO(), &cm, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("could not update ConfigMap %q to 'in-progress': %v", cm.Name, err) + } + return nil +} - // Process "in-progress" restores: re-create the CR and clean up +func (c *Controller) processInProgressCm(namespace string) error { inProgressOpts := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", cluster.PitrStateLabelKey, cluster.PitrStateLabelValueInProgress)} inProgressCmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), inProgressOpts) if err != nil { @@ -732,33 +748,33 @@ func (c *Controller) processPendingRestores() error { } for _, cm := range inProgressCmList.Items { - c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name) - - c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name) - var newPgSpec acidv1.Postgresql - if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil { - c.logger.Errorf("restore-in-place: could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err) - continue + if err := c.processSingleInProgressCm(cm); err != nil { + c.logger.Errorf("restore-in-place: could not process in-progress restore for config map %s: %v", cm.Name, err) } + } + return nil +} - c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name) - _, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{}) - if err != nil { - if errors.IsAlreadyExists(err) { - c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name) - // fallthrough to delete - } else { - c.logger.Errorf("restore-in-place: could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err) - continue // Retry on next cycle - } +func (c *Controller) processSingleInProgressCm(cm v1.ConfigMap) error { + c.logger.Infof("restore-in-place: processing in-progress restore for ConfigMap %q", cm.Name) + + c.logger.Debugf("restore-in-place: unmarshalling spec from ConfigMap %q", cm.Name) + var newPgSpec acidv1.Postgresql + if err := json.Unmarshal([]byte(cm.Data[cluster.PitrSpecDataKey]), &newPgSpec); err != nil { + return fmt.Errorf("could not unmarshal postgresql spec from ConfigMap %q: %v", cm.Name, err) + } + + c.logger.Debugf("restore-in-place: creating new Postgresql CR %q from ConfigMap spec", newPgSpec.Name) + _, err := c.KubeClient.Postgresqls(newPgSpec.Namespace).Create(context.TODO(), &newPgSpec, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name) + // fallthrough to delete } else { - c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name) + return fmt.Errorf("could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err) } - - // c.logger.Debugf("restore-in-place: deleting successfully used restore ConfigMap %q", cm.Name) - // if err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}); err != nil { - // c.logger.Errorf("restore-in-place: could not delete state ConfigMap %q: %v", cm.Name, err) - // } + } else { + c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name) } return nil From 4a45bc188a875b4cdddc9001873052ee83519e80 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Fri, 9 Jan 2026 00:21:43 +0530 Subject: [PATCH 4/9] Added configured cleanup function to remove the restored configmaps --- docs/reference/operator_parameters.md | 3 ++ manifests/operatorconfiguration.crd.yaml | 3 ++ ...gresql-operator-default-configuration.yaml | 1 + .../v1/operator_configuration_type.go | 1 + pkg/controller/postgresql.go | 49 +++++++++++++++++-- pkg/util/config/config.go | 1 + 6 files changed, 53 insertions(+), 5 deletions(-) diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 7e7cbeaf0..de27acb4d 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -171,6 +171,9 @@ Those are top-level keys, containing both leaf keys and groups. * **repair_period** period between consecutive repair requests. The default is `5m`. +* **pitr_backup_retention** + retention time for PITR backup config maps. The operator will clean up config maps older than the configured retention. The value is a duration string, e.g. "168h". The default is `168h`. + * **set_memory_request_to_limit** Set `memory_request` to `memory_limit` for all Postgres clusters (the default value is also increased but configured `max_memory_request` can not be diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 6556b333c..0fb224743 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -113,6 +113,9 @@ spec: repair_period: type: string default: "5m" + pitr_backup_retention: + type: string + default: "168h" set_memory_request_to_limit: type: boolean default: false diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 389d9325a..8d03b0df9 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -19,6 +19,7 @@ configuration: min_instances: -1 resync_period: 30m repair_period: 5m + pitr_backup_retention: 168h # set_memory_request_to_limit: false # sidecars: # - image: image:123 diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index cd11b9173..944bd862d 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -266,6 +266,7 @@ type OperatorConfigurationData struct { Workers uint32 `json:"workers,omitempty"` ResyncPeriod Duration `json:"resync_period,omitempty"` RepairPeriod Duration `json:"repair_period,omitempty"` + PitrBackupRetention Duration `json:"pitr_backup_retention,omitempty"` SetMemoryRequestToLimit bool `json:"set_memory_request_to_limit,omitempty"` ShmVolume *bool `json:"enable_shm_volume,omitempty"` SidecarImages map[string]string `json:"sidecar_docker_images,omitempty"` // deprecated in favour of SidecarContainers diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index a84656a98..222d7a970 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -13,8 +13,8 @@ import ( "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -27,8 +27,8 @@ import ( ) const ( - restoreAnnotationKey = "postgres-operator.zalando.org/action" - restoreAnnotationValue = "restore-in-place" + restoreAnnotationKey = "postgres-operator.zalando.org/action" + restoreAnnotationValue = "restore-in-place" ) func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { @@ -44,6 +44,9 @@ func (c *Controller) clusterResync(stopCh <-chan struct{}, wg *sync.WaitGroup) { if err := c.processPendingRestores(); err != nil { c.logger.Errorf("could not process pending restores: %v", err) } + if err := c.cleanupRestores(); err != nil { + c.logger.Errorf("could not cleanup restores: %v", err) + } case <-stopCh: return } @@ -611,8 +614,6 @@ func (c *Controller) validateRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) err return nil } - - // handlerRestoreInPlace starts an asynchronous point-in-time-restore. // It creates a ConfigMap to store the state and then deletes the old Postgresql CR. func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { @@ -780,6 +781,44 @@ func (c *Controller) processSingleInProgressCm(cm v1.ConfigMap) error { return nil } +func (c *Controller) cleanupRestores() error { + c.logger.Debug("cleaning up old restore config maps") + namespace := c.opConfig.WatchedNamespace + if namespace == "" { + namespace = v1.NamespaceAll + } + + cmList, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("could not list restore ConfigMaps: %v", err) + } + + retention := c.opConfig.PitrBackupRetention + if retention <= 0 { + c.logger.Debugf("Pitr backup retention is not set, skipping cleanup") + return nil + } + c.logger.Debugf("Pitr backup retention is %s", retention.String()) + + for _, cm := range cmList.Items { + if !strings.HasPrefix(cm.Name, "pitr-") { + continue + } + + age := time.Since(cm.CreationTimestamp.Time) + if age > retention { + c.logger.Infof("deleting old restore config map %q, age: %s", cm.Name, age.String()) + err := c.KubeClient.ConfigMaps(cm.Namespace).Delete(context.TODO(), cm.Name, metav1.DeleteOptions{}) + if err != nil { + c.logger.Errorf("could not delete config map %q: %v", cm.Name, err) + // continue with next cm + } + } + } + + return nil +} + /* Ensures the pod service account and role bindings exists in a namespace before a PG cluster is created there so that a user does not have to deploy diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index 9fadd6a5b..8af00f87e 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -18,6 +18,7 @@ type CRD struct { ReadyWaitTimeout time.Duration `name:"ready_wait_timeout" default:"30s"` ResyncPeriod time.Duration `name:"resync_period" default:"30m"` RepairPeriod time.Duration `name:"repair_period" default:"5m"` + PitrBackupRetention time.Duration `name:"pitr_backup_retention" default:"168h"` EnableCRDRegistration *bool `name:"enable_crd_registration" default:"true"` EnableCRDValidation *bool `name:"enable_crd_validation" default:"true"` CRDCategories []string `name:"crd_categories" default:"all"` From 3accf8adbba07c1afe55d63d4c7e2668458a9cba Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Mon, 12 Jan 2026 17:48:17 +0530 Subject: [PATCH 5/9] Fixed configmap prefix in the cleanup function, few typos and logs --- pkg/controller/postgresql.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/controller/postgresql.go b/pkg/controller/postgresql.go index 222d7a970..ebcbb8b03 100644 --- a/pkg/controller/postgresql.go +++ b/pkg/controller/postgresql.go @@ -554,7 +554,7 @@ func (c *Controller) postgresqlUpdate(prev, cur interface{}) { if pgNew.Annotations[restoreAnnotationKey] == restoreAnnotationValue { c.logger.Debugf("restore-in-place: postgresqlUpdate called for cluster %q", pgNew.Name) - c.handlerRestoreInPlace(pgOld, pgNew) + c.handleRestoreInPlace(pgOld, pgNew) return } @@ -614,9 +614,9 @@ func (c *Controller) validateRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) err return nil } -// handlerRestoreInPlace starts an asynchronous point-in-time-restore. +// handleRestoreInPlace starts an asynchronous point-in-time-restore. // It creates a ConfigMap to store the state and then deletes the old Postgresql CR. -func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { +func (c *Controller) handleRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { c.logger.Infof("restore-in-place: starting asynchronous restore-in-place for cluster %q", pgNew.Name) if err := c.validateRestoreInPlace(pgOld, pgNew); err != nil { @@ -675,7 +675,6 @@ func (c *Controller) handlerRestoreInPlace(pgOld, pgNew *acidv1.Postgresql) { err = c.KubeClient.Postgresqls(pgOld.Namespace).Delete(context.TODO(), pgOld.Name, metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { c.logger.Errorf("restore-in-place: could not delete postgresql CR %q: %v", pgOld.Name, err) - // Consider deleting the ConfigMap here to allow a retry return } c.logger.Infof("restore-in-place: initiated deletion of postgresql CR %q", pgOld.Name) @@ -745,7 +744,7 @@ func (c *Controller) processInProgressCm(namespace string) error { return fmt.Errorf("restore-in-place: could not list in-progress restore ConfigMaps: %v", err) } if len(inProgressCmList.Items) > 0 { - c.logger.Debugf("restore-in-place: found %d in-progress restore(s) to process", len(inProgressCmList.Items)) + c.logger.Infof("restore-in-place: found %d in-progress restore(s) to process", len(inProgressCmList.Items)) } for _, cm := range inProgressCmList.Items { @@ -770,14 +769,13 @@ func (c *Controller) processSingleInProgressCm(cm v1.ConfigMap) error { if err != nil { if errors.IsAlreadyExists(err) { c.logger.Infof("restore-in-place: Postgresql CR %q already exists, cleaning up restore ConfigMap", newPgSpec.Name) - // fallthrough to delete + return nil } else { return fmt.Errorf("could not re-create Postgresql CR %q for restore: %v", newPgSpec.Name, err) } - } else { - c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name) } - + // If err is nil (creation successful) + c.logger.Infof("restore-in-place: successfully re-created Postgresql CR %q to complete restore", newPgSpec.Name) return nil } @@ -801,7 +799,7 @@ func (c *Controller) cleanupRestores() error { c.logger.Debugf("Pitr backup retention is %s", retention.String()) for _, cm := range cmList.Items { - if !strings.HasPrefix(cm.Name, "pitr-") { + if !strings.HasPrefix(cm.Name, "pitr-state-") { continue } From 51dfb9d9cd7407e9d641889111e7d80ee5c35bc8 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Tue, 13 Jan 2026 13:43:49 +0530 Subject: [PATCH 6/9] Added unit tests for postgresql.go for newly added pitr functions --- pkg/controller/postgresql_test.go | 1278 +++++++++++++++++++++++++++++ 1 file changed, 1278 insertions(+) diff --git a/pkg/controller/postgresql_test.go b/pkg/controller/postgresql_test.go index 71d23a264..b60949559 100644 --- a/pkg/controller/postgresql_test.go +++ b/pkg/controller/postgresql_test.go @@ -1,14 +1,23 @@ package controller import ( + "context" "fmt" "reflect" + "strings" "testing" "time" acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/cluster" + fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" "github.com/zalando/postgres-operator/pkg/spec" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" ) var ( @@ -177,3 +186,1272 @@ func TestMeetsClusterDeleteAnnotations(t *testing.T) { } } } + +func TestCleanupRestores(t *testing.T) { + namespace := "default" + tests := []struct { + name string + configMaps []*v1.ConfigMap + retention time.Duration + remainingConfigMaps int + err error + }{ + { + "no config maps to delete", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: namespace, + CreationTimestamp: metav1.Now(), + }, + }, + }, + 24 * time.Hour, + 1, + nil, + }, + { + "one config map to delete", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: namespace, + CreationTimestamp: metav1.Now(), + }, + }, + }, + 24 * time.Hour, + 1, + nil, + }, + { + "do not delete non-pitr config maps", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: namespace, + CreationTimestamp: metav1.Now(), + }, + }, + }, + 24 * time.Hour, + 2, + nil, + }, + { + "zero retention, do nothing", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + }, + 0, + 1, + nil, + }, + { + "list config maps fails", + []*v1.ConfigMap{}, + 24 * time.Hour, + 0, + fmt.Errorf("synthetic list error"), + }, + { + "delete config map fails", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-to-delete", + Namespace: namespace, + CreationTimestamp: metav1.NewTime(time.Now().Add(-48 * time.Hour)), + }, + }, + }, + 24 * time.Hour, + 1, + nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + c.opConfig.PitrBackupRetention = tt.retention + c.opConfig.WatchedNamespace = namespace + + client := fake.NewSimpleClientset() + + if tt.name == "list config maps fails" { + client.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic list error") + }) + } + if tt.name == "delete config map fails" { + client.PrependReactor("delete", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic delete error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: client.CoreV1(), + } + + for _, cm := range tt.configMaps { + _, err := c.KubeClient.ConfigMaps(namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.cleanupRestores() + + if err != nil { + if tt.err == nil { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err.Error()) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.err) + } + } else if tt.err != nil { + t.Fatalf("expected error %q, but got none", tt.err) + } + + if tt.name != "list config maps fails" { + cms, err := c.KubeClient.ConfigMaps(namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Could not list config maps: %v", err) + } + if len(cms.Items) != tt.remainingConfigMaps { + t.Errorf("expected %d config maps, got %d", tt.remainingConfigMaps, len(cms.Items)) + } + } + }) + } +} + +func TestProcessSingleInProgressCm(t *testing.T) { + tests := []struct { + name string + cm v1.ConfigMap + err string + expectedPgName string + expectedPgNamespace string + }{ + { + "json marshal error", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "invalid json", + }, + }, + "could not unmarshal postgresql spec from ConfigMap \"pitr-state-test-1\"", + "", + "", + }, + { + "successful create", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"apiVersion\":\"acid.zalan.do/v1\",\"kind\":\"postgresql\",\"metadata\":{\"name\":\"acid-minimal-cluster\",\"namespace\":\"po\"},\"spec\":{\"teamId\":\"acid\",\"volume\":{\"size\":\"1Gi\"},\"numberOfInstances\":1,\"users\":{\"zalando\":[\"superuser\",\"createdb\"]},\"databases\":{\"foo\":\"zalando\"},\"postgresql\":{\"version\":\"16\"},\"enableLogicalBackup\":false,\"patroni\":{\"pg_hba\":[\"local all all trust\",\"host all all 0.0.0.0/0 md5\",\"host replication all 0.0.0.0/0 md5\"]}}}", + }, + }, + "", + "acid-minimal-cluster", + "po", + }, + { + "postgresql resource already exists", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"apiVersion\":\"acid.zalan.do/v1\",\"kind\":\"postgresql\",\"metadata\":{\"name\":\"acid-minimal-cluster\",\"namespace\":\"po\"},\"spec\":{\"teamId\":\"acid\",\"volume\":{\"size\":\"1Gi\"},\"numberOfInstances\":1,\"users\":{\"zalando\":[\"superuser\",\"createdb\"]},\"databases\":{\"foo\":\"zalando\"},\"postgresql\":{\"version\":\"16\"}}}", + }, + }, + "", + "acid-minimal-cluster", + "po", + }, + { + "spec with missing teamId", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "test", + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"apiVersion\":\"acid.zalan.do/v1\",\"kind\":\"postgresql\",\"metadata\":{\"name\":\"acid-spec-without-teamid\",\"namespace\":\"po\"},\"spec\":{\"volume\":{\"size\":\"1Gi\"},\"numberOfInstances\":1,\"users\":{\"zalando\":[\"superuser\",\"createdb\"]},\"databases\":{\"foo\":\"zalando\"}}}", + }, + }, + "", + "acid-spec-without-teamid", + "po", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.name == "postgresql resource already exists" { + pg := &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: tt.expectedPgName, + Namespace: tt.expectedPgNamespace, + }, + Spec: acidv1.PostgresSpec{ + TeamID: "some-other-team", + }, + } + _, err := acidClientSet.AcidV1().Postgresqls(tt.expectedPgNamespace).Create(context.TODO(), pg, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not pre-create postgresql resource for test: %v", err) + } + } + + c.KubeClient = k8sutil.KubernetesClient{ + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + err := c.processSingleInProgressCm(tt.cm) + + if err != nil { + if tt.err == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err) { + t.Fatalf("errors does not match, actual err: %v, expected err: %v", err, tt.err) + } + } else if tt.err != "" { + t.Fatalf("expected error containing %q, but got no error", tt.err) + } + + if tt.err == "" && tt.expectedPgName != "" { + pg, err := acidClientSet.AcidV1().Postgresqls(tt.expectedPgNamespace).Get(context.TODO(), tt.expectedPgName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("could not get postgresql resource: %v", err) + } + + switch tt.name { + case "successful create": + if pg.Spec.TeamID != "acid" { + t.Errorf("expected teamId 'acid', got '%s'", pg.Spec.TeamID) + } + case "postgresql resource already exists": + if pg.Spec.TeamID != "some-other-team" { + t.Errorf("expected teamId to be 'some-other-team', but it was overwritten to '%s'", pg.Spec.TeamID) + } + case "spec with missing teamId": + if pg.Spec.TeamID != "" { + t.Errorf("expected teamId to be empty, got '%s'", pg.Spec.TeamID) + } + } + } + }) + } +} + +func TestProcessInProgressCm(t *testing.T) { + tests := []struct { + name string + namespace string + cms []*v1.ConfigMap + err string + expectedPgCreations int + }{ + { + "process one of two in-progress cms", + "po", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"metadata\":{\"name\":\"acid-test-cluster-1\"}}", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-3", + Namespace: "po", + }, + }, + }, + "", + 1, + }, + { + "list fails", + "po", + []*v1.ConfigMap{}, + "synthetic list error", + 0, + }, + { + "single cm process fails", + "po", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-good", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"metadata\":{\"name\":\"acid-good-cluster\"}}", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-bad", + Namespace: "po", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "invalid-json", + }, + }, + }, + "", + 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.name == "list fails" { + clientSet.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic list error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + for _, cm := range tt.cms { + _, err := c.KubeClient.ConfigMaps(tt.namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.processInProgressCm(tt.namespace) + if err != nil { + if tt.err == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err) { + t.Fatalf("errors does not match, actual err: %v, expected err: %v", err, tt.err) + } + } else if tt.err != "" { + t.Fatalf("expected error containing %q, but got no error", tt.err) + } + + var creations int + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "create" && action.GetResource().Resource == "postgresqls" { + creations++ + } + } + + if creations != tt.expectedPgCreations { + t.Errorf("expected %d postgresql resources to be created, but found %d", tt.expectedPgCreations, creations) + } + }) + } +} + +func TestProcessSinglePendingCm(t *testing.T) { + tests := []struct { + name string + cm v1.ConfigMap + pgExists bool + getPgFails bool + updateCmFails bool + expectedErr string + expectedLabel string + }{ + { + "postgresql cr still exists", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + true, + false, + false, + "", + cluster.PitrStateLabelValuePending, + }, + { + "get postgresql cr fails", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + false, + true, + false, + "could not check for existence of Postgresql CR", + cluster.PitrStateLabelValuePending, + }, + { + "postgresql cr does not exist, cm update succeeds", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + false, + false, + false, + "", + cluster.PitrStateLabelValueInProgress, + }, + { + "postgresql cr does not exist, cm update fails", + v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-cluster", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + false, + false, + true, + "could not update ConfigMap", + cluster.PitrStateLabelValuePending, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset(&tt.cm) + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.pgExists { + pg := &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + }, + } + _, err := acidClientSet.AcidV1().Postgresqls("default").Create(context.TODO(), pg, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create postgresql resource: %v", err) + } + } + + if tt.getPgFails { + acidClientSet.PrependReactor("get", "postgresqls", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic get error") + }) + } + + if tt.updateCmFails { + clientSet.PrependReactor("update", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic update error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + err := c.processSinglePendingCm(tt.cm) + + if err != nil { + if tt.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.expectedErr) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.expectedErr) + } + } else if tt.expectedErr != "" { + t.Fatalf("expected error containing %q, but got no error", tt.expectedErr) + } + + if !tt.updateCmFails { + updatedCm, err := clientSet.CoreV1().ConfigMaps("default").Get(context.TODO(), "pitr-state-test-cluster", metav1.GetOptions{}) + if err != nil { + t.Fatalf("could not get configmap: %v", err) + } + if updatedCm.Labels[cluster.PitrStateLabelKey] != tt.expectedLabel { + t.Errorf("expected label %q but got %q", tt.expectedLabel, updatedCm.Labels[cluster.PitrStateLabelKey]) + } + } + }) + } +} + +func TestProcessPendingCm(t *testing.T) { + tests := []struct { + name string + namespace string + cms []*v1.ConfigMap + listFails bool + err string + expectedProcessedPending int + }{ + { + "process one of two pending cms", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-3", + Namespace: "default", + }, + }, + }, + false, + "", + 1, + }, + { + "no pending cms to process", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + }, + }, + false, + "", + 0, + }, + { + "list fails", + "default", + []*v1.ConfigMap{}, + true, + "could not list pending restore ConfigMaps", + 0, + }, + { + "process multiple pending cms", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + }, + false, + "", + 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.listFails { + clientSet.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic list error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + for _, cm := range tt.cms { + _, err := c.KubeClient.ConfigMaps(tt.namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.processPendingCm(tt.namespace) + if err != nil { + if tt.err == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.err) { + t.Fatalf("errors does not match, actual err: %v, expected err: %v", err, tt.err) + } + } else if tt.err != "" { + t.Fatalf("expected error containing %q, but got no error", tt.err) + } + + if !tt.listFails { + var pendingProcessed int + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "get" && action.GetResource().Resource == "postgresqls" { + pendingProcessed++ + } + } + + if pendingProcessed != tt.expectedProcessedPending { + t.Errorf("expected %d pending cms to be processed, but found %d", tt.expectedProcessedPending, pendingProcessed) + } + } + }) + } +} + +func TestProcessPendingRestores(t *testing.T) { + tests := []struct { + name string + watchedNamespace string + cms []*v1.ConfigMap + pendingCmListFails bool + inProgressCmListFails bool + expectedErr string + expectedPendingProcessed int + expectedInProgressCreate int + }{ + { + "process both pending and in-progress cms with watched namespace", + "default", + []*v1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-1", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValuePending, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pitr-state-test-2", + Namespace: "default", + Labels: map[string]string{ + cluster.PitrStateLabelKey: cluster.PitrStateLabelValueInProgress, + }, + }, + Data: map[string]string{ + cluster.PitrSpecDataKey: "{\"metadata\":{\"name\":\"acid-test-cluster\"}}", + }, + }, + }, + false, + false, + "", + 1, + 1, + }, + { + "use all namespaces when watched namespace is empty", + "", + []*v1.ConfigMap{}, + false, + false, + "", + 0, + 0, + }, + { + "processPendingCm fails", + "default", + []*v1.ConfigMap{}, + true, + false, + "could not list pending restore ConfigMaps", + 0, + 0, + }, + { + "processInProgressCm fails", + "default", + []*v1.ConfigMap{}, + false, + true, + "could not list in-progress restore ConfigMaps", + 0, + 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + c.opConfig.WatchedNamespace = tt.watchedNamespace + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + listCallCount := 0 + if tt.pendingCmListFails || tt.inProgressCmListFails { + clientSet.PrependReactor("list", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + listCallCount++ + if tt.pendingCmListFails && listCallCount == 1 { + return true, nil, fmt.Errorf("synthetic list error") + } + if tt.inProgressCmListFails && listCallCount == 2 { + return true, nil, fmt.Errorf("synthetic list error") + } + return false, nil, nil + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + namespace := tt.watchedNamespace + if namespace == "" { + namespace = "default" + } + for _, cm := range tt.cms { + _, err := c.KubeClient.ConfigMaps(namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Could not create config map: %v", err) + } + } + + err := c.processPendingRestores() + if err != nil { + if tt.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.expectedErr) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.expectedErr) + } + } else if tt.expectedErr != "" { + t.Fatalf("expected error containing %q, but got no error", tt.expectedErr) + } + + if tt.expectedErr == "" { + var pendingProcessed int + var inProgressCreate int + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "get" && action.GetResource().Resource == "postgresqls" { + pendingProcessed++ + } + if action.GetVerb() == "create" && action.GetResource().Resource == "postgresqls" { + inProgressCreate++ + } + } + + if pendingProcessed != tt.expectedPendingProcessed { + t.Errorf("expected %d pending cms to be processed, but found %d", tt.expectedPendingProcessed, pendingProcessed) + } + if inProgressCreate != tt.expectedInProgressCreate { + t.Errorf("expected %d in-progress cms to create postgresql resources, but found %d", tt.expectedInProgressCreate, inProgressCreate) + } + } + }) + } +} + +func TestValidateRestoreInPlace(t *testing.T) { + validTimestamp := time.Now().Add(-1 * time.Hour).Format(time.RFC3339) + futureTimestamp := time.Now().Add(1 * time.Hour).Format(time.RFC3339) + + tests := []struct { + name string + pgOld *acidv1.Postgresql + pgNew *acidv1.Postgresql + expectedErr string + }{ + { + "missing clone section", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{}, + }, + "'clone' section is missing in the manifest", + }, + { + "cluster name mismatch", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "different-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + "clone cluster name \"different-cluster\" does not match the current cluster name \"acid-test-cluster\"", + }, + { + "invalid timestamp format", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: "invalid-timestamp", + }, + }, + }, + "could not parse clone timestamp", + }, + { + "future timestamp", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: futureTimestamp, + }, + }, + }, + "clone timestamp", + }, + { + "valid restore parameters", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + + err := c.validateRestoreInPlace(tt.pgOld, tt.pgNew) + + if err != nil { + if tt.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), tt.expectedErr) { + t.Fatalf("error mismatch: got %q, expected to contain %q", err, tt.expectedErr) + } + } else if tt.expectedErr != "" { + t.Fatalf("expected error containing %q, but got no error", tt.expectedErr) + } + }) + } +} + +func TestHandleRestoreInPlace(t *testing.T) { + validTimestamp := time.Now().Add(-1 * time.Hour).Format(time.RFC3339) + + tests := []struct { + name string + pgOld *acidv1.Postgresql + pgNew *acidv1.Postgresql + cmExists bool + cmCreateFails bool + cmUpdateFails bool + pgDeleteFails bool + expectCmCreateAttempt bool + expectCmUpdateAttempt bool + expectPgDeleteAttempt bool + }{ + { + "validation fails - missing clone section", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{}, + }, + false, + false, + false, + false, + false, + false, + false, + }, + { + "successful restore - cm created and pg deleted", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + false, + false, + false, + false, + true, + false, + true, + }, + { + "cm already exists - cm updated and pg deleted", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + true, + false, + false, + false, + false, + true, + true, + }, + { + "cm create fails - no pg delete", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + false, + true, + false, + false, + true, + false, + false, + }, + { + "cm update fails - no pg delete", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + true, + false, + true, + false, + false, + true, + false, + }, + { + "pg delete fails", + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + }, + &acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acid-test-cluster", + Namespace: "default", + }, + Spec: acidv1.PostgresSpec{ + Clone: &acidv1.CloneDescription{ + ClusterName: "acid-test-cluster", + EndTimestamp: validTimestamp, + }, + }, + }, + false, + false, + false, + true, + true, + false, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := newPostgresqlTestController() + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + // Pre-create postgresql resource + _, err := acidClientSet.AcidV1().Postgresqls(tt.pgOld.Namespace).Create(context.TODO(), tt.pgOld, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create postgresql resource: %v", err) + } + + if tt.cmExists { + cmName := fmt.Sprintf("pitr-state-%s", tt.pgNew.Name) + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: tt.pgNew.Namespace, + }, + } + _, err := clientSet.CoreV1().ConfigMaps(tt.pgNew.Namespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create configmap: %v", err) + } + } + + if tt.cmCreateFails { + clientSet.PrependReactor("create", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic create error") + }) + } + + if tt.cmUpdateFails { + clientSet.PrependReactor("update", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic update error") + }) + } + + if tt.pgDeleteFails { + acidClientSet.PrependReactor("delete", "postgresqls", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic delete error") + }) + } + + c.KubeClient = k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + // Clear actions from setup phase before calling the handler + clientSet.ClearActions() + acidClientSet.ClearActions() + + c.handleRestoreInPlace(tt.pgOld, tt.pgNew) + + // Check ConfigMap actions (only actions from the handler itself) + var cmCreateAttempt, cmUpdateAttempt bool + for _, action := range clientSet.Actions() { + if action.GetVerb() == "create" && action.GetResource().Resource == "configmaps" { + cmCreateAttempt = true + } + if action.GetVerb() == "update" && action.GetResource().Resource == "configmaps" { + cmUpdateAttempt = true + } + } + + // Check Postgresql actions + var pgDeleteAttempt bool + for _, action := range acidClientSet.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "postgresqls" { + pgDeleteAttempt = true + } + } + + if cmCreateAttempt != tt.expectCmCreateAttempt { + t.Errorf("expected cmCreateAttempt=%v, got %v", tt.expectCmCreateAttempt, cmCreateAttempt) + } + if cmUpdateAttempt != tt.expectCmUpdateAttempt { + t.Errorf("expected cmUpdateAttempt=%v, got %v", tt.expectCmUpdateAttempt, cmUpdateAttempt) + } + if pgDeleteAttempt != tt.expectPgDeleteAttempt { + t.Errorf("expected pgDeleteAttempt=%v, got %v", tt.expectPgDeleteAttempt, pgDeleteAttempt) + } + }) + } +} From 901b9752dc85e67c054ccc61b2a0a349af4ff4a0 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Fri, 9 Jan 2026 01:01:29 +0530 Subject: [PATCH 7/9] Added documentation for pitr (automated) --- docs/reference/cluster_manifest.md | 4 +++ docs/reference/operator_parameters.md | 2 +- docs/user.md | 42 ++++++++++++++++++++++++++- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index ab0353202..4521f3ca7 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -48,6 +48,10 @@ Those parameters are grouped under the `metadata` top-level key. Labels that are set here but not listed as `inherited_labels` in the operator parameters are ignored. +* **annotations** + A map of annotations to add to the `postgresql` resource. The operator reacts to certain annotations, for instance, to trigger specific actions. + * `postgres-operator.zalando.org/action: restore-in-place`: When this annotation is present with this value, the operator will trigger an automated in-place restore of the cluster. This process requires a valid `clone` section to be defined in the manifest with a target `timestamp`. See the [user guide](../user.md#automated-restore-in-place-point-in-time-recovery) for more details. + ## Top-level parameters These parameters are grouped directly under the `spec` key in the manifest. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index de27acb4d..7fb28aca0 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -172,7 +172,7 @@ Those are top-level keys, containing both leaf keys and groups. period between consecutive repair requests. The default is `5m`. * **pitr_backup_retention** - retention time for PITR backup config maps. The operator will clean up config maps older than the configured retention. The value is a duration string, e.g. "168h". The default is `168h`. + retention time for PITR (Point-In-Time-Recovery) state ConfigMaps. The operator will clean up ConfigMaps older than the configured retention. The value is a [duration string](https://pkg.go.dev/time#ParseDuration), e.g. "168h" (which is 7 days), "7d", "24h". The default is `168h`. * **set_memory_request_to_limit** Set `memory_request` to `memory_limit` for all Postgres clusters (the default diff --git a/docs/user.md b/docs/user.md index c1a7c7d45..b069ef195 100644 --- a/docs/user.md +++ b/docs/user.md @@ -107,7 +107,7 @@ kind: postgresql metadata: name: acid-minimal-cluster spec: - [...] + [...] postgresql: version: "17" parameters: @@ -891,6 +891,45 @@ original UID, making it possible retry restoring. However, it is probably better to create a temporary clone for experimenting or finding out to which point you should restore. +## Automated Restore in place (Point-in-Time Recovery) + +The operator supports automated in-place restores, allowing you to restore a database to a specific point in time without changing connection strings on the application side. This feature orchestrates the deletion of the current cluster and the creation of a new one from a backup. + +:warning: This is a destructive operation. The existing cluster's StatefulSet and pods will be deleted as part of the process. Ensure you have a reliable backup strategy and have tested the restore process in a non-production environment. + +To trigger an in-place restore, you need to add a special annotation and a `clone` section to your `postgresql` manifest: + +* **Annotate the manifest**: Add the `postgres-operator.zalando.org/action: restore-in-place` annotation to the `metadata` section. +* **Specify the recovery target**: Add a `clone` section to the `spec`, providing the `cluster` name and the `timestamp` for the point-in-time recovery. The `cluster` name **must** be the same as the `metadata.name` of the cluster you are restoring. The `timestamp` must be in RFC 3339 format and point to a time in the past for which you have WAL archives. + +Here is an example manifest snippet: + +```yaml +apiVersion: "acid.zalan.do/v1" +kind: postgresql +metadata: + name: acid-minimal-cluster + annotations: + postgres-operator.zalando.org/action: restore-in-place +spec: + # ... other cluster parameters + clone: + cluster: "acid-minimal-cluster" # Must match metadata.name + uid: "" + timestamp: "2022-04-01T10:11:12+00:00" + # ... other cluster parameters +``` + +When you apply this manifest, the operator will: +* See the `restore-in-place` annotation and begin the restore workflow. +* Store the restore request and the new cluster definition in a temporary `ConfigMap`. +* Delete the existing `postgresql` custom resource, which triggers the deletion of the associated StatefulSet and pods. +* Wait for the old cluster to be fully terminated. +* Create a new `postgresql` resource with a new UID but the same name. +* The new cluster will bootstrap from the latest base backup prior to the given `timestamp` and replay WAL files to recover to the specified point in time. + +The process is asynchronous. You can monitor the operator logs and the state of the `postgresql` resource to follow the progress. Once the new cluster is up and running, your applications can reconnect. + ## Setting up a standby cluster Standby cluster is a [Patroni feature](https://github.com/zalando/patroni/blob/master/docs/replica_bootstrap.rst#standby-cluster) @@ -1291,3 +1330,4 @@ As of now, the operator does not sync the pooler deployment automatically which means that changes in the pod template are not caught. You need to toggle `enableConnectionPooler` to set environment variables, volumes, secret mounts and securityContext required for TLS support in the pooler pod. + From 5afd33206f577969eb3c58373849c828bdadfa74 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Tue, 13 Jan 2026 14:04:10 +0530 Subject: [PATCH 8/9] Fixed unit test 'TestCreate' and added other test --- pkg/cluster/cluster.go | 6 +- pkg/cluster/cluster_test.go | 120 ++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 1 deletion(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index c14a0fc6e..f1a9ab2f4 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -33,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" @@ -432,7 +433,6 @@ func (c *Cluster) Create() (err error) { c.logger.Errorf("could not list resources: %v", err) } - if err := c.updatePITRResources(PitrStateLabelValueFinished); err != nil { return fmt.Errorf("could not update pitr resources: %v", err) } @@ -453,6 +453,10 @@ func (c *Cluster) updatePITRResources(state string) error { data, _ := json.Marshal(patchPayload) if _, err := c.KubeClient.ConfigMaps(cmNamespace).Patch(context.TODO(), cmName, types.MergePatchType, data, metav1.PatchOptions{}, ""); err != nil { + // If ConfigMap doesn't exist, this is a normal cluster creation (not a restore-in-place) + if k8serrors.IsNotFound(err) { + return nil + } c.logger.Errorf("restore-in-place: error updating config map label to state: %v", err) return err } diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 25f61db98..21301834e 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -24,7 +24,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" ) @@ -94,6 +96,7 @@ func TestCreate(t *testing.T) { clusterNamespace := "test" client := k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), DeploymentsGetter: clientSet.AppsV1(), CronJobsGetter: clientSet.BatchV1(), EndpointsGetter: clientSet.CoreV1(), @@ -2202,3 +2205,120 @@ func TestGetSwitchoverSchedule(t *testing.T) { }) } } + +func TestUpdatePITRResources(t *testing.T) { + clusterName := "test-cluster" + clusterNamespace := "default" + + tests := []struct { + name string + state string + cmExists bool + patchFails bool + expectedErr bool + expectedLabel string + }{ + { + "successful patch - update label to finished", + PitrStateLabelValueFinished, + true, + false, + false, + PitrStateLabelValueFinished, + }, + { + "successful patch - update label to in-progress", + PitrStateLabelValueInProgress, + true, + false, + false, + PitrStateLabelValueInProgress, + }, + { + "config map does not exist - no error", + PitrStateLabelValueFinished, + false, + false, + false, + "", + }, + { + "patch fails with non-NotFound error", + PitrStateLabelValueFinished, + true, + true, + true, + "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + clientSet := fake.NewSimpleClientset() + acidClientSet := fakeacidv1.NewSimpleClientset() + + if tt.cmExists { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, clusterName) + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cmName, + Namespace: clusterNamespace, + Labels: map[string]string{ + PitrStateLabelKey: PitrStateLabelValuePending, + }, + }, + } + _, err := clientSet.CoreV1().ConfigMaps(clusterNamespace).Create(context.TODO(), cm, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("could not create configmap: %v", err) + } + } + + if tt.patchFails { + clientSet.PrependReactor("patch", "configmaps", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, fmt.Errorf("synthetic patch error") + }) + } + + client := k8sutil.KubernetesClient{ + ConfigMapsGetter: clientSet.CoreV1(), + PostgresqlsGetter: acidClientSet.AcidV1(), + } + + pg := acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: clusterNamespace, + }, + } + + cluster := New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + }, + }, client, pg, logger, eventRecorder) + + err := cluster.updatePITRResources(tt.state) + + if err != nil { + if !tt.expectedErr { + t.Fatalf("unexpected error: %v", err) + } + } else if tt.expectedErr { + t.Fatalf("expected error, but got none") + } + + if tt.cmExists && !tt.patchFails && tt.expectedLabel != "" { + cmName := fmt.Sprintf(PitrConfigMapNameTemplate, clusterName) + updatedCm, err := clientSet.CoreV1().ConfigMaps(clusterNamespace).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("could not get configmap: %v", err) + } + if updatedCm.Labels[PitrStateLabelKey] != tt.expectedLabel { + t.Errorf("expected label %q but got %q", tt.expectedLabel, updatedCm.Labels[PitrStateLabelKey]) + } + } + }) + } +} From 93479592cf46402ce96a1e278be3924f62373d51 Mon Sep 17 00:00:00 2001 From: Aditya Kumar Date: Fri, 16 Jan 2026 15:36:42 +0530 Subject: [PATCH 9/9] Fixed documentation, the time.duration for pitr_backup_retention does not accepts 'd' --- docs/reference/operator_parameters.md | 5 ++++- docs/user.md | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 7fb28aca0..8a12cf08a 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -9,6 +9,7 @@ configuration. Variable names are underscore-separated words. ### ConfigMaps-based + The configuration is supplied in a key-value configmap, defined by the `CONFIG_MAP_NAME` environment variable. Non-scalar values, i.e. lists or maps, are encoded in the value strings using @@ -25,6 +26,7 @@ operator CRD, all the CRD defaults are provided in the [operator's default configuration manifest](https://github.com/zalando/postgres-operator/blob/master/manifests/postgresql-operator-default-configuration.yaml) ### CRD-based configuration + The configuration is stored in a custom YAML manifest. The manifest is an instance of the custom resource definition (CRD) called `OperatorConfiguration`. The operator registers this CRD during the @@ -172,7 +174,7 @@ Those are top-level keys, containing both leaf keys and groups. period between consecutive repair requests. The default is `5m`. * **pitr_backup_retention** - retention time for PITR (Point-In-Time-Recovery) state ConfigMaps. The operator will clean up ConfigMaps older than the configured retention. The value is a [duration string](https://pkg.go.dev/time#ParseDuration), e.g. "168h" (which is 7 days), "7d", "24h". The default is `168h`. + retention time for PITR (Point-In-Time-Recovery) state ConfigMaps. The operator will clean up ConfigMaps older than the configured retention. The value is a [duration string](https://pkg.go.dev/time#ParseDuration), e.g. "168h" (which is 7 days), "24h". The default is `168h`. * **set_memory_request_to_limit** Set `memory_request` to `memory_limit` for all Postgres clusters (the default @@ -921,6 +923,7 @@ key. ```yaml teams_api_role_configuration: "log_statement:all,search_path:'data,public'" ``` + The default is `"log_statement:all"` * **enable_team_superuser** diff --git a/docs/user.md b/docs/user.md index b069ef195..86f35d8f0 100644 --- a/docs/user.md +++ b/docs/user.md @@ -107,7 +107,7 @@ kind: postgresql metadata: name: acid-minimal-cluster spec: - [...] + [...] postgresql: version: "17" parameters: