Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,23 @@ func addIcebergStatusUpdates(updates map[string]interface{}, w *configstore.Mana
func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.ManagedWarehouse) {
log := slog.With("org", w.OrgID, "phase", "deleting")

// Resolve the Lakekeeper inputs BEFORE deleting the Duckling CR. The
// inputs include the metadata-store admin DSN derived from the CR's
// status; once the CR is gone the resolver can't reconstruct it, and
// we'd lose the only way to drop the per-tenant lakekeeper_<org>
// Postgres database. Best-effort resolution: when the inputs aren't
// available (resolver unwired, CR never reconciled, dev/orbstack
// without env config), the subsequent DeleteForOrg call falls back to
// k8s-only teardown.
var lkInputs ProvisioningInputs
if c.lakekeeperProvisioner != nil && c.lakekeeperInputs != nil {
if in, err := c.lakekeeperInputs(ctx, w); err != nil {
log.Debug("Lakekeeper inputs unavailable at delete time; skipping PG cleanup.", "error", err)
} else {
lkInputs = in
}
}

log.Info("Deleting Duckling CR.")
if err := c.duckling.Delete(ctx, w.OrgID); err != nil {
// Only proceed if the CR is already gone (NotFound). For other errors
Expand All @@ -593,15 +610,17 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag
}

// Tear down the per-org Lakekeeper instance the control plane provisioned
// out-of-band (CR + Secret + ServiceAccount in the lakekeeper namespace).
// The Crossplane Duckling composition doesn't own these, so without an
// explicit teardown they leak after the warehouse is gone. Idempotent and
// out-of-band (CR + Secret + ServiceAccount in the lakekeeper namespace,
// and — when this provisioner created them — the lakekeeper_<org>
// Postgres database and role on the metadata store). The Crossplane
// Duckling composition doesn't own the k8s pieces, so without an explicit
// teardown they leak after the warehouse is gone. Idempotent and
// NotFound-tolerant — a clean no-op for ducklings that never enabled
// Iceberg. Skipped silently when the provisioner isn't wired (mirrors
// reconcileLakekeeper). On error we return without marking the warehouse
// deleted so the next reconcile pass retries.
if c.lakekeeperProvisioner != nil {
if err := c.lakekeeperProvisioner.DeleteForOrg(ctx, w.OrgID); err != nil {
if err := c.lakekeeperProvisioner.DeleteForOrg(ctx, w.OrgID, lkInputs); err != nil {
log.Warn("Failed to tear down Lakekeeper resources, will retry.", "error", err)
return
}
Expand Down
45 changes: 39 additions & 6 deletions controlplane/provisioner/lakekeeper_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"log/slog"

"github.com/posthog/duckgres/controlplane/configstore"
"github.com/posthog/duckgres/server/lakekeeperbroker"
Expand Down Expand Up @@ -341,12 +342,26 @@ func (p *LakekeeperProvisioner) EnsureForOrg(ctx context.Context, w *configstore
// explicitly). Idempotent and NotFound-tolerant, so it's a safe no-op for orgs
// that never had Iceberg enabled.
//
// What it does NOT touch: the lakekeeper_<org> Postgres database/role. On
// cnpg-shard it's provider-sql-managed and torn down with the Duckling CR; on
// aurora the whole cluster is torn down with the Duckling CR; on an external
// (shared) metadata store it currently persists — dropping a database on a
// shared RDS is destructive and out of scope for this teardown.
func (p *LakekeeperProvisioner) DeleteForOrg(ctx context.Context, orgID string) error {
// When inputs carry an AdminDSN (i.e. !PGPreProvisioned), additionally drops
// the lakekeeper_<orgid> Postgres database and role on the metadata store.
// This is what makes recreating a duckling with the same orgID work: the
// duckgres lakekeeper provisioner rotates the k8s Secret's encryption-key on
// every provision, and leaving the old encrypted rows behind causes
// Lakekeeper to return SecretFetchError ("Wrong key or corrupt data") on
// every CREATE TABLE in the next lifetime. For the cnpg-shard case
// (PGPreProvisioned) the corresponding cleanup happens via the Crossplane
// composition's [Delete] managementPolicy on the cnpg-tenant-role and
// cnpg-tenant-database resources — see posthog/charts PR for the parallel
// fix. For the aurora case the whole Aurora cluster is destroyed by
// Crossplane on Duckling CR delete, so the DROP DATABASE here is redundant
// but harmless (and the connection may simply refuse — tolerated below).
//
// Best-effort: PG drop failures are logged and swallowed so a transient
// network issue or a half-deleted Aurora cluster doesn't permanently block
// the duckling teardown. The k8s teardown failures (which actually leave
// resources stranded) still surface as errors to the controller's retry
// loop.
func (p *LakekeeperProvisioner) DeleteForOrg(ctx context.Context, orgID string, in ProvisioningInputs) error {
if err := p.k8s.DeleteCR(ctx, orgID); err != nil {
return err
}
Expand All @@ -356,6 +371,24 @@ func (p *LakekeeperProvisioner) DeleteForOrg(ctx context.Context, orgID string)
if err := p.k8s.DeleteServiceAccount(ctx, orgID); err != nil {
return err
}

// PG cleanup applies only when this provisioner actually created the
// DB/role (external + dev/orbstack paths). cnpg-shard ownership lives
// in the Crossplane composition.
if in.PGPreProvisioned || in.AdminDSN == "" {
return nil
}
dbName := lakekeeperDBName(orgID)
// DROP DATABASE first so DROP ROLE doesn't trip over the role's
// ownership of the database. Both best-effort.
if err := DropDatabase(ctx, in.AdminDSN, dbName); err != nil {
slog.Warn("Lakekeeper PG database drop failed; continuing teardown.",
"org", orgID, "database", dbName, "error", err)
}
if err := DropRole(ctx, in.AdminDSN, dbName); err != nil {
slog.Warn("Lakekeeper PG role drop failed; continuing teardown.",
"org", orgID, "role", dbName, "error", err)
}
return nil
}

Expand Down
34 changes: 34 additions & 0 deletions controlplane/provisioner/lakekeeper_provisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,37 @@ func dropDatabase(t *testing.T, dsn, name string) {
dropDSN := dsn // pgx accepts the same DSN; CREATE/DROP from any current DB.
cleanupDB(t, dropDSN, name)
}

// TestDeleteForOrg_SkipsPGCleanupForCnpgShard guards the contract that the
// cnpg-shard path leaves PG cleanup to the Crossplane composition. The
// composition's [Delete] managementPolicy on the cnpg-tenant-role and
// cnpg-tenant-database resources owns role+DB teardown there; the
// duckgres provisioner doesn't have the AdminDSN to do it itself
// anyway. The k8s teardown still runs.
func TestDeleteForOrg_SkipsPGCleanupForCnpgShard(t *testing.T) {
c, _, _ := newFakeLakekeeperClient()
p := NewLakekeeperProvisioner(newFakeStore(), c)
// AdminDSN empty + PGPreProvisioned=true: DropDatabase/DropRole must
// not be invoked. With a bogus DSN they'd surface as connection
// errors; a nil-DSN attempt would panic deep in pgx. The fact that
// this returns nil with no DSN configured is the assertion.
err := p.DeleteForOrg(context.Background(), "acme", ProvisioningInputs{
PGPreProvisioned: true,
})
if err != nil {
t.Fatalf("DeleteForOrg with PGPreProvisioned should succeed without DSN, got: %v", err)
}
}

// TestDeleteForOrg_SkipsPGCleanupWithoutAdminDSN covers the dev/orbstack
// path where the lakekeeper provisioner never had an AdminDSN in the
// first place (no env, no Duckling CR). PG cleanup must be skipped
// rather than failing the teardown.
func TestDeleteForOrg_SkipsPGCleanupWithoutAdminDSN(t *testing.T) {
c, _, _ := newFakeLakekeeperClient()
p := NewLakekeeperProvisioner(newFakeStore(), c)
err := p.DeleteForOrg(context.Background(), "acme", ProvisioningInputs{})
if err != nil {
t.Fatalf("DeleteForOrg with empty inputs should succeed, got: %v", err)
}
}
91 changes: 91 additions & 0 deletions controlplane/provisioner/postgres_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,78 @@ func EnsureRole(ctx context.Context, adminDSN, role, password, ownedDB string) e
return nil
}

// DropDatabase removes dbName on the Postgres server addressed by adminDSN.
// Idempotent: returns nil when the database is already absent (3D000). Forces
// disconnection of any active sessions on the target DB so DROP DATABASE
// can't hang waiting for clients to drain — necessary at duckling teardown
// time because the per-tenant Lakekeeper pod may still be alive when the
// drop runs (the k8s teardown is fire-and-forget and the operator's
// reconciliation lag means connections linger).
//
// Caller must connect via a privileged DSN against a different database
// than dbName (the admin DSN's path is OK to be `postgres`).
func DropDatabase(ctx context.Context, adminDSN, dbName string) error {
if !isSafePGIdent(dbName) {
return fmt.Errorf("drop database: unsafe identifier %q", dbName)
}
db, err := sql.Open("pgx", adminDSN)
if err != nil {
return fmt.Errorf("open admin connection: %w", err)
}
defer func() { _ = db.Close() }()

// FORCE terminates active backends as part of DROP DATABASE (Postgres
// 13+). Without it a single lingering Lakekeeper connection blocks the
// drop until backoff.
if _, err := db.ExecContext(ctx, "DROP DATABASE IF EXISTS "+quoteIdent(dbName)+" WITH (FORCE)"); err != nil {
if isInvalidCatalogName(err) {
return nil
}
return fmt.Errorf("drop database %s: %w", dbName, err)
}
return nil
}

// DropRole removes role on the Postgres server addressed by adminDSN.
// Idempotent: returns nil when the role is already absent. Best-effort
// REASSIGN/DROP OWNED first so any object the role owns (e.g. tables
// created in databases other than the one this caller manages) doesn't
// block DROP ROLE — at duckling teardown the role's database has already
// been dropped, so REASSIGN typically has nothing to do.
//
// Caller must connect via a privileged DSN.
func DropRole(ctx context.Context, adminDSN, role string) error {
if !isSafePGIdent(role) {
return fmt.Errorf("drop role: unsafe role name %q", role)
}
db, err := sql.Open("pgx", adminDSN)
if err != nil {
return fmt.Errorf("open admin connection: %w", err)
}
defer func() { _ = db.Close() }()

// REVOKE everything the role was granted on the maintenance database;
// otherwise DROP ROLE fails with "role cannot be dropped because some
// objects depend on it". DROP OWNED handles cluster-wide ownerships.
if _, err := db.ExecContext(ctx, "DROP OWNED BY "+quoteIdent(role)+" CASCADE"); err != nil {
// 42704 = undefined_object — role doesn't exist. Benign.
if isUndefinedObject(err) {
return nil
}
// Any other failure: log via wrap and continue to the DROP ROLE
// attempt — DROP OWNED can fail on cross-DB dependencies we don't
// have the visibility to clean up here, but DROP ROLE itself may
// still succeed if nothing is left.
// We deliberately swallow this and try DROP ROLE; if there's a
// real dependency it'll surface there.
_ = err
}
if _, err := db.ExecContext(ctx, "DROP ROLE IF EXISTS "+quoteIdent(role)); err != nil {
return fmt.Errorf("drop role %s: %w", role, err)
}
return nil
}

// reDSN rewrites the dbname component of a Postgres URL-style DSN. Used to
// connect to a specific database with the same admin credentials.
func reDSN(dsn, dbName string) string {
Expand Down Expand Up @@ -250,3 +322,22 @@ func isDuplicateDatabase(err error) bool {
var s sqlStater
return errors.As(err, &s) && s.SQLState() == "42P04"
}

// isInvalidCatalogName reports whether err is Postgres 3D000 (database does
// not exist) — what DROP DATABASE returns when the target is already gone.
// Without the IF EXISTS clause this would matter; we keep the check anyway
// because IF EXISTS is silent on missing-DB and an actual no-such-database
// error can also surface from the connection attempt itself.
func isInvalidCatalogName(err error) bool {
type sqlStater interface{ SQLState() string }
var s sqlStater
return errors.As(err, &s) && s.SQLState() == "3D000"
}

// isUndefinedObject reports whether err is Postgres 42704 (undefined_object)
// — what DROP OWNED returns when the role doesn't exist. Benign.
func isUndefinedObject(err error) bool {
type sqlStater interface{ SQLState() string }
var s sqlStater
return errors.As(err, &s) && s.SQLState() == "42704"
}
60 changes: 60 additions & 0 deletions controlplane/provisioner/postgres_admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,66 @@ func TestEnsureRole_AgainstLivePG(t *testing.T) {
}
}

// TestDropDatabaseAndRole_AgainstLivePG covers the teardown helpers added
// for duckling delete: round-trips Ensure → Drop and confirms the role
// and DB are both gone, plus the idempotent re-drop case.
func TestDropDatabaseAndRole_AgainstLivePG(t *testing.T) {
dsn := os.Getenv("PG_ADMIN_DSN")
if dsn == "" {
t.Skip("PG_ADMIN_DSN not set")
}
dbName := fmt.Sprintf("lakekeeper_drop_test_%d", os.Getpid())
roleName := dbName
t.Cleanup(func() {
// Belt-and-suspenders cleanup in case the assertions short-circuit.
db, _ := sql.Open("pgx", dsn)
defer func() { _ = db.Close() }()
_, _ = db.Exec("DROP DATABASE IF EXISTS " + quoteIdent(dbName) + " WITH (FORCE)")
_, _ = db.Exec("DROP ROLE IF EXISTS " + quoteIdent(roleName))
})

ctx := context.Background()
if err := EnsureDatabase(ctx, dsn, dbName); err != nil {
t.Fatalf("EnsureDatabase: %v", err)
}
if err := EnsureRole(ctx, dsn, roleName, "abcdef0123456789", dbName); err != nil {
t.Fatalf("EnsureRole: %v", err)
}

if err := DropDatabase(ctx, dsn, dbName); err != nil {
t.Fatalf("DropDatabase: %v", err)
}
// Idempotent re-drop.
if err := DropDatabase(ctx, dsn, dbName); err != nil {
t.Fatalf("DropDatabase (idempotent): %v", err)
}
if err := DropRole(ctx, dsn, roleName); err != nil {
t.Fatalf("DropRole: %v", err)
}
if err := DropRole(ctx, dsn, roleName); err != nil {
t.Fatalf("DropRole (idempotent): %v", err)
}

db, err := sql.Open("pgx", dsn)
if err != nil {
t.Fatalf("open: %v", err)
}
defer func() { _ = db.Close() }()
var dbExists, roleExists bool
if err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname=$1)", dbName).Scan(&dbExists); err != nil {
t.Fatalf("verify db gone: %v", err)
}
if dbExists {
t.Errorf("database %s still present after DropDatabase", dbName)
}
if err := db.QueryRow("SELECT EXISTS(SELECT 1 FROM pg_roles WHERE rolname=$1)", roleName).Scan(&roleExists); err != nil {
t.Fatalf("verify role gone: %v", err)
}
if roleExists {
t.Errorf("role %s still present after DropRole", roleName)
}
}

func TestIsSafePGPassword(t *testing.T) {
cases := map[string]bool{
"abc123": true,
Expand Down
Loading