From 5ffcae907290086a80e494619a190a5156f30023 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Thu, 28 May 2026 15:12:32 +0900 Subject: [PATCH 1/2] Spark 4.1: bind parameters in IcebergSparkSqlExtensionsParser Spark 4.1 routes parameterized queries through ParserInterface.parsePlanWithParameters, whose default implementation drops the parameter context and falls back to parsePlan. The Iceberg extensions parser delegates non-Iceberg SQL to the Spark parser but did not override parsePlanWithParameters, so positional and named parameters were left unbound (UNBOUND_SQL_PARAMETER) for any SQL routed through the Iceberg session extensions on Spark Connect. Override parsePlanWithParameters to delegate with the parameter context intact, mirroring parsePlan. Signed-off-by: Jiwon Park --- .../IcebergSparkSqlExtensionsParser.scala | 17 +++++++ .../iceberg/spark/TestExtendedParser.java | 48 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index ac127f754a91..709c1d28a576 100644 --- a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParameterContext import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext @@ -125,6 +126,22 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) } } + /** + * Parse a string to a LogicalPlan, binding the given parameters. + */ + override def parsePlanWithParameters( + sqlText: String, + parameterContext: ParameterContext): LogicalPlan = { + val sqlTextAfterSubstitution = substitutor.substitute(sqlText) + if (isIcebergCommand(sqlTextAfterSubstitution)) { + parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) } + .asInstanceOf[LogicalPlan] + } else { + RewriteViewCommands(SparkSession.active) + .apply(delegate.parsePlanWithParameters(sqlText, parameterContext)) + } + } + private def isIcebergCommand(sqlText: String): Boolean = { val normalized = sqlText .toLowerCase(Locale.ROOT) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java index ef4f0090292c..36e7314473aa 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java @@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -27,14 +29,19 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.iceberg.NullOrder; import org.apache.iceberg.SortDirection; import org.apache.iceberg.expressions.Term; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.parser.AbstractSqlParser; import org.apache.spark.sql.catalyst.parser.AstBuilder; +import org.apache.spark.sql.catalyst.parser.ParameterContext; import org.apache.spark.sql.catalyst.parser.ParserInterface; import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -184,6 +191,47 @@ public void testParseSortOrderFindsExtendedParserInParentClassField() throws Exc verify(icebergParser).parseSortOrder("id ASC NULLS FIRST"); } + /** Tests that non-Iceberg SQL delegates with the parameter context intact. */ + @Test + public void testParsePlanWithParametersDelegatesForNonIcebergSql() throws Exception { + ParserInterface delegate = mock(ParserInterface.class); + ParameterContext context = mock(ParameterContext.class); + LogicalPlan plan = new OneRowRelation(); + when(delegate.parsePlanWithParameters(anyString(), any(ParameterContext.class))) + .thenReturn(plan); + + IcebergSparkSqlExtensionsParser parser = new IcebergSparkSqlExtensionsParser(delegate); + + parser.parsePlanWithParameters("SELECT 1 WHERE 1 = ?", context); + + verify(delegate).parsePlanWithParameters("SELECT 1 WHERE 1 = ?", context); + } + + /** Tests that a positional parameter binds through a real Iceberg-extended parser. */ + @Test + public void testParsePlanWithParametersBindsPositionalParameter() throws Exception { + IcebergSparkSqlExtensionsParser parser = new IcebergSparkSqlExtensionsParser(originalParser); + setSessionStateParser(spark.sessionState(), parser); + + List rows = spark.sql("SELECT ? AS id", new Object[] {42}).collectAsList(); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get(0)).isEqualTo(42); + } + + /** Tests that a named parameter binds through a real Iceberg-extended parser. */ + @Test + public void testParsePlanWithParametersBindsNamedParameter() throws Exception { + IcebergSparkSqlExtensionsParser parser = new IcebergSparkSqlExtensionsParser(originalParser); + setSessionStateParser(spark.sessionState(), parser); + + Map args = Collections.singletonMap("id", 42); + List rows = spark.sql("SELECT :id AS id", args).collectAsList(); + + assertThat(rows).hasSize(1); + assertThat(rows.get(0).get(0)).isEqualTo(42); + } + private static void setSessionStateParser(Object sessionState, ParserInterface parser) throws Exception { Class clazz = sessionState.getClass(); From 5411fdff89f653321374a0693048763604fb19a4 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Mon, 1 Jun 2026 14:39:49 +0900 Subject: [PATCH 2/2] Spark 4.1: Document why parameterContext is not propagated for Iceberg DDL Iceberg DDL grammars do not accept parameter markers (`?` / `:name`), so the parameterContext is intentionally dropped on the Iceberg-command branch of parsePlanWithParameters. Add a comment per review feedback. Signed-off-by: Jiwon Park --- .../parser/extensions/IcebergSparkSqlExtensionsParser.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 709c1d28a576..22c28dce1558 100644 --- a/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v4.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -134,6 +134,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) parameterContext: ParameterContext): LogicalPlan = { val sqlTextAfterSubstitution = substitutor.substitute(sqlText) if (isIcebergCommand(sqlTextAfterSubstitution)) { + // Iceberg DDL grammars do not accept parameter markers (`?` / `:name`), so the + // parameterContext is intentionally not propagated on this path. parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) } .asInstanceOf[LogicalPlan] } else {