diff --git a/server/conn.go b/server/conn.go index d3151379..2711b99d 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1448,7 +1448,9 @@ func (c *clientConn) handleQuery(body []byte) error { runExec := func() (ExecResult, error) { execResult, err := c.executor.ExecContext(ctx, query) if err != nil { - fallbackResult, handled, fallbackErr := c.execCompatibilityFallback(ctx, query, err) + fallbackResult, handled, fallbackErr := c.execCompatibilityFallback(ctx, query, err, func(fallbackQuery string) (ExecResult, error) { + return c.executor.ExecContext(ctx, fallbackQuery) + }) if handled { return fallbackResult, fallbackErr } @@ -2155,15 +2157,11 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr runExec := func() (ExecResult, error) { execResult, err := c.executor.ExecContext(ctx, executedQuery) if err != nil { - if isAlterTableNotTableError(err) { - if alteredQuery, ok := transpiler.ConvertAlterTableToAlterView(executedQuery); ok { - return c.executor.ExecContext(ctx, alteredQuery) - } - } - if isDropTableOnViewError(err) { - if alteredQuery, ok := transpiler.ConvertDropTableToDropView(executedQuery); ok { - return c.executor.ExecContext(ctx, alteredQuery) - } + fallbackResult, handled, fallbackErr := c.execCompatibilityFallback(ctx, executedQuery, err, func(fallbackQuery string) (ExecResult, error) { + return c.executor.ExecContext(ctx, fallbackQuery) + }) + if handled { + return fallbackResult, fallbackErr } } return execResult, err @@ -5723,17 +5721,10 @@ func (c *clientConn) handleExecute(body []byte) { runExec := func() (ExecResult, error) { result, err := c.executor.Exec(convertedQuery, args...) if err != nil { - // Retry ALTER TABLE as ALTER VIEW if target is a view - if isAlterTableNotTableError(err) { - if alteredQuery, ok := transpiler.ConvertAlterTableToAlterView(convertedQuery); ok { - return c.executor.Exec(alteredQuery, args...) - } - } - // Retry DROP TABLE as DROP VIEW if target is a view - if isDropTableOnViewError(err) { - if alteredQuery, ok := transpiler.ConvertDropTableToDropView(convertedQuery); ok { - return c.executor.Exec(alteredQuery, args...) - } + if fallbackResult, handled, fallbackErr := c.execCompatibilityFallback(queryCtx, convertedQuery, err, func(fallbackQuery string) (ExecResult, error) { + return c.executor.Exec(fallbackQuery, args...) + }); handled { + return fallbackResult, fallbackErr } } return result, err diff --git a/server/conn_test.go b/server/conn_test.go index 876f7089..ab5c0e9f 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -1392,7 +1392,7 @@ func (e *queryErrorExecutor) PingContext(context.Context) error { func (e *queryErrorExecutor) Close() error { return nil } type abortedSelectRecoveryExecutor struct { - queryCalls int + queryCalls int noopProfiling rollbackCalls int } @@ -1434,9 +1434,9 @@ func (e *abortedSelectRecoveryExecutor) Close() error { return nil } type abortedExecAlterViewRecoveryExecutor struct { originalQuery string noopProfiling - rewritten string - execCalls []string - execCtxCalls []string + rewritten string + execCalls []string + execCtxCalls []string } func (e *abortedExecAlterViewRecoveryExecutor) execResult(query string, callIndex int) (ExecResult, error) { @@ -1487,7 +1487,7 @@ func (e *abortedExecAlterViewRecoveryExecutor) Close() error { return nil } type abortedAlterViewRecoveryExecutor struct { execContextQueries []string noopProfiling - execQueries []string + execQueries []string } func (e *abortedAlterViewRecoveryExecutor) QueryContext(context.Context, string, ...any) (RowSet, error) { @@ -1630,11 +1630,19 @@ func TestHandleQueryEmitsCorrectSQLSTATEOnDuckDBErrors(t *testing.T) { // is found or the field is absent. func extractErrorResponseField(t *testing.T, buf []byte, field byte) string { t.Helper() + if value, ok := errorResponseField(buf, field); ok { + return value + } + t.Fatalf("no ErrorResponse frame with field %q in output (%d bytes)", field, len(buf)) + return "" +} + +func errorResponseField(buf []byte, field byte) (string, bool) { for i := 0; i+5 <= len(buf); { msgType := buf[i] length := int(binary.BigEndian.Uint32(buf[i+1 : i+5])) if length < 4 || i+1+length > len(buf) { - return "" + return "", false } body := buf[i+5 : i+1+length] if msgType == 'E' { @@ -1645,19 +1653,18 @@ func extractErrorResponseField(t *testing.T, buf []byte, field byte) string { } end := bytes.IndexByte(body[j+1:], 0) if end < 0 { - return "" + return "", false } if code == field { - return string(body[j+1 : j+1+end]) + return string(body[j+1 : j+1+end]), true } j += 1 + end + 1 } - t.Fatalf("ErrorResponse found but field %q missing", field) + return "", false } i += 1 + length } - t.Fatalf("no ErrorResponse frame in output (%d bytes)", len(buf)) - return "" + return "", false } func TestExecuteSingleStatementRecoversAbortedAutocommitAlterViewFallback(t *testing.T) { @@ -1864,6 +1871,92 @@ func TestHandleExecuteAbortedRecoveryPreservesAlterViewFallback(t *testing.T) { } } +func TestHandleExecuteUsesCompatibilityFallbackForIcebergDropSchemaCascade(t *testing.T) { + clientSide, serverSide := net.Pipe() + defer func() { _ = clientSide.Close() }() + defer func() { _ = serverSide.Close() }() + + const query = "DROP SCHEMA IF EXISTS fivetran_testing_schema_abc CASCADE" + executor := &extendedDropSchemaCascadeFallbackExecutor{} + + var out bytes.Buffer + c := &clientConn{ + server: &Server{activeQueries: make(map[BackendKey]context.CancelFunc)}, + conn: clientSide, + reader: bufio.NewReader(clientSide), + writer: bufio.NewWriter(&out), + ctx: context.Background(), + cancel: func() {}, + txStatus: txStatusIdle, + executor: executor, + portals: map[string]*portal{ + "p": { + stmt: &preparedStmt{ + query: query, + convertedQuery: query, + }, + }, + }, + } + + var body bytes.Buffer + body.WriteString("p") + body.WriteByte(0) + if err := binary.Write(&body, binary.BigEndian, int32(0)); err != nil { + t.Fatalf("encode execute body: %v", err) + } + + c.handleExecute(body.Bytes()) + + if code, ok := errorResponseField(out.Bytes(), 'C'); ok { + t.Fatalf("unexpected ErrorResponse SQLSTATE %q", code) + } + if !slices.Equal(executor.execCalls, []string{query}) { + t.Fatalf("Exec calls = %v, want original query only", executor.execCalls) + } + wantExecContext := []string{`DROP SCHEMA IF EXISTS "iceberg"."fivetran_testing_schema_abc"`} + if !slices.Equal(executor.execContextCalls, wantExecContext) { + t.Fatalf("ExecContext calls = %v, want %v", executor.execContextCalls, wantExecContext) + } +} + +type extendedDropSchemaCascadeFallbackExecutor struct { + noopProfiling + execCalls []string + execContextCalls []string +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) QueryContext(_ context.Context, query string, _ ...any) (RowSet, error) { + if strings.Contains(query, "duckdb_settings()") { + return &icebergDropSchemaCascadeRows{values: []string{`"iceberg"."public",memory.main`}}, nil + } + return &icebergDropSchemaCascadeRows{values: nil}, nil +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) ExecContext(_ context.Context, query string, _ ...any) (ExecResult, error) { + e.execContextCalls = append(e.execContextCalls, strings.TrimSpace(query)) + return &fakeExecResult{}, nil +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) Query(string, ...any) (RowSet, error) { + return nil, errors.New("not implemented") +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) Exec(query string, _ ...any) (ExecResult, error) { + e.execCalls = append(e.execCalls, strings.TrimSpace(query)) + return nil, errors.New("flight execute update: rpc error: code = InvalidArgument desc = failed to execute update: Not implemented Error: DROP SCHEMA CASCADE is not supported for Iceberg schemas currently") +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) ConnContext(context.Context) (RawConn, error) { + return nil, errors.New("not implemented") +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) PingContext(context.Context) error { + return errors.New("not implemented") +} + +func (e *extendedDropSchemaCascadeFallbackExecutor) Close() error { return nil } + func TestHandleExecuteRecoversAbortedAutocommitAlterViewFallback(t *testing.T) { clientSide, serverSide := net.Pipe() defer func() { _ = clientSide.Close() }() diff --git a/server/exec_fallback.go b/server/exec_fallback.go index 0f54b0a5..48e60c20 100644 --- a/server/exec_fallback.go +++ b/server/exec_fallback.go @@ -7,17 +7,19 @@ import ( "github.com/posthog/duckgres/transpiler" ) -func (c *clientConn) execCompatibilityFallback(ctx context.Context, query string, execErr error) (ExecResult, bool, error) { +type execFallbackRunner func(string) (ExecResult, error) + +func (c *clientConn) execCompatibilityFallback(ctx context.Context, query string, execErr error, exec execFallbackRunner) (ExecResult, bool, error) { if isAlterTableNotTableError(execErr) { if alteredQuery, ok := transpiler.ConvertAlterTableToAlterView(query); ok { - result, err := c.executor.ExecContext(ctx, alteredQuery) + result, err := exec(alteredQuery) return result, true, err } } if isDropTableOnViewError(execErr) { if alteredQuery, ok := transpiler.ConvertDropTableToDropView(query); ok { - result, err := c.executor.ExecContext(ctx, alteredQuery) + result, err := exec(alteredQuery) return result, true, err } } diff --git a/server/exec_fallback_test.go b/server/exec_fallback_test.go index c42d6829..b14224c3 100644 --- a/server/exec_fallback_test.go +++ b/server/exec_fallback_test.go @@ -11,7 +11,9 @@ func TestExecCompatibilityFallbackReturnsIcebergFallbackError(t *testing.T) { execErr := errors.New("Not implemented Error: DROP SCHEMA CASCADE is not supported for Iceberg schemas currently") c := &clientConn{executor: &failingFallbackExecutor{}} - _, handled, err := c.execCompatibilityFallback(context.Background(), "DROP SCHEMA IF EXISTS stripe CASCADE", execErr) + _, handled, err := c.execCompatibilityFallback(context.Background(), "DROP SCHEMA IF EXISTS stripe CASCADE", execErr, func(string) (ExecResult, error) { + return nil, errors.New("unexpected fallback exec") + }) if !handled { t.Fatal("expected Iceberg fallback to handle unsupported DROP SCHEMA CASCADE error") } diff --git a/server/sessionmeta/sessionmeta.go b/server/sessionmeta/sessionmeta.go index a61252a5..69b60a7d 100644 --- a/server/sessionmeta/sessionmeta.go +++ b/server/sessionmeta/sessionmeta.go @@ -252,6 +252,16 @@ func buildSessionInformationSchemaColumnsViewSQL() string { AND c.column_name = '__' AND UPPER(c.data_type) = 'UNKNOWN' ) + AND NOT ( + c.table_catalog = 'iceberg' + AND EXISTS ( + SELECT 1 + FROM main.__duckgres_iceberg_column_metadata im + WHERE im.table_schema = c.table_schema + AND im.table_name = c.table_name + AND im.column_name = c.column_name + ) + ) UNION ALL SELECT 'iceberg' AS source_catalog, diff --git a/server/sessionmeta/sessionmeta_test.go b/server/sessionmeta/sessionmeta_test.go index 5ab1bd9b..97962571 100644 --- a/server/sessionmeta/sessionmeta_test.go +++ b/server/sessionmeta/sessionmeta_test.go @@ -2,10 +2,12 @@ package sessionmeta import ( "context" + "database/sql" "errors" "strings" "testing" + _ "github.com/duckdb/duckdb-go/v2" "github.com/posthog/duckgres/server/sqlcore" ) @@ -153,3 +155,84 @@ func TestInformationSchemaColumnsCompatLoadedIcebergColumnsKeepIcebergCatalog(t t.Fatalf("loaded Iceberg columns should not use current_database() as table_catalog in:\n%s", got) } } + +func TestInformationSchemaColumnsCompatPrefersLoadedIcebergMetadata(t *testing.T) { + db, err := sql.Open("duckdb", ":memory:") + if err != nil { + t.Fatalf("open duckdb: %v", err) + } + defer func() { _ = db.Close() }() + + stmts := []string{ + `ATTACH ':memory:' AS iceberg`, + `CREATE SCHEMA iceberg.stripe`, + `CREATE TABLE iceberg.stripe.account (requirements_currently_due INTEGER)`, + sessionColumnMetadataTableSQL(), + sessionIcebergColumnMetadataTableSQL(), + `INSERT INTO main.__duckgres_iceberg_column_metadata ( + table_schema, + table_name, + column_name, + ordinal_position, + is_nullable, + data_type, + character_maximum_length, + character_octet_length, + numeric_precision, + numeric_scale, + datetime_precision + ) VALUES ( + 'stripe', + 'account', + 'requirements_currently_due', + 1, + 'YES', + 'STRUCT(currently_due VARCHAR[])', + NULL, + NULL, + NULL, + NULL, + NULL + )`, + buildSessionInformationSchemaColumnsViewSQL(), + } + for _, stmt := range stmts { + if _, err := db.Exec(stmt); err != nil { + t.Fatalf("exec %q: %v", stmt, err) + } + } + + var dataType string + err = db.QueryRow(` + SELECT data_type + FROM main.information_schema_columns_compat + WHERE table_catalog = 'iceberg' + AND table_schema = 'stripe' + AND table_name = 'account' + AND column_name = 'requirements_currently_due' + `).Scan(&dataType) + if err != nil { + t.Fatalf("query compat data_type: %v", err) + } + if dataType != "json" { + t.Fatalf("data_type = %q, want json from loaded Iceberg metadata", dataType) + } +} + +func TestInformationSchemaColumnsCompatSuppressesNativeIcebergDuplicatesExplicitly(t *testing.T) { + got := buildSessionInformationSchemaColumnsViewSQL() + for _, want := range []string{ + "AND NOT (", + "FROM main.__duckgres_iceberg_column_metadata im", + "WHERE im.table_schema = c.table_schema", + "AND im.table_name = c.table_name", + "AND im.column_name = c.column_name", + } { + if !strings.Contains(got, want) { + t.Fatalf("columns compat SQL should explicitly suppress native Iceberg duplicates; missing %q in:\n%s", want, got) + } + } + if strings.Contains(got, "source_priority") { + t.Fatalf("columns compat SQL should not rely on hidden source_priority ranking in:\n%s", got) + } +}