Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector

import java.util.Collections

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{
Expand Down Expand Up @@ -54,9 +52,7 @@ import org.apache.spark.unsafe.types.UTF8String
* netChanges sees them. Output assertions stay identical because both paths produce
* the same `_change_type` labels at the netChanges input.
*/
trait ResolveChangelogTableNetChangesTestsBase
extends SharedSparkSession
with BeforeAndAfterEach {
trait ResolveChangelogTableNetChangesTestsBase extends SharedSparkSession {

/**
* Value of the user-facing CDC option `computeUpdates` that this test run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector

import java.util.Collections

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -47,9 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
* paths (carry-over removal, update detection) and then verifies that Spark's analyzer rule
* correctly transforms the plan and produces the expected output.
*/
class ResolveChangelogTablePostProcessingSuite
extends SharedSparkSession
with BeforeAndAfterEach {
class ResolveChangelogTablePostProcessingSuite extends SharedSparkSession {

private val catalogName = "cdc_test_catalog"
private val testTableName = "events"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.connector

import java.util.Collections

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.Inline
import org.apache.spark.sql.catalyst.plans.logical.{
Expand All @@ -46,9 +44,7 @@ import org.apache.spark.sql.types.LongType
* -> [Project (update relabel)]
* -> Project (drop helper columns)
*/
class ResolveChangelogTableStreamingPostProcessingSuite
extends SharedSparkSession
with BeforeAndAfterEach {
class ResolveChangelogTableStreamingPostProcessingSuite extends SharedSparkSession {

private val catalogName = "cdc_streaming_pp"
private val testTableName = "events"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.functions.{count, timestamp_seconds, window}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ArrayImplicits._

class ForeachWriterSuite extends StreamTest with SharedSparkSession with BeforeAndAfter {
class ForeachWriterSuite extends StreamTest with BeforeAndAfter {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRe
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.runtime.{ContinuousRecordPartitionOffset, LongOffset, StreamExecution}
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
class TextSocketStreamSuite extends StreamTest {

override def afterEach(): Unit = {
sqlContext.streams.active.foreach(_.stop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, RunningCountStatefulProcessor, StreamTest, TimeMode}
import org.apache.spark.sql.streaming.OutputMode.{Complete, Update}
import org.apache.spark.sql.test.SharedSparkSession

class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
class OperatorStateMetadataSuite extends StreamTest {
import testImplicits._

private lazy val hadoopConf = spark.sessionState.newHadoopConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.OutputMode.Update
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.util.Utils


@SlowSQLTest
/** Test suite to inject some failures in RocksDB checkpoint */
class RocksDBCheckpointFailureInjectionSuite extends StreamTest
with SharedSparkSession {
class RocksDBCheckpointFailureInjectionSuite extends StreamTest {

private val fileManagerClassName = classOf[FailureInjectionCheckpointFileManager].getName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.UUID
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -40,8 +39,7 @@ import org.apache.spark.util.Utils
* [[TimestampAsPrefixKeyStateEncoder]] and [[TimestampAsPostfixKeyStateEncoder]].
*/
@ExtendedSQLTest
class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
with BeforeAndAfterEach with Matchers {
class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession with Matchers {

// Test schemas
private val keySchema = StructType(Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ import org.apache.spark.sql.execution.streaming.sinks.{FileStreamSink, FileStrea
import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

abstract class FileStreamSourceTest
extends StreamTest with SharedSparkSession with PrivateMethodTester {
abstract class FileStreamSourceTest extends StreamTest with PrivateMethodTester {

import testImplicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.concurrent.duration._
import org.apache.hadoop.fs.Path
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterEach, Tag}
import org.scalatest.Tag

import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
* Test suite for streaming source naming and validation.
* Tests cover the naming API, validation rules, and resolution pipeline.
*/
class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach {
class StreamingSourceEvolutionSuite extends StreamTest {

private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
Expand Down