[SPARK-55728][SS] Introduce RocksDB conf for file checksum threadpool size#54529
[SPARK-55728][SS] Introduce RocksDB conf for file checksum threadpool size#54529gnanda wants to merge 2 commits intoapache:masterfrom
Conversation
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
||
| val STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE = |
There was a problem hiding this comment.
We need to use this conf for HDFS provider too. See: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L540
| .createWithDefault(true) | ||
|
|
||
| val STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE = | ||
| buildConf("spark.sql.streaming.stateStore.rocksdb.fileChecksumThreadPoolSize") |
There was a problem hiding this comment.
Hence we should also remove RocksDB from the conf name
| "RocksDB state store checkpoints (e.g. main file and checksum file).") | ||
| .version("4.1.0") | ||
| .intConf | ||
| .checkValue(x => x == 1 || (x > 0 && x % 2 == 0), "Must be 1 or a positive even number") |
There was a problem hiding this comment.
Actually, we should also support disabling using threadpool and just current thread for the requests. Maybe when this conf is set to 0?
This would need extra code change in the ChecksumCheckpointFileManager
| "RocksDB state store checkpoints (e.g. main file and checksum file).") | ||
| .version("4.1.0") | ||
| .intConf | ||
| .checkValue(x => x == 1 || (x > 0 && x % 2 == 0), "Must be 1 or a positive even number") |
There was a problem hiding this comment.
We don't need to do even number validation here
| .internal() | ||
| .doc("Number of threads used to compute file checksums concurrently when uploading " + | ||
| "RocksDB state store checkpoints (e.g. main file and checksum file).") | ||
| .version("4.1.0") |
There was a problem hiding this comment.
spark master is now: 4.2.0
| buildConf("spark.sql.streaming.stateStore.rocksdb.fileChecksumThreadPoolSize") | ||
| .internal() | ||
| .doc("Number of threads used to compute file checksums concurrently when uploading " + | ||
| "RocksDB state store checkpoints (e.g. main file and checksum file).") |
There was a problem hiding this comment.
Lets add a warning that reducing the default value may have performance impact
| val skipCreationIfFileMissingChecksum: Boolean) | ||
| extends CheckpointFileManager with Logging { | ||
| assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" + | ||
| assert(numThreads == 1 || numThreads % 2 == 0, |
There was a problem hiding this comment.
As discussed, we should support disabling the threadpool. Also, we can remove this even number check.
| // To avoid blocking, we need 2 threads per fm caller (one for main file, one for checksum file). | ||
| // Since this fm is used by both query task and maintenance thread, the recommended default is | ||
| // 2 * 2 = 4 threads. A value of 1 disables concurrency (sequential execution). | ||
| protected val fileChecksumThreadPoolSize: Option[Int] = Some(conf.fileChecksumThreadPoolSize) |
There was a problem hiding this comment.
We can log a warning if the conf value isn't the recommended default value 4
| /** Whether file checksum generation and verification is enabled. */ | ||
| val checkpointFileChecksumEnabled: Boolean = sqlConf.checkpointFileChecksumEnabled | ||
|
|
||
| /** Number of threads used to compute file checksums concurrently for RocksDB state store. */ |
There was a problem hiding this comment.
nit: remove RocksDB in comment. And correct the description
| } | ||
| } | ||
|
|
||
| test("RocksDB: fileChecksumThreadPoolSize propagates to ChecksumCheckpointFileManager") { |
There was a problem hiding this comment.
We can move these test cases to StateStoreSuite, so they are reused for both rocksdb and hdfs state store
| sqlConf.setConfString( | ||
| SQLConf.STREAMING_CHECKPOINT_FILE_CHECKSUM_ENABLED.key, "true") | ||
| sqlConf.setConfString( | ||
| SQLConf.STATE_STORE_ROCKSDB_FILE_CHECKSUM_THREAD_POOL_SIZE.key, "1") |
There was a problem hiding this comment.
looks like this test case and the one above can be folded into one test case. And then we can pass in value 1 and 6 within the test
| } | ||
| } | ||
|
|
||
| Seq("0", "3").foreach { invalidValue => |
There was a problem hiding this comment.
I would move this seq iteration within the test func, so we use one test case instead of 3. This is already a large suite.
What changes were proposed in this pull request?
Introduce a conf for the currently hardcoded file checksum thread pool size. Also small changes to support a single thread if desired.
Why are the changes needed?
Add a conf
Does this PR introduce any user-facing change?
no
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code 2.1.58