Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions cmd/duckgres-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ func main() {
duckLakeDeltaCatalogEnabled := flag.Bool("ducklake-delta-catalog-enabled", true, "Attach a Delta Lake catalog during DuckLake worker boot (default true; use --ducklake-delta-catalog-enabled=false to disable; env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_ENABLED)")
duckLakeDeltaCatalogPath := flag.String("ducklake-delta-catalog-path", "", "Delta Lake catalog/table path to attach (env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_PATH)")
duckLakeDefaultSpecVersion := flag.String("ducklake-default-spec-version", "", "Default DuckLake spec version for migration checks (env: DUCKGRES_DUCKLAKE_DEFAULT_SPEC_VERSION)")
icebergEnabled := flag.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (AWS S3 Tables) at session init (env: DUCKGRES_ICEBERG_ENABLED)")
icebergTableBucket := flag.String("iceberg-table-bucket", "", "Iceberg S3 Tables bucket ARN (env: DUCKGRES_ICEBERG_TABLE_BUCKET)")
icebergRegion := flag.String("iceberg-region", "", "AWS region for the Iceberg table bucket (env: DUCKGRES_ICEBERG_REGION)")
icebergEnabled := flag.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (Lakekeeper REST) at session init (env: DUCKGRES_ICEBERG_ENABLED)")
icebergRegion := flag.String("iceberg-region", "", "AWS region for S3 object access by Iceberg (env: DUCKGRES_ICEBERG_REGION)")
icebergNamespace := flag.String("iceberg-namespace", "", "Default Iceberg namespace (env: DUCKGRES_ICEBERG_NAMESPACE)")

// Query log
Expand Down Expand Up @@ -208,7 +207,6 @@ func main() {
DuckLakeDeltaCatalogPath: *duckLakeDeltaCatalogPath,
DuckLakeDefaultSpecVersion: *duckLakeDefaultSpecVersion,
IcebergEnabled: *icebergEnabled,
IcebergTableBucket: *icebergTableBucket,
IcebergRegion: *icebergRegion,
IcebergNamespace: *icebergNamespace,
QueryLog: *queryLog,
Expand Down
7 changes: 3 additions & 4 deletions configloader/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,9 @@ type RateLimitFileConfig struct {
// instance into Iceberg catalog attachment. In multi-tenant mode the
// equivalent values come from the per-warehouse configstore row, not YAML.
type IcebergFileConfig struct {
Enabled *bool `yaml:"enabled"`
TableBucket string `yaml:"table_bucket"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"`
Enabled *bool `yaml:"enabled"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"`
}

type DuckLakeFileConfig struct {
Expand Down
6 changes: 2 additions & 4 deletions configresolve/cliflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
duckLakeDeltaCatalogEnabled := fs.Bool("ducklake-delta-catalog-enabled", true, "Attach a Delta Lake catalog during DuckLake worker boot (default true; use --ducklake-delta-catalog-enabled=false to disable; env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_ENABLED)")
duckLakeDeltaCatalogPath := fs.String("ducklake-delta-catalog-path", "", "Delta Lake catalog/table path to attach, defaults to sibling delta/ prefix at DuckLake object-store root (env: DUCKGRES_DUCKLAKE_DELTA_CATALOG_PATH)")
duckLakeDefaultSpecVersion := fs.String("ducklake-default-spec-version", "", "Default DuckLake spec version for migration checks (env: DUCKGRES_DUCKLAKE_DEFAULT_SPEC_VERSION)")
icebergEnabled := fs.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (AWS S3 Tables) at session init (env: DUCKGRES_ICEBERG_ENABLED)")
icebergTableBucket := fs.String("iceberg-table-bucket", "", "Iceberg S3 Tables bucket ARN, e.g. arn:aws:s3tables:us-east-1:<acct>:bucket/<name> (env: DUCKGRES_ICEBERG_TABLE_BUCKET)")
icebergRegion := fs.String("iceberg-region", "", "AWS region for the Iceberg table bucket (default: us-east-1) (env: DUCKGRES_ICEBERG_REGION)")
icebergEnabled := fs.Bool("iceberg-enabled", false, "Attach a per-tenant Iceberg catalog (Lakekeeper REST) at session init (env: DUCKGRES_ICEBERG_ENABLED)")
icebergRegion := fs.String("iceberg-region", "", "AWS region for S3 object access by Iceberg (default: us-east-1) (env: DUCKGRES_ICEBERG_REGION)")
icebergNamespace := fs.String("iceberg-namespace", "", "Default Iceberg namespace (informational; default: main) (env: DUCKGRES_ICEBERG_NAMESPACE)")
processMinWorkers := fs.Int("process-min-workers", 0, "Pre-warm worker count at startup for process workers (control-plane mode) (env: DUCKGRES_PROCESS_MIN_WORKERS)")
processMaxWorkers := fs.Int("process-max-workers", 0, "Max process workers, 0=auto-derived (control-plane mode) (env: DUCKGRES_PROCESS_MAX_WORKERS)")
Expand Down Expand Up @@ -111,7 +110,6 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
cli.DuckLakeDeltaCatalogPath = *duckLakeDeltaCatalogPath
cli.DuckLakeDefaultSpecVersion = *duckLakeDefaultSpecVersion
cli.IcebergEnabled = *icebergEnabled
cli.IcebergTableBucket = *icebergTableBucket
cli.IcebergRegion = *icebergRegion
cli.IcebergNamespace = *icebergNamespace
cli.ProcessMinWorkers = *processMinWorkers
Expand Down
10 changes: 0 additions & 10 deletions configresolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type CLIInputs struct {
DuckLakeDeltaCatalogPath string
DuckLakeDefaultSpecVersion string
IcebergEnabled bool
IcebergTableBucket string
IcebergRegion string
IcebergNamespace string
ProcessMinWorkers int
Expand Down Expand Up @@ -313,9 +312,6 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
if fileCfg.Iceberg.Enabled != nil {
cfg.Iceberg.Enabled = *fileCfg.Iceberg.Enabled
}
if fileCfg.Iceberg.TableBucket != "" {
cfg.Iceberg.TableBucket = fileCfg.Iceberg.TableBucket
}
if fileCfg.Iceberg.Region != "" {
cfg.Iceberg.Region = fileCfg.Iceberg.Region
}
Expand Down Expand Up @@ -610,9 +606,6 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
warn("Invalid DUCKGRES_ICEBERG_ENABLED: " + err.Error())
}
}
if v := getenv("DUCKGRES_ICEBERG_TABLE_BUCKET"); v != "" {
cfg.Iceberg.TableBucket = v
}
if v := getenv("DUCKGRES_ICEBERG_REGION"); v != "" {
cfg.Iceberg.Region = v
}
Expand Down Expand Up @@ -1049,9 +1042,6 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
if cli.Set["iceberg-enabled"] {
cfg.Iceberg.Enabled = cli.IcebergEnabled
}
if cli.Set["iceberg-table-bucket"] {
cfg.Iceberg.TableBucket = cli.IcebergTableBucket
}
if cli.Set["iceberg-region"] {
cfg.Iceberg.Region = cli.IcebergRegion
}
Expand Down
1 change: 0 additions & 1 deletion controlplane/admin/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,6 @@ func managedWarehouseUpsertColumns() []string {
"s3_delta_catalog_enabled",
"s3_delta_catalog_path",
"iceberg_enabled",
"iceberg_table_bucket_arn",
"iceberg_region",
"iceberg_namespace",
"worker_identity_namespace",
Expand Down
7 changes: 3 additions & 4 deletions controlplane/configstore/iceberg_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import "testing"

func TestManagedWarehouseIceberg_ResolvedBackend(t *testing.T) {
cases := []struct {
name string
in ManagedWarehouseIceberg
want string
name string
in ManagedWarehouseIceberg
want string
}{
{"empty defaults to lakekeeper", ManagedWarehouseIceberg{}, IcebergBackendLakekeeper},
{"explicit lakekeeper", ManagedWarehouseIceberg{Backend: IcebergBackendLakekeeper}, IcebergBackendLakekeeper},
{"explicit s3_tables", ManagedWarehouseIceberg{Backend: IcebergBackendS3Tables}, IcebergBackendS3Tables},
{"unknown passthrough", ManagedWarehouseIceberg{Backend: "future"}, "future"},
}
for _, c := range cases {
Expand Down
43 changes: 17 additions & 26 deletions controlplane/configstore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,41 +175,33 @@ type ManagedWarehouseDuckLake struct {
Enabled bool `json:"enabled"`
}

// ManagedWarehouseIceberg captures per-org Iceberg catalog config. Two
// backends are supported, selected by Backend:
// ManagedWarehouseIceberg captures per-org Iceberg catalog config. The
// only supported backend is Lakekeeper: a per-org Lakekeeper instance
// vends the Iceberg REST catalog. The provisioner creates the Lakekeeper
// CR + a warehouse pointing at the org's existing S3 bucket (path
// <s3.path-prefix>/lakekeeper/<orgid>/) and persists the endpoint +
// OAuth2 client credentials back here. The worker activator reads these
// and emits a (TYPE ICEBERG, CLIENT_ID/CLIENT_SECRET/OAUTH2_SERVER_URI)
// DuckDB SECRET + ATTACH at session init.
//
// - "lakekeeper" (default): a per-org Lakekeeper instance vends the
// Iceberg REST catalog. The provisioner creates the Lakekeeper CR + a
// warehouse pointing at the org's existing S3 bucket (path
// <s3.path-prefix>/lakekeeper/<orgid>/) and persists the endpoint +
// OAuth2 client credentials back here. The worker activator reads
// these and emits a (TYPE ICEBERG, CLIENT_ID/CLIENT_SECRET/
// OAUTH2_SERVER_URI) DuckDB SECRET + ATTACH at session init.
//
// - "s3_tables" (legacy): provisioner controller sets spec.iceberg.enabled
// on the Duckling CR; the composition provisions a fresh S3 Tables
// bucket and writes TableBucketArn back here. Kept as a safety net /
// escape hatch — new orgs default to Lakekeeper.
// The Backend column is retained for forward-compat / observability; the
// legacy "s3_tables" value is no longer honored anywhere in the code path.
type ManagedWarehouseIceberg struct {
Enabled bool `json:"enabled"`

// Backend selects the catalog backend. Empty/unset is treated as
// "lakekeeper" by callers.
// Backend is retained for schema compat. Empty/unset is treated as
// "lakekeeper" by callers; any other value is also treated as
// "lakekeeper" since no other backend is implemented.
Backend string `gorm:"size:32;default:'lakekeeper'" json:"backend"`

// Namespace is the default Iceberg namespace inside the catalog. Used
// for both backends.
// Namespace is the default Iceberg namespace inside the catalog.
Namespace string `gorm:"size:255" json:"namespace"`

// Region applies to both backends (S3 region for S3 Tables; AWS region
// for the Lakekeeper warehouse storage profile).
// Region is the AWS region for the Lakekeeper warehouse storage profile.
Region string `gorm:"size:64" json:"region"`

// S3 Tables fields (Backend == "s3_tables"). Empty otherwise.
TableBucketArn string `gorm:"size:512" json:"table_bucket_arn,omitempty"`

// Lakekeeper fields (Backend == "lakekeeper"). Populated by the
// provisioner after the per-org Lakekeeper is ready.
// Lakekeeper fields. Populated by the provisioner after the per-org
// Lakekeeper is ready.
LakekeeperEndpoint string `gorm:"size:512" json:"lakekeeper_endpoint,omitempty"`

// LakekeeperWarehouse is the warehouse NAME (e.g. "org-acme"), not the
Expand All @@ -235,7 +227,6 @@ type ManagedWarehouseIceberg struct {
// IcebergBackend constants — string-typed to keep the GORM tag happy.
const (
IcebergBackendLakekeeper = "lakekeeper"
IcebergBackendS3Tables = "s3_tables"
)

// ManagedWarehouseTrino captures per-org opt-in for the customer-facing Trino
Expand Down
50 changes: 0 additions & 50 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ func NewConfigStore(connStr string, pollInterval time.Duration) (*ConfigStore, e
if err := migrateDeltaCatalogDefaultEnabled(db); err != nil {
return nil, fmt.Errorf("migrate delta catalog default: %w", err)
}
if err := migrateIcebergBackendBackfill(db); err != nil {
return nil, fmt.Errorf("migrate iceberg backend backfill: %w", err)
}

runtimeSchema, err := resolveRuntimeSchema(db)
if err != nil {
return nil, fmt.Errorf("resolve runtime schema: %w", err)
Expand Down Expand Up @@ -870,52 +866,6 @@ func migrateDeltaCatalogDefaultEnabled(db *gorm.DB) error {
})
}

// migrateIcebergBackendBackfill stamps iceberg_backend = 's3_tables' on any
// existing row whose iceberg_table_bucket_arn is populated but whose
// iceberg_backend is empty. Without this, ResolvedBackend() on those rows
// would return "lakekeeper" (the empty-default semantics) and the worker
// activator + controller reconcile loop would treat them as Lakekeeper
// orgs — silently breaking S3 Tables ATTACH AND firing redundant
// Lakekeeper provisioning attempts.
//
// New rows inserted after the GORM `default:'lakekeeper'` migration get
// the default at insert time, so this migration only matters for rows
// that predate the column. One-shot, tracked in duckgres_schema_migrations.
const icebergBackendBackfillMigrationName = "2026_05_iceberg_backend_backfill"

func migrateIcebergBackendBackfill(db *gorm.DB) error {
var existing SchemaMigration
err := db.Where("name = ?", icebergBackendBackfillMigrationName).First(&existing).Error
if err == nil {
return nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("check schema migration: %w", err)
}
return db.Transaction(func(tx *gorm.DB) error {
// Backfill rows that have a populated S3 Tables ARN but no
// explicit backend — these are pre-feature rows whose semantics
// would change without this stamp.
res := tx.Exec(
`UPDATE duckgres_managed_warehouses
SET iceberg_backend = 's3_tables'
WHERE COALESCE(iceberg_table_bucket_arn, '') <> ''
AND COALESCE(iceberg_backend, '') = ''`,
)
if res.Error != nil {
return fmt.Errorf("backfill iceberg_backend: %w", res.Error)
}
if err := tx.Create(&SchemaMigration{
Name: icebergBackendBackfillMigrationName,
AppliedAt: time.Now().UTC(),
}).Error; err != nil {
return fmt.Errorf("record schema migration: %w", err)
}
slog.Info("Backfilled iceberg_backend=s3_tables on pre-Lakekeeper rows.",
"rows", res.RowsAffected)
return nil
})
}

func migrateOrgUserPK(db *gorm.DB) error {
// Check if the PK already has 2 columns (idempotent)
Expand Down
4 changes: 0 additions & 4 deletions controlplane/org_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/posthog/duckgres/controlplane/configstore"
"github.com/posthog/duckgres/controlplane/provisioner"
"github.com/posthog/duckgres/server"
"github.com/posthog/duckgres/server/iceberg"
)

// OrgStack holds the isolated worker pool and session manager for an org.
Expand Down Expand Up @@ -225,9 +224,6 @@ func (tr *OrgRouter) IcebergConfigForOrg(orgID string) (server.IcebergConfig, bo
LakekeeperClientID: src.LakekeeperClientID,
LakekeeperOAuth2ServerURI: src.LakekeeperOAuth2ServerURI,
}
if cfg.ResolvedBackend() == iceberg.BackendS3Tables {
cfg.TableBucket = src.TableBucketArn
}
return cfg, true
}

Expand Down
94 changes: 5 additions & 89 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,11 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M
updates["identity_state"] = configstore.ManagedWarehouseStateReady
}

// Iceberg is opt-in per warehouse — only track its state when enabled.
// When the composition writes status.iceberg.tableBucketArn, the bucket
// (Workspace) is reconciled and we persist the ARN + region back to the
// configstore so the worker activator can feed it into IcebergConfig.
addIcebergStatusUpdates(updates, w, status)
// Iceberg readiness for the Lakekeeper backend is owned by
// reconcileLakekeeper, which writes iceberg_state=Ready directly once
// the per-org Lakekeeper warehouse is provisioned. Nothing here needs
// to propagate from the Crossplane Duckling status — the Lakekeeper
// provisioner is the source of truth.

// Infrastructure is ready when all components are provisioned AND the
// Crossplane Ready condition is True. The Ready condition ensures all
Expand Down Expand Up @@ -440,70 +440,12 @@ func (c *Controller) reconcileReady(ctx context.Context, w *configstore.ManagedW
}
}

// Propagate the Duckling's status.iceberg.tableBucketArn back to the
// configstore even after the warehouse has transitioned to Ready. This
// covers the late-enable case: the iceberg block can be flipped on via
// the admin API long after the warehouse first became Ready, and the
// Crossplane composition will only emit the bucket Workspace + populate
// status.iceberg.tableBucketArn after that flip. Without this, the ARN
// would only land in the configstore if the warehouse was still in
// Provisioning when the composition completed — never on a re-enable.
if w.Iceberg.Enabled {
// TODO: this is the third c.duckling.Get-equivalent call in this
// function (after GetPgBouncerEnabled + GetIcebergEnabled). All
// three parse the same CR. Worth consolidating to a single fetch
// once another caller adds a fourth — for now the extra
// round-trips only hit warehouses where iceberg is enabled.
status, err := c.duckling.Get(ctx, w.OrgID)
if err != nil {
log.Warn("Failed to read Duckling status for iceberg propagation.", "error", err)
return
}
updates := map[string]interface{}{}
addIcebergStatusUpdates(updates, w, status)
if len(updates) > 0 {
if err := c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateReady, updates); err != nil {
log.Warn("Failed to persist iceberg status to configstore.", "error", err)
} else {
log.Info("Iceberg status persisted to configstore.", "updates", updates)
// Mirror the persisted updates onto the in-memory w so
// downstream reconcile steps (notably reconcileLakekeeper)
// see the fresh values without a re-read round-trip.
applyIcebergUpdatesToWarehouse(w, updates)
}
}
}

// Lakekeeper reconcile is independent of the Duckling state machine —
// provisioning a Lakekeeper for an org doesn't depend on, and doesn't
// block, the warehouse top-level state.
c.reconcileLakekeeper(ctx, w)
}

// applyIcebergUpdatesToWarehouse mirrors the column-update map onto the
// in-memory warehouse struct so subsequent reconcile steps in the same
// tick see the values that were just persisted, without a second store
// read. Mirrors the column → field mapping used by the configstore's
// UpdateWarehouseState GORM call (which writes by column name).
//
// Only the iceberg_* columns reconcileReady actually writes are handled
// here; if a future caller passes other column names through this
// function, extend the switch alongside.
func applyIcebergUpdatesToWarehouse(w *configstore.ManagedWarehouse, updates map[string]interface{}) {
for k, v := range updates {
switch k {
case "iceberg_table_bucket_arn":
w.Iceberg.TableBucketArn = v.(string)
case "iceberg_region":
w.Iceberg.Region = v.(string)
case "iceberg_namespace":
w.Iceberg.Namespace = v.(string)
case "iceberg_state":
w.IcebergState = v.(configstore.ManagedWarehouseProvisioningState)
}
}
}

// reconcileLakekeeper provisions a per-org Lakekeeper instance when the
// warehouse selects the lakekeeper backend and isn't yet provisioned. Called
// from both reconcileProvisioning (initial turn-up — required for cnpg-shard,
Expand Down Expand Up @@ -552,32 +494,6 @@ func (c *Controller) reconcileLakekeeper(ctx context.Context, w *configstore.Man
log.Info("Lakekeeper provisioning completed.")
}

// addIcebergStatusUpdates copies the Duckling's reported iceberg status
// (ARN, region, namespace) into the configstore update map when fields
// differ. Idempotent. Called from both reconcileProvisioning (initial
// turn-up) and reconcileReady (late iceberg enable on an existing
// warehouse). Without the reconcileReady call site, a warehouse that
// became Ready before iceberg was opted in would never get its ARN
// propagated to the configstore — the worker activator would then run
// AttachIcebergCatalog with an empty TableBucket and skip the attach.
func addIcebergStatusUpdates(updates map[string]interface{}, w *configstore.ManagedWarehouse, status *DucklingStatus) {
if !w.Iceberg.Enabled || status.Iceberg.TableBucketArn == "" {
return
}
if w.Iceberg.TableBucketArn != status.Iceberg.TableBucketArn {
updates["iceberg_table_bucket_arn"] = status.Iceberg.TableBucketArn
}
if w.Iceberg.Region != status.Iceberg.Region && status.Iceberg.Region != "" {
updates["iceberg_region"] = status.Iceberg.Region
}
if w.Iceberg.Namespace != status.Iceberg.NamespaceName && status.Iceberg.NamespaceName != "" {
updates["iceberg_namespace"] = status.Iceberg.NamespaceName
}
if w.IcebergState != configstore.ManagedWarehouseStateReady {
updates["iceberg_state"] = configstore.ManagedWarehouseStateReady
}
}

func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.ManagedWarehouse) {
log := slog.With("org", w.OrgID, "phase", "deleting")

Expand Down
Loading
Loading