diff --git a/configresolve/cliflags.go b/configresolve/cliflags.go index 6fe2e5c9..aecf58fa 100644 --- a/configresolve/cliflags.go +++ b/configresolve/cliflags.go @@ -62,7 +62,7 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs { configStoreConn := fs.String("config-store", "", "PostgreSQL connection string for config store (env: DUCKGRES_CONFIG_STORE)") configPollInterval := fs.String("config-poll-interval", "", "How often to poll config store for changes (default: 30s) (env: DUCKGRES_CONFIG_POLL_INTERVAL)") internalSecret := fs.String("internal-secret", "", "Shared secret for API authentication (env: DUCKGRES_INTERNAL_SECRET)") - sniRoutingMode := fs.String("sni-routing-mode", "", "Hostname-based org routing: 'off' (default), 'passthrough' (validate managed SNI against requested db; use SNI only when db is empty; log legacy), 'enforce' (require managed SNI and same-org db). Multi-tenant only. (env: DUCKGRES_SNI_ROUTING_MODE)") + sniRoutingMode := fs.String("sni-routing-mode", "", "Hostname-based org routing: 'enforce' (default; require a managed SNI hostname that resolves to an org — the database name selects the catalog, not the org), 'passthrough' (warn on legacy hostnames), 'off' (no SNI handling; identity can no longer be resolved). Multi-tenant only. (env: DUCKGRES_SNI_ROUTING_MODE)") managedHostnameSuffixes := fs.String("managed-hostname-suffixes", "", "Comma-separated DNS suffixes (each starting with '.') for managed tenant hostnames, e.g. '.dw.us.postwh.com'. (env: DUCKGRES_MANAGED_HOSTNAME_SUFFIXES)") workerBackend := fs.String("worker-backend", "", "Worker backend: process (default) or remote for config-store-backed K8s multitenant mode (env: DUCKGRES_WORKER_BACKEND)") k8sWorkerImage := fs.String("k8s-worker-image", "", "Container image for K8s worker pods (env: DUCKGRES_K8S_WORKER_IMAGE)") diff --git a/configresolve/resolve.go b/configresolve/resolve.go index a37a76d0..d960a6f0 100644 --- a/configresolve/resolve.go +++ b/configresolve/resolve.go @@ -1125,6 +1125,13 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu if cli.Set["sni-routing-mode"] { sniRoutingMode = cli.SNIRoutingMode } + // Identity in multi-tenant mode is derived solely from the managed hostname + // (SNI) + username; the database name no longer routes. Default to enforcing + // managed SNI so unresolvable hostnames are rejected. (Consulted only on the + // configStore-backed multi-tenant path; standalone/process backends ignore it.) + if sniRoutingMode == "" { + sniRoutingMode = "enforce" + } if cli.Set["managed-hostname-suffixes"] { managedHostnameSuffixes = splitAndTrim(cli.ManagedHostnameSuffixes, ",") } diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index d3b78934..b64e2899 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -45,22 +45,42 @@ type Snapshot struct { QueryLog QueryLogConfig } +// Selectable catalog names. The startup `database` param now names the catalog +// a session defaults to rather than identifying the org — these are the only +// non-empty values a client may request. +const ( + catalogDuckLake = "ducklake" + catalogIceberg = "iceberg" +) + // PostgresConnectionResolution is the result of resolving and authenticating a // Postgres startup packet against one immutable config snapshot. +// +// Identity (OrgID) comes solely from the managed hostname (SNI) plus the +// username/password; the startup `database` param is treated as catalog +// selection, not identity. type PostgresConnectionResolution struct { - EffectiveDatabase string - OrgID string - SNIOrgID string - SNIDatabase string - SNIAliasUsed bool - UsedSNIDatabase bool - RequiresSNIOrgMatch bool - SNIResolved bool - DatabaseExists bool - HostnameMatches bool - Valid bool - Passthrough bool - DefaultCatalog string + // OrgID is the organization the connection belongs to, resolved from the + // managed hostname (SNI). Empty unless SNIResolved. + OrgID string + // SNIOrgID mirrors OrgID; kept distinct for log/observability parity. + SNIOrgID string + // SNIAliasUsed reports whether the hostname matched via hostname_alias. + SNIAliasUsed bool + // SNIResolved is true when the managed hostname resolved to a known org. + SNIResolved bool + // EffectiveCatalog is the catalog the session should default to, selected by + // the startup `database` param: "" (use the per-user/attached default), + // "ducklake", or "iceberg". + EffectiveCatalog string + // CatalogValid is false when the requested `database` is not a selectable + // catalog name (anything other than "", "ducklake", "iceberg"). + CatalogValid bool + // Valid is true when (OrgID, username, password) authenticated. + Valid bool + // Passthrough / DefaultCatalog are the per-user flags for the resolved user. + Passthrough bool + DefaultCatalog string } // ConfigStore manages configuration stored in a PostgreSQL database. @@ -359,9 +379,21 @@ func isDNSLabel(label string) bool { } func (cs *ConfigStore) ResolvePostgresConnection(startupDatabase, sniPrefix string, useManagedSNI bool, username, password string) PostgresConnectionResolution { - result := PostgresConnectionResolution{ - EffectiveDatabase: startupDatabase, - HostnameMatches: true, + result := PostgresConnectionResolution{} + + // The startup `database` param is now pure catalog selection, not identity. + // Valid values: "" (use the per-user/attached default), "ducklake", or + // "iceberg". Anything else fails closed — there is no logical-name masking, + // so an arbitrary name no longer routes anywhere. + switch strings.ToLower(strings.TrimSpace(startupDatabase)) { + case "": + result.CatalogValid = true + case catalogDuckLake: + result.EffectiveCatalog = catalogDuckLake + result.CatalogValid = true + case catalogIceberg: + result.EffectiveCatalog = catalogIceberg + result.CatalogValid = true } cs.mu.RLock() @@ -370,38 +402,26 @@ func (cs *ConfigStore) ResolvePostgresConnection(startupDatabase, sniPrefix stri return result } - if useManagedSNI { - result.RequiresSNIOrgMatch = true - result.SNIOrgID, result.SNIDatabase, result.SNIAliasUsed = resolveSNIPrefixFromSnapshot(cs.snapshot, sniPrefix) - result.SNIResolved = result.SNIOrgID != "" - if result.SNIDatabase == "" { - result.SNIDatabase = sniPrefix - } - if startupDatabase == "" { - result.EffectiveDatabase = result.SNIDatabase - result.UsedSNIDatabase = true - } - } - - if result.EffectiveDatabase == "" { + // Identity comes from the managed hostname (SNI) only. Without a managed, + // resolvable hostname there is no org to authenticate against — the database + // name is no longer consulted for routing. + if !useManagedSNI { return result } - - orgID := cs.snapshot.DatabaseOrg[result.EffectiveDatabase] + orgID, _, aliasUsed := resolveSNIPrefixFromSnapshot(cs.snapshot, sniPrefix) if orgID == "" { return result } - result.DatabaseExists = true + result.SNIResolved = true + result.SNIAliasUsed = aliasUsed + result.SNIOrgID = orgID result.OrgID = orgID - if result.RequiresSNIOrgMatch && result.SNIOrgID != orgID { - result.HostnameMatches = false - return result - } - + // Authenticate the user within the resolved org. key := OrgUserKey{OrgID: orgID, Username: username} storedHash, ok := cs.snapshot.OrgUserPassword[key] if !ok { + // Timing-leak guard: still spend bcrypt time on unknown users. _ = bcrypt.CompareHashAndPassword([]byte("$2a$10$000000000000000000000000000000000000000000000000000000"), []byte(password)) return result } @@ -498,26 +518,6 @@ func (cs *ConfigStore) ValidateOrgUserAndGetPassthrough(orgID, username, passwor return true, cs.snapshot.OrgUserPassthrough[key] } -// FindAndValidateUser scans all orgs to find and authenticate a user by username/password. -// This is used for Flight SQL which doesn't have SNI-based org routing. -func (cs *ConfigStore) FindAndValidateUser(username, password string) (string, bool) { - cs.mu.RLock() - defer cs.mu.RUnlock() - if cs.snapshot == nil { - return "", false - } - for key, storedHash := range cs.snapshot.OrgUserPassword { - if key.Username == username { - if bcrypt.CompareHashAndPassword([]byte(storedHash), []byte(password)) == nil { - return key.OrgID, true - } - return "", false - } - } - _ = bcrypt.CompareHashAndPassword([]byte("$2a$10$000000000000000000000000000000000000000000000000000000"), []byte(password)) - return "", false -} - // HashPassword hashes a plaintext password using bcrypt. func HashPassword(password string) (string, error) { hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) diff --git a/controlplane/configstore/store_test.go b/controlplane/configstore/store_test.go index 66e44aa5..5d1e0bf4 100644 --- a/controlplane/configstore/store_test.go +++ b/controlplane/configstore/store_test.go @@ -301,103 +301,90 @@ func TestResolvePostgresConnection(t *testing.T) { }, } - t.Run("explicit database must match managed SNI org", func(t *testing.T) { + t.Run("org resolved from SNI; ducklake catalog selected", func(t *testing.T) { got := cs.ResolvePostgresConnection( - "test_org_smoke_1778167994", + "ducklake", "test-org-smoke-1778167994", true, "root", "secret", ) - if got.EffectiveDatabase != "test_org_smoke_1778167994" || got.OrgID != "test-org-smoke-1778167994" { - t.Fatalf("effective route = (%q, %q), want explicit db/org", got.EffectiveDatabase, got.OrgID) + if !got.SNIResolved || got.OrgID != "test-org-smoke-1778167994" { + t.Fatalf("org = (resolved=%v, %q), want test org from SNI: %+v", got.SNIResolved, got.OrgID, got) } - if got.UsedSNIDatabase { - t.Fatalf("explicit database should take priority over SNI fallback") + if !got.CatalogValid || got.EffectiveCatalog != "ducklake" { + t.Fatalf("catalog = (valid=%v, %q), want ducklake: %+v", got.CatalogValid, got.EffectiveCatalog, got) } - if !got.DatabaseExists || !got.HostnameMatches || !got.Valid || !got.Passthrough { - t.Fatalf("unexpected result: %+v", got) + if !got.Valid || !got.Passthrough { + t.Fatalf("unexpected auth result: %+v", got) } }) - t.Run("empty database falls back to managed SNI", func(t *testing.T) { - got := cs.ResolvePostgresConnection( - "", - "test-org-smoke-1778167994", - true, - "root", - "secret", - ) - if got.EffectiveDatabase != "test_org_smoke_1778167994" || !got.UsedSNIDatabase { - t.Fatalf("SNI fallback result = (%q, used=%v), want test db/used", got.EffectiveDatabase, got.UsedSNIDatabase) + t.Run("iceberg catalog selected", func(t *testing.T) { + got := cs.ResolvePostgresConnection("iceberg", "test-org-smoke-1778167994", true, "root", "secret") + if !got.CatalogValid || got.EffectiveCatalog != "iceberg" { + t.Fatalf("catalog = (valid=%v, %q), want iceberg: %+v", got.CatalogValid, got.EffectiveCatalog, got) } - if !got.DatabaseExists || !got.HostnameMatches || !got.Valid { - t.Fatalf("unexpected result: %+v", got) + if !got.Valid { + t.Fatalf("expected valid auth: %+v", got) } }) - t.Run("two existing orgs mismatch is rejected before auth", func(t *testing.T) { - got := cs.ResolvePostgresConnection( - "test_org_smoke_1778167994", - "billing", - true, - "root", - "secret", - ) - if !got.DatabaseExists { - t.Fatalf("expected requested database to exist: %+v", got) + t.Run("empty database means use the default catalog", func(t *testing.T) { + got := cs.ResolvePostgresConnection("", "test-org-smoke-1778167994", true, "root", "secret") + if !got.CatalogValid || got.EffectiveCatalog != "" { + t.Fatalf("catalog = (valid=%v, %q), want empty/use-default: %+v", got.CatalogValid, got.EffectiveCatalog, got) } - if got.HostnameMatches { - t.Fatalf("expected SNI org billing to mismatch requested database org: %+v", got) - } - if got.Valid { - t.Fatalf("mismatched managed hostname must not authenticate: %+v", got) + if !got.Valid { + t.Fatalf("expected valid auth: %+v", got) } }) - t.Run("unknown managed SNI with explicit database is rejected before auth", func(t *testing.T) { - got := cs.ResolvePostgresConnection( - "test_org_smoke_1778167994", - "ghostorg", - true, - "root", - "secret", - ) - if !got.DatabaseExists { - t.Fatalf("expected requested database to exist: %+v", got) + t.Run("legacy database name is no longer a valid catalog", func(t *testing.T) { + // The org's old database_name is not "ducklake"/"iceberg", so it fails the + // catalog check even though SNI+auth would otherwise succeed. + got := cs.ResolvePostgresConnection("test_org_smoke_1778167994", "test-org-smoke-1778167994", true, "root", "secret") + if got.CatalogValid { + t.Fatalf("legacy database name must not be a selectable catalog: %+v", got) } - if got.HostnameMatches { - t.Fatalf("expected unknown managed SNI to mismatch requested database org: %+v", got) + }) + + t.Run("unknown managed SNI does not resolve an org", func(t *testing.T) { + got := cs.ResolvePostgresConnection("ducklake", "ghostorg", true, "root", "secret") + if got.SNIResolved || got.OrgID != "" { + t.Fatalf("unknown SNI must not resolve an org: %+v", got) } if got.Valid { - t.Fatalf("unknown managed SNI must not authenticate: %+v", got) + t.Fatalf("unknown SNI must not authenticate: %+v", got) } }) - t.Run("unknown SNI fallback keeps database-style error target", func(t *testing.T) { - got := cs.ResolvePostgresConnection( - "", - "ghostorg", - true, - "root", - "secret", - ) - if got.EffectiveDatabase != "ghostorg" || got.DatabaseExists { - t.Fatalf("unknown SNI fallback = (%q, exists=%v), want ghostorg missing", got.EffectiveDatabase, got.DatabaseExists) + t.Run("identity requires managed SNI", func(t *testing.T) { + got := cs.ResolvePostgresConnection("ducklake", "test-org-smoke-1778167994", false, "root", "secret") + if got.SNIResolved || got.Valid { + t.Fatalf("without managed SNI there is no identity: %+v", got) + } + // Catalog validation is independent of identity. + if !got.CatalogValid || got.EffectiveCatalog != "ducklake" { + t.Fatalf("catalog should still validate: %+v", got) + } + }) + + t.Run("wrong password fails auth but resolves org", func(t *testing.T) { + got := cs.ResolvePostgresConnection("ducklake", "test-org-smoke-1778167994", true, "root", "wrong") + if !got.SNIResolved || got.Valid { + t.Fatalf("expected resolved org but failed auth: %+v", got) } }) t.Run("valid user includes configured default catalog", func(t *testing.T) { - got := cs.ResolvePostgresConnection( - "billing_db", - "billing-alias", - true, - "root", - "secret", - ) + got := cs.ResolvePostgresConnection("", "billing-alias", true, "root", "secret") if !got.Valid { t.Fatalf("expected valid auth: %+v", got) } + if got.OrgID != "billing" { + t.Fatalf("OrgID = %q, want billing (via hostname alias)", got.OrgID) + } if got.DefaultCatalog != "iceberg" { t.Fatalf("DefaultCatalog = %q, want iceberg", got.DefaultCatalog) } diff --git a/controlplane/control.go b/controlplane/control.go index fb375169..7ea7cd8b 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -180,7 +180,6 @@ type ConfigStoreInterface interface { // window where the snapshot could swap between two separate calls. // passthrough is always false when valid is false. ValidateOrgUserAndGetPassthrough(orgID, username, password string) (valid, passthrough bool) - FindAndValidateUser(username, password string) (orgID string, ok bool) // for Flight SQL (no database param) // OrgWarehouseStatus reports an org's current warehouse provisioning state so // connection-time errors can distinguish "no such org" from "warehouse not // ready yet". Returns (state, orgExists). state is "" when the org has no @@ -703,9 +702,9 @@ func sessionCreationErrorResponse(err error) (code string, message string) { // SNI routing modes (values for ControlPlaneConfig.SNIRoutingMode). const ( - SNIRoutingOff = "off" // ignore SNI entirely (default) - SNIRoutingPassthrough = "passthrough" // validate managed SNI, fallback only when database is empty - SNIRoutingEnforce = "enforce" // require managed SNI and same-org database routing + SNIRoutingOff = "off" // ignore SNI entirely; identity can no longer be resolved + SNIRoutingPassthrough = "passthrough" // require managed SNI but warn on legacy hostnames + SNIRoutingEnforce = "enforce" // default: require a managed SNI hostname that resolves to an org ) type postgresSNIResolution struct { @@ -901,77 +900,61 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { password := string(bytes.TrimRight(body, "\x00")) - // Authenticate - // In multi-tenant mode, the database name maps to an org. - // User uniqueness is scoped to the org. + // Authenticate. + // In multi-tenant mode the org is resolved solely from the managed hostname + // (SNI); the user is authenticated within that org. The startup `database` + // param no longer identifies the org — it selects which attached catalog + // (ducklake/iceberg) the session defaults to. var ( - orgID string - passthroughUser bool - defaultCatalog string + orgID string + passthroughUser bool + defaultCatalog string + requestedCatalog string // "" | "ducklake" | "iceberg" (validated below) ) if cp.configStore != nil { - // Resolve the effective database name based on SNI routing mode. - // "off" or unknown - legacy: use the startup `database` param. - // "passthrough" - use startup `database` first; for managed SNI, - // require the hostname and database to resolve to the - // same org. If the startup database is empty, fall back - // to the SNI-derived database. Non-managed hostnames - // still fall back to legacy routing with a warning. - // "enforce" - require managed SNI and same-org validation, while - // still preferring explicit startup `database`. sni := tlsConn.ConnectionState().ServerName sniResolution := cp.resolvePostgresSNI(cp.cfg.SNIRoutingMode, sni) - switch cp.cfg.SNIRoutingMode { - case SNIRoutingEnforce: - if !sniResolution.isManaged { - hint := cp.managedHostnameHint() - slog.Warn("Postgres connection rejected: SNI does not match a managed hostname.", - "sni", sni, "expected", hint, "remote_addr", remoteAddr, "user", username, "application_name", applicationName) - _ = server.WriteErrorResponse(writer, "FATAL", "08006", - fmt.Sprintf("this server requires connecting via %s", hint)) - _ = writer.Flush() - return - } - case SNIRoutingPassthrough: - if !sniResolution.isManaged && sni == "" { - slog.Warn("Postgres client connected without SNI; please migrate to a managed hostname.", - "expected", cp.managedHostnameHint(), "remote_addr", remoteAddr, "database", database, "user", username, "application_name", applicationName) - } else if !sniResolution.isManaged { - slog.Warn("Postgres client using legacy hostname; please migrate to a managed hostname.", - "sni", sni, "expected", cp.managedHostnameHint(), "remote_addr", remoteAddr, "database", database, "user", username, "application_name", applicationName) - } - default: // SNIRoutingOff or unset — legacy behavior, no SNI handling + if cp.cfg.SNIRoutingMode != SNIRoutingEnforce && cp.cfg.SNIRoutingMode != SNIRoutingPassthrough { + // Identity now comes solely from the managed hostname. The legacy + // database→org routing is gone, so an org cannot be resolved without + // SNI routing enabled. Warn loudly — this is a misconfiguration. + slog.Warn("Postgres connection: SNI routing disabled but identity now requires a managed hostname; set sni_routing_mode=enforce.", + "mode", cp.cfg.SNIRoutingMode, "remote_addr", remoteAddr, "user", username, "application_name", applicationName) + } + if !sniResolution.isManaged { + hint := cp.managedHostnameHint() + slog.Warn("Postgres connection rejected: SNI does not match a managed hostname.", + "sni", sni, "expected", hint, "remote_addr", remoteAddr, "user", username, "application_name", applicationName) + _ = server.WriteErrorResponse(writer, "FATAL", "08006", + fmt.Sprintf("this server requires connecting via %s", hint)) + _ = writer.Flush() + return } resolution := cp.configStore.ResolvePostgresConnection(database, sniResolution.sniPrefix, sniResolution.useManagedSNI, username, password) - if sniResolution.useManagedSNI && resolution.SNIResolved { + if resolution.SNIResolved { observeSNIRoutingResolution("postgres", resolution.SNIAliasUsed) } - effectiveDatabase := resolution.EffectiveDatabase - if effectiveDatabase == "" { - slog.Warn("Connection rejected: no database specified.", "remote_addr", remoteAddr) - _ = server.WriteErrorResponse(writer, "FATAL", "28000", "database name is required") + if !resolution.SNIResolved { + slog.Warn("Postgres connection rejected: managed hostname does not resolve to a known organization.", + "sni", sni, "sni_prefix", sniResolution.sniPrefix, "remote_addr", remoteAddr, "user", username, "application_name", applicationName) + _ = server.WriteErrorResponse(writer, "FATAL", "08006", + fmt.Sprintf("this server requires connecting via %s", cp.managedHostnameHint())) _ = writer.Flush() return } - if !resolution.DatabaseExists { - slog.Warn("Unknown database.", "database", effectiveDatabase, "remote_addr", remoteAddr) - _ = server.WriteErrorResponse(writer, "FATAL", "3D000", fmt.Sprintf("database %q does not exist", effectiveDatabase)) - _ = writer.Flush() - return - } - if !resolution.HostnameMatches { - slog.Warn("Postgres connection rejected: requested database does not match managed hostname.", - "sni", sni, "sni_prefix", sniResolution.sniPrefix, "sni_org", resolution.SNIOrgID, - "database", effectiveDatabase, "database_org", resolution.OrgID, "remote_addr", remoteAddr, - "user", username, "application_name", applicationName) - _ = server.WriteErrorResponse(writer, "FATAL", "28000", - "requested database does not match managed hostname") + if !resolution.CatalogValid { + // The startup `database` is now a catalog selector; only + // "ducklake"/"iceberg"/empty are valid. No logical-name masking. + slog.Warn("Postgres connection rejected: requested database is not a selectable catalog.", + "database", database, "org", resolution.OrgID, "remote_addr", remoteAddr, "user", username) + _ = server.WriteErrorResponse(writer, "FATAL", "3D000", + fmt.Sprintf("database %q does not exist (connect with \"ducklake\" or \"iceberg\")", database)) _ = writer.Flush() return } if !resolution.Valid { - slog.Warn("Authentication failed.", "user", username, "org", resolution.OrgID, "database", effectiveDatabase, "remote_addr", remoteAddr) + slog.Warn("Authentication failed.", "user", username, "org", resolution.OrgID, "database", database, "remote_addr", remoteAddr) banned := server.RecordFailedAuthAttempt(cp.rateLimiter, remoteAddr) if banned { slog.Warn("IP banned after too many failed auth attempts.", "remote_addr", remoteAddr) @@ -983,11 +966,10 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { orgID = resolution.OrgID passthroughUser = resolution.Passthrough defaultCatalog = resolution.DefaultCatalog - // From here on, `database` reflects the effective routing database. - // This is what gets passed to the worker as the logical database - // (drives the `current_database()` macro and pg_database view) so - // observability surfaces the actual routing decision. - database = effectiveDatabase + requestedCatalog = resolution.EffectiveCatalog + // `database` is finalized post-session to the real catalog the session + // defaults to (once worker attachment is known), so logs and the + // current_database() macro surface the actual catalog. } else { // Single-tenant: static users map if !server.ValidateUserPassword(cp.cfg.Users, username, password) { @@ -1126,47 +1108,87 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { } }() - // Passthrough users skip pg_catalog initialization and logical-catalog - // mapping — they bypass the PG compatibility layer entirely. They still - // need a default catalog, though: without one the worker session stays in - // DuckDB's empty in-memory catalog, so current_database() reports "memory" - // and unqualified DDL/DML never reaches the warehouse (see the passthrough - // branch below). - var duckLakeAttached bool + // Probe which catalogs the worker actually attached for this session, then + // resolve the real catalog the session defaults to. The startup `database` + // selected "ducklake"/"iceberg"/"" (default); fail closed (3D000) if the + // requested catalog isn't attached. + attachCtx, attachCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout) + duckLakeAttached, dlErr := sessionmeta.HasAttachedCatalog(attachCtx, executor, physicalDuckLakeCatalog) + icebergAttached, icErr := sessionmeta.HasAttachedCatalog(attachCtx, executor, physicalIcebergCatalog) + attachCancel() + probeErr := dlErr + if probeErr == nil { + probeErr = icErr + } + if probeErr != nil { + slog.Error("Failed to detect attached catalogs.", "user", username, "org", orgID, "remote_addr", remoteAddr, "error", probeErr, "worker", workerID, "worker_pod", workerPod) + _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to detect attached catalogs") + _ = writer.Flush() + return + } + var effectiveCatalog string + if cp.configStore != nil { + var ok bool + effectiveCatalog, ok = resolveEffectiveCatalog(requestedCatalog, defaultCatalog, duckLakeAttached, icebergAttached) + if !ok { + slog.Warn("Postgres connection rejected: requested catalog is not available for this connection.", + "requested", requestedCatalog, "org", orgID, "ducklake_attached", duckLakeAttached, "iceberg_attached", icebergAttached, "remote_addr", remoteAddr, "user", username) + msg := "no catalog is available for this connection" + if requestedCatalog != "" { + msg = fmt.Sprintf("database %q does not exist", requestedCatalog) + } + _ = server.WriteErrorResponse(writer, "FATAL", "3D000", msg) + _ = writer.Flush() + return + } + } else { + // Single-tenant (process backend / static users): de-mask to the real + // attached catalog when present; otherwise keep the client's database name + // (plain DuckDB, no masking concern). No catalog-selection rejection here. + switch { + case duckLakeAttached: + effectiveCatalog = physicalDuckLakeCatalog + case icebergAttached: + effectiveCatalog = physicalIcebergCatalog + default: + effectiveCatalog = database + } + } + // `database` now reflects the real catalog the session defaults to — this is + // what drives the current_database() macro/pg_database view and what logs and + // observability surface. + database = effectiveCatalog + + // Passthrough users skip pg_catalog initialization and the catalog USE + // rewriting — they bypass the PG compatibility layer entirely. They still + // need their selected catalog as the session default, though: without one the + // worker session stays in DuckDB's empty in-memory catalog (see the + // passthrough branch below). if !passthroughUser { initCtx, initCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout) - if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, executor, database); err != nil { + if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, executor, effectiveCatalog); err != nil { initCancel() slog.Error("Failed to initialize session database metadata.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod) _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to initialize session database metadata") _ = writer.Flush() return } - duckLakeAttached, err = sessionmeta.HasAttachedCatalog(initCtx, executor, "ducklake") initCancel() - if err != nil { - slog.Error("Failed to detect ducklake catalog attachment.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod) - _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to detect ducklake catalog attachment") - _ = writer.Flush() - return - } // Apply the effective connect-time session default AFTER metadata init. - // It must run here, not on the worker at session create: (1) - // InitSessionDatabaseMetadata's defer resets search_path to the ducklake - // default, so an earlier value is clobbered; and (2) running metadata init - // while the session default points at the iceberg REST catalog fails. - // Client-supplied search_path keeps the previous best-effort behavior; - // configured per-user catalog defaults fail closed because silently - // falling back would route the user to the wrong catalog. - if cmd, source := effectiveSessionDefaultCommand(clientSearchPath, defaultCatalog); cmd != "" { + // It must run here, not on the worker at session create: + // InitSessionDatabaseMetadata's defer resets the catalog/search_path, so an + // earlier value would be clobbered. A client-supplied search_path is + // best-effort; the configured catalog (Iceberg) fails closed because + // silently falling back would route the user to the wrong catalog. + if cmd, source := effectiveSessionDefaultCommand(clientSearchPath, effectiveCatalog); cmd != "" { spCtx, spCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout) _, err := executor.ExecContext(spCtx, cmd) spCancel() if err != nil { if source == sessionDefaultSourceConfiguredCatalog { - slog.Error("Failed to apply configured default catalog.", "user", username, "org", orgID, "catalog", defaultCatalog, "error", err) - _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to apply configured default catalog") + slog.Error("Failed to apply session default catalog.", "user", username, "org", orgID, "catalog", effectiveCatalog, "error", err) + _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to apply default catalog") _ = writer.Flush() return } @@ -1174,37 +1196,24 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { } } } else { - // Passthrough: no pg_catalog views and no logical-catalog rewriting, but - // the session must still land in the tenant's catalog instead of the - // empty in-memory one. Standalone passthrough does this via - // server.setDuckLakeDefault/setIcebergDefault; the remote-worker path - // has to issue the equivalent explicitly here. - initCtx, initCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout) - duckLakeAttached, err = sessionmeta.HasAttachedCatalog(initCtx, executor, physicalDuckLakeCatalog) - if err != nil { - initCancel() - slog.Error("Failed to detect ducklake catalog attachment.", "user", username, "org", orgID, "database", database, "remote_addr", remoteAddr, "error", err, "worker", workerID, "worker_pod", workerPod) - _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to detect ducklake catalog attachment") - _ = writer.Flush() - return - } - // Passthrough doesn't apply a client-supplied search_path (the - // non-passthrough best-effort path does). Surface it rather than - // dropping it silently — passthrough clients can issue SET search_path - // themselves once connected. + // Passthrough: no pg_catalog views and no rewriting, but the session must + // still land in its selected catalog instead of the empty in-memory one. + // Standalone passthrough does this via server.setDuckLakeDefault/ + // setIcebergDefault; the remote-worker path issues the equivalent here. if clientSearchPath != "" { slog.Warn("Ignoring client connect-time search_path for passthrough session.", "user", username, "org", orgID, "search_path", clientSearchPath, "remote_addr", remoteAddr) } - if cmd := passthroughSessionDefaultCatalogCommand(defaultCatalog, duckLakeAttached); cmd != "" { - if _, err := executor.ExecContext(initCtx, cmd); err != nil { - initCancel() + if cmd := passthroughSessionDefaultCatalogCommand(effectiveCatalog); cmd != "" { + initCtx, initCancel := context.WithTimeout(context.Background(), cp.cfg.SessionInitTimeout) + _, err := executor.ExecContext(initCtx, cmd) + initCancel() + if err != nil { slog.Error("Failed to apply passthrough session default catalog.", "user", username, "org", orgID, "command", cmd, "error", err, "worker", workerID, "worker_pod", workerPod) _ = server.WriteErrorResponse(writer, "FATAL", "XX000", "failed to apply default catalog") _ = writer.Flush() return } } - initCancel() } // Register the TCP connection so OnWorkerCrash can close it to unblock @@ -1218,11 +1227,11 @@ func (cp *ControlPlane) handleConnection(conn net.Conn) { server.SetConnectionIcebergConfig(cc, icebergCfg) } } - // Logical-catalog mapping (current_database masking + USE/qualified-name - // rewriting) is a non-passthrough feature; passthrough sessions talk raw - // DuckDB against the physical catalog name, so keep it disabled for them - // even though duckLakeAttached is now populated on the passthrough path. - server.SetLogicalCatalogMapping(cc, duckLakeAttached && !passthroughUser) + // Catalog USE rewriting (expanding bare `USE ducklake`/`USE iceberg` to the + // reliable two-part target) is a non-passthrough feature; passthrough sessions + // talk raw DuckDB, so keep it disabled for them. Enabled whenever either + // catalog is attached. + server.SetCatalogUseRewrite(cc, (duckLakeAttached || icebergAttached) && !passthroughUser) server.SetPassthrough(cc, passthroughUser) if orgID != "" { observeOrgPgSessionAccepted(orgID, passthroughUser) @@ -1754,10 +1763,10 @@ func (cp *ControlPlane) drainAfterUpgrade() { // startFlightIngress creates and starts the Flight SQL ingress listener. // cpFlightCredentialValidator authenticates Flight SQL clients in -// multi-tenant mode. It implements both flightsqlingress.CredentialValidator -// (legacy: scan all orgs to find the user) and SNIAwareCredentialValidator -// (preferred: derive org from SNI, scope to that org's users only). -// Behavior is gated on cp.cfg.SNIRoutingMode just like the Postgres path. +// multi-tenant mode. Identity is derived solely from the managed hostname +// (SNI): the org is resolved from the SNI prefix and the user is authenticated +// within that org. Flight has no `database` param, so there is no catalog +// selection here — the per-user default catalog applies. type cpFlightCredentialValidator struct { cp *ControlPlane orgProvider *orgRoutedSessionProvider @@ -1770,53 +1779,28 @@ func (v *cpFlightCredentialValidator) ValidateCredentials(username, password str func (v *cpFlightCredentialValidator) ValidateCredentialsForSNI(sni, username, password string) bool { cp := v.cp sniPrefix, isManaged := cp.extractOrgFromSNI(sni) - - switch cp.cfg.SNIRoutingMode { - case SNIRoutingEnforce: - if !isManaged { - slog.Warn("Flight auth rejected: SNI does not match a managed hostname.", - "sni", sni, "expected", cp.managedHostnameHint(), "user", username) - return false - } - return v.authForSNIPrefix(sni, sniPrefix, username, password) - case SNIRoutingPassthrough: - if isManaged { - return v.authForSNIPrefix(sni, sniPrefix, username, password) - } - if sni == "" { - slog.Warn("Flight client connected without SNI; please migrate to a managed hostname.", - "expected", cp.managedHostnameHint(), "user", username) - } else { - slog.Warn("Flight client using legacy hostname; please migrate to a managed hostname.", - "sni", sni, "expected", cp.managedHostnameHint(), "user", username) - } - return v.authByScan(username, password) - default: // SNIRoutingOff or unset - return v.authByScan(username, password) + if !isManaged { + // A username alone can collide across orgs, so identity now requires a + // managed hostname — there is no username-scan fallback. + slog.Warn("Flight auth rejected: SNI does not match a managed hostname.", + "sni", sni, "expected", cp.managedHostnameHint(), "user", username) + return false } + return v.authForSNIPrefix(sni, sniPrefix, username, password) } -// authForSNIPrefix validates (username, password) against a single org -// resolved from the SNI-derived hostname prefix. Used by enforce / -// matched-passthrough. Translates the prefix through the hostname_alias map -// so callers reach the right org regardless of which form (alias vs. dbname) -// the client used. -// -// Alias precedence: if a prefix matches both an org's hostname_alias AND -// another org's database_name, the alias wins (DatabaseNameForSNIPrefix -// checks the alias map first). Operators must avoid that collision — the -// admin API enforces unique aliases and unique dbnames separately, but does -// not cross-validate that an alias isn't another org's dbname. +// authForSNIPrefix validates (username, password) against the single org the +// SNI-derived hostname prefix resolves to (via hostname_alias, database_name, +// or DNS-safe org name — see ConfigStore.ResolveSNIPrefix). func (v *cpFlightCredentialValidator) authForSNIPrefix(sni, sniPrefix, username, password string) bool { cp := v.cp - dbname := cp.configStore.DatabaseNameForSNIPrefix(sniPrefix) - observeSNIRoutingResolution("flight", dbname != sniPrefix) - orgID := cp.configStore.ResolveDatabase(dbname) + orgID, dbname := cp.configStore.ResolveSNIPrefix(sniPrefix) if orgID == "" { slog.Warn("Flight client SNI references unknown org.", - "sni", sni, "sni_prefix", sniPrefix, "sni_database", dbname, "user", username) + "sni", sni, "sni_prefix", sniPrefix, "user", username) return false } + observeSNIRoutingResolution("flight", dbname != sniPrefix) if !cp.configStore.ValidateOrgUser(orgID, username, password) { return false } @@ -1826,20 +1810,6 @@ func (v *cpFlightCredentialValidator) authForSNIPrefix(sni, sniPrefix, username, return true } -// authByScan is the legacy Flight auth path: scan all orgs to find a user -// matching (username, password). First match wins. -func (v *cpFlightCredentialValidator) authByScan(username, password string) bool { - cp := v.cp - orgID, ok := cp.configStore.FindAndValidateUser(username, password) - if !ok { - return false - } - v.orgProvider.mu.Lock() - v.orgProvider.userOrg[username] = orgID - v.orgProvider.mu.Unlock() - return true -} - func (cp *ControlPlane) startFlightIngress() { if cp.cfg.FlightPort <= 0 { return diff --git a/controlplane/session_search_path.go b/controlplane/session_search_path.go index 29241265..e0aa2c39 100644 --- a/controlplane/session_search_path.go +++ b/controlplane/session_search_path.go @@ -20,11 +20,22 @@ const ( // HasAttachedCatalog probe in control.go in sync. const physicalDuckLakeCatalog = "ducklake" -func effectiveSessionDefaultCommand(clientSearchPath, defaultCatalog string) (string, sessionSearchPathSource) { +// physicalIcebergCatalog is the name the per-tenant Iceberg catalog is attached +// as on the worker (the `ATTACH ... AS iceberg` during activation). +const physicalIcebergCatalog = iceberg.CatalogName + +// effectiveSessionDefaultCommand returns the connect-time command for a +// non-passthrough session, given the resolved real catalog the session defaults +// to (effectiveCatalog, one of "ducklake"/"iceberg"). A client-supplied +// search_path always wins. For DuckLake the catalog switch is owned by +// InitSessionDatabaseMetadata's defer (which also restores memory.main on the +// search_path so the pg_catalog compat macros stay resolvable), so this returns +// "" — re-issuing `USE ducklake` here would clobber that search_path. +func effectiveSessionDefaultCommand(clientSearchPath, effectiveCatalog string) (string, sessionSearchPathSource) { switch { case clientSearchPath != "": return fmt.Sprintf("SET search_path = '%s'", ensureMemoryMainInSearchPath(clientSearchPath)), sessionSearchPathSourceClient - case defaultCatalog == iceberg.CatalogName: + case effectiveCatalog == iceberg.CatalogName: return fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema), sessionDefaultSourceConfiguredCatalog default: return "", "" @@ -32,23 +43,67 @@ func effectiveSessionDefaultCommand(clientSearchPath, defaultCatalog string) (st } // passthroughSessionDefaultCatalogCommand returns the connect-time command that -// points a passthrough session at the tenant's catalog. Passthrough users skip -// InitSessionDatabaseMetadata (whose defer issues `USE ducklake` for the -// standard path), so without this the session stays in DuckDB's empty in-memory -// catalog — current_database() reports "memory" and unqualified DDL/DML never -// reaches the warehouse. Mirrors server.setIcebergDefault / setDuckLakeDefault -// used by the standalone passthrough path. -func passthroughSessionDefaultCatalogCommand(defaultCatalog string, duckLakeAttached bool) string { - switch { - case defaultCatalog == iceberg.CatalogName: +// points a passthrough session at the catalog it selected (effectiveCatalog). +// Passthrough users skip InitSessionDatabaseMetadata (whose defer issues the +// catalog `USE` for the standard path), so without this the session stays in +// DuckDB's empty in-memory catalog — current_database() reports "memory" and +// unqualified DDL/DML never reaches the warehouse. Mirrors +// server.setIcebergDefault / setDuckLakeDefault used by the standalone +// passthrough path. +func passthroughSessionDefaultCatalogCommand(effectiveCatalog string) string { + switch effectiveCatalog { + case iceberg.CatalogName: return fmt.Sprintf("USE %s.%s", iceberg.CatalogName, iceberg.DefaultSchema) - case duckLakeAttached: + case physicalDuckLakeCatalog: return "USE " + physicalDuckLakeCatalog default: return "" } } +// resolveEffectiveCatalog picks the real catalog a session should default to. +// requested is the validated startup selection ("" → use the per-user/attached +// default, "ducklake", or "iceberg"). defaultCatalog is the per-user configured +// default ("" or "iceberg"). duckLakeAttached/icebergAttached reflect what the +// worker actually attached for this session. The bool is false when the +// requested catalog isn't attached (caller should fail the connection 3D000) or +// nothing is attached at all. +func resolveEffectiveCatalog(requested, defaultCatalog string, duckLakeAttached, icebergAttached bool) (string, bool) { + switch requested { + case physicalDuckLakeCatalog: + if duckLakeAttached { + return physicalDuckLakeCatalog, true + } + return "", false + case iceberg.CatalogName: + if icebergAttached { + return iceberg.CatalogName, true + } + return "", false + } + // requested == "": fall back to the per-user configured default. If the user + // explicitly configured a default catalog, honor it strictly — fail closed if + // it isn't attached rather than silently routing to a different catalog (the + // connect path turns the false into a 3D000). This preserves the pre-rework + // fail-closed contract for configured catalogs. + if defaultCatalog == iceberg.CatalogName { + if icebergAttached { + return iceberg.CatalogName, true + } + return "", false + } + // No configured default: use whatever is attached (DuckLake preferred, then + // Iceberg for iceberg-only orgs). + switch { + case duckLakeAttached: + return physicalDuckLakeCatalog, true + case icebergAttached: + return iceberg.CatalogName, true + default: + return "", false + } +} + func ensureMemoryMainInSearchPath(searchPath string) string { if strings.Contains(strings.ToLower(searchPath), "memory.main") { return searchPath diff --git a/controlplane/session_search_path_test.go b/controlplane/session_search_path_test.go index 768ad7f7..26066bfe 100644 --- a/controlplane/session_search_path_test.go +++ b/controlplane/session_search_path_test.go @@ -2,7 +2,7 @@ package controlplane import "testing" -func TestEffectiveSessionDefaultCommandUsesClientSearchPathBeforeConfiguredCatalog(t *testing.T) { +func TestEffectiveSessionDefaultCommandUsesClientSearchPathBeforeCatalog(t *testing.T) { got, source := effectiveSessionDefaultCommand("ducklake.main", "iceberg") if got != "SET search_path = 'ducklake.main,memory.main'" { t.Fatalf("command = %q, want SET search_path = 'ducklake.main,memory.main'", got) @@ -12,7 +12,7 @@ func TestEffectiveSessionDefaultCommandUsesClientSearchPathBeforeConfiguredCatal } } -func TestEffectiveSessionDefaultCommandUsesConfiguredIcebergCatalogWhenClientOmitted(t *testing.T) { +func TestEffectiveSessionDefaultCommandUsesIcebergCatalogWhenClientOmitted(t *testing.T) { got, source := effectiveSessionDefaultCommand("", "iceberg") if got != "USE iceberg.public" { t.Fatalf("command = %q, want USE iceberg.public", got) @@ -22,8 +22,10 @@ func TestEffectiveSessionDefaultCommandUsesConfiguredIcebergCatalogWhenClientOmi } } -func TestEffectiveSessionDefaultCommandReturnsEmptyWhenUnset(t *testing.T) { - got, source := effectiveSessionDefaultCommand("", "") +func TestEffectiveSessionDefaultCommandEmptyForDuckLake(t *testing.T) { + // DuckLake's catalog switch is owned by InitSessionDatabaseMetadata's defer, + // so the connect-time command for a ducklake session is empty. + got, source := effectiveSessionDefaultCommand("", "ducklake") if got != "" { t.Fatalf("command = %q, want empty", got) } @@ -34,21 +36,49 @@ func TestEffectiveSessionDefaultCommandReturnsEmptyWhenUnset(t *testing.T) { func TestPassthroughSessionDefaultCatalogCommand(t *testing.T) { tests := []struct { - name string - defaultCatalog string - duckLakeAttached bool - want string + name string + effectiveCatalog string + want string }{ - {name: "ducklake attached defaults to ducklake", defaultCatalog: "", duckLakeAttached: true, want: "USE ducklake"}, - {name: "iceberg-default user prefers iceberg over ducklake", defaultCatalog: "iceberg", duckLakeAttached: true, want: "USE iceberg.public"}, - {name: "iceberg-default user with no ducklake", defaultCatalog: "iceberg", duckLakeAttached: false, want: "USE iceberg.public"}, - {name: "no ducklake and no configured catalog leaves session as-is", defaultCatalog: "", duckLakeAttached: false, want: ""}, + {name: "ducklake selected", effectiveCatalog: "ducklake", want: "USE ducklake"}, + {name: "iceberg selected", effectiveCatalog: "iceberg", want: "USE iceberg.public"}, + {name: "nothing resolved leaves session as-is", effectiveCatalog: "", want: ""}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := passthroughSessionDefaultCatalogCommand(tt.defaultCatalog, tt.duckLakeAttached); got != tt.want { + if got := passthroughSessionDefaultCatalogCommand(tt.effectiveCatalog); got != tt.want { t.Fatalf("command = %q, want %q", got, tt.want) } }) } } + +func TestResolveEffectiveCatalog(t *testing.T) { + tests := []struct { + name string + requested string + defaultCatalog string + duckLake bool + iceberg bool + want string + wantOK bool + }{ + {name: "explicit ducklake attached", requested: "ducklake", duckLake: true, iceberg: true, want: "ducklake", wantOK: true}, + {name: "explicit iceberg attached", requested: "iceberg", duckLake: true, iceberg: true, want: "iceberg", wantOK: true}, + {name: "explicit ducklake not attached", requested: "ducklake", duckLake: false, iceberg: true, want: "", wantOK: false}, + {name: "explicit iceberg not attached", requested: "iceberg", duckLake: true, iceberg: false, want: "", wantOK: false}, + {name: "default prefers ducklake", requested: "", duckLake: true, iceberg: true, want: "ducklake", wantOK: true}, + {name: "default honors per-user iceberg", requested: "", defaultCatalog: "iceberg", duckLake: true, iceberg: true, want: "iceberg", wantOK: true}, + {name: "configured iceberg default not attached fails closed", requested: "", defaultCatalog: "iceberg", duckLake: true, iceberg: false, want: "", wantOK: false}, + {name: "default falls back to iceberg-only", requested: "", duckLake: false, iceberg: true, want: "iceberg", wantOK: true}, + {name: "nothing attached fails", requested: "", duckLake: false, iceberg: false, want: "", wantOK: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, ok := resolveEffectiveCatalog(tt.requested, tt.defaultCatalog, tt.duckLake, tt.iceberg) + if got != tt.want || ok != tt.wantOK { + t.Fatalf("resolveEffectiveCatalog = (%q, %v), want (%q, %v)", got, ok, tt.want, tt.wantOK) + } + }) + } +} diff --git a/controlplane/sni_kubernetes_test.go b/controlplane/sni_kubernetes_test.go index 283b6d79..edf2bf3c 100644 --- a/controlplane/sni_kubernetes_test.go +++ b/controlplane/sni_kubernetes_test.go @@ -102,14 +102,12 @@ type fakeConfigStore struct { resolveSNIPrefix func(string) (string, string) resolvePostgresConnection func(startupDatabase, sniPrefix string, useManagedSNI bool, username, password string) configstore.PostgresConnectionResolution validateOrgUser func(orgID, user, pass string) bool - findAndValidateUser func(user, pass string) (string, bool) resolveDatabaseCalls int databaseNameForSNIPrefixCalls int resolveSNIPrefixCalls int resolvePostgresConnectionCalls int validateOrgUserCalls int - findAndValidateUserCalls int } func (f *fakeConfigStore) ResolveDatabase(database string) string { @@ -147,13 +145,6 @@ func (f *fakeConfigStore) ValidateOrgUser(orgID, user, pass string) bool { } return f.validateOrgUser(orgID, user, pass) } -func (f *fakeConfigStore) FindAndValidateUser(user, pass string) (string, bool) { - f.findAndValidateUserCalls++ - if f.findAndValidateUser == nil { - return "", false - } - return f.findAndValidateUser(user, pass) -} func (f *fakeConfigStore) IsOrgUserPassthrough(string, string) bool { // SNI tests don't exercise passthrough; the real flag lookup is covered // elsewhere. Returning false keeps the existing assertions intact. @@ -270,7 +261,10 @@ func TestPostgresSNIUnknownModeIgnoresSNI(t *testing.T) { } } -func TestPostgresManagedHostnameMismatchSQLSTATE(t *testing.T) { +// A non-selectable database name (anything other than ducklake/iceberg/empty) +// is rejected with 3D000 — the database param is now catalog selection, not an +// org/identity routing key. +func TestPostgresInvalidCatalogSQLSTATE(t *testing.T) { store := &fakeConfigStore{ resolvePostgresConnection: func(startupDatabase, sniPrefix string, useManagedSNI bool, username, password string) configstore.PostgresConnectionResolution { if startupDatabase != "requested_db" || sniPrefix != "other-org" || !useManagedSNI || username != "root" || password != "secret" { @@ -278,16 +272,16 @@ func TestPostgresManagedHostnameMismatchSQLSTATE(t *testing.T) { startupDatabase, sniPrefix, useManagedSNI, username, password) } return configstore.PostgresConnectionResolution{ - EffectiveDatabase: "requested_db", - OrgID: "requested-org", - SNIOrgID: "other-org", - DatabaseExists: true, - HostnameMatches: false, + OrgID: "other-org", + SNIOrgID: "other-org", + SNIResolved: true, + CatalogValid: false, // "requested_db" is not ducklake/iceberg + Valid: true, } }, } cp := newSNIControlPlane(store) - cp.cfg.SNIRoutingMode = SNIRoutingPassthrough + cp.cfg.SNIRoutingMode = SNIRoutingEnforce cp.tlsConfig = testControlPlaneTLSConfig(t) cfg, err := pgconn.ParseConfig("postgres://root:secret@127.0.0.1/requested_db?sslmode=require") @@ -307,17 +301,14 @@ func TestPostgresManagedHostnameMismatchSQLSTATE(t *testing.T) { conn, err := pgconn.ConnectConfig(context.Background(), cfg) if err == nil { _ = conn.Close(context.Background()) - t.Fatal("expected managed hostname mismatch to reject connection") + t.Fatal("expected invalid catalog to reject connection") } var pgErr *pgconn.PgError if !errors.As(err, &pgErr) { t.Fatalf("expected pg error; got: %T %v", err, err) } - if pgErr.Code != "28000" { - t.Fatalf("SQLSTATE = %q, want 28000", pgErr.Code) - } - if pgErr.Message != "requested database does not match managed hostname" { - t.Fatalf("message = %q", pgErr.Message) + if pgErr.Code != "3D000" { + t.Fatalf("SQLSTATE = %q, want 3D000", pgErr.Code) } } @@ -336,191 +327,72 @@ func testControlPlaneTLSConfig(t *testing.T) *tls.Config { return &tls.Config{Certificates: []tls.Certificate{cert}} } -// TestFlightValidatorOff: SNI ignored entirely. Both legacy and new -// hostnames go through FindAndValidateUser; ResolveDatabase / ValidateOrgUser -// are never called regardless of SNI. -func TestFlightValidatorOff(t *testing.T) { - store := &fakeConfigStore{ - findAndValidateUser: func(user, pass string) (string, bool) { - return "org-by-scan", user == "alice" && pass == "secret" - }, - } - v := newFlightValidator(t, SNIRoutingOff, store) - - cases := []struct { - name string - sni string - }{ - {"matching SNI", "acme.dw.us.postwh.com"}, - {"empty SNI", ""}, - {"unmanaged SNI", "duckgres-db.internal.ec2.us-east-1.dev.posthog.dev"}, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - if !v.ValidateCredentialsForSNI(tc.sni, "alice", "secret") { - t.Fatalf("expected valid credentials to pass in off mode") - } - }) - } - if store.resolveDatabaseCalls != 0 || store.validateOrgUserCalls != 0 { - t.Fatalf("off mode must not consult ResolveDatabase / ValidateOrgUser; got %d / %d", - store.resolveDatabaseCalls, store.validateOrgUserCalls) - } - if store.findAndValidateUserCalls != len(cases) { - t.Fatalf("expected %d FindAndValidateUser calls, got %d", - len(cases), store.findAndValidateUserCalls) - } -} +// Flight identity is now SNI-only in every mode: the org is resolved from the +// managed hostname via ResolveSNIPrefix and the user is authenticated within +// that org. There is no username-scan fallback (a username can collide across +// orgs), so a non-managed hostname always fails. -// TestFlightValidatorPassthroughMatchedSNI: SNI matches, so we resolve and -// validate against a single org, never falling through to the scan. -func TestFlightValidatorPassthroughMatchedSNI(t *testing.T) { +// TestFlightValidatorMatchedSNI: SNI matches, so we resolve via ResolveSNIPrefix +// and validate against that single org. +func TestFlightValidatorMatchedSNI(t *testing.T) { store := &fakeConfigStore{ - resolveDatabase: func(name string) string { - if name == "acme" { - return "org-acme" + resolveSNIPrefix: func(prefix string) (string, string) { + if prefix == "acme" { + return "org-acme", "acme_db" } - return "" + return "", "" }, validateOrgUser: func(orgID, user, pass string) bool { return orgID == "org-acme" && user == "alice" && pass == "secret" }, - findAndValidateUser: func(string, string) (string, bool) { - t.Fatalf("FindAndValidateUser must not be called when SNI resolves an org") - return "", false - }, } - v := newFlightValidator(t, SNIRoutingPassthrough, store) + v := newFlightValidator(t, SNIRoutingEnforce, store) if !v.ValidateCredentialsForSNI("acme.dw.us.postwh.com", "alice", "secret") { t.Fatalf("expected SNI-resolved org with valid creds to pass") } - if store.resolveDatabaseCalls != 1 || store.validateOrgUserCalls != 1 { - t.Fatalf("expected one ResolveDatabase + one ValidateOrgUser; got %d / %d", - store.resolveDatabaseCalls, store.validateOrgUserCalls) + if store.resolveSNIPrefixCalls != 1 || store.validateOrgUserCalls != 1 { + t.Fatalf("expected one ResolveSNIPrefix + one ValidateOrgUser; got %d / %d", + store.resolveSNIPrefixCalls, store.validateOrgUserCalls) } if got := v.orgProvider.userOrg["alice"]; got != "org-acme" { t.Fatalf("expected userOrg['alice'] = org-acme; got %q", got) } } -// TestFlightValidatorPassthroughHostnameAliasResolves: SNI prefix is the -// hostname alias for an org whose dbname is something different. The -// validator must consult DatabaseNameForSNIPrefix to translate prefix → -// dbname before looking up the orgID. -func TestFlightValidatorPassthroughHostnameAliasResolves(t *testing.T) { - store := &fakeConfigStore{ - databaseNameForSNIPrefix: func(prefix string) string { - if prefix == "entirely-chief-wildcat" { - return "portola" // alias-translated dbname - } - return prefix - }, - resolveDatabase: func(name string) string { - if name == "portola" { - return "org-portola" - } - return "" - }, - validateOrgUser: func(orgID, user, pass string) bool { - return orgID == "org-portola" && user == "alice" && pass == "secret" - }, - findAndValidateUser: func(string, string) (string, bool) { - t.Fatalf("FindAndValidateUser must not be called when SNI alias resolves an org") - return "", false - }, - } - v := newFlightValidator(t, SNIRoutingPassthrough, store) - - if !v.ValidateCredentialsForSNI("entirely-chief-wildcat.dw.us.postwh.com", "alice", "secret") { - t.Fatalf("expected alias-resolved org with valid creds to pass") - } - if store.databaseNameForSNIPrefixCalls != 1 { - t.Fatalf("expected DatabaseNameForSNIPrefix to be consulted exactly once; got %d", store.databaseNameForSNIPrefixCalls) - } - if store.resolveDatabaseCalls != 1 { - t.Fatalf("expected one ResolveDatabase call (against translated dbname); got %d", store.resolveDatabaseCalls) - } - if got := v.orgProvider.userOrg["alice"]; got != "org-portola" { - t.Fatalf("expected userOrg['alice'] = org-portola; got %q", got) - } -} - -// TestFlightValidatorPassthroughUnknownOrg: SNI matches the suffix, but the -// resolved org name doesn't exist in the config store. Must return false -// WITHOUT falling through to the scan (a managed hostname is authoritative — -// silently routing to a different org would defeat the boundary). -func TestFlightValidatorPassthroughUnknownOrg(t *testing.T) { +// TestFlightValidatorUnknownOrg: SNI matches the suffix, but the prefix +// resolves to no org. Must return false. +func TestFlightValidatorUnknownOrg(t *testing.T) { store := &fakeConfigStore{ - resolveDatabase: func(string) string { return "" }, // unknown - findAndValidateUser: func(string, string) (string, bool) { - t.Fatalf("FindAndValidateUser must not be called for unknown SNI org") - return "", false + resolveSNIPrefix: func(string) (string, string) { return "", "" }, // unknown + validateOrgUser: func(string, string, string) bool { + t.Fatalf("ValidateOrgUser must not be called for unknown SNI org") + return false }, } - v := newFlightValidator(t, SNIRoutingPassthrough, store) + v := newFlightValidator(t, SNIRoutingEnforce, store) if v.ValidateCredentialsForSNI("ghostorg.dw.us.postwh.com", "alice", "secret") { t.Fatalf("unknown SNI org must not authenticate") } } -// TestFlightValidatorPassthroughLegacyHostname: SNI doesn't match a managed -// suffix → fall back to the scan path (with a warn log we don't assert here). -func TestFlightValidatorPassthroughLegacyHostname(t *testing.T) { - store := &fakeConfigStore{ - findAndValidateUser: func(user, pass string) (string, bool) { - return "org-from-scan", user == "alice" && pass == "secret" - }, - } - v := newFlightValidator(t, SNIRoutingPassthrough, store) - - if !v.ValidateCredentialsForSNI("duckgres-db.internal.ec2.us-east-1.dev.posthog.dev", "alice", "secret") { - t.Fatalf("legacy hostname should pass via scan in passthrough mode") - } - if store.findAndValidateUserCalls != 1 { - t.Fatalf("expected scan fallback; got %d FindAndValidateUser calls", store.findAndValidateUserCalls) - } - if got := v.orgProvider.userOrg["alice"]; got != "org-from-scan" { - t.Fatalf("expected userOrg['alice'] = org-from-scan; got %q", got) - } -} - -// TestFlightValidatorEnforceMatchedSNI: same as passthrough+matched. -func TestFlightValidatorEnforceMatchedSNI(t *testing.T) { +// TestFlightValidatorRejectsUnmanagedHostname: a non-managed hostname (or empty +// SNI) has no org and must fail — there is no username-scan fallback. +func TestFlightValidatorRejectsUnmanagedHostname(t *testing.T) { store := &fakeConfigStore{ - resolveDatabase: func(name string) string { - if name == "acme" { - return "org-acme" - } - return "" - }, - validateOrgUser: func(orgID, user, pass string) bool { - return orgID == "org-acme" && user == "alice" && pass == "secret" - }, - } - v := newFlightValidator(t, SNIRoutingEnforce, store) - if !v.ValidateCredentialsForSNI("acme.dw.us.postwh.com", "alice", "secret") { - t.Fatalf("expected enforce+matched to pass") - } -} - -// TestFlightValidatorEnforceLegacyHostnameRejected: the contract of enforce. -// Even with otherwise-valid credentials, a non-managed hostname must fail -// without hitting the scan. -func TestFlightValidatorEnforceLegacyHostnameRejected(t *testing.T) { - store := &fakeConfigStore{ - findAndValidateUser: func(string, string) (string, bool) { - t.Fatalf("FindAndValidateUser must not be called in enforce mode") - return "", false + resolveSNIPrefix: func(string) (string, string) { + t.Fatalf("ResolveSNIPrefix must not be called for unmanaged hostnames") + return "", "" }, } - v := newFlightValidator(t, SNIRoutingEnforce, store) - - if v.ValidateCredentialsForSNI("", "alice", "secret") { - t.Fatalf("enforce must reject empty SNI") - } - if v.ValidateCredentialsForSNI("duckgres-db.internal.ec2.us-east-1.dev.posthog.dev", "alice", "secret") { - t.Fatalf("enforce must reject legacy hostname") + for _, mode := range []string{SNIRoutingEnforce, SNIRoutingPassthrough, SNIRoutingOff} { + v := newFlightValidator(t, mode, store) + if v.ValidateCredentialsForSNI("", "alice", "secret") { + t.Fatalf("mode %q must reject empty SNI", mode) + } + if v.ValidateCredentialsForSNI("duckgres-db.internal.ec2.us-east-1.dev.posthog.dev", "alice", "secret") { + t.Fatalf("mode %q must reject legacy hostname", mode) + } } } diff --git a/k8s/kind/control-plane.yaml b/k8s/kind/control-plane.yaml index cab3fcd5..a1c3f1e7 100644 --- a/k8s/kind/control-plane.yaml +++ b/k8s/kind/control-plane.yaml @@ -68,7 +68,7 @@ spec: - "--flight-port" - "8815" - "--sni-routing-mode" - - "passthrough" + - "enforce" - "--managed-hostname-suffixes" - ".dw.test.local" - "--config" diff --git a/server/conn.go b/server/conn.go index d3151379..c73935d0 100644 --- a/server/conn.go +++ b/server/conn.go @@ -171,7 +171,7 @@ type clientConn struct { txStatus byte // current transaction status ('I', 'T', or 'E') passthrough bool // true for passthrough users (skip transpiler + pg_catalog) cursors map[string]*cursorState // server-side cursor emulation - logicalCatalogMapping bool // true when the session has an attached ducklake catalog and logical catalog masking is active + catalogUseRewrite bool // true when bare `USE ducklake`/`USE iceberg` should expand to the reliable two-part target tenantIcebergConfig IcebergConfig hasTenantIcebergConfig bool ctx context.Context // connection context, cancelled when connection is closed @@ -197,11 +197,23 @@ type clientConn struct { } // newTranspiler creates a transpiler configured for this connection. +// +// LogicalDatabaseName is pinned to the physical DuckLake catalog ("ducklake") +// rather than the client's connection database. duckgres no longer masks a +// logical database name onto the physical catalog; the only remaining job of +// the logical-catalog transform is to map `ducklake.public.*` → `ducklake.main.*` +// (DuckLake's real schema is `main`). Iceberg three-part refs and arbitrary +// names are left untouched. func (c *clientConn) newTranspiler(convertPlaceholders bool) *transpiler.Transpiler { + duckLakeMode := c.server.cfg.DuckLake.MetadataStore != "" || c.server.cfg.AlwaysDuckLake + logicalDatabaseName := "" + if duckLakeMode { + logicalDatabaseName = physicalDuckLakeCatalog + } return transpiler.New(transpiler.Config{ - DuckLakeMode: c.server.cfg.DuckLake.MetadataStore != "" || c.server.cfg.AlwaysDuckLake, - LogicalDatabaseName: c.database, - PhysicalCatalogName: "ducklake", + DuckLakeMode: duckLakeMode, + LogicalDatabaseName: logicalDatabaseName, + PhysicalCatalogName: physicalDuckLakeCatalog, ConvertPlaceholders: convertPlaceholders, }) } @@ -994,18 +1006,39 @@ func (c *clientConn) serve() error { initTimeout = DefaultSessionInitTimeout } initCtx, initCancel := context.WithTimeout(context.Background(), initTimeout) - if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, c.executor, c.database); err != nil { + duckLakeAttached, err := sessionmeta.HasAttachedCatalog(initCtx, c.executor, physicalDuckLakeCatalog) + if err != nil { initCancel() - c.sendError("FATAL", "XX000", fmt.Sprintf("failed to initialize session database metadata: %v", err)) + c.sendError("FATAL", "XX000", fmt.Sprintf("failed to detect ducklake catalog attachment: %v", err)) return err } - duckLakeAttached, err := sessionmeta.HasAttachedCatalog(initCtx, c.executor, "ducklake") - initCancel() + icebergAttached, err := sessionmeta.HasAttachedCatalog(initCtx, c.executor, iceberg.CatalogName) if err != nil { - c.sendError("FATAL", "XX000", fmt.Sprintf("failed to detect ducklake catalog attachment: %v", err)) + initCancel() + c.sendError("FATAL", "XX000", fmt.Sprintf("failed to detect iceberg catalog attachment: %v", err)) return err } - c.logicalCatalogMapping = duckLakeAttached + // De-mask: current_database() and the pg_catalog surfaces should reflect + // the real attached catalog, not the client's connection database name. + // Standalone has a single backing catalog, so honor whatever is attached. + catalog := c.database + switch { + case duckLakeAttached: + catalog = physicalDuckLakeCatalog + case icebergAttached: + catalog = iceberg.CatalogName + } + if err := sessionmeta.InitSessionDatabaseMetadata(initCtx, c.executor, catalog); err != nil { + initCancel() + c.sendError("FATAL", "XX000", fmt.Sprintf("failed to initialize session database metadata: %v", err)) + return err + } + initCancel() + // Keep c.database aligned with the real catalog so observability surfaces + // (pg_stat_activity.datname, logs) agree with current_database(). The + // control-plane path does the equivalent via NewClientConn(database=…). + c.database = catalog + c.catalogUseRewrite = duckLakeAttached || icebergAttached } // Send initial parameters @@ -1593,8 +1626,15 @@ func (c *clientConn) executeQueryDirect(query, cmdType string) error { return err } +// rewriteDirectQuery expands a bare `USE ducklake`/`USE iceberg` to its reliable +// two-part `catalog.schema` target. This is NOT logical-name masking — the +// catalog names are real; the rewrite only works around DuckDB's bare-catalog +// `USE` resolution (a bare `USE ducklake` issued while the session is in the +// iceberg catalog resolves `ducklake` as a *schema* within iceberg, landing on a +// bogus `iceberg.ducklake`). Any other `USE ` and all other statements are +// passed through unchanged. func (c *clientConn) rewriteDirectQuery(query string) string { - if c == nil || c.server == nil || c.passthrough || !c.logicalCatalogMapping || strings.TrimSpace(c.database) == "" { + if c == nil || c.server == nil || c.passthrough || !c.catalogUseRewrite { return query } @@ -1605,13 +1645,6 @@ func (c *clientConn) rewriteDirectQuery(query string) string { hasSemicolon := strings.HasSuffix(stripped, ";") trimmed := strings.TrimSpace(strings.TrimSuffix(stripped, ";")) - if strings.EqualFold(trimmed, "SHOW DATABASES") { - rewritten := "SELECT current_database() AS database_name" - if hasSemicolon { - rewritten += ";" - } - return rewritten - } if len(trimmed) < len("USE") || !strings.EqualFold(trimmed[:len("USE")], "USE") { return query @@ -1627,29 +1660,15 @@ func (c *clientConn) rewriteDirectQuery(query string) string { unquoted = strings.ReplaceAll(target[1:len(target)-1], `""`, `"`) } - // Rewrite a bare catalog `USE` to a two-part `catalog.schema` target. - // Two-part is required for reliable switching: a bare `USE ducklake` - // issued while the session is in the iceberg catalog resolves `ducklake` - // as a schema *within* iceberg (DuckDB prefers schema-in-current-catalog - // for a single identifier), landing on a bogus `iceberg.ducklake`. var target2part string switch { - case strings.EqualFold(unquoted, c.database) || strings.EqualFold(unquoted, physicalDuckLakeCatalog): - // `USE ` or `USE ducklake` -> the physical ducklake.main. - // Checked before the iceberg arm so that a customer whose logical DB is - // (improbably) named "iceberg" still reaches their DuckLake warehouse. + case strings.EqualFold(unquoted, physicalDuckLakeCatalog): + // `USE ducklake` -> ducklake.main (DuckLake's real schema is `main`). target2part = physicalDuckLakeCatalog + ".main" case strings.EqualFold(unquoted, iceberg.CatalogName): - // `USE iceberg` -> the guaranteed default schema. DuckDB can't `USE` - // a bare REST catalog (it targets .main, which it shadows), - // so we land on iceberg. (ensured by attachLakekeeperCatalog). - // - // NOT gated on cfg.Iceberg.Enabled: rewriteDirectQuery runs on the - // control-plane proxy conn (server = the CP, not the per-org worker), - // where cfg.Iceberg.Enabled is always false — gating there silently - // disabled the rewrite for every tenant. If the tenant doesn't have - // iceberg attached, `USE iceberg.public` simply errors at the worker, - // same as a bare `USE iceberg` would. + // `USE iceberg` -> iceberg.. DuckDB can't `USE` a bare REST + // catalog (it targets .main, which it shadows), so land on the + // guaranteed default schema (ensured by attachLakekeeperCatalog). target2part = iceberg.CatalogName + "." + iceberg.DefaultSchema default: return query diff --git a/server/direct_query_rewrite_test.go b/server/direct_query_rewrite_test.go index 7a8ad89f..61fb25a2 100644 --- a/server/direct_query_rewrite_test.go +++ b/server/direct_query_rewrite_test.go @@ -12,8 +12,8 @@ func TestRewriteDirectQuery(t *testing.T) { Iceberg: IcebergConfig{Enabled: true}, }, }, - database: "test", - logicalCatalogMapping: true, + database: "ducklake", + catalogUseRewrite: true, } tests := []struct { @@ -22,27 +22,22 @@ func TestRewriteDirectQuery(t *testing.T) { want string }{ { - name: "rewrites logical use command to two-part ducklake.main", - query: "USE test", + // `USE ducklake` while currently in the iceberg catalog would + // otherwise resolve to a bogus iceberg.ducklake — two-part fixes it. + name: "rewrites bare ducklake to two-part ducklake.main", + query: "USE ducklake", want: "USE ducklake.main", }, { - name: "rewrites quoted logical use command to two-part ducklake.main", - query: `USE "test"`, + name: "rewrites quoted ducklake to two-part ducklake.main", + query: `USE "ducklake"`, want: "USE ducklake.main", }, { - name: "rewrites commented logical use command", - query: "/* switch */ USE test;", + name: "rewrites commented ducklake use", + query: "/* switch */ USE ducklake;", want: "USE ducklake.main;", }, - { - // `USE ducklake` while currently in the iceberg catalog would - // otherwise resolve to a bogus iceberg.ducklake — two-part fixes it. - name: "rewrites bare ducklake to two-part ducklake.main", - query: "USE ducklake", - want: "USE ducklake.main", - }, { name: "rewrites bare iceberg to its default schema", query: "USE iceberg", @@ -64,20 +59,22 @@ func TestRewriteDirectQuery(t *testing.T) { query: "USE memory", want: "USE memory", }, + { + // An arbitrary database name is no longer remapped onto a catalog. + name: "preserves use of an arbitrary name", + query: "USE analytics", + want: "USE analytics", + }, { name: "preserves non-use query", query: "SELECT current_database()", want: "SELECT current_database()", }, { - name: "rewrites show databases to logical catalog", + // SHOW DATABASES now lists the real attached catalogs — no rewrite. + name: "preserves show databases", query: "SHOW DATABASES", - want: "SELECT current_database() AS database_name", - }, - { - name: "rewrites commented show databases with semicolon", - query: "/* list */ SHOW DATABASES;", - want: "SELECT current_database() AS database_name;", + want: "SHOW DATABASES", }, } @@ -100,61 +97,33 @@ func TestRewriteDirectQueryUseIcebergRewrittenEvenWhenCfgDisabled(t *testing.T) DuckLake: DuckLakeConfig{MetadataStore: "postgres:host=127.0.0.1 dbname=ducklake"}, Iceberg: IcebergConfig{Enabled: false}, }}, - database: "test", - logicalCatalogMapping: true, + database: "iceberg", + catalogUseRewrite: true, } if got, want := c.rewriteDirectQuery("USE iceberg"), "USE iceberg.public"; got != want { t.Fatalf("rewriteDirectQuery(USE iceberg) = %q, want %q", got, want) } } -// A customer whose logical DB name is literally "iceberg" must still reach -// their DuckLake warehouse on `USE iceberg`, not the Iceberg REST catalog. -func TestRewriteDirectQueryLogicalDBNamedIcebergGoesToDuckLake(t *testing.T) { - c := &clientConn{ - server: &Server{cfg: Config{ - DuckLake: DuckLakeConfig{MetadataStore: "postgres:host=127.0.0.1 dbname=ducklake"}, - Iceberg: IcebergConfig{Enabled: true}, - }}, - database: "iceberg", - logicalCatalogMapping: true, - } - if got, want := c.rewriteDirectQuery("USE iceberg"), "USE ducklake.main"; got != want { - t.Fatalf("rewriteDirectQuery(USE iceberg) with logical db 'iceberg' = %q, want %q", got, want) - } -} - -func TestRewriteDirectQueryMultitenantShowDatabases(t *testing.T) { - c := &clientConn{ - server: &Server{}, - database: "duckgres", - logicalCatalogMapping: true, - } - - if got, want := c.rewriteDirectQuery("SHOW DATABASES"), "SELECT current_database() AS database_name"; got != want { - t.Fatalf("rewriteDirectQuery(SHOW DATABASES) = %q, want %q", got, want) - } -} - -func TestRewriteDirectQueryPassthroughPreservesShowDatabases(t *testing.T) { +func TestRewriteDirectQueryPassthroughPreservesUse(t *testing.T) { c := &clientConn{ server: &Server{}, - database: "duckgres", + database: "ducklake", passthrough: true, } - if got, want := c.rewriteDirectQuery("SHOW DATABASES"), "SHOW DATABASES"; got != want { - t.Fatalf("rewriteDirectQuery(SHOW DATABASES) = %q, want %q", got, want) + if got, want := c.rewriteDirectQuery("USE ducklake"), "USE ducklake"; got != want { + t.Fatalf("rewriteDirectQuery(USE ducklake) = %q, want %q", got, want) } } -func TestRewriteDirectQueryPreservesShowDatabasesWithoutLogicalCatalogMapping(t *testing.T) { +func TestRewriteDirectQueryPreservesUseWithoutCatalogRewrite(t *testing.T) { c := &clientConn{ server: &Server{}, - database: "duckgres", + database: "ducklake", } - if got, want := c.rewriteDirectQuery("SHOW DATABASES"), "SHOW DATABASES"; got != want { - t.Fatalf("rewriteDirectQuery(SHOW DATABASES) = %q, want %q", got, want) + if got, want := c.rewriteDirectQuery("USE ducklake"), "USE ducklake"; got != want { + t.Fatalf("rewriteDirectQuery(USE ducklake) = %q, want %q", got, want) } } diff --git a/server/exports.go b/server/exports.go index 725b20ee..e7d8e11f 100644 --- a/server/exports.go +++ b/server/exports.go @@ -85,11 +85,13 @@ func CancelClientConn(cc *clientConn) { } } -// SetLogicalCatalogMapping records whether this session should rewrite DuckLake -// metadata surfaces to the client-visible logical database name. -func SetLogicalCatalogMapping(cc *clientConn, enabled bool) { +// SetCatalogUseRewrite records whether this session should expand a bare +// `USE ducklake`/`USE iceberg` into its reliable two-part target. This is not +// masking — the catalog names are real; it only works around DuckDB's +// bare-catalog `USE` resolution. +func SetCatalogUseRewrite(cc *clientConn, enabled bool) { if cc != nil { - cc.logicalCatalogMapping = enabled + cc.catalogUseRewrite = enabled } } diff --git a/server/sessionmeta/sessionmeta.go b/server/sessionmeta/sessionmeta.go index a61252a5..b65793b8 100644 --- a/server/sessionmeta/sessionmeta.go +++ b/server/sessionmeta/sessionmeta.go @@ -1,6 +1,10 @@ // Package sessionmeta installs session-local catalog/metadata overrides on // a duckgres connection (current_database, pg_database, information_schema -// views) so they reflect the client-visible database name on the PG wire. +// views) so they reflect the catalog the session defaults to on the PG wire. +// +// The catalog name passed in is the real, attached catalog (e.g. "ducklake" or +// "iceberg") the session uses — duckgres no longer masks a logical database +// name onto a physical catalog, so current_database() reports the truth. // // Pure helpers — no dependency on github.com/duckdb/duckdb-go. The control // plane and other duckdb-free callers use this package without linking @@ -17,20 +21,23 @@ import ( ) // InitSessionDatabaseMetadata installs session-local overrides for metadata -// surfaces that should reflect the client-visible database name on pgwire. -func InitSessionDatabaseMetadata(ctx context.Context, executor sqlcore.QueryExecutor, database string) error { +// surfaces (current_database, pg_database, information_schema views) so they +// reflect `catalog` — the real, attached catalog the session defaults to. The +// caller resolves `catalog` to "ducklake"/"iceberg" (the names the catalogs are +// actually attached as); there is no logical→physical masking. +func InitSessionDatabaseMetadata(ctx context.Context, executor sqlcore.QueryExecutor, catalog string) error { if executor == nil { return fmt.Errorf("session executor is required") } - database = strings.TrimSpace(database) - if database == "" { + catalog = strings.TrimSpace(catalog) + if catalog == "" { return nil } if _, err := executor.ExecContext(ctx, fmt.Sprintf( "CREATE OR REPLACE TEMP MACRO current_database() AS %s", - quoteSQLStringLiteral(database), + quoteSQLStringLiteral(catalog), )); err != nil { return fmt.Errorf("create current_database() macro: %w", err) } @@ -44,15 +51,18 @@ func InitSessionDatabaseMetadata(ctx context.Context, executor sqlcore.QueryExec return fmt.Errorf("switch to memory catalog: %w", err) } defer func() { + // Leave the session in a real catalog (we entered `memory` to install the + // compat views there). For DuckLake sessions, restore `ducklake` here; for + // Iceberg the caller issues `USE iceberg.public` after this returns. Keep + // memory.main on the search_path so the pg_catalog compat macros stay + // resolvable after the switch. if duckLakeAttached { _, _ = executor.ExecContext(context.Background(), "USE ducklake") - // USE ducklake resets search_path to ducklake.main, excluding memory.main - // where pg_catalog macros live. Restore it so macros remain resolvable. _, _ = executor.ExecContext(context.Background(), "SET search_path = 'main,memory.main'") } }() - if _, err := executor.ExecContext(ctx, buildSessionMetadataSQL(database)); err != nil { + if _, err := executor.ExecContext(ctx, buildSessionMetadataSQL(catalog)); err != nil { return fmt.Errorf("apply session metadata override: %w", err) } diff --git a/tests/integration/catalog_demask_test.go b/tests/integration/catalog_demask_test.go new file mode 100644 index 00000000..7eddd7df --- /dev/null +++ b/tests/integration/catalog_demask_test.go @@ -0,0 +1,158 @@ +package integration + +import ( + "database/sql" + "fmt" + "testing" + + _ "github.com/lib/pq" +) + +// openCatalogConn opens a connection with an arbitrary startup `database` name. +// duckgres no longer masks that name onto a physical catalog — in standalone +// DuckLake mode the session lands in the real `ducklake` catalog regardless of +// the name, and current_database() reports the truth. +func openCatalogConn(t *testing.T, dbName string) *sql.DB { + t.Helper() + connStr := fmt.Sprintf( + "host=127.0.0.1 port=%d user=testuser password=testpass dbname=%s sslmode=require", + testHarness.dgPort, dbName, + ) + db, err := sql.Open("postgres", connStr) + if err != nil { + t.Fatalf("sql.Open(%q): %v", dbName, err) + } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + if err := db.Ping(); err != nil { + _ = db.Close() + t.Fatalf("Ping(%q): %v", dbName, err) + } + t.Cleanup(func() { _ = db.Close() }) + return db +} + +// TestCatalogIsNotMasked verifies the de-masking contract: the connection +// database name no longer drives current_database()/pg_catalog surfaces; they +// reflect the real attached catalog (`ducklake`). +func TestCatalogIsNotMasked(t *testing.T) { + if !testHarness.useDuckLake { + t.Skip("catalog de-masking is only observable in DuckLake mode") + } + + // Connect with a name that is NOT a real catalog — it must still de-mask to + // the real ducklake catalog rather than being honored as a logical name. + db := openCatalogConn(t, "anything_goes") + + t.Run("current_database reports the real catalog", func(t *testing.T) { + var current string + if err := db.QueryRow("SELECT current_database()").Scan(¤t); err != nil { + t.Fatalf("SELECT current_database(): %v", err) + } + if current != "ducklake" { + t.Fatalf("current_database() = %q, want %q", current, "ducklake") + } + }) + + t.Run("pg_database reports the real catalog", func(t *testing.T) { + var datname string + if err := db.QueryRow( + "SELECT datname FROM pg_catalog.pg_database WHERE datname = current_database()", + ).Scan(&datname); err != nil { + t.Fatalf("pg_database lookup: %v", err) + } + if datname != "ducklake" { + t.Fatalf("datname = %q, want %q", datname, "ducklake") + } + }) + + t.Run("information_schema reports the real catalog", func(t *testing.T) { + // Use a dedicated, uniquely-named schema and drop it after the test so it + // doesn't pollute the shared DuckLake catalog that later catalog tests + // compare against (e.g. \dn schema-count parity). + mustExec(t, db, "DROP SCHEMA IF EXISTS demask_ns CASCADE") + mustExec(t, db, "CREATE SCHEMA demask_ns") + t.Cleanup(func() { _, _ = db.Exec("DROP SCHEMA IF EXISTS demask_ns CASCADE") }) + mustExec(t, db, "CREATE OR REPLACE VIEW demask_ns.demask_view AS SELECT 1 AS id") + mustExec(t, db, "CREATE TABLE demask_ns.demask_tbl (id INTEGER)") + + checks := []struct { + name, query string + }{ + {"tables", `SELECT DISTINCT table_catalog FROM information_schema.tables WHERE table_schema='demask_ns' AND table_name='demask_tbl'`}, + {"columns", `SELECT DISTINCT table_catalog FROM information_schema.columns WHERE table_schema='demask_ns' AND table_name='demask_tbl'`}, + {"views", `SELECT DISTINCT table_catalog FROM information_schema.views WHERE table_schema='demask_ns' AND table_name='demask_view'`}, + {"schemata", `SELECT DISTINCT catalog_name FROM information_schema.schemata WHERE schema_name='public'`}, + } + for _, tc := range checks { + t.Run(tc.name, func(t *testing.T) { + var catalog string + if err := db.QueryRow(tc.query).Scan(&catalog); err != nil { + t.Fatalf("%s query: %v", tc.name, err) + } + if catalog != "ducklake" { + t.Fatalf("%s catalog = %q, want %q", tc.name, catalog, "ducklake") + } + }) + } + }) + + t.Run("internal ducklake metadata catalogs stay hidden", func(t *testing.T) { + var leaked int + if err := db.QueryRow(` + SELECT COUNT(*) FROM information_schema.schemata + WHERE catalog_name LIKE '__ducklake_metadata_%' + `).Scan(&leaked); err != nil { + t.Fatalf("count internal schemata rows: %v", err) + } + if leaked != 0 { + t.Fatalf("information_schema.schemata leaked %d internal DuckLake metadata rows", leaked) + } + }) +} + +// TestCatalogQualifiedNames verifies real-catalog three-part references work +// (with `public` mapping to DuckLake's `main` schema), and that a connection +// made with dbname=ducklake behaves identically. +func TestCatalogQualifiedNames(t *testing.T) { + if !testHarness.useDuckLake { + t.Skip("catalog mapping is only relevant in DuckLake mode") + } + + db := openCatalogConn(t, "ducklake") + // Dedicated schema + table names, dropped after the test so they don't + // pollute the shared DuckLake catalog later catalog tests compare against. + mustExec(t, db, "DROP SCHEMA IF EXISTS demask_ns CASCADE") + mustExec(t, db, "CREATE SCHEMA demask_ns") + t.Cleanup(func() { + _, _ = db.Exec("DROP SCHEMA IF EXISTS demask_ns CASCADE") + _, _ = db.Exec("DROP TABLE IF EXISTS ducklake.main.demask_qualified_pub") + }) + + // public -> main mapping for the real ducklake catalog. + mustExec(t, db, "CREATE TABLE ducklake.public.demask_qualified_pub (id INTEGER)") + mustExec(t, db, "INSERT INTO ducklake.public.demask_qualified_pub VALUES (1), (2)") + mustExec(t, db, "CREATE TABLE ducklake.demask_ns.qualified_bill (id INTEGER)") + mustExec(t, db, "INSERT INTO ducklake.demask_ns.qualified_bill VALUES (7)") + + var count int + if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.public.demask_qualified_pub").Scan(&count); err != nil { + t.Fatalf("read ducklake.public table: %v", err) + } + if count != 2 { + t.Fatalf("ducklake.public row count = %d, want 2", count) + } + // public maps to main, so the same table is reachable via ducklake.main. + if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.main.demask_qualified_pub").Scan(&count); err != nil { + t.Fatalf("read ducklake.main table: %v", err) + } + if count != 2 { + t.Fatalf("ducklake.main row count = %d, want 2", count) + } + if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.demask_ns.qualified_bill").Scan(&count); err != nil { + t.Fatalf("read ducklake.demask_ns table: %v", err) + } + if count != 1 { + t.Fatalf("ducklake.demask_ns row count = %d, want 1", count) + } +} diff --git a/tests/integration/logical_catalog_mapping_test.go b/tests/integration/logical_catalog_mapping_test.go deleted file mode 100644 index cfe1b775..00000000 --- a/tests/integration/logical_catalog_mapping_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package integration - -import "testing" - -func TestPgwireLogicalCatalogMapping(t *testing.T) { - if testHarness == nil || testHarness.DuckgresDB == nil { - t.Fatal("test harness is not initialized") - } - if !testHarness.useDuckLake { - t.Skip("logical catalog mapping requires DuckLake mode") - } - - db := testHarness.DuckgresDB - - mustExec(t, db, "DROP TABLE IF EXISTS ducklake.main.logical_catalog_mapping_test") - t.Cleanup(func() { - _, _ = db.Exec("DROP TABLE IF EXISTS ducklake.main.logical_catalog_mapping_test") - }) - - var currentDB string - if err := db.QueryRow("SELECT current_database()").Scan(¤tDB); err != nil { - t.Fatalf("query current_database(): %v", err) - } - if currentDB != "test" { - t.Fatalf("expected current_database() = %q, got %q", "test", currentDB) - } - - var datname string - if err := db.QueryRow("SELECT datname FROM pg_database WHERE datname = current_database()").Scan(&datname); err != nil { - t.Fatalf("query pg_database/current_database: %v", err) - } - if datname != "test" { - t.Fatalf("expected pg_database row %q, got %q", "test", datname) - } - - mustExec(t, db, "CREATE TABLE test.public.logical_catalog_mapping_test (id INTEGER)") - mustExec(t, db, `INSERT INTO "test"."public".logical_catalog_mapping_test VALUES (7)`) - - var legacyCount int - if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.main.logical_catalog_mapping_test").Scan(&legacyCount); err != nil { - t.Fatalf("query ducklake compatibility table: %v", err) - } - if legacyCount != 1 { - t.Fatalf("expected ducklake compatibility count 1, got %d", legacyCount) - } - - var tableCatalog string - if err := db.QueryRow(` - SELECT table_catalog - FROM information_schema.tables - WHERE table_schema = 'public' AND table_name = 'logical_catalog_mapping_test' - `).Scan(&tableCatalog); err != nil { - t.Fatalf("query information_schema.tables: %v", err) - } - if tableCatalog != "test" { - t.Fatalf("expected information_schema.tables.table_catalog = %q, got %q", "test", tableCatalog) - } - - var schemaCatalog string - if err := db.QueryRow(` - SELECT catalog_name - FROM information_schema.schemata - WHERE schema_name = 'public' - ORDER BY catalog_name - LIMIT 1 - `).Scan(&schemaCatalog); err != nil { - t.Fatalf("query information_schema.schemata: %v", err) - } - if schemaCatalog != "test" { - t.Fatalf("expected information_schema.schemata.catalog_name = %q, got %q", "test", schemaCatalog) - } -} diff --git a/tests/integration/logical_database_catalog_mapping_test.go b/tests/integration/logical_database_catalog_mapping_test.go deleted file mode 100644 index ecb5b367..00000000 --- a/tests/integration/logical_database_catalog_mapping_test.go +++ /dev/null @@ -1,170 +0,0 @@ -package integration - -import ( - "database/sql" - "fmt" - "testing" - - _ "github.com/lib/pq" -) - -func TestPgwireLogicalDatabaseCatalogMapping(t *testing.T) { - if testHarness == nil || testHarness.DuckgresDB == nil { - t.Skip("integration harness is not available") - } - if !testHarness.useDuckLake { - t.Skip("PR1 logical catalog mapping requires DuckLake mode") - } - - db := openLogicalDatabaseConnection(t, "duckgres") - t.Cleanup(func() { _ = db.Close() }) - - const ( - logicalSchema = "bill" - logicalTable = "pr1_logical_catalog_users" - quotedLogicalTable = "QuotedLogicalUsers" - legacyTable = "pr1_ducklake_legacy_users" - metadataProbeTable = "pr1_metadata_probe" - ) - - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.%s.%s`, logicalSchema, logicalTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.main.%q`, quotedLogicalTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.main.%s`, legacyTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.main.%s`, metadataProbeTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP SCHEMA IF EXISTS ducklake.%s CASCADE`, logicalSchema)) - - if _, err := db.Exec(fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS ducklake.%s`, logicalSchema)); err != nil { - t.Fatalf("create logical schema: %v", err) - } - t.Cleanup(func() { - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.%s.%s`, logicalSchema, logicalTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.main.%q`, quotedLogicalTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.main.%s`, legacyTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS ducklake.main.%s`, metadataProbeTable)) - _, _ = db.Exec(fmt.Sprintf(`DROP SCHEMA IF EXISTS ducklake.%s CASCADE`, logicalSchema)) - }) - - t.Run("metadata reports logical database", func(t *testing.T) { - var currentDB string - if err := db.QueryRow("SELECT current_database()").Scan(¤tDB); err != nil { - t.Fatalf("query current_database(): %v", err) - } - if currentDB != "duckgres" { - t.Fatalf("current_database() = %q, want %q", currentDB, "duckgres") - } - - var datname string - if err := db.QueryRow("SELECT datname FROM pg_catalog.pg_database WHERE datname = current_database()").Scan(&datname); err != nil { - t.Fatalf("query pg_database/current_database: %v", err) - } - if datname != "duckgres" { - t.Fatalf("pg_database datname = %q, want %q", datname, "duckgres") - } - - if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE duckgres.public.%s (id INTEGER)`, metadataProbeTable)); err != nil { - t.Fatalf("create metadata probe table: %v", err) - } - - var tableCatalog string - if err := db.QueryRow(fmt.Sprintf( - "SELECT table_catalog FROM information_schema.tables WHERE table_name = '%s' LIMIT 1", - metadataProbeTable, - )).Scan(&tableCatalog); err != nil { - t.Fatalf("query information_schema.tables: %v", err) - } - if tableCatalog != "duckgres" { - t.Fatalf("information_schema.tables table_catalog = %q, want %q", tableCatalog, "duckgres") - } - - var columnCatalog string - if err := db.QueryRow(fmt.Sprintf( - "SELECT table_catalog FROM information_schema.columns WHERE table_name = '%s' AND column_name = 'id' LIMIT 1", - metadataProbeTable, - )).Scan(&columnCatalog); err != nil { - t.Fatalf("query information_schema.columns: %v", err) - } - if columnCatalog != "duckgres" { - t.Fatalf("information_schema.columns table_catalog = %q, want %q", columnCatalog, "duckgres") - } - - var schemaCatalog string - if err := db.QueryRow("SELECT catalog_name FROM information_schema.schemata WHERE schema_name = 'public' LIMIT 1").Scan(&schemaCatalog); err != nil { - t.Fatalf("query information_schema.schemata: %v", err) - } - if schemaCatalog != "duckgres" { - t.Fatalf("information_schema.schemata catalog_name = %q, want %q", schemaCatalog, "duckgres") - } - }) - - t.Run("logical catalog unquoted execution works", func(t *testing.T) { - if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE duckgres.%s.%s (id INTEGER, name TEXT)`, logicalSchema, logicalTable)); err != nil { - t.Fatalf("create logical catalog table: %v", err) - } - if _, err := db.Exec(fmt.Sprintf(`INSERT INTO duckgres.%s.%s VALUES (1, 'alice')`, logicalSchema, logicalTable)); err != nil { - t.Fatalf("insert logical catalog table: %v", err) - } - - var name string - if err := db.QueryRow(fmt.Sprintf(`SELECT name FROM duckgres.%s.%s WHERE id = 1`, logicalSchema, logicalTable)).Scan(&name); err != nil { - t.Fatalf("select logical catalog table: %v", err) - } - if name != "alice" { - t.Fatalf("logical catalog select returned %q, want %q", name, "alice") - } - }) - - t.Run("logical catalog quoted execution works", func(t *testing.T) { - if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE "duckgres"."public"."%s" (id INTEGER)`, quotedLogicalTable)); err != nil { - t.Fatalf("create quoted logical catalog table: %v", err) - } - if _, err := db.Exec(fmt.Sprintf(`INSERT INTO "duckgres"."public"."%s" VALUES (7)`, quotedLogicalTable)); err != nil { - t.Fatalf("insert quoted logical catalog table: %v", err) - } - - var count int - if err := db.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM "duckgres"."public"."%s"`, quotedLogicalTable)).Scan(&count); err != nil { - t.Fatalf("select quoted logical catalog table: %v", err) - } - if count != 1 { - t.Fatalf("quoted logical catalog row count = %d, want %d", count, 1) - } - }) - - t.Run("physical ducklake references remain valid", func(t *testing.T) { - if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE ducklake.main.%s (id INTEGER)`, legacyTable)); err != nil { - t.Fatalf("create legacy ducklake table: %v", err) - } - if _, err := db.Exec(fmt.Sprintf(`INSERT INTO ducklake.main.%s VALUES (9)`, legacyTable)); err != nil { - t.Fatalf("insert legacy ducklake table: %v", err) - } - - var count int - if err := db.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM ducklake.main.%s`, legacyTable)).Scan(&count); err != nil { - t.Fatalf("select legacy ducklake table: %v", err) - } - if count != 1 { - t.Fatalf("legacy ducklake row count = %d, want %d", count, 1) - } - }) -} - -func openLogicalDatabaseConnection(t *testing.T, database string) *sql.DB { - t.Helper() - - connStr := fmt.Sprintf( - "host=127.0.0.1 port=%d user=testuser password=testpass dbname=%s sslmode=require", - testHarness.dgPort, - database, - ) - db, err := sql.Open("postgres", connStr) - if err != nil { - t.Fatalf("open logical database connection: %v", err) - } - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - if err := db.Ping(); err != nil { - _ = db.Close() - t.Fatalf("ping logical database connection: %v", err) - } - return db -} diff --git a/tests/integration/logical_database_catalog_test.go b/tests/integration/logical_database_catalog_test.go deleted file mode 100644 index c84e315e..00000000 --- a/tests/integration/logical_database_catalog_test.go +++ /dev/null @@ -1,264 +0,0 @@ -package integration - -import ( - "database/sql" - "fmt" - "testing" - - _ "github.com/lib/pq" -) - -func openLogicalDatabaseConn(t *testing.T, dbName string) *sql.DB { - t.Helper() - - connStr := fmt.Sprintf( - "host=127.0.0.1 port=%d user=testuser password=testpass dbname=%s sslmode=require", - testHarness.dgPort, - dbName, - ) - db, err := sql.Open("postgres", connStr) - if err != nil { - t.Fatalf("sql.Open(%q): %v", dbName, err) - } - - db.SetMaxOpenConns(1) - db.SetMaxIdleConns(1) - - if err := db.Ping(); err != nil { - _ = db.Close() - t.Fatalf("Ping(%q): %v", dbName, err) - } - - t.Cleanup(func() { - _ = db.Close() - }) - - return db -} - -func TestLogicalDatabaseCatalogMetadata(t *testing.T) { - if !testHarness.useDuckLake { - t.Skip("logical catalog mapping is only relevant in DuckLake mode") - } - - db := openLogicalDatabaseConn(t, "duckgres_catalog") - mustExec(t, db, "CREATE SCHEMA IF NOT EXISTS bill") - mustExec(t, db, "CREATE OR REPLACE VIEW bill.logical_catalog_view AS SELECT 1 AS id") - - t.Run("current_database_reports_logical_name", func(t *testing.T) { - var current string - if err := db.QueryRow("SELECT current_database()").Scan(¤t); err != nil { - t.Fatalf("SELECT current_database(): %v", err) - } - if current != "duckgres_catalog" { - t.Fatalf("current_database() = %q, want %q", current, "duckgres_catalog") - } - }) - - t.Run("pg_database_reports_logical_name", func(t *testing.T) { - var datname string - if err := db.QueryRow( - "SELECT datname FROM pg_catalog.pg_database WHERE datname = current_database()", - ).Scan(&datname); err != nil { - t.Fatalf("pg_database lookup: %v", err) - } - if datname != "duckgres_catalog" { - t.Fatalf("datname = %q, want %q", datname, "duckgres_catalog") - } - }) - - t.Run("information_schema_catalog_columns_report_logical_name", func(t *testing.T) { - checks := []struct { - name string - query string - }{ - { - name: "tables", - query: ` - SELECT DISTINCT table_catalog - FROM information_schema.tables - WHERE table_schema = 'public' AND table_name = 'users' - `, - }, - { - name: "columns", - query: ` - SELECT DISTINCT table_catalog - FROM information_schema.columns - WHERE table_schema = 'public' AND table_name = 'users' - `, - }, - { - name: "views", - query: ` - SELECT DISTINCT table_catalog - FROM information_schema.views - WHERE table_schema = 'bill' AND table_name = 'logical_catalog_view' - `, - }, - { - name: "schemata", - query: ` - SELECT DISTINCT catalog_name - FROM information_schema.schemata - WHERE schema_name = 'public' - `, - }, - } - - for _, tc := range checks { - t.Run(tc.name, func(t *testing.T) { - var catalog string - if err := db.QueryRow(tc.query).Scan(&catalog); err != nil { - t.Fatalf("%s query: %v", tc.name, err) - } - if catalog != "duckgres_catalog" { - t.Fatalf("%s catalog = %q, want %q", tc.name, catalog, "duckgres_catalog") - } - }) - } - }) - - t.Run("information_schema_schemata_hides_internal_ducklake_catalogs", func(t *testing.T) { - var leaked int - if err := db.QueryRow(` - SELECT COUNT(*) - FROM information_schema.schemata - WHERE catalog_name LIKE '__ducklake_metadata_%' - `).Scan(&leaked); err != nil { - t.Fatalf("count internal schemata rows: %v", err) - } - if leaked != 0 { - t.Fatalf("information_schema.schemata leaked %d internal DuckLake metadata rows", leaked) - } - }) - - t.Run("show_databases_reports_only_logical_name", func(t *testing.T) { - rows, err := db.Query("SHOW DATABASES") - if err != nil { - t.Fatalf("SHOW DATABASES: %v", err) - } - defer func() { - if err := rows.Close(); err != nil { - t.Fatalf("close SHOW DATABASES rows: %v", err) - } - }() - - var got []string - for rows.Next() { - var name string - if err := rows.Scan(&name); err != nil { - t.Fatalf("scan SHOW DATABASES row: %v", err) - } - got = append(got, name) - } - if err := rows.Err(); err != nil { - t.Fatalf("iterate SHOW DATABASES rows: %v", err) - } - - if len(got) != 1 || got[0] != "duckgres_catalog" { - t.Fatalf("SHOW DATABASES = %v, want [duckgres_catalog]", got) - } - }) -} - -func TestLogicalDatabaseCatalogQualifiedNames(t *testing.T) { - if !testHarness.useDuckLake { - t.Skip("logical catalog mapping is only relevant in DuckLake mode") - } - - t.Run("unquoted_logical_catalog_executes_and_ducklake_still_works", func(t *testing.T) { - db := openLogicalDatabaseConn(t, "duckgres_catalog") - mustExec(t, db, "DROP SCHEMA IF EXISTS bill CASCADE") - mustExec(t, db, "CREATE SCHEMA bill") - mustExec(t, db, "CREATE TABLE duckgres_catalog.bill.logical_catalog_table (id INTEGER)") - mustExec(t, db, "INSERT INTO duckgres_catalog.bill.logical_catalog_table VALUES (1), (2)") - - var count int - if err := db.QueryRow("SELECT COUNT(*) FROM duckgres_catalog.bill.logical_catalog_table").Scan(&count); err != nil { - t.Fatalf("logical catalog read: %v", err) - } - if count != 2 { - t.Fatalf("logical catalog row count = %d, want 2", count) - } - - if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.bill.logical_catalog_table").Scan(&count); err != nil { - t.Fatalf("ducklake compatibility read: %v", err) - } - if count != 2 { - t.Fatalf("ducklake compatibility row count = %d, want 2", count) - } - }) - - t.Run("quoted_logical_catalog_executes", func(t *testing.T) { - db := openLogicalDatabaseConn(t, "DuckgresCatalog") - mustExec(t, db, "DROP SCHEMA IF EXISTS bill CASCADE") - mustExec(t, db, "CREATE SCHEMA bill") - mustExec(t, db, `CREATE TABLE "DuckgresCatalog"."bill"."quoted_catalog_table" (id INTEGER)`) - mustExec(t, db, `INSERT INTO "DuckgresCatalog"."bill"."quoted_catalog_table" VALUES (7)`) - - var count int - if err := db.QueryRow(`SELECT COUNT(*) FROM "DuckgresCatalog"."bill"."quoted_catalog_table"`).Scan(&count); err != nil { - t.Fatalf("quoted logical catalog read: %v", err) - } - if count != 1 { - t.Fatalf("quoted logical catalog row count = %d, want 1", count) - } - - if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.bill.quoted_catalog_table").Scan(&count); err != nil { - t.Fatalf("ducklake quoted compatibility read: %v", err) - } - if count != 1 { - t.Fatalf("ducklake quoted compatibility row count = %d, want 1", count) - } - }) - - t.Run("dbt_style_temp_view_rename_uses_physical_catalog", func(t *testing.T) { - db := openLogicalDatabaseConn(t, "duckgres_catalog") - mustExec(t, db, "DROP SCHEMA IF EXISTS bill CASCADE") - mustExec(t, db, "CREATE SCHEMA bill") - mustExec(t, db, "CREATE TABLE duckgres_catalog.bill.customers (id INTEGER, name TEXT)") - mustExec(t, db, "INSERT INTO duckgres_catalog.bill.customers VALUES (1, 'alice')") - - mustExec(t, db, ` - CREATE VIEW "duckgres_catalog"."bill"."stg_customers__dbt_tmp" - AS ( - SELECT id, name - FROM "duckgres_catalog"."bill"."customers" - ) - `) - mustExec(t, db, `ALTER TABLE "duckgres_catalog"."bill"."stg_customers__dbt_tmp" RENAME TO "stg_customers"`) - - var count int - if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.bill.stg_customers").Scan(&count); err != nil { - t.Fatalf("query renamed dbt-style view through ducklake: %v", err) - } - if count != 1 { - t.Fatalf("renamed dbt-style view row count = %d, want 1", count) - } - }) - - t.Run("dbt_style_create_table_as_uses_physical_catalog", func(t *testing.T) { - db := openLogicalDatabaseConn(t, "duckgres_catalog") - mustExec(t, db, "DROP SCHEMA IF EXISTS bill CASCADE") - mustExec(t, db, "CREATE SCHEMA bill") - mustExec(t, db, "CREATE TABLE duckgres_catalog.bill.stg_customers (id INTEGER, name TEXT)") - mustExec(t, db, "INSERT INTO duckgres_catalog.bill.stg_customers VALUES (1, 'alice')") - - mustExec(t, db, ` - CREATE TABLE "duckgres_catalog"."bill"."customer_lifetime_value__dbt_tmp" - AS ( - SELECT * - FROM "duckgres_catalog"."bill"."stg_customers" - ) - `) - - var count int - if err := db.QueryRow("SELECT COUNT(*) FROM ducklake.bill.customer_lifetime_value__dbt_tmp").Scan(&count); err != nil { - t.Fatalf("query dbt-style ctas table through ducklake: %v", err) - } - if count != 1 { - t.Fatalf("dbt-style ctas row count = %d, want 1", count) - } - }) -} diff --git a/tests/k8s/k8s_test.go b/tests/k8s/k8s_test.go index e47dd107..f83caf37 100644 --- a/tests/k8s/k8s_test.go +++ b/tests/k8s/k8s_test.go @@ -4,6 +4,7 @@ package k8s_test import ( "context" + "crypto/tls" "database/sql" "fmt" "log" @@ -17,6 +18,8 @@ import ( "testing" "time" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/stdlib" _ "github.com/lib/pq" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -822,35 +825,48 @@ func openDBConn() (*sql.DB, error) { return openDBConnAs("postgres", "postgres") } +// sniOrgPrefixForUser maps a test user to the managed-hostname label that +// resolves to its org. Identity is now established from the managed hostname + +// username (the database param no longer routes), so every connection presents a +// managed SNI. Tenants are seeded with org name == username; the "postgres" user +// belongs to org "local". +func sniOrgPrefixForUser(username string) string { + if username == "postgres" { + return "local" + } + return username +} + func openDBConnAs(username, password string) (*sql.DB, error) { if portForward == nil { return nil, fmt.Errorf("port-forward state is not initialized") } - databaseName := username - if username == "postgres" { - databaseName = "duckgres" - } pgPort := portForward.currentPort() if pgPort == 0 { return nil, fmt.Errorf("port-forward port is not initialized") } - // kubectl port-forward passes raw TCP bytes, so the client still needs - // SSL. lib/pq sslmode=require skips server cert verification by default, - // which works with self-signed certs. - connStr := fmt.Sprintf( - "host=127.0.0.1 port=%d user=%s password=%s dbname=%s sslmode=require connect_timeout=30", - pgPort, - username, - password, - databaseName, - ) - - db, err := sql.Open("postgres", connStr) + // kubectl port-forward passes raw TCP bytes, so the client still needs SSL. + // Override the TLS ServerName (SNI) to the org's managed hostname — that is + // the identity signal the control plane reads. The startup database is left + // empty so the session lands in the org's default attached catalog + // (ducklake, or iceberg for iceberg-only tenants). InsecureSkipVerify is fine + // for the kind cluster's self-signed cert (port-forward already breaks the + // canonical hostname). + cfg, err := pgx.ParseConfig(fmt.Sprintf( + "postgres://%s:%s@127.0.0.1:%d/?sslmode=require&connect_timeout=30", + url.QueryEscape(username), url.QueryEscape(password), pgPort, + )) if err != nil { return nil, err } + cfg.Database = "" // empty → control plane selects the org's default catalog + cfg.TLSConfig = &tls.Config{ + ServerName: sniOrgPrefixForUser(username) + sniManagedSuffix, + InsecureSkipVerify: true, // self-signed kind cert + } + db := stdlib.OpenDB(*cfg) db.SetMaxOpenConns(1) db.SetConnMaxLifetime(30 * time.Second) return db, nil diff --git a/tests/k8s/sni_test.go b/tests/k8s/sni_test.go index 0b0f8d85..e2f40d45 100644 --- a/tests/k8s/sni_test.go +++ b/tests/k8s/sni_test.go @@ -7,7 +7,6 @@ import ( "crypto/tls" "errors" "fmt" - "strings" "testing" "time" @@ -16,10 +15,14 @@ import ( ) // SNI integration tests rely on the kind manifest at k8s/kind/control-plane.yaml -// running the control plane with --sni-routing-mode=passthrough and +// running the control plane with --sni-routing-mode=enforce and // --managed-hostname-suffixes=.dw.test.local. The seeded org is "local" with // database name "duckgres" and user postgres/postgres // (k8s/kind/config-store.seed.sql). +// +// Identity is now established from the managed hostname (SNI) + username only; +// the startup `database` param selects the session catalog (ducklake/iceberg/ +// empty=default), so current_database() reports the real catalog, not the org. const ( sniManagedSuffix = ".dw.test.local" @@ -27,6 +30,7 @@ const ( sniSeedDatabaseName = "duckgres" sniSeedUser = "postgres" sniSeedPassword = "postgres" + sniSeedCatalog = "ducklake" // the default catalog for the "local" org sniBogusPrefix = "ignored-by-test" ) @@ -60,10 +64,9 @@ func connectWithSNI(ctx context.Context, sni, database, user, password string) ( return pgx.ConnectConfig(ctx, cfg) } -// TestSNI_MatchedHostnameUsesDatabaseParam: a managed SNI resolves to the same -// org as the requested database, and the startup database remains the routing -// key when present. -func TestSNI_MatchedHostnameUsesDatabaseParam(t *testing.T) { +// TestSNI_MatchedHostnameSelectsCatalog: a managed SNI resolves the org, and an +// explicit `database=ducklake` selects that catalog for the session. +func TestSNI_MatchedHostnameSelectsCatalog(t *testing.T) { if err := waitForDBReady(60 * time.Second); err != nil { t.Fatalf("waitForDBReady: %v", err) } @@ -72,10 +75,10 @@ func TestSNI_MatchedHostnameUsesDatabaseParam(t *testing.T) { defer cancel() conn, err := connectWithSNI(ctx, sniSeedOrgName+sniManagedSuffix, - sniSeedDatabaseName, sniSeedUser, sniSeedPassword) + sniSeedCatalog, sniSeedUser, sniSeedPassword) if err != nil { - t.Fatalf("expected managed SNI=%q with database param %q to authenticate; got: %v", - sniSeedOrgName+sniManagedSuffix, sniSeedDatabaseName, err) + t.Fatalf("expected managed SNI=%q with catalog %q to authenticate; got: %v", + sniSeedOrgName+sniManagedSuffix, sniSeedCatalog, err) } defer conn.Close(ctx) @@ -83,16 +86,15 @@ func TestSNI_MatchedHostnameUsesDatabaseParam(t *testing.T) { if err := conn.QueryRow(ctx, "SELECT current_database()").Scan(¤t); err != nil { t.Fatalf("SELECT current_database(): %v", err) } - if current != sniSeedDatabaseName { - t.Fatalf("explicit database should be the routing database; got %q, want %q", - current, sniSeedDatabaseName) + if current != sniSeedCatalog { + t.Fatalf("current_database() should be the selected catalog; got %q, want %q", + current, sniSeedCatalog) } } -// TestSNI_MatchedHostnameUsesSNIWhenDatabaseParamEmpty: a client that omits -// the startup database can still route through a managed hostname that resolves -// to a configured org. -func TestSNI_MatchedHostnameUsesSNIWhenDatabaseParamEmpty(t *testing.T) { +// TestSNI_MatchedHostnameDefaultsCatalogWhenDatabaseEmpty: an empty startup +// database lands the session in the org's default attached catalog. +func TestSNI_MatchedHostnameDefaultsCatalogWhenDatabaseEmpty(t *testing.T) { if err := waitForDBReady(60 * time.Second); err != nil { t.Fatalf("waitForDBReady: %v", err) } @@ -112,16 +114,15 @@ func TestSNI_MatchedHostnameUsesSNIWhenDatabaseParamEmpty(t *testing.T) { if err := conn.QueryRow(ctx, "SELECT current_database()").Scan(¤t); err != nil { t.Fatalf("SELECT current_database(): %v", err) } - if current != sniSeedDatabaseName { - t.Fatalf("resolved SNI org database should be the routing database; got %q, want %q", - current, sniSeedDatabaseName) + if current != sniSeedCatalog { + t.Fatalf("empty database should default to the org catalog; got %q, want %q", + current, sniSeedCatalog) } } -// TestSNI_MatchedHostnameRejectsDifferentDatabaseOrg: a managed hostname -// cannot be used as a generic valid hostname for a different requested -// database. The URL and startup database must resolve to the same org. -func TestSNI_MatchedHostnameRejectsDifferentDatabaseOrg(t *testing.T) { +// TestSNI_UnknownHostnameRejected: a managed-suffix hostname whose prefix +// resolves to no org is rejected — identity comes solely from the hostname. +func TestSNI_UnknownHostnameRejected(t *testing.T) { if err := waitForDBReady(60 * time.Second); err != nil { t.Fatalf("waitForDBReady: %v", err) } @@ -130,29 +131,23 @@ func TestSNI_MatchedHostnameRejectsDifferentDatabaseOrg(t *testing.T) { defer cancel() _, err := connectWithSNI(ctx, sniBogusPrefix+sniManagedSuffix, - sniSeedDatabaseName, sniSeedUser, sniSeedPassword) + sniSeedCatalog, sniSeedUser, sniSeedPassword) if err == nil { - t.Fatalf("expected managed SNI for a different org to be rejected") - } - if !strings.Contains(err.Error(), "does not match managed hostname") { - t.Fatalf("expected database/hostname mismatch error; got: %v", err) + t.Fatalf("expected unknown managed hostname to be rejected") } var pgErr *pgconn.PgError if !errors.As(err, &pgErr) { t.Fatalf("expected pg error; got: %T %v", err, err) } - if pgErr.Code != "28000" { - t.Fatalf("SQLSTATE = %q, want 28000", pgErr.Code) + if pgErr.Code != "08006" { + t.Fatalf("SQLSTATE = %q, want 08006", pgErr.Code) } } -// TestSNI_LegacyHostnameFallsThroughInPassthrough: with the kind setup in -// passthrough mode, an SNI that doesn't match a managed suffix falls back -// to the database startup param. The connection should succeed AND the -// control plane should emit a warn log identifying the legacy caller. -// (We can't easily assert the log line from here without log scraping; the -// happy path itself proves the fallback works.) -func TestSNI_LegacyHostnameFallsThroughInPassthrough(t *testing.T) { +// TestSNI_LegacyHostnameRejected: an unmanaged hostname (e.g. the raw +// port-forward host) has no org and is rejected under enforce — there is no +// database-param fallback. +func TestSNI_LegacyHostnameRejected(t *testing.T) { if err := waitForDBReady(60 * time.Second); err != nil { t.Fatalf("waitForDBReady: %v", err) } @@ -160,23 +155,40 @@ func TestSNI_LegacyHostnameFallsThroughInPassthrough(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Use 127.0.0.1 as the SNI (matches what lib/pq would send when - // connecting via port-forward without override). Definitely not on - // .dw.test.local, so it falls through to the legacy database-param - // path with a warn log. - conn, err := connectWithSNI(ctx, "127.0.0.1", - sniSeedDatabaseName, sniSeedUser, sniSeedPassword) - if err != nil { - t.Fatalf("legacy hostname should still authenticate via database-param fallback in passthrough mode; got: %v", err) + _, err := connectWithSNI(ctx, "127.0.0.1", + sniSeedCatalog, sniSeedUser, sniSeedPassword) + if err == nil { + t.Fatalf("expected unmanaged hostname to be rejected under enforce") } - defer conn.Close(ctx) + var pgErr *pgconn.PgError + if !errors.As(err, &pgErr) { + t.Fatalf("expected pg error; got: %T %v", err, err) + } + if pgErr.Code != "08006" { + t.Fatalf("SQLSTATE = %q, want 08006", pgErr.Code) + } +} - var current string - if err := conn.QueryRow(ctx, "SELECT current_database()").Scan(¤t); err != nil { - t.Fatalf("SELECT current_database(): %v", err) +// TestSNI_InvalidCatalogRejected: a managed hostname authenticates, but an +// unknown database/catalog name fails with 3D000. +func TestSNI_InvalidCatalogRejected(t *testing.T) { + if err := waitForDBReady(60 * time.Second); err != nil { + t.Fatalf("waitForDBReady: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err := connectWithSNI(ctx, sniSeedOrgName+sniManagedSuffix, + "not_a_catalog", sniSeedUser, sniSeedPassword) + if err == nil { + t.Fatalf("expected an invalid catalog name to be rejected") + } + var pgErr *pgconn.PgError + if !errors.As(err, &pgErr) { + t.Fatalf("expected pg error; got: %T %v", err, err) } - if current != sniSeedDatabaseName { - t.Fatalf("legacy fallback should land us in the param-named database; got %q, want %q", - current, sniSeedDatabaseName) + if pgErr.Code != "3D000" { + t.Fatalf("SQLSTATE = %q, want 3D000", pgErr.Code) } }