From f05e2e6a402190355456690594aa0948d1cc95ce Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Mon, 1 Jun 2026 15:48:13 +0200 Subject: [PATCH] chore(iceberg): remove dead S3 Tables backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S3 Tables was the original Iceberg backend; Lakekeeper has replaced it end-to-end (see #635 / mw-dev validation). This rip-out drops the unused S3 Tables code path entirely — both backends ATTACH against DuckDB iceberg, but the credential/secret shape and ATTACH SQL are incompatible, so the dual-backend dispatcher was just dead code. Production code: - server/iceberg: drop BackendS3Tables, TableBucket, BuildIcebergAttachStmt - server/server.go: collapse AttachIcebergCatalog → Lakekeeper-only (drop attachS3TablesIcebergCatalog) - duckdbservice/activation.go: drop TableBucket from sameTenantActivationRuntime - configresolve, configloader, cliflags, cmd/duckgres-worker: drop --iceberg-table-bucket / DUCKGRES_ICEBERG_TABLE_BUCKET / YAML knob - controlplane/configstore: drop ManagedWarehouseIceberg.TableBucketArn, IcebergBackendS3Tables constant, migrateIcebergBackendBackfill one-shot - controlplane/admin: drop iceberg_table_bucket_arn from update column list - controlplane/org_router, shared_worker_activator: drop backend switch - controlplane/provisioner: drop addIcebergStatusUpdates + applyIcebergUpdatesToWarehouse + tableBucketArn read on Duckling status; Lakekeeper readiness is owned by reconcileLakekeeper The Backend column / constant is retained for forward-compat — empty or "s3_tables" rows now resolve to lakekeeper rather than dispatching. The DB column iceberg_table_bucket_arn stays in the schema (GORM doesn't drop columns) and is simply ignored. Tests: - delete server/iceberg/dispatch_test.go (dispatcher gone) - delete tests/k8s/iceberg_test.go (entirely S3-Tables-shaped; no Lakekeeper k8s test replacement in this PR — unit coverage in controlplane/shared_worker_activator_iceberg_test.go validates the Lakekeeper activation wiring) - trim S3 Tables cases from migration_test, dispatch checks, refresh, column metadata, controller, activator tests - main_test iceberg precedence test now exercises region/namespace knobs that remain --- cmd/duckgres-worker/main.go | 6 +- configloader/file_config.go | 7 +- configresolve/cliflags.go | 6 +- configresolve/resolve.go | 10 - controlplane/admin/api.go | 1 - .../configstore/iceberg_backend_test.go | 7 +- controlplane/configstore/models.go | 43 +- controlplane/configstore/store.go | 50 -- controlplane/org_router.go | 4 - controlplane/provisioner/controller.go | 94 +--- .../provisioner/controller_lakekeeper_test.go | 23 - controlplane/provisioner/controller_test.go | 66 --- controlplane/provisioner/k8s_client.go | 15 +- controlplane/shared_worker_activator.go | 48 +- .../shared_worker_activator_iceberg_test.go | 26 - duckdbservice/activation.go | 1 - duckdbservice/activation_test.go | 20 +- main_test.go | 30 +- server/iceberg/config.go | 52 +- server/iceberg/dispatch_test.go | 77 --- server/iceberg/migration.go | 48 +- server/iceberg/migration_test.go | 29 +- server/iceberg_column_metadata_test.go | 6 - server/iceberg_refresh_test.go | 32 +- server/server.go | 95 +--- tests/k8s/iceberg_test.go | 466 ------------------ 26 files changed, 127 insertions(+), 1135 deletions(-) delete mode 100644 server/iceberg/dispatch_test.go delete mode 100644 tests/k8s/iceberg_test.go diff --git a/cmd/duckgres-worker/main.go b/cmd/duckgres-worker/main.go index ac24a9c3..094897ef 100644 --- a/cmd/duckgres-worker/main.go +++ b/cmd/duckgres-worker/main.go @@ -79,9 +79,8 @@ func main() { duckLakeDeltaCatalogEnabled := flag.Bool("ducklake-delta-catalog-enabled", true, "Attach a Delta Lake catalog during DuckLake worker boot (default true; use --ducklake-delta-catalog-enabled=false to disable; env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_ENABLED)") duckLakeDeltaCatalogPath := flag.String("ducklake-delta-catalog-path", "", "Delta Lake catalog/table path to attach (env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_PATH)") duckLakeDefaultSpecVersion := flag.String("ducklake-default-spec-version", "", "Default DuckLake spec version for migration checks (env: DUCKGRES_DUCKLAKE_DEFAULT_SPEC_VERSION)") - icebergEnabled := flag.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (AWS S3 Tables) at session init (env: DUCKGRES_ICEBERG_ENABLED)") - icebergTableBucket := flag.String("iceberg-table-bucket", "", "Iceberg S3 Tables bucket ARN (env: DUCKGRES_ICEBERG_TABLE_BUCKET)") - icebergRegion := flag.String("iceberg-region", "", "AWS region for the Iceberg table bucket (env: DUCKGRES_ICEBERG_REGION)") + icebergEnabled := flag.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (Lakekeeper REST) at session init (env: DUCKGRES_ICEBERG_ENABLED)") + icebergRegion := flag.String("iceberg-region", "", "AWS region for S3 object access by Iceberg (env: DUCKGRES_ICEBERG_REGION)") icebergNamespace := flag.String("iceberg-namespace", "", "Default Iceberg namespace (env: DUCKGRES_ICEBERG_NAMESPACE)") // Query log @@ -208,7 +207,6 @@ func main() { DuckLakeDeltaCatalogPath: *duckLakeDeltaCatalogPath, DuckLakeDefaultSpecVersion: *duckLakeDefaultSpecVersion, IcebergEnabled: *icebergEnabled, - IcebergTableBucket: *icebergTableBucket, IcebergRegion: *icebergRegion, IcebergNamespace: *icebergNamespace, QueryLog: *queryLog, diff --git a/configloader/file_config.go b/configloader/file_config.go index 911e3818..38a3c7b4 100644 --- a/configloader/file_config.go +++ b/configloader/file_config.go @@ -108,10 +108,9 @@ type RateLimitFileConfig struct { // instance into Iceberg catalog attachment. In multi-tenant mode the // equivalent values come from the per-warehouse configstore row, not YAML. type IcebergFileConfig struct { - Enabled *bool `yaml:"enabled"` - TableBucket string `yaml:"table_bucket"` - Region string `yaml:"region"` - Namespace string `yaml:"namespace"` + Enabled *bool `yaml:"enabled"` + Region string `yaml:"region"` + Namespace string `yaml:"namespace"` } type DuckLakeFileConfig struct { diff --git a/configresolve/cliflags.go b/configresolve/cliflags.go index 6fe2e5c9..324742be 100644 --- a/configresolve/cliflags.go +++ b/configresolve/cliflags.go @@ -43,9 +43,8 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs { duckLakeDeltaCatalogEnabled := fs.Bool("ducklake-delta-catalog-enabled", true, "Attach a Delta Lake catalog during DuckLake worker boot (default true; use --ducklake-delta-catalog-enabled=false to disable; env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_ENABLED)") duckLakeDeltaCatalogPath := fs.String("ducklake-delta-catalog-path", "", "Delta Lake catalog/table path to attach, defaults to sibling delta/ prefix at DuckLake object-store root (env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_PATH)") duckLakeDefaultSpecVersion := fs.String("ducklake-default-spec-version", "", "Default DuckLake spec version for migration checks (env: DUCKGRES_DUCKLAKE_DEFAULT_SPEC_VERSION)") - icebergEnabled := fs.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (AWS S3 Tables) at session init (env: DUCKGRES_ICEBERG_ENABLED)") - icebergTableBucket := fs.String("iceberg-table-bucket", "", "Iceberg S3 Tables bucket ARN, e.g. arn:aws:s3tables:us-east-1::bucket/ (env: DUCKGRES_ICEBERG_TABLE_BUCKET)") - icebergRegion := fs.String("iceberg-region", "", "AWS region for the Iceberg table bucket (default: us-east-1) (env: DUCKGRES_ICEBERG_REGION)") + icebergEnabled := fs.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (Lakekeeper REST) at session init (env: DUCKGRES_ICEBERG_ENABLED)") + icebergRegion := fs.String("iceberg-region", "", "AWS region for S3 object access by Iceberg (default: us-east-1) (env: DUCKGRES_ICEBERG_REGION)") icebergNamespace := fs.String("iceberg-namespace", "", "Default Iceberg namespace (informational; default: main) (env: DUCKGRES_ICEBERG_NAMESPACE)") processMinWorkers := fs.Int("process-min-workers", 0, "Pre-warm worker count at startup for process workers (control-plane mode) (env: DUCKGRES_PROCESS_MIN_WORKERS)") processMaxWorkers := fs.Int("process-max-workers", 0, "Max process workers, 0=auto-derived (control-plane mode) (env: DUCKGRES_PROCESS_MAX_WORKERS)") @@ -111,7 +110,6 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs { cli.DuckLakeDeltaCatalogPath = *duckLakeDeltaCatalogPath cli.DuckLakeDefaultSpecVersion = *duckLakeDefaultSpecVersion cli.IcebergEnabled = *icebergEnabled - cli.IcebergTableBucket = *icebergTableBucket cli.IcebergRegion = *icebergRegion cli.IcebergNamespace = *icebergNamespace cli.ProcessMinWorkers = *processMinWorkers diff --git a/configresolve/resolve.go b/configresolve/resolve.go index a37a76d0..a9559b0e 100644 --- a/configresolve/resolve.go +++ b/configresolve/resolve.go @@ -49,7 +49,6 @@ type CLIInputs struct { DuckLakeDeltaCatalogPath string DuckLakeDefaultSpecVersion string IcebergEnabled bool - IcebergTableBucket string IcebergRegion string IcebergNamespace string ProcessMinWorkers int @@ -313,9 +312,6 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu if fileCfg.Iceberg.Enabled != nil { cfg.Iceberg.Enabled = *fileCfg.Iceberg.Enabled } - if fileCfg.Iceberg.TableBucket != "" { - cfg.Iceberg.TableBucket = fileCfg.Iceberg.TableBucket - } if fileCfg.Iceberg.Region != "" { cfg.Iceberg.Region = fileCfg.Iceberg.Region } @@ -610,9 +606,6 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu warn("Invalid DUCKGRES_ICEBERG_ENABLED: " + err.Error()) } } - if v := getenv("DUCKGRES_ICEBERG_TABLE_BUCKET"); v != "" { - cfg.Iceberg.TableBucket = v - } if v := getenv("DUCKGRES_ICEBERG_REGION"); v != "" { cfg.Iceberg.Region = v } @@ -1049,9 +1042,6 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu if cli.Set["iceberg-enabled"] { cfg.Iceberg.Enabled = cli.IcebergEnabled } - if cli.Set["iceberg-table-bucket"] { - cfg.Iceberg.TableBucket = cli.IcebergTableBucket - } if cli.Set["iceberg-region"] { cfg.Iceberg.Region = cli.IcebergRegion } diff --git a/controlplane/admin/api.go b/controlplane/admin/api.go index c9ca9b01..a255adf0 100644 --- a/controlplane/admin/api.go +++ b/controlplane/admin/api.go @@ -422,7 +422,6 @@ func managedWarehouseUpsertColumns() []string { "s3_delta_catalog_enabled", "s3_delta_catalog_path", "iceberg_enabled", - "iceberg_table_bucket_arn", "iceberg_region", "iceberg_namespace", "worker_identity_namespace", diff --git a/controlplane/configstore/iceberg_backend_test.go b/controlplane/configstore/iceberg_backend_test.go index dd08eb7e..4964afc3 100644 --- a/controlplane/configstore/iceberg_backend_test.go +++ b/controlplane/configstore/iceberg_backend_test.go @@ -4,13 +4,12 @@ import "testing" func TestManagedWarehouseIceberg_ResolvedBackend(t *testing.T) { cases := []struct { - name string - in ManagedWarehouseIceberg - want string + name string + in ManagedWarehouseIceberg + want string }{ {"empty defaults to lakekeeper", ManagedWarehouseIceberg{}, IcebergBackendLakekeeper}, {"explicit lakekeeper", ManagedWarehouseIceberg{Backend: IcebergBackendLakekeeper}, IcebergBackendLakekeeper}, - {"explicit s3_tables", ManagedWarehouseIceberg{Backend: IcebergBackendS3Tables}, IcebergBackendS3Tables}, {"unknown passthrough", ManagedWarehouseIceberg{Backend: "future"}, "future"}, } for _, c := range cases { diff --git a/controlplane/configstore/models.go b/controlplane/configstore/models.go index 4fe45e83..26fe7262 100644 --- a/controlplane/configstore/models.go +++ b/controlplane/configstore/models.go @@ -175,41 +175,33 @@ type ManagedWarehouseDuckLake struct { Enabled bool `json:"enabled"` } -// ManagedWarehouseIceberg captures per-org Iceberg catalog config. Two -// backends are supported, selected by Backend: +// ManagedWarehouseIceberg captures per-org Iceberg catalog config. The +// only supported backend is Lakekeeper: a per-org Lakekeeper instance +// vends the Iceberg REST catalog. The provisioner creates the Lakekeeper +// CR + a warehouse pointing at the org's existing S3 bucket (path +// /lakekeeper//) and persists the endpoint + +// OAuth2 client credentials back here. The worker activator reads these +// and emits a (TYPE ICEBERG, CLIENT_ID/CLIENT_SECRET/OAUTH2_SERVER_URI) +// DuckDB SECRET + ATTACH at session init. // -// - "lakekeeper" (default): a per-org Lakekeeper instance vends the -// Iceberg REST catalog. The provisioner creates the Lakekeeper CR + a -// warehouse pointing at the org's existing S3 bucket (path -// /lakekeeper//) and persists the endpoint + -// OAuth2 client credentials back here. The worker activator reads -// these and emits a (TYPE ICEBERG, CLIENT_ID/CLIENT_SECRET/ -// OAUTH2_SERVER_URI) DuckDB SECRET + ATTACH at session init. -// -// - "s3_tables" (legacy): provisioner controller sets spec.iceberg.enabled -// on the Duckling CR; the composition provisions a fresh S3 Tables -// bucket and writes TableBucketArn back here. Kept as a safety net / -// escape hatch — new orgs default to Lakekeeper. +// The Backend column is retained for forward-compat / observability; the +// legacy "s3_tables" value is no longer honored anywhere in the code path. type ManagedWarehouseIceberg struct { Enabled bool `json:"enabled"` - // Backend selects the catalog backend. Empty/unset is treated as - // "lakekeeper" by callers. + // Backend is retained for schema compat. Empty/unset is treated as + // "lakekeeper" by callers; any other value is also treated as + // "lakekeeper" since no other backend is implemented. Backend string `gorm:"size:32;default:'lakekeeper'" json:"backend"` - // Namespace is the default Iceberg namespace inside the catalog. Used - // for both backends. + // Namespace is the default Iceberg namespace inside the catalog. Namespace string `gorm:"size:255" json:"namespace"` - // Region applies to both backends (S3 region for S3 Tables; AWS region - // for the Lakekeeper warehouse storage profile). + // Region is the AWS region for the Lakekeeper warehouse storage profile. Region string `gorm:"size:64" json:"region"` - // S3 Tables fields (Backend == "s3_tables"). Empty otherwise. - TableBucketArn string `gorm:"size:512" json:"table_bucket_arn,omitempty"` - - // Lakekeeper fields (Backend == "lakekeeper"). Populated by the - // provisioner after the per-org Lakekeeper is ready. + // Lakekeeper fields. Populated by the provisioner after the per-org + // Lakekeeper is ready. LakekeeperEndpoint string `gorm:"size:512" json:"lakekeeper_endpoint,omitempty"` // LakekeeperWarehouse is the warehouse NAME (e.g. "org-acme"), not the @@ -235,7 +227,6 @@ type ManagedWarehouseIceberg struct { // IcebergBackend constants — string-typed to keep the GORM tag happy. const ( IcebergBackendLakekeeper = "lakekeeper" - IcebergBackendS3Tables = "s3_tables" ) // ManagedWarehouseTrino captures per-org opt-in for the customer-facing Trino diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index d3b78934..f475bb0c 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -113,10 +113,6 @@ func NewConfigStore(connStr string, pollInterval time.Duration) (*ConfigStore, e if err := migrateDeltaCatalogDefaultEnabled(db); err != nil { return nil, fmt.Errorf("migrate delta catalog default: %w", err) } - if err := migrateIcebergBackendBackfill(db); err != nil { - return nil, fmt.Errorf("migrate iceberg backend backfill: %w", err) - } - runtimeSchema, err := resolveRuntimeSchema(db) if err != nil { return nil, fmt.Errorf("resolve runtime schema: %w", err) @@ -870,52 +866,6 @@ func migrateDeltaCatalogDefaultEnabled(db *gorm.DB) error { }) } -// migrateIcebergBackendBackfill stamps iceberg_backend = 's3_tables' on any -// existing row whose iceberg_table_bucket_arn is populated but whose -// iceberg_backend is empty. Without this, ResolvedBackend() on those rows -// would return "lakekeeper" (the empty-default semantics) and the worker -// activator + controller reconcile loop would treat them as Lakekeeper -// orgs — silently breaking S3 Tables ATTACH AND firing redundant -// Lakekeeper provisioning attempts. -// -// New rows inserted after the GORM `default:'lakekeeper'` migration get -// the default at insert time, so this migration only matters for rows -// that predate the column. One-shot, tracked in duckgres_schema_migrations. -const icebergBackendBackfillMigrationName = "2026_05_iceberg_backend_backfill" - -func migrateIcebergBackendBackfill(db *gorm.DB) error { - var existing SchemaMigration - err := db.Where("name = ?", icebergBackendBackfillMigrationName).First(&existing).Error - if err == nil { - return nil - } - if !errors.Is(err, gorm.ErrRecordNotFound) { - return fmt.Errorf("check schema migration: %w", err) - } - return db.Transaction(func(tx *gorm.DB) error { - // Backfill rows that have a populated S3 Tables ARN but no - // explicit backend — these are pre-feature rows whose semantics - // would change without this stamp. - res := tx.Exec( - `UPDATE duckgres_managed_warehouses - SET iceberg_backend = 's3_tables' - WHERE COALESCE(iceberg_table_bucket_arn, '') <> '' - AND COALESCE(iceberg_backend, '') = ''`, - ) - if res.Error != nil { - return fmt.Errorf("backfill iceberg_backend: %w", res.Error) - } - if err := tx.Create(&SchemaMigration{ - Name: icebergBackendBackfillMigrationName, - AppliedAt: time.Now().UTC(), - }).Error; err != nil { - return fmt.Errorf("record schema migration: %w", err) - } - slog.Info("Backfilled iceberg_backend=s3_tables on pre-Lakekeeper rows.", - "rows", res.RowsAffected) - return nil - }) -} func migrateOrgUserPK(db *gorm.DB) error { // Check if the PK already has 2 columns (idempotent) diff --git a/controlplane/org_router.go b/controlplane/org_router.go index db8d911b..007bdbcb 100644 --- a/controlplane/org_router.go +++ b/controlplane/org_router.go @@ -14,7 +14,6 @@ import ( "github.com/posthog/duckgres/controlplane/configstore" "github.com/posthog/duckgres/controlplane/provisioner" "github.com/posthog/duckgres/server" - "github.com/posthog/duckgres/server/iceberg" ) // OrgStack holds the isolated worker pool and session manager for an org. @@ -225,9 +224,6 @@ func (tr *OrgRouter) IcebergConfigForOrg(orgID string) (server.IcebergConfig, bo LakekeeperClientID: src.LakekeeperClientID, LakekeeperOAuth2ServerURI: src.LakekeeperOAuth2ServerURI, } - if cfg.ResolvedBackend() == iceberg.BackendS3Tables { - cfg.TableBucket = src.TableBucketArn - } return cfg, true } diff --git a/controlplane/provisioner/controller.go b/controlplane/provisioner/controller.go index 4bb72e42..3a9d9a18 100644 --- a/controlplane/provisioner/controller.go +++ b/controlplane/provisioner/controller.go @@ -309,11 +309,11 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M updates["identity_state"] = configstore.ManagedWarehouseStateReady } - // Iceberg is opt-in per warehouse — only track its state when enabled. - // When the composition writes status.iceberg.tableBucketArn, the bucket - // (Workspace) is reconciled and we persist the ARN + region back to the - // configstore so the worker activator can feed it into IcebergConfig. - addIcebergStatusUpdates(updates, w, status) + // Iceberg readiness for the Lakekeeper backend is owned by + // reconcileLakekeeper, which writes iceberg_state=Ready directly once + // the per-org Lakekeeper warehouse is provisioned. Nothing here needs + // to propagate from the Crossplane Duckling status — the Lakekeeper + // provisioner is the source of truth. // Infrastructure is ready when all components are provisioned AND the // Crossplane Ready condition is True. The Ready condition ensures all @@ -440,70 +440,12 @@ func (c *Controller) reconcileReady(ctx context.Context, w *configstore.ManagedW } } - // Propagate the Duckling's status.iceberg.tableBucketArn back to the - // configstore even after the warehouse has transitioned to Ready. This - // covers the late-enable case: the iceberg block can be flipped on via - // the admin API long after the warehouse first became Ready, and the - // Crossplane composition will only emit the bucket Workspace + populate - // status.iceberg.tableBucketArn after that flip. Without this, the ARN - // would only land in the configstore if the warehouse was still in - // Provisioning when the composition completed — never on a re-enable. - if w.Iceberg.Enabled { - // TODO: this is the third c.duckling.Get-equivalent call in this - // function (after GetPgBouncerEnabled + GetIcebergEnabled). All - // three parse the same CR. Worth consolidating to a single fetch - // once another caller adds a fourth — for now the extra - // round-trips only hit warehouses where iceberg is enabled. - status, err := c.duckling.Get(ctx, w.OrgID) - if err != nil { - log.Warn("Failed to read Duckling status for iceberg propagation.", "error", err) - return - } - updates := map[string]interface{}{} - addIcebergStatusUpdates(updates, w, status) - if len(updates) > 0 { - if err := c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateReady, updates); err != nil { - log.Warn("Failed to persist iceberg status to configstore.", "error", err) - } else { - log.Info("Iceberg status persisted to configstore.", "updates", updates) - // Mirror the persisted updates onto the in-memory w so - // downstream reconcile steps (notably reconcileLakekeeper) - // see the fresh values without a re-read round-trip. - applyIcebergUpdatesToWarehouse(w, updates) - } - } - } - // Lakekeeper reconcile is independent of the Duckling state machine — // provisioning a Lakekeeper for an org doesn't depend on, and doesn't // block, the warehouse top-level state. c.reconcileLakekeeper(ctx, w) } -// applyIcebergUpdatesToWarehouse mirrors the column-update map onto the -// in-memory warehouse struct so subsequent reconcile steps in the same -// tick see the values that were just persisted, without a second store -// read. Mirrors the column → field mapping used by the configstore's -// UpdateWarehouseState GORM call (which writes by column name). -// -// Only the iceberg_* columns reconcileReady actually writes are handled -// here; if a future caller passes other column names through this -// function, extend the switch alongside. -func applyIcebergUpdatesToWarehouse(w *configstore.ManagedWarehouse, updates map[string]interface{}) { - for k, v := range updates { - switch k { - case "iceberg_table_bucket_arn": - w.Iceberg.TableBucketArn = v.(string) - case "iceberg_region": - w.Iceberg.Region = v.(string) - case "iceberg_namespace": - w.Iceberg.Namespace = v.(string) - case "iceberg_state": - w.IcebergState = v.(configstore.ManagedWarehouseProvisioningState) - } - } -} - // reconcileLakekeeper provisions a per-org Lakekeeper instance when the // warehouse selects the lakekeeper backend and isn't yet provisioned. Called // from both reconcileProvisioning (initial turn-up — required for cnpg-shard, @@ -552,32 +494,6 @@ func (c *Controller) reconcileLakekeeper(ctx context.Context, w *configstore.Man log.Info("Lakekeeper provisioning completed.") } -// addIcebergStatusUpdates copies the Duckling's reported iceberg status -// (ARN, region, namespace) into the configstore update map when fields -// differ. Idempotent. Called from both reconcileProvisioning (initial -// turn-up) and reconcileReady (late iceberg enable on an existing -// warehouse). Without the reconcileReady call site, a warehouse that -// became Ready before iceberg was opted in would never get its ARN -// propagated to the configstore — the worker activator would then run -// AttachIcebergCatalog with an empty TableBucket and skip the attach. -func addIcebergStatusUpdates(updates map[string]interface{}, w *configstore.ManagedWarehouse, status *DucklingStatus) { - if !w.Iceberg.Enabled || status.Iceberg.TableBucketArn == "" { - return - } - if w.Iceberg.TableBucketArn != status.Iceberg.TableBucketArn { - updates["iceberg_table_bucket_arn"] = status.Iceberg.TableBucketArn - } - if w.Iceberg.Region != status.Iceberg.Region && status.Iceberg.Region != "" { - updates["iceberg_region"] = status.Iceberg.Region - } - if w.Iceberg.Namespace != status.Iceberg.NamespaceName && status.Iceberg.NamespaceName != "" { - updates["iceberg_namespace"] = status.Iceberg.NamespaceName - } - if w.IcebergState != configstore.ManagedWarehouseStateReady { - updates["iceberg_state"] = configstore.ManagedWarehouseStateReady - } -} - func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.ManagedWarehouse) { log := slog.With("org", w.OrgID, "phase", "deleting") diff --git a/controlplane/provisioner/controller_lakekeeper_test.go b/controlplane/provisioner/controller_lakekeeper_test.go index 1e2b7734..ac149e9b 100644 --- a/controlplane/provisioner/controller_lakekeeper_test.go +++ b/controlplane/provisioner/controller_lakekeeper_test.go @@ -107,29 +107,6 @@ func TestReconcileLakekeeper_SkipsWhenIcebergDisabled(t *testing.T) { } } -func TestReconcileLakekeeper_SkipsWhenBackendIsS3Tables(t *testing.T) { - called := false - store := newFakeStore() - store.warehouses["acme"] = &configstore.ManagedWarehouse{ - OrgID: "acme", - State: configstore.ManagedWarehouseStateReady, - Iceberg: configstore.ManagedWarehouseIceberg{ - Enabled: true, - Backend: configstore.IcebergBackendS3Tables, - }, - } - c := NewControllerWithClient(store, nil, 0) - c.lakekeeperProvisioner = &LakekeeperProvisioner{} - c.lakekeeperInputs = func(_ context.Context, _ *configstore.ManagedWarehouse) (ProvisioningInputs, error) { - called = true - return ProvisioningInputs{}, nil - } - c.reconcileLakekeeper(context.Background(), store.warehouses["acme"]) - if called { - t.Errorf("inputs resolver should not be called when Backend=s3_tables") - } -} - func TestReconcileLakekeeper_SkipsWhenAlreadyProvisioned(t *testing.T) { called := false store := newFakeStore() diff --git a/controlplane/provisioner/controller_test.go b/controlplane/provisioner/controller_test.go index ea613f6e..3d9c1c93 100644 --- a/controlplane/provisioner/controller_test.go +++ b/controlplane/provisioner/controller_test.go @@ -92,8 +92,6 @@ func (s *fakeStore) UpdateWarehouseState(orgID string, expectedState configstore case "provisioning_started_at": t := v.(time.Time) w.ProvisioningStartedAt = &t - case "iceberg_table_bucket_arn": - w.Iceberg.TableBucketArn = v.(string) case "iceberg_region": w.Iceberg.Region = v.(string) case "iceberg_namespace": @@ -140,8 +138,6 @@ func (s *fakeStore) UpdateIcebergConfig(orgID string, updates map[string]interfa w.Iceberg.Namespace = v.(string) case "iceberg_region": w.Iceberg.Region = v.(string) - case "iceberg_table_bucket_arn": - w.Iceberg.TableBucketArn = v.(string) case "iceberg_state": w.IcebergState = v.(configstore.ManagedWarehouseProvisioningState) case "iceberg_lakekeeper_endpoint": @@ -466,68 +462,6 @@ func TestReconcileReadyPatchesCRWhenIcebergFlippedOff(t *testing.T) { } } -func TestReconcileReadyPropagatesIcebergStatusToConfigstore(t *testing.T) { - // Covers the late-iceberg-enable case: warehouse is already Ready, - // then iceberg gets opted in via admin API. The Crossplane composition - // eventually populates status.iceberg.tableBucketArn on the Duckling. - // reconcileReady must copy that back to the configstore — otherwise - // the worker activator's IcebergConfig.TableBucket stays empty and - // AttachIcebergCatalog skips the attach. - dc, fakeK8s := newFakeDucklingClient() - fs := newFakeStore() - fs.warehouses["org-iceberg-late"] = &configstore.ManagedWarehouse{ - OrgID: "org-iceberg-late", - State: configstore.ManagedWarehouseStateReady, - Iceberg: configstore.ManagedWarehouseIceberg{ - Enabled: true, - // TableBucketArn / Region / Namespace are intentionally empty — - // this is the configstore state right after an admin API PUT - // that just flipped iceberg.enabled=true. - }, - } - const tableBucketArn = "arn:aws:s3tables:us-east-1:123456789012:bucket/org-iceberg-late-iceberg" - cr := &unstructured.Unstructured{Object: map[string]interface{}{ - "apiVersion": "k8s.posthog.com/v1alpha1", - "kind": "Duckling", - "metadata": map[string]interface{}{ - "name": ducklingName("org-iceberg-late"), - "namespace": ducklingNamespace, - }, - "spec": map[string]interface{}{ - "metadataStore": map[string]interface{}{"type": "aurora"}, - "dataStore": map[string]interface{}{"type": "s3bucket"}, - "iceberg": map[string]interface{}{"enabled": true}, - }, - "status": map[string]interface{}{ - "iceberg": map[string]interface{}{ - "tableBucketArn": tableBucketArn, - "region": "us-east-1", - "namespaceName": "main", - }, - }, - }} - if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(context.Background(), cr, metav1.CreateOptions{}); err != nil { - t.Fatalf("seed CR: %v", err) - } - - ctrl := NewControllerWithClient(fs, dc, time.Second) - ctrl.reconcile(context.Background()) - - got := fs.warehouses["org-iceberg-late"] - if got.Iceberg.TableBucketArn != tableBucketArn { - t.Fatalf("expected configstore iceberg.table_bucket_arn=%q, got %q", tableBucketArn, got.Iceberg.TableBucketArn) - } - if got.Iceberg.Region != "us-east-1" { - t.Fatalf("expected iceberg.region=us-east-1, got %q", got.Iceberg.Region) - } - if got.Iceberg.Namespace != "main" { - t.Fatalf("expected iceberg.namespace=main, got %q", got.Iceberg.Namespace) - } - if got.IcebergState != configstore.ManagedWarehouseStateReady { - t.Fatalf("expected iceberg_state=ready, got %q", got.IcebergState) - } -} - func TestReconcileReadyNoDriftDoesNotPatch(t *testing.T) { dc, fakeK8s := newFakeDucklingClient() fs := newFakeStore() diff --git a/controlplane/provisioner/k8s_client.go b/controlplane/provisioner/k8s_client.go index f3d7f8db..5fbc81d0 100644 --- a/controlplane/provisioner/k8s_client.go +++ b/controlplane/provisioner/k8s_client.go @@ -44,15 +44,13 @@ type DucklingStatus struct { BucketName string S3Region string } - // Iceberg is populated when spec.iceberg.enabled=true and the - // composition has reconciled the per-tenant S3 Tables bucket. Empty - // when the tenant has not opted in. TableBucketArn arrives last - // (after the bucket is reconciled); the controller uses its presence - // as the trigger to flip iceberg_state to Ready in the configstore. + // Iceberg is populated when spec.iceberg.enabled=true. The + // composition provisions a per-org Lakekeeper instance; the + // Lakekeeper provisioner extension drives readiness off the + // Lakekeeper CR itself, so we only need namespace/region here. Iceberg struct { - TableBucketArn string - NamespaceName string - Region string + NamespaceName string + Region string } IAMRoleARN string ReadyCondition bool @@ -493,7 +491,6 @@ func parseDucklingStatus(cr *unstructured.Unstructured) (*DucklingStatus, error) // Parse status.iceberg (only populated when spec.iceberg.enabled=true) if ic, ok := status["iceberg"].(map[string]interface{}); ok { - ds.Iceberg.TableBucketArn = getNestedString(ic, "tableBucketArn") ds.Iceberg.NamespaceName = getNestedString(ic, "namespaceName") ds.Iceberg.Region = getNestedString(ic, "region") } diff --git a/controlplane/shared_worker_activator.go b/controlplane/shared_worker_activator.go index 7ddd4c2e..8d460e5e 100644 --- a/controlplane/shared_worker_activator.go +++ b/controlplane/shared_worker_activator.go @@ -19,7 +19,6 @@ import ( "github.com/posthog/duckgres/controlplane/provisioner" "github.com/posthog/duckgres/server" "github.com/posthog/duckgres/server/ducklake" - "github.com/posthog/duckgres/server/iceberg" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -580,15 +579,15 @@ func BuildTenantActivationPayload(ctx context.Context, clientset kubernetes.Inte } // buildIcebergConfig maps a stored ManagedWarehouseIceberg into the wire-level -// iceberg.Config that ships to workers. Per-backend fields are only populated -// when their backend is selected; the worker-side AttachIcebergCatalog -// dispatches on ResolvedBackend. Empty per-backend fields are treated as -// "provisioner hasn't filled this in yet" and the worker returns no-op for -// that org. +// IcebergConfig that ships to workers. Lakekeeper is the only backend, so the +// fields populated here all describe the per-tenant Lakekeeper REST catalog. +// Empty Lakekeeper fields are treated as "provisioner hasn't filled this in +// yet" and the worker returns no-op for that org. // -// For lakekeeper with a non-empty LakekeeperClientCredentials SecretRef, the -// OAuth2 client_secret is resolved via readSecretValue just before sending. -// Empty SecretRef is fine (PR1 allowall mode; PR3 wires OIDC SA-token auth). +// With a non-empty LakekeeperClientCredentials SecretRef, the OAuth2 +// client_secret is resolved via readSecretValue just before sending. Empty +// SecretRef is fine (allowall mode; OIDC SA-token auth supersedes this when +// configured). func (a *SharedWorkerActivator) buildIcebergConfig(ctx context.Context, orgID string, src *configstore.ManagedWarehouseIceberg) (server.IcebergConfig, error) { ic := server.IcebergConfig{ Enabled: src.Enabled, @@ -596,27 +595,18 @@ func (a *SharedWorkerActivator) buildIcebergConfig(ctx context.Context, orgID st Namespace: src.Namespace, Region: src.Region, } - // Populate only the fields the selected backend actually uses. Avoids - // leaking stale TableBucketArn from a pre-migration row into a - // lakekeeper activation payload (or vice versa). The worker-side - // dispatcher gates on ResolvedBackend anyway, but zeroing here is - // cheap defense-in-depth and keeps the payload free of orphaned - // credentials. - switch ic.ResolvedBackend() { - case iceberg.BackendLakekeeper: - ic.LakekeeperEndpoint = src.LakekeeperEndpoint - ic.LakekeeperWarehouse = src.LakekeeperWarehouse - ic.LakekeeperClientID = src.LakekeeperClientID - ic.LakekeeperOAuth2ServerURI = src.LakekeeperOAuth2ServerURI - if src.LakekeeperClientCredentials.Name != "" { - val, err := a.readSecretValue(ctx, src.LakekeeperClientCredentials) - if err != nil { - return server.IcebergConfig{}, fmt.Errorf("resolve lakekeeper client credentials for org %q: %w", orgID, err) - } - ic.LakekeeperClientSecret = val + // Lakekeeper is the only supported backend; populate its fields + // unconditionally. + ic.LakekeeperEndpoint = src.LakekeeperEndpoint + ic.LakekeeperWarehouse = src.LakekeeperWarehouse + ic.LakekeeperClientID = src.LakekeeperClientID + ic.LakekeeperOAuth2ServerURI = src.LakekeeperOAuth2ServerURI + if src.LakekeeperClientCredentials.Name != "" { + val, err := a.readSecretValue(ctx, src.LakekeeperClientCredentials) + if err != nil { + return server.IcebergConfig{}, fmt.Errorf("resolve lakekeeper client credentials for org %q: %w", orgID, err) } - case iceberg.BackendS3Tables: - ic.TableBucket = src.TableBucketArn + ic.LakekeeperClientSecret = val } return ic, nil } diff --git a/controlplane/shared_worker_activator_iceberg_test.go b/controlplane/shared_worker_activator_iceberg_test.go index 4af9362a..27709ece 100644 --- a/controlplane/shared_worker_activator_iceberg_test.go +++ b/controlplane/shared_worker_activator_iceberg_test.go @@ -13,32 +13,6 @@ import ( "k8s.io/client-go/kubernetes/fake" ) -func TestBuildIcebergConfig_S3TablesPath(t *testing.T) { - cs := fake.NewSimpleClientset() - a := &SharedWorkerActivator{clientset: cs, defaultNamespace: "duckgres"} - - src := &configstore.ManagedWarehouseIceberg{ - Enabled: true, - Backend: iceberg.BackendS3Tables, - Namespace: "main", - Region: "us-east-1", - TableBucketArn: "arn:aws:s3tables:us-east-1:123:bucket/acme", - } - ic, err := a.buildIcebergConfig(context.Background(), "acme", src) - if err != nil { - t.Fatalf("buildIcebergConfig: %v", err) - } - if ic.Backend != iceberg.BackendS3Tables { - t.Errorf("Backend = %q, want s3_tables", ic.Backend) - } - if ic.TableBucket != src.TableBucketArn { - t.Errorf("TableBucket = %q, want %q", ic.TableBucket, src.TableBucketArn) - } - if ic.LakekeeperClientSecret != "" { - t.Errorf("S3 Tables path should not populate LakekeeperClientSecret") - } -} - func TestBuildIcebergConfig_LakekeeperAllowall(t *testing.T) { cs := fake.NewSimpleClientset() a := &SharedWorkerActivator{clientset: cs, defaultNamespace: "duckgres"} diff --git a/duckdbservice/activation.go b/duckdbservice/activation.go index 0a83a657..e36222fe 100644 --- a/duckdbservice/activation.go +++ b/duckdbservice/activation.go @@ -313,7 +313,6 @@ func sameTenantActivationRuntime(current, next ActivationPayload) bool { a.CheckpointInterval == b.CheckpointInterval && ai.Enabled == bi.Enabled && ai.Backend == bi.Backend && - ai.TableBucket == bi.TableBucket && ai.Region == bi.Region && ai.Namespace == bi.Namespace && // Lakekeeper-side identity. Without this, a hot-idle worker diff --git a/duckdbservice/activation_test.go b/duckdbservice/activation_test.go index 75b5ea5b..3663b142 100644 --- a/duckdbservice/activation_test.go +++ b/duckdbservice/activation_test.go @@ -736,10 +736,11 @@ func TestReuseExistingActivationRefreshesIcebergAlongsideS3(t *testing.T) { S3SessionToken: "OLD_TOK", }, Iceberg: server.IcebergConfig{ - Enabled: true, - TableBucket: "arn:aws:s3tables:us-east-1:000000000000:bucket/analytics", - Region: "us-east-1", - Namespace: "main", + Enabled: true, + LakekeeperEndpoint: "http://lakekeeper-analytics.lakekeeper.svc:8181/catalog", + LakekeeperWarehouse: "org-analytics", + Region: "us-east-1", + Namespace: "main", }, }, db: mainDB, @@ -776,10 +777,11 @@ func TestReuseExistingActivationRefreshesIcebergAlongsideS3(t *testing.T) { S3SessionToken: "NEW_TOK", }, Iceberg: server.IcebergConfig{ - Enabled: true, - TableBucket: "arn:aws:s3tables:us-east-1:000000000000:bucket/analytics", - Region: "us-east-1", - Namespace: "main", + Enabled: true, + LakekeeperEndpoint: "http://lakekeeper-analytics.lakekeeper.svc:8181/catalog", + LakekeeperWarehouse: "org-analytics", + Region: "us-east-1", + Namespace: "main", }, } @@ -797,7 +799,7 @@ func TestReuseExistingActivationRefreshesIcebergAlongsideS3(t *testing.T) { t.Errorf("iceberg refresh got stale credentials: keyID=%q secret=%q token=%q, want NEW_AK/NEW_SK/NEW_TOK", icebergKeyID, icebergSecret, icebergToken) } - if icebergCfg.TableBucket != newPayload.Iceberg.TableBucket || icebergCfg.Region != newPayload.Iceberg.Region { + if icebergCfg.LakekeeperWarehouse != newPayload.Iceberg.LakekeeperWarehouse || icebergCfg.Region != newPayload.Iceberg.Region { t.Errorf("iceberg refresh got wrong config: %+v, want %+v", icebergCfg, newPayload.Iceberg) } } diff --git a/main_test.go b/main_test.go index 263b7b47..fcbdf4de 100644 --- a/main_test.go +++ b/main_test.go @@ -305,37 +305,23 @@ func TestResolveEffectiveConfigIceberg(t *testing.T) { t.Fatal("expected Iceberg.Enabled to default to false") } - // File config opts in. + // File config opts in; remaining knobs (region/namespace) flow through. fileEnabled := true resolved = configresolve.ResolveEffective(&FileConfig{ Iceberg: IcebergFileConfig{ - Enabled: &fileEnabled, - TableBucket: "arn:aws:s3tables:us-east-1:111:bucket/yaml", - Region: "us-east-1", - Namespace: "main", + Enabled: &fileEnabled, + Region: "us-east-1", + Namespace: "main", }, }, configresolve.CLIInputs{}, envFromMap(nil), nil) if !resolved.Server.Iceberg.Enabled { t.Fatal("expected YAML iceberg.enabled=true to enable Iceberg") } - if got, want := resolved.Server.Iceberg.TableBucket, "arn:aws:s3tables:us-east-1:111:bucket/yaml"; got != want { - t.Fatalf("expected YAML table bucket %q, got %q", want, got) + if got, want := resolved.Server.Iceberg.Region, "us-east-1"; got != want { + t.Fatalf("expected YAML iceberg region %q, got %q", want, got) } - - // Env overrides YAML; CLI overrides env. - resolved = configresolve.ResolveEffective(&FileConfig{ - Iceberg: IcebergFileConfig{ - Enabled: &fileEnabled, - TableBucket: "arn:aws:s3tables:us-east-1:111:bucket/yaml", - }, - }, configresolve.CLIInputs{ - Set: map[string]bool{"iceberg-table-bucket": true}, - IcebergTableBucket: "arn:aws:s3tables:us-east-1:333:bucket/cli", - }, envFromMap(map[string]string{ - "DUCKGRES_ICEBERG_TABLE_BUCKET": "arn:aws:s3tables:us-east-1:222:bucket/env", - }), nil) - if got, want := resolved.Server.Iceberg.TableBucket, "arn:aws:s3tables:us-east-1:333:bucket/cli"; got != want { - t.Fatalf("expected CLI table bucket %q to win, got %q", want, got) + if got, want := resolved.Server.Iceberg.Namespace, "main"; got != want { + t.Fatalf("expected YAML iceberg namespace %q, got %q", want, got) } } diff --git a/server/iceberg/config.go b/server/iceberg/config.go index 6ac04661..c645b736 100644 --- a/server/iceberg/config.go +++ b/server/iceberg/config.go @@ -3,46 +3,36 @@ // on github.com/duckdb/duckdb-go: helpers return strings. package iceberg -// Backend constants — string-typed for wire stability across the -// control-plane → worker activation payload. -const ( - BackendS3Tables = "s3_tables" - BackendLakekeeper = "lakekeeper" -) - -// Config configures Iceberg catalog attachment. Two backends are supported: -// -// - "s3_tables" (legacy): per-tenant AWS S3 Tables bucket addressed by ARN, -// with AWS SigV4 auth picked up from the worker pod's IRSA credential -// chain. Fields used: TableBucket, Region, Namespace. +// Backend is the only supported Iceberg backend now that the S3 Tables path +// has been removed. Retained as a constant so the activation payload + the +// configstore column have a stable name to write/check; future backends can +// extend the type-set without breaking the wire shape. +const BackendLakekeeper = "lakekeeper" + +// Config configures Iceberg catalog attachment. Lakekeeper-only: a per-tenant +// Lakekeeper instance vends an Iceberg REST catalog. Fields used: +// LakekeeperEndpoint, LakekeeperWarehouse, LakekeeperClientID, +// LakekeeperClientSecret, LakekeeperOAuth2ServerURI, Namespace. // -// - "lakekeeper" (default for new orgs): per-tenant Lakekeeper instance -// vends an Iceberg REST catalog. Fields used: LakekeeperEndpoint, -// LakekeeperWarehouse, LakekeeperClientID, LakekeeperClientSecret, -// LakekeeperOAuth2ServerURI, Namespace. -// -// Empty Backend is treated as "lakekeeper" by ResolvedBackend so rows -// migrated from earlier schemas behave correctly. +// Empty Backend is treated as "lakekeeper" by ResolvedBackend so rows migrated +// from earlier schemas behave correctly. type Config struct { // Enabled gates the ATTACH at worker session init. When false, no Iceberg // catalog is attached and the rest of the fields are ignored. Enabled bool - // Backend selects the catalog backend. See constants above. + // Backend retains its name on the wire (always "lakekeeper" or empty) so + // downstream consumers that already carry the field don't break; the + // dispatcher no longer branches on it. Backend string - // Namespace is the default Iceberg namespace within the catalog. Used by - // both backends. + // Namespace is the default Iceberg namespace within the catalog. Namespace string - // Region applies to both backends (S3 region for S3 Tables; AWS region - // for the Lakekeeper warehouse storage profile). + // Region is the AWS region for the Lakekeeper warehouse storage profile. Region string - // S3 Tables fields (Backend == "s3_tables"). - TableBucket string - - // Lakekeeper fields (Backend == "lakekeeper"). + // Lakekeeper fields. LakekeeperEndpoint string // e.g. http://lakekeeper-.lakekeeper.svc:8181/catalog LakekeeperWarehouse string // warehouse NAME, e.g. "org-acme" LakekeeperClientID string @@ -51,9 +41,7 @@ type Config struct { } // ResolvedBackend returns Backend with the empty-string default applied. +// Kept for callers that still inspect the field; always returns "lakekeeper". func (c Config) ResolvedBackend() string { - if c.Backend == "" { - return BackendLakekeeper - } - return c.Backend + return BackendLakekeeper } diff --git a/server/iceberg/dispatch_test.go b/server/iceberg/dispatch_test.go deleted file mode 100644 index 2e4a15bf..00000000 --- a/server/iceberg/dispatch_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package iceberg - -import ( - "strings" - "testing" -) - -// TestDispatcher_BackendSelection asserts that the SQL produced for a given -// Backend value targets the expected catalog backend. The full dispatcher -// lives in server.AttachIcebergCatalog, but the SQL emitted is the -// observable contract: if Build*Stmt emit the right SQL for the right -// backend, the dispatcher's branching is correct by construction. -// -// This catches "stale field" mistakes — e.g. a row with Backend="lakekeeper" -// but TableBucket still populated must produce Lakekeeper SQL, not -// S3-Tables SQL. -func TestDispatcher_BackendSelection(t *testing.T) { - cases := []struct { - name string - cfg Config - wantSQL string // a substring uniquely identifying the chosen branch - }{ - { - name: "explicit s3_tables", - cfg: Config{ - Backend: BackendS3Tables, - TableBucket: "arn:aws:s3tables:us-east-1:1:bucket/x", - }, - wantSQL: "ENDPOINT_TYPE 's3_tables'", - }, - { - name: "explicit lakekeeper", - cfg: Config{ - Backend: BackendLakekeeper, - LakekeeperEndpoint: "http://lk/catalog", - LakekeeperWarehouse: "org-x", - }, - wantSQL: "AUTHORIZATION_TYPE 'none'", - }, - { - name: "empty Backend defaults to lakekeeper even with stale TableBucket", - cfg: Config{ - Backend: "", - TableBucket: "arn:aws:s3tables:us-east-1:1:bucket/stale", - LakekeeperEndpoint: "http://lk/catalog", - LakekeeperWarehouse: "org-x", - }, - wantSQL: "AUTHORIZATION_TYPE 'none'", - }, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - var attach string - switch tc.cfg.ResolvedBackend() { - case BackendLakekeeper: - attach = BuildLakekeeperAttachStmt(tc.cfg) - case BackendS3Tables: - attach = BuildIcebergAttachStmt(tc.cfg) - default: - t.Fatalf("unhandled backend %q", tc.cfg.ResolvedBackend()) - } - if !strings.Contains(attach, tc.wantSQL) { - t.Errorf("ATTACH SQL did not contain expected fragment:\n got: %s\nwant ⊃: %s", attach, tc.wantSQL) - } - // And the wrong-branch fragment must NOT appear. - var unwanted string - if tc.cfg.ResolvedBackend() == BackendLakekeeper { - unwanted = "ENDPOINT_TYPE 's3_tables'" - } else { - unwanted = "ACCESS_DELEGATION_MODE" - } - if strings.Contains(attach, unwanted) { - t.Errorf("ATTACH SQL contains wrong-branch fragment %q: %s", unwanted, attach) - } - }) - } -} diff --git a/server/iceberg/migration.go b/server/iceberg/migration.go index 92d74b27..155c086b 100644 --- a/server/iceberg/migration.go +++ b/server/iceberg/migration.go @@ -25,25 +25,20 @@ const CatalogName = "iceberg" // (the Postgres convention) is a safe, unshadowed default. const DefaultSchema = "public" -// BuildIcebergSecretStmt builds the CREATE SECRET statement that the -// DuckDB iceberg extension uses to sign AWS S3 Tables requests when an -// `ATTACH ... (TYPE iceberg, ENDPOINT_TYPE 's3_tables')` is opened. +// BuildIcebergSecretStmt builds the CREATE SECRET statement that lets the +// DuckDB iceberg extension sign warehouse-data S3 requests with the +// duckling's brokered per-org credentials. // -// DuckDB's TYPE ICEBERG secret is OAuth2-only — it has a single provider -// ('config') registered by OAuth2Authorization::CreateCatalogSecretFunction. -// Trying to pass AUTHORIZATION_TYPE/REGION on TYPE ICEBERG fails with -// "Unknown parameter ... with default provider 'config'". For s3_tables -// the iceberg extension internally signs with SigV4 and pulls credentials -// from any TYPE S3 secret in scope. +// DuckDB's TYPE ICEBERG secret is OAuth2-only (single 'config' provider +// registered by OAuth2Authorization::CreateCatalogSecretFunction), so the +// data-plane credentials have to ride on a TYPE S3 secret with PROVIDER +// config — the iceberg extension picks them up from any TYPE S3 secret in +// scope when it issues warehouse reads/writes. // -// The secret is always built with PROVIDER config and the supplied -// short-lived credentials inlined directly. This is the only supported -// auth model: the control plane assumes the per-tenant IAM role via STS -// and ships the resulting temporary credentials in the worker activation -// payload, identical to how the DuckLake S3 secret is built (see -// buildConfigSecret in server/server.go). The same role has both s3:* and -// s3tables:* permissions on the tenant's data and table buckets, so -// reusing the credentials here is correct. +// The control plane assumes the per-tenant IAM role via STS and ships the +// resulting short-lived credentials in the activation payload (identical to +// how the DuckLake S3 secret is built in server.buildConfigSecret); the same +// role has s3:* on the warehouse bucket, so reusing them here is correct. // // keyID and secret are required — callers must validate upstream and emit // a clear error if the activation payload is missing them when iceberg is @@ -68,24 +63,9 @@ func BuildIcebergSecretStmt(cfg Config, keyID, secret, sessionToken string) stri return stmt } -// BuildIcebergAttachStmt builds the ATTACH statement for the Iceberg -// extension catalog, addressing the per-tenant S3 Tables bucket ARN. -// -// Kept as the S3 Tables-specific builder. The dispatcher in -// server.AttachIcebergCatalog picks between this and -// BuildLakekeeperAttachStmt based on Config.ResolvedBackend. -func BuildIcebergAttachStmt(cfg Config) string { - return fmt.Sprintf( - "ATTACH '%s' AS %s (TYPE iceberg, ENDPOINT_TYPE 's3_tables')", - escapeSQLStringLiteral(cfg.TableBucket), - CatalogName, - ) -} - // LakekeeperSecretName is the DuckDB SECRET name used when ATTACHing a -// Lakekeeper REST catalog with OAuth2 client credentials. Distinct from -// iceberg_sigv4 so a worker that has both backends configured (post-migration) -// doesn't collide. +// Lakekeeper REST catalog with OAuth2 client credentials. Distinct from the +// data-plane iceberg_sigv4 (TYPE S3) so they coexist cleanly. const LakekeeperSecretName = "iceberg_oauth" // BuildLakekeeperSecretStmt builds a CREATE SECRET statement for the diff --git a/server/iceberg/migration_test.go b/server/iceberg/migration_test.go index 23b3a9a3..ce61cc30 100644 --- a/server/iceberg/migration_test.go +++ b/server/iceberg/migration_test.go @@ -2,27 +2,6 @@ package iceberg import "testing" -func TestBuildIcebergAttachStmt(t *testing.T) { - got := BuildIcebergAttachStmt(Config{ - TableBucket: "arn:aws:s3tables:us-east-1:123456789012:bucket/posthog-duckling-acme-iceberg", - }) - want := "ATTACH 'arn:aws:s3tables:us-east-1:123456789012:bucket/posthog-duckling-acme-iceberg' AS iceberg (TYPE iceberg, ENDPOINT_TYPE 's3_tables')" - if got != want { - t.Fatalf("BuildIcebergAttachStmt mismatch:\n got: %s\nwant: %s", got, want) - } -} - -func TestBuildIcebergAttachStmtEscapesSingleQuotes(t *testing.T) { - // Defensive: ARN won't contain single quotes in practice, but the - // helper escapes them anyway so an attacker-controlled config can't - // break out of the SQL string literal. - got := BuildIcebergAttachStmt(Config{TableBucket: "weird'name"}) - want := "ATTACH 'weird''name' AS iceberg (TYPE iceberg, ENDPOINT_TYPE 's3_tables')" - if got != want { - t.Fatalf("BuildIcebergAttachStmt did not escape quote:\n got: %s\nwant: %s", got, want) - } -} - func TestBuildIcebergSecretStmtWithExplicitCreds(t *testing.T) { got := BuildIcebergSecretStmt(Config{Region: "us-west-2"}, "AKIA_TEST", "shh", "tok123") want := "CREATE OR REPLACE SECRET iceberg_sigv4 (TYPE S3, PROVIDER config, KEY_ID 'AKIA_TEST', SECRET 'shh', REGION 'us-west-2', SESSION_TOKEN 'tok123')" @@ -114,11 +93,11 @@ func TestBuildLakekeeperAttachStmt_EscapesQuotes(t *testing.T) { } func TestResolvedBackend(t *testing.T) { + // Backend is lakekeeper-only now; any input resolves to that constant. cases := map[string]string{ - "": BackendLakekeeper, - BackendLakekeeper: BackendLakekeeper, - BackendS3Tables: BackendS3Tables, - "future": "future", + "": BackendLakekeeper, + BackendLakekeeper: BackendLakekeeper, + "future": BackendLakekeeper, } for in, want := range cases { if got := (Config{Backend: in}).ResolvedBackend(); got != want { diff --git a/server/iceberg_column_metadata_test.go b/server/iceberg_column_metadata_test.go index 86ee9832..9ac60a1d 100644 --- a/server/iceberg_column_metadata_test.go +++ b/server/iceberg_column_metadata_test.go @@ -22,12 +22,6 @@ func TestShouldLoadIcebergColumnMetadataOnlyForLakekeeper(t *testing.T) { }, false) { t.Fatal("expected Lakekeeper catalog to load Iceberg column metadata") } - if shouldLoadIcebergColumnMetadata(IcebergConfig{ - Enabled: true, - Backend: iceberg.BackendS3Tables, - }, false) { - t.Fatal("S3 Tables catalog should not use Lakekeeper REST metadata loading") - } if shouldLoadIcebergColumnMetadata(IcebergConfig{ Enabled: true, Backend: iceberg.BackendLakekeeper, diff --git a/server/iceberg_refresh_test.go b/server/iceberg_refresh_test.go index b1f3ad5e..7b74a6ac 100644 --- a/server/iceberg_refresh_test.go +++ b/server/iceberg_refresh_test.go @@ -35,9 +35,10 @@ func TestRefreshIcebergSecretRotatesCredentials(t *testing.T) { } ic := IcebergConfig{ - Enabled: true, - TableBucket: "arn:aws:s3tables:us-east-1:000000000000:bucket/refresh-test", - Region: "us-east-1", + Enabled: true, + LakekeeperEndpoint: "http://lakekeeper.invalid/catalog", + LakekeeperWarehouse: "org-refresh-test", + Region: "us-east-1", } if err := RefreshIcebergSecret(db, ic, nil, "AKIA_INITIAL", "initial-secret", "initial-token"); err != nil { @@ -59,32 +60,11 @@ func TestRefreshIcebergSecretNoOpWhenDisabled(t *testing.T) { } } -// TestRefreshIcebergSecretRejectsEmptyCredentials guards the invariant -// that "credentials are required when iceberg is enabled" applies to -// refresh too, not just initial attach. A silent fallback here would either -// re-introduce credential_chain (the bug fixed by PR #562) or emit an -// empty-cred config secret that fails opaquely at attach time. Applies to -// BOTH backends: Lakekeeper no longer vends (PackedPolicyTooLarge), so the -// worker rotates its own iceberg_sigv4 S3 secret for Lakekeeper too. -func TestRefreshIcebergSecretRejectsEmptyCredentials(t *testing.T) { - err := RefreshIcebergSecret(nil, IcebergConfig{ - Enabled: true, - Backend: iceberg.BackendS3Tables, - TableBucket: "arn:...", - }, nil, "", "", "") - if err == nil { - t.Fatal("expected error when iceberg enabled with empty credentials, got nil") - } - if !strings.Contains(err.Error(), "no AWS credentials") { - t.Fatalf("error message should name the missing-credentials cause, got: %v", err) - } -} - // TestRefreshIcebergSecretLakekeeperRequiresCreds: Lakekeeper no longer vends // credentials (its STS session policy overflowed AWS's packed-policy limit), // so the worker reads/writes S3 data with its own brokered creds and must -// rotate that secret on the STS schedule — same as S3 Tables. An empty-cred -// refresh is therefore an error, not a no-op. +// rotate that secret on the STS schedule. An empty-cred refresh is therefore +// an error, not a no-op. func TestRefreshIcebergSecretLakekeeperRequiresCreds(t *testing.T) { err := RefreshIcebergSecret(nil, IcebergConfig{ Enabled: true, diff --git a/server/server.go b/server/server.go index 1c583da4..3f6b5db4 100644 --- a/server/server.go +++ b/server/server.go @@ -1667,97 +1667,16 @@ func isDeltaCatalogEmptyError(err error) bool { return strings.Contains(err.Error(), "No files in log segment") } -// AttachIcebergCatalog attaches the per-tenant Iceberg catalog alongside -// DuckLake. Dispatches on Config.ResolvedBackend: -// -// - "s3_tables" (legacy): a per-tenant AWS S3 Tables bucket. Requires -// the keyID/secret/sessionToken trio of short-lived STS credentials. -// - "lakekeeper" (default): a per-tenant Lakekeeper REST catalog. -// The AWS credentials are ignored — Lakekeeper vends short-lived STS -// creds to DuckDB at table-load time via ACCESS_DELEGATION_MODE. -// -// Idempotent if the catalog is already attached. Fail-soft for the "fresh -// tenant, no namespaces yet" case so a worker activation isn't blocked. +// AttachIcebergCatalog attaches the per-tenant Iceberg catalog. Lakekeeper REST +// is the only backend; the AWS S3 Tables path was removed when S3 Tables +// stopped being a supported product. Idempotent if the catalog is already +// attached; fail-soft for the "fresh tenant, no namespaces yet" case so a +// worker activation isn't blocked. func AttachIcebergCatalog(db *sql.DB, ic IcebergConfig, sem chan struct{}, keyID, secret, sessionToken string) error { if !ic.Enabled { return nil } - switch ic.ResolvedBackend() { - case iceberg.BackendLakekeeper: - return attachLakekeeperCatalog(db, ic, sem, keyID, secret, sessionToken) - case iceberg.BackendS3Tables: - return attachS3TablesIcebergCatalog(db, ic, sem, keyID, secret, sessionToken) - default: - return fmt.Errorf("iceberg: unsupported backend %q", ic.Backend) - } -} - -// attachS3TablesIcebergCatalog is the legacy S3 Tables path. Preserves the -// previous AttachIcebergCatalog behavior exactly so existing orgs see no -// change. -func attachS3TablesIcebergCatalog(db *sql.DB, ic IcebergConfig, sem chan struct{}, keyID, secret, sessionToken string) error { - if ic.TableBucket == "" { - return nil - } - if keyID == "" || secret == "" { - return fmt.Errorf("iceberg catalog enabled (table_bucket=%q) but no AWS credentials in activation payload — control plane STS broker did not populate DuckLake S3 credentials", ic.TableBucket) - } - - select { - case sem <- struct{}{}: - defer func() { <-sem }() - case <-time.After(30 * time.Second): - return fmt.Errorf("timeout waiting for Iceberg catalog attachment lock") - } - - var count int - err := db.QueryRow( - "SELECT COUNT(*) FROM duckdb_databases() WHERE database_name = '" + iceberg.CatalogName + "'", - ).Scan(&count) - if err == nil && count > 0 { - return nil - } - - // httpfs + S3 secret must be ready BEFORE LoadExtensions("iceberg"); see the - // matching comment in attachLakekeeperCatalog. The iceberg extension's - // LOAD-time init falls back to AWS-SDK credential discovery when no TYPE S3 - // secret is present, which probes IMDS (blocked by the worker network - // policy) and hangs until the activate-tenant deadline. Iceberg + DuckLake - // tenants don't hit it because AttachDuckLake runs first and primes httpfs - // + a TYPE S3 secret; the iceberg-only path here must replicate that order. - if err := LoadExtensions(db, []string{"httpfs"}); err != nil { - return fmt.Errorf("load httpfs extension: %w", err) - } - - if _, err := db.Exec(iceberg.BuildIcebergSecretStmt(ic, keyID, secret, sessionToken)); err != nil { - return fmt.Errorf("create Iceberg secret: %w", err) - } - - if err := LoadExtensions(db, []string{"iceberg"}); err != nil { - return fmt.Errorf("load iceberg extension: %w", err) - } - - attachStmt := iceberg.BuildIcebergAttachStmt(ic) - slog.Info("Attaching Iceberg catalog.", "backend", "s3_tables", "table_bucket", ic.TableBucket, "region", ic.Region) - if _, err := db.Exec(attachStmt); err != nil { - if isIcebergCatalogEmptyError(err) { - slog.Info("Skipping Iceberg catalog attach: no namespaces at table bucket yet.", "table_bucket", ic.TableBucket) - return nil - } - return fmt.Errorf("failed to attach Iceberg catalog: %w", err) - } - if _, err := db.Exec("SHOW TABLES FROM " + iceberg.CatalogName); err != nil { - if isIcebergCatalogEmptyError(err) { - if _, derr := db.Exec("DETACH " + iceberg.CatalogName); derr != nil { - slog.Warn("Failed to detach empty Iceberg catalog after attach probe.", "error", derr) - } - slog.Info("Detached Iceberg catalog: no namespaces at table bucket yet.", "table_bucket", ic.TableBucket) - return nil - } - return fmt.Errorf("failed to probe Iceberg catalog: %w", err) - } - slog.Info("Attached Iceberg catalog successfully.", "backend", "s3_tables", "table_bucket", ic.TableBucket) - return nil + return attachLakekeeperCatalog(db, ic, sem, keyID, secret, sessionToken) } // attachLakekeeperCatalog attaches the per-tenant Lakekeeper REST catalog. @@ -2241,7 +2160,7 @@ func RefreshIcebergSecret(db *sql.DB, ic IcebergConfig, sem chan struct{}, keyID return fmt.Errorf("refresh iceberg secret: %w", err) } } - slog.Debug("Refreshed iceberg secret for hot-idle reuse.", "table_bucket", ic.TableBucket) + slog.Debug("Refreshed iceberg secret for hot-idle reuse.", "warehouse", ic.LakekeeperWarehouse) return nil } diff --git a/tests/k8s/iceberg_test.go b/tests/k8s/iceberg_test.go deleted file mode 100644 index 747ee7ec..00000000 --- a/tests/k8s/iceberg_test.go +++ /dev/null @@ -1,466 +0,0 @@ -//go:build k8s_integration - -package k8s_test - -import ( - "encoding/json" - "fmt" - "os" - "os/exec" - "strings" - "testing" - "time" -) - -// TestK8sIcebergRoundTrip exercises the per-tenant Iceberg-on-S3-Tables -// activation path against REAL AWS — kind cluster, real control plane, -// real worker pod, real ATTACH 'arn:aws:s3tables:...' (TYPE iceberg, -// ENDPOINT_TYPE 's3_tables') against actual S3 Tables. This is the only -// way to gain high confidence in the iceberg mode: every alternative -// (LocalStack community, moto, REST-catalog substitutes) exercises a -// DIFFERENT code path in the DuckDB iceberg extension. The 's3_tables' -// endpoint is derived from the ARN's region and goes straight to AWS; no -// environment flag overrides it. -// -// SCOPE — what this test covers and what it doesn't, and why. -// -// Covers (the wiring underneath the catalog SQL surface). This is what -// the iceberg integration test is *for* — every piece of glue between -// the control plane, the worker pod, and AWS: -// - control plane reads the tenant's S3 credentials secret, including -// the STS session_token (regression-tests the fix in PR #569 where -// ASIA…-prefixed temporary credentials were dropping the token and -// getting 403s from the iceberg REST endpoint) -// - worker activation loads the iceberg extension, creates the -// SigV4-bearing TYPE S3 secret, ATTACHes the per-tenant S3 Tables -// bucket using the OIDC-vended CI role, and runs the -// `SHOW TABLES FROM iceberg` probe that hits real S3 Tables APIs -// (ListNamespaces + ListTables under the hood) -// - the catalog ends up visible in `duckdb_databases()` from a -// subsequent client session — i.e. the activation actually -// completed and didn't silently DETACH -// -// Does NOT cover (blocked upstream — DuckDB iceberg extension bug): -// - `USE iceberg.` → "No catalog + schema named ... found" -// - `CREATE TABLE iceberg.ns.t` → "Schema with name "" not found" -// - `SELECT FROM iceberg.ns.t` → "schema main does not exist" -// - `INSERT INTO iceberg.ns.t` → same -// -// Reproduced against plain `duckdb-go` v2 (no duckgres) on both the -// stable (v11fea8ed) and core_nightly (v10e97957) iceberg extensions. -// `information_schema.schemata`, `duckdb_schemas()`, `SHOW TABLES FROM -// iceberg` and `SHOW ALL TABLES` all see the namespace+table, but the -// schema-by-name lookup path used by USE/CREATE/SELECT/INSERT disagrees -// and reports the schema as missing. So once a tenant's iceberg catalog -// is attached, the only thing currently usable through DuckDB SQL is -// metadata listing — actual table read/write requires going through -// PyIceberg/Spark/Athena/etc until the upstream bug is fixed. Expand -// this test once that resolves. -// -// Why the test doesn't pre-create a probe table for read-side coverage: -// AttachIcebergCatalog runs a `SHOW TABLES FROM iceberg` probe right -// after ATTACH; if it errors with a "no namespace / no such table" -// pattern, the activator treats the catalog as freshly empty and -// DETACHes it. With an empty-metadata table present in the namespace -// (the shape `aws s3tables create-table` produces — no data files -// yet), the worker's iceberg ext hits one of those probe errors and -// detaches, leaving `duckdb_databases()` count = 0. A workaround -// would have to either populate the table via Spark/PyIceberg in test -// setup (heavyweight) or relax the activator's detach heuristic -// (touches production code for one test). Until we resolve that -// trade-off, the test stays at activation-level coverage — which is -// where the wiring bugs we just spent days fixing actually live. -// -// This test is INTENTIONALLY NOT SKIPPABLE. If the required env vars -// aren't set in whatever CI lane runs the k8s integration suite, the -// test fails openly with a clear diagnostic. A silent skip would hide -// two failure modes that matter more than the test itself: -// -// 1. CI misconfiguration — a secret rotates, an env var name changes, -// the sandbox bucket gets renamed, and the test silently stops -// running. With a skip, nobody notices until someone actively -// looks at the test output; with a fatal, the next PR catches it. -// 2. A real iceberg regression that happens to coincide with an -// env-var gap — even worse, because the regression hides behind -// the same "skipped — missing env vars" line. -// -// Required env vars (test fails the whole job when any is empty): -// -// DUCKGRES_K8S_ICEBERG_TABLE_BUCKET_ARN — arn:aws:s3tables:::bucket/ -// DUCKGRES_K8S_ICEBERG_REGION — must match the ARN's region -// DUCKGRES_K8S_ICEBERG_DATA_BUCKET — real S3 bucket name for DuckLake parquet -// (DuckLake is attached alongside iceberg; -// empty ObjectStore would skip the attach -// but we want both code paths exercised) -// AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY — credentials with s3tables:* on the table -// bucket and s3:* on the data bucket -// AWS_SESSION_TOKEN — required for STS-vended creds (the OIDC -// assume-role path always returns one) -// -// Optional: -// -// DUCKGRES_K8S_ICEBERG_NAMESPACE — defaults to "main" -// -// CI sandbox bucket guidance: -// - Use a SINGLE persistent table bucket per CI environment; the test -// creates+drops a uniquely-named probe table on each run. Avoids the -// 10-bucket-per-region service quota and saves ~30s/run vs. -// create-bucket-per-run. -// - The IAM principal needs s3tables:CreateTable, GetTable, DeleteTable, -// ListTables, GetNamespace on the table bucket, plus s3:GetObject/ -// PutObject on the data bucket. -// - DO NOT reuse a production bucket. The test creates and drops tables; -// a leaked DROP would target whatever bucket the env var pointed at. -func TestK8sIcebergRoundTrip(t *testing.T) { - cfg := loadIcebergTestConfig(t) - - if err := seedIcebergTenantFixture(cfg); err != nil { - t.Fatalf("seed iceberg tenant fixture: %v", err) - } - - // Wait for the new tenant DB to be reachable — the control plane's - // configstore poll picks up the new org on its next tick. This - // triggers worker activation, which in turn runs AttachIcebergCatalog - // against the real S3 Tables bucket using the OIDC-vended creds. - if err := waitForTenantDBReady(icebergTenantName, icebergTenantPassword, initialDBReadyTimeout); err != nil { - t.Fatalf("iceberg tenant login not ready: %v", err) - } - - // Confirm the iceberg catalog actually attached on the worker. If - // the ATTACH failed (wrong region, missing s3tables permission, - // missing session_token for STS-vended creds) the activation path - // logs+returns rather than silently skipping, so the catalog either - // shows up here or the session never came up. - // - // A count == 1 result demonstrates the full activation pipeline - // worked end-to-end: control plane resolved the tenant config, - // looked up the S3 credentials secret (with session_token plumbing, - // the regression in PR #569), STS broker minted vended credentials - // where applicable, worker pod started, iceberg extension installed, - // TYPE S3 PROVIDER config secret created with SigV4 region, ATTACH - // hit S3 Tables, the post-attach `SHOW TABLES FROM iceberg` probe - // reached the listing endpoint without a detach-on-empty error, and - // a client session's flight call to `duckdb_databases()` was routed - // to the activated worker. - // - // Why we poll instead of a one-shot query: waitForTenantDBReady - // returns as soon as `SELECT 1` succeeds, which can race the worker - // activation's AttachIcebergCatalog step — the auth/SELECT path - // completes before the iceberg ext finishes its - // install+secret+ATTACH+probe sequence against AWS. A one-shot count - // query against that window returns 0 spuriously. Retrying until - // the catalog shows up (with a hard upper bound) catches genuine - // activation failures while tolerating the ordinary multi-second - // ATTACH latency. - if err := pollIcebergAttached(60 * time.Second); err != nil { - // Surface the worker + control-plane logs inline so the CI - // failure message contains everything needed to diagnose the - // activation failure without re-running anything. The bucket - // only attaches once per CI run, so a follow-up run can't - // reproduce the same state. - diag := captureIcebergActivationDiagnostics() - t.Fatalf("iceberg catalog not attached after activation: %v (ATTACH against %s probably failed)\n%s", err, cfg.tableBucketARN, diag) - } -} - -// captureIcebergActivationDiagnostics dumps the control-plane and -// worker pod logs (filtered to iceberg / attach lines first, then full -// tail) plus the live row from the warehouse table so the test failure -// in CI contains enough context to diagnose why activation didn't -// produce an attached catalog. The kind cluster gets torn down right -// after the test exits, so anything not surfaced here is gone. -func captureIcebergActivationDiagnostics() string { - const ns = "duckgres" - var b strings.Builder - b.WriteString("--- activation diagnostics ---\n") - - if pods, err := kubectlCommandOutput("-n", ns, "get", "pods", "-l", "app=duckgres-worker", "-o", "wide"); err != nil { - fmt.Fprintf(&b, "kubectl get pods (worker): %v\n", err) - } else { - fmt.Fprintf(&b, "worker pods:\n%s\n", pods) - } - - if cpLogs, err := kubectlCommandOutput("-n", ns, "logs", "deployment/duckgres-control-plane", "--tail=200"); err != nil { - fmt.Fprintf(&b, "control-plane logs: %v\n", err) - } else { - fmt.Fprintf(&b, "control-plane logs (last 200 lines):\n%s\n", cpLogs) - } - - if workerLogs, err := kubectlCommandOutput("-n", ns, "logs", "-l", "app=duckgres-worker", "--tail=200", "--prefix=true"); err != nil { - fmt.Fprintf(&b, "worker logs: %v\n", err) - } else { - fmt.Fprintf(&b, "worker logs (last 200 lines):\n%s\n", workerLogs) - } - - // iceberg_backend is included because an empty/'lakekeeper' value - // would silently skip the s3_tables ATTACH path — the regression - // this seed pin guards against. - row, err := queryRuntimeStoreText( - "SELECT org_id, state, iceberg_enabled, iceberg_backend, iceberg_table_bucket_arn, iceberg_region, iceberg_namespace, iceberg_state, iceberg_status_message, s3_state " + - "FROM duckgres_managed_warehouses WHERE org_id = '" + icebergTenantName + "'") - if err != nil { - fmt.Fprintf(&b, "warehouse row query: %v\n", err) - } else { - fmt.Fprintf(&b, "duckgres_managed_warehouses[%s] (iceberg cols):\n%s\n", icebergTenantName, row) - } - - return b.String() -} - -// pollIcebergAttached polls the iceberg tenant's session for the -// attached-catalog count until it reads 1 or the timeout elapses. See -// the call site for the race this paves over. -func pollIcebergAttached(timeout time.Duration) error { - const q = "SELECT COUNT(*) FROM duckdb_databases() WHERE database_name = 'iceberg'" - deadline := time.Now().Add(timeout) - for { - got, err := queryIntWithReconnectAs(icebergTenantName, icebergTenantPassword, q, timeout) - if err == nil && got == 1 { - return nil - } - if time.Now().After(deadline) { - if err != nil { - return fmt.Errorf("last query error: %w", err) - } - return fmt.Errorf("catalog still not attached (last count=%d)", got) - } - time.Sleep(2 * time.Second) - } -} - -const ( - icebergTenantName = "iceberg-test" - icebergTenantPassword = "postgres" - // bcrypt hash of "postgres", matching the existing tenant fixtures so - // the auth wiring is identical to analytics/billing. - icebergTenantPasswordHash = "$2a$10$TQyt73Vw91Q1d7YcE86EVuhms/0u4qBydMDyVvZYlqDwc3/VtQAbm" -) - -type icebergTestConfig struct { - tableBucketARN string - region string - namespace string - dataBucket string - accessKeyID string - secretKey string - sessionToken string -} - -// loadIcebergTestConfig reads the required env vars and fails the test -// loudly if any are missing. There is no skip path — see the -// TestK8sIcebergRoundTrip godoc for the rationale. -// -// Note that the env vars must be present *and non-empty*; an empty -// value is treated as missing. This matters when CI passes secrets -// through templated workflow files: a rotated-out secret typically -// renders as empty rather than absent, and an empty value here would -// silently fail the AWS call rather than the env check. -func loadIcebergTestConfig(t *testing.T) icebergTestConfig { - t.Helper() - required := map[string]string{ - "DUCKGRES_K8S_ICEBERG_TABLE_BUCKET_ARN": os.Getenv("DUCKGRES_K8S_ICEBERG_TABLE_BUCKET_ARN"), - "DUCKGRES_K8S_ICEBERG_REGION": os.Getenv("DUCKGRES_K8S_ICEBERG_REGION"), - "DUCKGRES_K8S_ICEBERG_DATA_BUCKET": os.Getenv("DUCKGRES_K8S_ICEBERG_DATA_BUCKET"), - "AWS_ACCESS_KEY_ID": os.Getenv("AWS_ACCESS_KEY_ID"), - "AWS_SECRET_ACCESS_KEY": os.Getenv("AWS_SECRET_ACCESS_KEY"), - } - var missing []string - for k, v := range required { - if strings.TrimSpace(v) == "" { - missing = append(missing, k) - } - } - if len(missing) > 0 { - t.Fatalf(`iceberg integration test cannot run — required env vars are unset or empty: %s. - -This test is intentionally NOT skippable: a silent skip would hide CI -misconfiguration (rotated secret, renamed bucket, dropped env var) and, -worse, would mask any real iceberg regression that happened to land at -the same time as the env-var gap. - -To wire the iceberg CI lane: - - provision a persistent sandbox S3 Tables bucket + companion data bucket - in your sandbox AWS account - - grant the CI IAM principal s3tables:* on the table bucket and - s3:GetObject/PutObject on the data bucket - - set all of the env vars above as CI secrets - -See TestK8sIcebergRoundTrip godoc for the full setup notes. Until the -iceberg lane is wired, this failure is the correct signal that work -remains.`, strings.Join(missing, ", ")) - } - ns := os.Getenv("DUCKGRES_K8S_ICEBERG_NAMESPACE") - if ns == "" { - ns = "main" - } - return icebergTestConfig{ - tableBucketARN: required["DUCKGRES_K8S_ICEBERG_TABLE_BUCKET_ARN"], - region: required["DUCKGRES_K8S_ICEBERG_REGION"], - namespace: ns, - dataBucket: required["DUCKGRES_K8S_ICEBERG_DATA_BUCKET"], - accessKeyID: required["AWS_ACCESS_KEY_ID"], - secretKey: required["AWS_SECRET_ACCESS_KEY"], - sessionToken: os.Getenv("AWS_SESSION_TOKEN"), - } -} - -// seedIcebergTenantFixture installs everything the iceberg tenant needs: -// - k8s secrets in the duckgres namespace (warehouse DB DSN, ducklake -// metadata DSN, S3 creds payload, runtime config). The S3 creds carry -// the REAL AWS keys — they're consumed both by DuckLake (against the -// real data bucket) and by the iceberg extension (against S3 Tables) -// because AttachIcebergCatalog reuses DuckLake.S3* per the comment in -// server/iceberg/migration.go. -// - A dedicated DuckLake metadata DB on the local Postgres so this -// tenant doesn't share metadata with the default 'local' fixture. -// - A row in duckgres_managed_warehouses with iceberg_* fields populated -// and state='ready', so the activator picks up the config without -// waiting on the provisioner controller (kind has no Duckling CR). -// - An org + an org-user. -func seedIcebergTenantFixture(cfg icebergTestConfig) error { - if err := ensurePostgresDatabase(duckLakeMetadataContainer, "ducklake", "ducklake_metadata_iceberg"); err != nil { - return fmt.Errorf("create iceberg ducklake metadata DB: %w", err) - } - - s3CredsPayload := map[string]string{ - "access_key_id": cfg.accessKeyID, - "secret_access_key": cfg.secretKey, - } - if cfg.sessionToken != "" { - s3CredsPayload["session_token"] = cfg.sessionToken - } - s3CredsJSON, err := json.Marshal(s3CredsPayload) - if err != nil { - return fmt.Errorf("marshal s3 creds payload: %w", err) - } - - secrets := map[string]map[string]string{ - "iceberg-test-warehouse-db": {"dsn": "duckgres"}, - "iceberg-test-metadata": {"dsn": "ducklake"}, - "iceberg-test-s3": {"credentials": string(s3CredsJSON)}, - "iceberg-test-runtime": {"duckgres.yaml": baseTenantRuntimeConfig()}, - } - for name, data := range secrets { - if err := upsertTenantIsolationSecret(name, data); err != nil { - return fmt.Errorf("upsert secret %s: %w", name, err) - } - } - - seed := buildIcebergConfigStoreSeed(cfg) - if err := applyConfigStoreSeedInline(seed); err != nil { - return fmt.Errorf("apply iceberg seed: %w", err) - } - return nil -} - -// buildIcebergConfigStoreSeed constructs the SQL that registers the -// iceberg-test org + warehouse + user. Mirrors the column set used by -// k8s/kind/config-store.seed.sql and tenant-isolation.seed.sql; diverges -// only where iceberg matters (iceberg_enabled, table bucket ARN/region, -// state='ready' so the activator doesn't wait on the provisioner), in -// the S3 endpoint (real AWS regional endpoint instead of MinIO), and in -// having s3_delta_catalog_enabled = false. -// -// Why disable Delta: ManagedWarehouseS3.DeltaCatalogEnabled defaults to -// true (GORM `default:true` on the column), and during activation the -// worker runs a `_delta_log/_last_checkpoint` probe via the DuckDB -// delta extension to discover whether the catalog exists. On this -// tenant the probe issues a GET that resolves to a URL whose bucket -// segment isn't the tenant's data bucket — first observed as a -// 403 against `https://s3.us-east-1.amazonaws.com/orgs/delta/_delta_log/_last_checkpoint` -// despite the IAM role granting s3:GetObject on the data bucket. The -// iceberg integration test doesn't need Delta (it tests -// iceberg-on-S3-Tables + DuckLake only), so we turn the probe off -// rather than chase the URL-construction bug here. Re-enable + chase -// the underlying delta path bug when this test grows a delta scenario. -func buildIcebergConfigStoreSeed(cfg icebergTestConfig) string { - return fmt.Sprintf(` -INSERT INTO duckgres_orgs (name, database_name, max_workers, memory_budget, idle_timeout_s, created_at, updated_at) -VALUES ('%s', '%s', 0, '', 0, NOW(), NOW()) -ON CONFLICT (name) DO UPDATE SET updated_at = NOW(); - -INSERT INTO duckgres_managed_warehouses ( - org_id, image, aurora_min_acu, aurora_max_acu, - warehouse_database_region, warehouse_database_endpoint, warehouse_database_port, - warehouse_database_database_name, warehouse_database_username, - metadata_store_kind, metadata_store_engine, metadata_store_region, - metadata_store_endpoint, metadata_store_port, metadata_store_database_name, metadata_store_username, - s3_provider, s3_region, s3_bucket, s3_path_prefix, s3_endpoint, s3_use_ssl, s3_url_style, - s3_delta_catalog_enabled, - iceberg_enabled, iceberg_backend, iceberg_table_bucket_arn, iceberg_region, iceberg_namespace, - worker_identity_namespace, worker_identity_service_account_name, worker_identity_iam_role_arn, - warehouse_database_credentials_namespace, warehouse_database_credentials_name, warehouse_database_credentials_key, - metadata_store_credentials_namespace, metadata_store_credentials_name, metadata_store_credentials_key, - s3_credentials_namespace, s3_credentials_name, s3_credentials_key, - runtime_config_namespace, runtime_config_name, runtime_config_key, - state, status_message, - warehouse_database_state, warehouse_database_status_message, - metadata_store_state, metadata_store_status_message, - s3_state, s3_status_message, - iceberg_state, iceberg_status_message, - identity_state, identity_status_message, - secrets_state, secrets_status_message, - ready_at, failed_at, created_at, updated_at -) VALUES ( - '%s', '', 0, 0, - '%s', 'local-warehouse-db', 5432, 'duckgres_local', 'duckgres', - 'dedicated_rds', 'postgres', '%s', - 'duckgres-local-ducklake-metadata', 5432, 'ducklake_metadata_iceberg', 'ducklake', - 'aws', '%s', '%s', 'orgs/iceberg-test/', - 's3.%s.amazonaws.com', true, 'vhost', - false, - true, 's3_tables', '%s', '%s', '%s', - 'duckgres', 'duckgres-local-worker', 'arn:aws:iam::000000000000:role/duckgres-iceberg-test', - 'duckgres', 'iceberg-test-warehouse-db', 'dsn', - 'duckgres', 'iceberg-test-metadata', 'dsn', - 'duckgres', 'iceberg-test-s3', 'credentials', - 'duckgres', 'iceberg-test-runtime', 'duckgres.yaml', - 'ready', 'iceberg integration test', - 'ready', '', 'ready', '', 'ready', '', - 'ready', 'iceberg bucket sandbox', - 'ready', '', 'ready', '', - NOW(), NULL, NOW(), NOW() -) ON CONFLICT (org_id) DO UPDATE SET - s3_region = EXCLUDED.s3_region, - s3_bucket = EXCLUDED.s3_bucket, - s3_endpoint = EXCLUDED.s3_endpoint, - s3_use_ssl = EXCLUDED.s3_use_ssl, - s3_url_style = EXCLUDED.s3_url_style, - s3_delta_catalog_enabled = EXCLUDED.s3_delta_catalog_enabled, - iceberg_enabled = EXCLUDED.iceberg_enabled, - iceberg_backend = EXCLUDED.iceberg_backend, - iceberg_table_bucket_arn = EXCLUDED.iceberg_table_bucket_arn, - iceberg_region = EXCLUDED.iceberg_region, - iceberg_namespace = EXCLUDED.iceberg_namespace, - iceberg_state = EXCLUDED.iceberg_state, - state = EXCLUDED.state, - updated_at = NOW(); - -INSERT INTO duckgres_org_users (username, password, org_id, created_at, updated_at) -VALUES ('%s', '%s', '%s', NOW(), NOW()) -ON CONFLICT (org_id, username) DO UPDATE SET password = EXCLUDED.password, updated_at = NOW(); -`, - icebergTenantName, icebergTenantName, - icebergTenantName, - cfg.region, cfg.region, - cfg.region, cfg.dataBucket, cfg.region, - cfg.tableBucketARN, cfg.region, cfg.namespace, - icebergTenantName, icebergTenantPasswordHash, icebergTenantName, - ) -} - -// applyConfigStoreSeedInline pipes a SQL string into the config-store -// container — mirrors applyConfigStoreSeedFixture but takes a string -// instead of a file, since the seed is parameterized by env-var values. -func applyConfigStoreSeedInline(sql string) error { - cmd := exec.Command( - "docker", "exec", "-i", configStoreContainer, - "psql", "-v", "ON_ERROR_STOP=1", "-U", "duckgres", "-d", "duckgres_config", - ) - cmd.Stdin = strings.NewReader(sql) - if out, err := cmd.CombinedOutput(); err != nil { - return fmt.Errorf("psql exec: %w: %s", err, strings.TrimSpace(string(out))) - } - return nil -} -