Skip to content

[SPARK-55728][SS] Introduce RocksDB conf for file checksum threadpool size#54529

Open
gnanda wants to merge 2 commits intoapache:masterfrom
gnanda:stack/SPARK-55728
Open

[SPARK-55728][SS] Introduce RocksDB conf for file checksum threadpool size#54529
gnanda wants to merge 2 commits intoapache:masterfrom
gnanda:stack/SPARK-55728

Conversation

@gnanda
Copy link

@gnanda gnanda commented Feb 27, 2026

What changes were proposed in this pull request?

Introduce a conf for the currently hardcoded file checksum thread pool size. Also small changes to support a single thread if desired.

Why are the changes needed?

Add a conf

Does this PR introduce any user-facing change?

no

How was this patch tested?

Unit tests

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code 2.1.58

.booleanConf
.createWithDefault(true)

val STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.createWithDefault(true)

val STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE =
buildConf("spark.sql.streaming.stateStore.rocksdb.fileChecksumThreadPoolSize")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hence we should also remove RocksDB from the conf name

"RocksDB state store checkpoints (e.g. main file and checksum file).")
.version("4.1.0")
.intConf
.checkValue(x => x == 1 || (x > 0 && x % 2 == 0), "Must be 1 or a positive even number")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we should also support disabling using threadpool and just current thread for the requests. Maybe when this conf is set to 0?

This would need extra code change in the ChecksumCheckpointFileManager

"RocksDB state store checkpoints (e.g. main file and checksum file).")
.version("4.1.0")
.intConf
.checkValue(x => x == 1 || (x > 0 && x % 2 == 0), "Must be 1 or a positive even number")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to do even number validation here

.internal()
.doc("Number of threads used to compute file checksums concurrently when uploading " +
"RocksDB state store checkpoints (e.g. main file and checksum file).")
.version("4.1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark master is now: 4.2.0

buildConf("spark.sql.streaming.stateStore.rocksdb.fileChecksumThreadPoolSize")
.internal()
.doc("Number of threads used to compute file checksums concurrently when uploading " +
"RocksDB state store checkpoints (e.g. main file and checksum file).")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a warning that reducing the default value may have performance impact

val skipCreationIfFileMissingChecksum: Boolean)
extends CheckpointFileManager with Logging {
assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" +
assert(numThreads == 1 || numThreads % 2 == 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we should support disabling the threadpool. Also, we can remove this even number check.

// To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum file).
// Since this fm is used by both query task and maintenance thread, the recommended default is
// 2 * 2 = 4 threads. A value of 1 disables concurrency (sequential execution).
protected val fileChecksumThreadPoolSize: Option[Int] = Some(conf.fileChecksumThreadPoolSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can log a warning if the conf value isn't the recommended default value 4

/** Whether file checksum generation and verification is enabled. */
val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled

/** Number of threads used to compute file checksums concurrently for RocksDB state store. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove RocksDB in comment. And correct the description

}
}

test("RocksDB: fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move these test cases to StateStoreSuite, so they are reused for both rocksdb and hdfs state store

sqlConf.setConfString(
SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true")
sqlConf.setConfString(
SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this test case and the one above can be folded into one test case. And then we can pass in value 1 and 6 within the test

}
}

Seq("0", "3").foreach { invalidValue =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this seq iteration within the test func, so we use one test case instead of 3. This is already a large suite.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants