From c3948eaee31d59f359fe83fa17be67e89ad78ecb Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Tue, 12 May 2026 11:52:15 +0000 Subject: [PATCH] [SPARK-XXXXX][SQL][TESTS] Drop redundant BeforeAndAfterEach and SharedSparkSession mixins SharedSparkSession transitively extends BeforeAndAfterEach (via SharedSparkSessionBase), so explicit `with BeforeAndAfterEach` alongside SharedSparkSession is redundant. StreamTest extends SharedSparkSession, so explicit `with SharedSparkSession` after StreamTest is redundant. This change drops: - `with BeforeAndAfterEach` from 5 suites whose base already provides it: StreamingSourceEvolutionSuite, RocksDBTimestampEncoderOperationsSuite, ResolveChangelogTableNetChangesTestsBase, ResolveChangelogTablePostProcessingSuite, ResolveChangelogTableStreamingPostProcessingSuite. - `with SharedSparkSession` from 5 StreamTest-based suites: ForeachWriterSuite, FileStreamSourceTest, TextSocketStreamSuite, OperatorStateMetadataSuite, RocksDBCheckpointFailureInjectionSuite. Unused `BeforeAndAfterEach` / `SharedSparkSession` imports are also removed. Suites that mix in the older `org.scalatest.BeforeAndAfter` (singular) trait keep it - that's a distinct API, not redundant. Generated-by: Claude Code --- .../connector/ResolveChangelogTableNetChangesSuite.scala | 6 +----- .../ResolveChangelogTablePostProcessingSuite.scala | 6 +----- .../ResolveChangelogTableStreamingPostProcessingSuite.scala | 6 +----- .../execution/streaming/sources/ForeachWriterSuite.scala | 3 +-- .../execution/streaming/sources/TextSocketStreamSuite.scala | 3 +-- .../streaming/state/OperatorStateMetadataSuite.scala | 3 +-- .../state/RocksDBCheckpointFailureInjectionSuite.scala | 4 +--- .../state/RocksDBTimestampEncoderOperationsSuite.scala | 4 +--- .../apache/spark/sql/streaming/FileStreamSourceSuite.scala | 4 +--- .../sql/streaming/test/StreamingSourceEvolutionSuite.scala | 4 ++-- 10 files changed, 11 insertions(+), 32 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala index 33e97de957581..bd56fe80b5f82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableNetChangesSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector import java.util.Collections -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.catalog.{ @@ -54,9 +52,7 @@ import org.apache.spark.unsafe.types.UTF8String * netChanges sees them. Output assertions stay identical because both paths produce * the same `_change_type` labels at the netChanges input. */ -trait ResolveChangelogTableNetChangesTestsBase - extends SharedSparkSession - with BeforeAndAfterEach { +trait ResolveChangelogTableNetChangesTestsBase extends SharedSparkSession { /** * Value of the user-facing CDC option `computeUpdates` that this test run diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala index e0f426abe5d20..36bd86eeab471 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTablePostProcessingSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector import java.util.Collections -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.InternalRow @@ -47,9 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String * paths (carry-over removal, update detection) and then verifies that Spark's analyzer rule * correctly transforms the plan and produces the expected output. */ -class ResolveChangelogTablePostProcessingSuite - extends SharedSparkSession - with BeforeAndAfterEach { +class ResolveChangelogTablePostProcessingSuite extends SharedSparkSession { private val catalogName = "cdc_test_catalog" private val testTableName = "events" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala index 7b72568c460a0..51b17eee69db0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector import java.util.Collections -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.expressions.Inline import org.apache.spark.sql.catalyst.plans.logical.{ @@ -46,9 +44,7 @@ import org.apache.spark.sql.types.LongType * -> [Project (update relabel)] * -> Project (drop helper columns) */ -class ResolveChangelogTableStreamingPostProcessingSuite - extends SharedSparkSession - with BeforeAndAfterEach { +class ResolveChangelogTableStreamingPostProcessingSuite extends SharedSparkSession { private val catalogName = "cdc_streaming_pp" private val testTableName = "events" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala index 8c657c2e07160..a33be8fc94152 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala @@ -28,10 +28,9 @@ import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.functions.{count, timestamp_seconds, window} import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest} -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ArrayImplicits._ -class ForeachWriterSuite extends StreamTest with SharedSparkSession with BeforeAndAfter { +class ForeachWriterSuite extends StreamTest with BeforeAndAfter { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index 8b8ae57c82c4d..31b904c2e481b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -36,11 +36,10 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRe import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.execution.streaming.runtime.{ContinuousRecordPartitionOffset, LongOffset, StreamExecution} import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap -class TextSocketStreamSuite extends StreamTest with SharedSparkSession { +class TextSocketStreamSuite extends StreamTest { override def afterEach(): Unit = { sqlContext.streams.active.foreach(_.stop()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala index 01b9ae4dc82d4..5fa8d86c21af2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala @@ -29,9 +29,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.{OutputMode, RunningCountStatefulProcessor, StreamTest, TimeMode} import org.apache.spark.sql.streaming.OutputMode.{Complete, Update} -import org.apache.spark.sql.test.SharedSparkSession -class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { +class OperatorStateMetadataSuite extends StreamTest { import testImplicits._ private lazy val hadoopConf = spark.sessionState.newHadoopConf() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala index b0220c3396261..a469ab36e0ab0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala @@ -30,15 +30,13 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.OutputMode.Update -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils @SlowSQLTest /** Test suite to inject some failures in RocksDB checkpoint */ -class RocksDBCheckpointFailureInjectionSuite extends StreamTest - with SharedSparkSession { +class RocksDBCheckpointFailureInjectionSuite extends StreamTest { private val fileManagerClassName = classOf[FailureInjectionCheckpointFileManager].getName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala index 6823cf7db4062..4179b0fafee5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala @@ -22,7 +22,6 @@ import java.util.UUID import scala.util.Random import org.apache.hadoop.conf.Configuration -import org.scalatest.BeforeAndAfterEach import org.scalatest.matchers.should.Matchers import org.apache.spark.sql.catalyst.InternalRow @@ -40,8 +39,7 @@ import org.apache.spark.util.Utils * [[TimestampAsPrefixKeyStateEncoder]] and [[TimestampAsPostfixKeyStateEncoder]]. */ @ExtendedSQLTest -class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession - with BeforeAndAfterEach with Matchers { +class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession with Matchers { // Test schemas private val keySchema = StructType(Seq( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 83e6772d69dc6..4955a32c49b1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -45,14 +45,12 @@ import org.apache.spark.sql.execution.streaming.sinks.{FileStreamSink, FileStrea import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.util.StreamManualClock -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils -abstract class FileStreamSourceTest - extends StreamTest with SharedSparkSession with PrivateMethodTester { +abstract class FileStreamSourceTest extends StreamTest with PrivateMethodTester { import testImplicits._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala index ec919f1f12429..0ee219171a45e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration._ import org.apache.hadoop.fs.Path import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterEach, Tag} +import org.scalatest.Tag import org.apache.spark.sql._ import org.apache.spark.sql.internal.SQLConf @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils * Test suite for streaming source naming and validation. * Tests cover the naming API, validation rules, and resolution pipeline. */ -class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { +class StreamingSourceEvolutionSuite extends StreamTest { private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath