diff --git a/controlplane/provisioner/controller.go b/controlplane/provisioner/controller.go index 4bb72e42..083921c8 100644 --- a/controlplane/provisioner/controller.go +++ b/controlplane/provisioner/controller.go @@ -581,6 +581,23 @@ func addIcebergStatusUpdates(updates map[string]interface{}, w *configstore.Mana func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.ManagedWarehouse) { log := slog.With("org", w.OrgID, "phase", "deleting") + // Resolve the Lakekeeper inputs BEFORE deleting the Duckling CR. The + // inputs include the metadata-store admin DSN derived from the CR's + // status; once the CR is gone the resolver can't reconstruct it, and + // we'd lose the only way to drop the per-tenant lakekeeper_ + // Postgres database. Best-effort resolution: when the inputs aren't + // available (resolver unwired, CR never reconciled, dev/orbstack + // without env config), the subsequent DeleteForOrg call falls back to + // k8s-only teardown. + var lkInputs ProvisioningInputs + if c.lakekeeperProvisioner != nil && c.lakekeeperInputs != nil { + if in, err := c.lakekeeperInputs(ctx, w); err != nil { + log.Debug("Lakekeeper inputs unavailable at delete time; skipping PG cleanup.", "error", err) + } else { + lkInputs = in + } + } + log.Info("Deleting Duckling CR.") if err := c.duckling.Delete(ctx, w.OrgID); err != nil { // Only proceed if the CR is already gone (NotFound). For other errors @@ -593,15 +610,17 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag } // Tear down the per-org Lakekeeper instance the control plane provisioned - // out-of-band (CR + Secret + ServiceAccount in the lakekeeper namespace). - // The Crossplane Duckling composition doesn't own these, so without an - // explicit teardown they leak after the warehouse is gone. Idempotent and + // out-of-band (CR + Secret + ServiceAccount in the lakekeeper namespace, + // and — when this provisioner created them — the lakekeeper_ + // Postgres database and role on the metadata store). The Crossplane + // Duckling composition doesn't own the k8s pieces, so without an explicit + // teardown they leak after the warehouse is gone. Idempotent and // NotFound-tolerant — a clean no-op for ducklings that never enabled // Iceberg. Skipped silently when the provisioner isn't wired (mirrors // reconcileLakekeeper). On error we return without marking the warehouse // deleted so the next reconcile pass retries. if c.lakekeeperProvisioner != nil { - if err := c.lakekeeperProvisioner.DeleteForOrg(ctx, w.OrgID); err != nil { + if err := c.lakekeeperProvisioner.DeleteForOrg(ctx, w.OrgID, lkInputs); err != nil { log.Warn("Failed to tear down Lakekeeper resources, will retry.", "error", err) return } diff --git a/controlplane/provisioner/lakekeeper_provisioner.go b/controlplane/provisioner/lakekeeper_provisioner.go index 7449d151..6e56772b 100644 --- a/controlplane/provisioner/lakekeeper_provisioner.go +++ b/controlplane/provisioner/lakekeeper_provisioner.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "errors" "fmt" + "log/slog" "github.com/posthog/duckgres/controlplane/configstore" "github.com/posthog/duckgres/server/lakekeeperbroker" @@ -341,12 +342,26 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore // explicitly). Idempotent and NotFound-tolerant, so it's a safe no-op for orgs // that never had Iceberg enabled. // -// What it does NOT touch: the lakekeeper_ Postgres database/role. On -// cnpg-shard it's provider-sql-managed and torn down with the Duckling CR; on -// aurora the whole cluster is torn down with the Duckling CR; on an external -// (shared) metadata store it currently persists — dropping a database on a -// shared RDS is destructive and out of scope for this teardown. -func (p *LakekeeperProvisioner) DeleteForOrg(ctx context.Context, orgID string) error { +// When inputs carry an AdminDSN (i.e. !PGPreProvisioned), additionally drops +// the lakekeeper_ Postgres database and role on the metadata store. +// This is what makes recreating a duckling with the same orgID work: the +// duckgres lakekeeper provisioner rotates the k8s Secret's encryption-key on +// every provision, and leaving the old encrypted rows behind causes +// Lakekeeper to return SecretFetchError ("Wrong key or corrupt data") on +// every CREATE TABLE in the next lifetime. For the cnpg-shard case +// (PGPreProvisioned) the corresponding cleanup happens via the Crossplane +// composition's [Delete] managementPolicy on the cnpg-tenant-role and +// cnpg-tenant-database resources — see posthog/charts PR for the parallel +// fix. For the aurora case the whole Aurora cluster is destroyed by +// Crossplane on Duckling CR delete, so the DROP DATABASE here is redundant +// but harmless (and the connection may simply refuse — tolerated below). +// +// Best-effort: PG drop failures are logged and swallowed so a transient +// network issue or a half-deleted Aurora cluster doesn't permanently block +// the duckling teardown. The k8s teardown failures (which actually leave +// resources stranded) still surface as errors to the controller's retry +// loop. +func (p *LakekeeperProvisioner) DeleteForOrg(ctx context.Context, orgID string, in ProvisioningInputs) error { if err := p.k8s.DeleteCR(ctx, orgID); err != nil { return err } @@ -356,6 +371,24 @@ func (p *LakekeeperProvisioner) DeleteForOrg(ctx context.Context, orgID string) if err := p.k8s.DeleteServiceAccount(ctx, orgID); err != nil { return err } + + // PG cleanup applies only when this provisioner actually created the + // DB/role (external + dev/orbstack paths). cnpg-shard ownership lives + // in the Crossplane composition. + if in.PGPreProvisioned || in.AdminDSN == "" { + return nil + } + dbName := lakekeeperDBName(orgID) + // DROP DATABASE first so DROP ROLE doesn't trip over the role's + // ownership of the database. Both best-effort. + if err := DropDatabase(ctx, in.AdminDSN, dbName); err != nil { + slog.Warn("Lakekeeper PG database drop failed; continuing teardown.", + "org", orgID, "database", dbName, "error", err) + } + if err := DropRole(ctx, in.AdminDSN, dbName); err != nil { + slog.Warn("Lakekeeper PG role drop failed; continuing teardown.", + "org", orgID, "role", dbName, "error", err) + } return nil } diff --git a/controlplane/provisioner/lakekeeper_provisioner_test.go b/controlplane/provisioner/lakekeeper_provisioner_test.go index 0279a959..27842d32 100644 --- a/controlplane/provisioner/lakekeeper_provisioner_test.go +++ b/controlplane/provisioner/lakekeeper_provisioner_test.go @@ -497,3 +497,37 @@ func dropDatabase(t *testing.T, dsn, name string) { dropDSN := dsn // pgx accepts the same DSN; CREATE/DROP from any current DB. cleanupDB(t, dropDSN, name) } + +// TestDeleteForOrg_SkipsPGCleanupForCnpgShard guards the contract that the +// cnpg-shard path leaves PG cleanup to the Crossplane composition. The +// composition's [Delete] managementPolicy on the cnpg-tenant-role and +// cnpg-tenant-database resources owns role+DB teardown there; the +// duckgres provisioner doesn't have the AdminDSN to do it itself +// anyway. The k8s teardown still runs. +func TestDeleteForOrg_SkipsPGCleanupForCnpgShard(t *testing.T) { + c, _, _ := newFakeLakekeeperClient() + p := NewLakekeeperProvisioner(newFakeStore(), c) + // AdminDSN empty + PGPreProvisioned=true: DropDatabase/DropRole must + // not be invoked. With a bogus DSN they'd surface as connection + // errors; a nil-DSN attempt would panic deep in pgx. The fact that + // this returns nil with no DSN configured is the assertion. + err := p.DeleteForOrg(context.Background(), "acme", ProvisioningInputs{ + PGPreProvisioned: true, + }) + if err != nil { + t.Fatalf("DeleteForOrg with PGPreProvisioned should succeed without DSN, got: %v", err) + } +} + +// TestDeleteForOrg_SkipsPGCleanupWithoutAdminDSN covers the dev/orbstack +// path where the lakekeeper provisioner never had an AdminDSN in the +// first place (no env, no Duckling CR). PG cleanup must be skipped +// rather than failing the teardown. +func TestDeleteForOrg_SkipsPGCleanupWithoutAdminDSN(t *testing.T) { + c, _, _ := newFakeLakekeeperClient() + p := NewLakekeeperProvisioner(newFakeStore(), c) + err := p.DeleteForOrg(context.Background(), "acme", ProvisioningInputs{}) + if err != nil { + t.Fatalf("DeleteForOrg with empty inputs should succeed, got: %v", err) + } +} diff --git a/controlplane/provisioner/postgres_admin.go b/controlplane/provisioner/postgres_admin.go index 4a1b3569..260b2b4c 100644 --- a/controlplane/provisioner/postgres_admin.go +++ b/controlplane/provisioner/postgres_admin.go @@ -137,6 +137,78 @@ func EnsureRole(ctx context.Context, adminDSN, role, password, ownedDB string) e return nil } +// DropDatabase removes dbName on the Postgres server addressed by adminDSN. +// Idempotent: returns nil when the database is already absent (3D000). Forces +// disconnection of any active sessions on the target DB so DROP DATABASE +// can't hang waiting for clients to drain — necessary at duckling teardown +// time because the per-tenant Lakekeeper pod may still be alive when the +// drop runs (the k8s teardown is fire-and-forget and the operator's +// reconciliation lag means connections linger). +// +// Caller must connect via a privileged DSN against a different database +// than dbName (the admin DSN's path is OK to be `postgres`). +func DropDatabase(ctx context.Context, adminDSN, dbName string) error { + if !isSafePGIdent(dbName) { + return fmt.Errorf("drop database: unsafe identifier %q", dbName) + } + db, err := sql.Open("pgx", adminDSN) + if err != nil { + return fmt.Errorf("open admin connection: %w", err) + } + defer func() { _ = db.Close() }() + + // FORCE terminates active backends as part of DROP DATABASE (Postgres + // 13+). Without it a single lingering Lakekeeper connection blocks the + // drop until backoff. + if _, err := db.ExecContext(ctx, "DROP DATABASE IF EXISTS "+quoteIdent(dbName)+" WITH (FORCE)"); err != nil { + if isInvalidCatalogName(err) { + return nil + } + return fmt.Errorf("drop database %s: %w", dbName, err) + } + return nil +} + +// DropRole removes role on the Postgres server addressed by adminDSN. +// Idempotent: returns nil when the role is already absent. Best-effort +// REASSIGN/DROP OWNED first so any object the role owns (e.g. tables +// created in databases other than the one this caller manages) doesn't +// block DROP ROLE — at duckling teardown the role's database has already +// been dropped, so REASSIGN typically has nothing to do. +// +// Caller must connect via a privileged DSN. +func DropRole(ctx context.Context, adminDSN, role string) error { + if !isSafePGIdent(role) { + return fmt.Errorf("drop role: unsafe role name %q", role) + } + db, err := sql.Open("pgx", adminDSN) + if err != nil { + return fmt.Errorf("open admin connection: %w", err) + } + defer func() { _ = db.Close() }() + + // REVOKE everything the role was granted on the maintenance database; + // otherwise DROP ROLE fails with "role cannot be dropped because some + // objects depend on it". DROP OWNED handles cluster-wide ownerships. + if _, err := db.ExecContext(ctx, "DROP OWNED BY "+quoteIdent(role)+" CASCADE"); err != nil { + // 42704 = undefined_object — role doesn't exist. Benign. + if isUndefinedObject(err) { + return nil + } + // Any other failure: log via wrap and continue to the DROP ROLE + // attempt — DROP OWNED can fail on cross-DB dependencies we don't + // have the visibility to clean up here, but DROP ROLE itself may + // still succeed if nothing is left. + // We deliberately swallow this and try DROP ROLE; if there's a + // real dependency it'll surface there. + _ = err + } + if _, err := db.ExecContext(ctx, "DROP ROLE IF EXISTS "+quoteIdent(role)); err != nil { + return fmt.Errorf("drop role %s: %w", role, err) + } + return nil +} + // reDSN rewrites the dbname component of a Postgres URL-style DSN. Used to // connect to a specific database with the same admin credentials. func reDSN(dsn, dbName string) string { @@ -250,3 +322,22 @@ func isDuplicateDatabase(err error) bool { var s sqlStater return errors.As(err, &s) && s.SQLState() == "42P04" } + +// isInvalidCatalogName reports whether err is Postgres 3D000 (database does +// not exist) — what DROP DATABASE returns when the target is already gone. +// Without the IF EXISTS clause this would matter; we keep the check anyway +// because IF EXISTS is silent on missing-DB and an actual no-such-database +// error can also surface from the connection attempt itself. +func isInvalidCatalogName(err error) bool { + type sqlStater interface{ SQLState() string } + var s sqlStater + return errors.As(err, &s) && s.SQLState() == "3D000" +} + +// isUndefinedObject reports whether err is Postgres 42704 (undefined_object) +// — what DROP OWNED returns when the role doesn't exist. Benign. +func isUndefinedObject(err error) bool { + type sqlStater interface{ SQLState() string } + var s sqlStater + return errors.As(err, &s) && s.SQLState() == "42704" +} diff --git a/controlplane/provisioner/postgres_admin_test.go b/controlplane/provisioner/postgres_admin_test.go index bc5d99e1..86aec77f 100644 --- a/controlplane/provisioner/postgres_admin_test.go +++ b/controlplane/provisioner/postgres_admin_test.go @@ -165,6 +165,66 @@ func TestEnsureRole_AgainstLivePG(t *testing.T) { } } +// TestDropDatabaseAndRole_AgainstLivePG covers the teardown helpers added +// for duckling delete: round-trips Ensure → Drop and confirms the role +// and DB are both gone, plus the idempotent re-drop case. +func TestDropDatabaseAndRole_AgainstLivePG(t *testing.T) { + dsn := os.Getenv("PG_ADMIN_DSN") + if dsn == "" { + t.Skip("PG_ADMIN_DSN not set") + } + dbName := fmt.Sprintf("lakekeeper_drop_test_%d", os.Getpid()) + roleName := dbName + t.Cleanup(func() { + // Belt-and-suspenders cleanup in case the assertions short-circuit. + db, _ := sql.Open("pgx", dsn) + defer func() { _ = db.Close() }() + _, _ = db.Exec("DROP DATABASE IF EXISTS " + quoteIdent(dbName) + " WITH (FORCE)") + _, _ = db.Exec("DROP ROLE IF EXISTS " + quoteIdent(roleName)) + }) + + ctx := context.Background() + if err := EnsureDatabase(ctx, dsn, dbName); err != nil { + t.Fatalf("EnsureDatabase: %v", err) + } + if err := EnsureRole(ctx, dsn, roleName, "abcdef0123456789", dbName); err != nil { + t.Fatalf("EnsureRole: %v", err) + } + + if err := DropDatabase(ctx, dsn, dbName); err != nil { + t.Fatalf("DropDatabase: %v", err) + } + // Idempotent re-drop. + if err := DropDatabase(ctx, dsn, dbName); err != nil { + t.Fatalf("DropDatabase (idempotent): %v", err) + } + if err := DropRole(ctx, dsn, roleName); err != nil { + t.Fatalf("DropRole: %v", err) + } + if err := DropRole(ctx, dsn, roleName); err != nil { + t.Fatalf("DropRole (idempotent): %v", err) + } + + db, err := sql.Open("pgx", dsn) + if err != nil { + t.Fatalf("open: %v", err) + } + defer func() { _ = db.Close() }() + var dbExists, roleExists bool + if err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname=$1)", dbName).Scan(&dbExists); err != nil { + t.Fatalf("verify db gone: %v", err) + } + if dbExists { + t.Errorf("database %s still present after DropDatabase", dbName) + } + if err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&roleExists); err != nil { + t.Fatalf("verify role gone: %v", err) + } + if roleExists { + t.Errorf("role %s still present after DropRole", roleName) + } +} + func TestIsSafePGPassword(t *testing.T) { cases := map[string]bool{ "abc123": true,