From 6ae218117195f26a4a4a520f36a1b037257e37cb Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Fri, 27 Feb 2026 07:06:00 +0100 Subject: [PATCH] SPARK-55742: Support TransformingEncoder inside RowEncoder --- .../spark/sql/catalyst/SerializerBuildHelper.scala | 9 +++++++-- .../catalyst/encoders/ExpressionEncoderSuite.scala | 11 ++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index b8b2406a58130..69b4b044e1e5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{expressions => exprs} import org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, AgnosticExpressionPathEncoder, Codec, JavaSerializationCodec, KryoSerializationCodec} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, GeographyEncoder, GeometryEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder} -import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor} +import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{dataTypeForClass, externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor} import org.apache.spark.sql.catalyst.expressions.{BoundReference, CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, UnsafeArrayData} import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayData, CharVarcharCodegenUtils, DateTimeUtils, GenericArrayData, IntervalUtils, STUtils} @@ -421,11 +421,16 @@ object SerializerBuildHelper { case AgnosticEncoders.RowEncoder(fields) => val serializedFields = fields.zipWithIndex.map { case (field, index) => + val expected = field.enc match { + case AgnosticEncoders.TransformingEncoder(tag, _, _, _) => + dataTypeForClass(tag.runtimeClass) + case other => field.enc.dataType + } val fieldValue = createSerializer( field.enc, ValidateExternalType( GetExternalRowField(input, index, field.name), - field.enc.dataType, + expected, lenientExternalDataTypeFor(field.enc))) val convertedField = if (field.nullable) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala index 287b99d10d659..1e4b5c98fd188 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.{Encoder, Encoders, Row} import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, OptionalData, PrimitiveData, ScalaReflection, ScroogeLikeExample} import org.apache.spark.sql.catalyst.analysis.AnalysisTest import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, EncoderField, IterableEncoder, MapEncoder, OptionEncoder, PrimitiveIntEncoder, ProductEncoder, TimestampEncoder, TransformingEncoder} +import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, EncoderField, IterableEncoder, MapEncoder, OptionEncoder, PrimitiveIntEncoder, ProductEncoder, RowEncoder => AgnosticRowEncoder, TimestampEncoder, TransformingEncoder} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NaNvl} import org.apache.spark.sql.catalyst.plans.CodegenInterpretedPlanTest import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -829,6 +829,15 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes testDataTransformingEnc(enc, data) } + + test("SPARK-55742: Transforming encoder inside row encoder") { + val enc = AgnosticRowEncoder(Seq( + EncoderField("time", longEncForTimestamp, nullable = false, Metadata.empty) + )) + val data = Seq(Row(V(0L)), Row(V(1L))) + testDataTransformingEnc(enc, data) + } + // Scala / Java big decimals ---------------------------------------------------------- encodeDecodeTest(BigDecimal("9".repeat(20) + "." + "9".repeat(18)),