[improve][broker]Reduce the lock range of SimpleCache to enhance performance#25293
[improve][broker]Reduce the lock range of SimpleCache to enhance performance#25293poorbarcode wants to merge 12 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
@poorbarcode Is it better just fix SimpleCache so get() does not run valueSupplier under a global synchronized lock? It will avoids per-thread TableView growth/leak risk.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #25293 +/- ##
============================================
+ Coverage 72.25% 72.77% +0.52%
- Complexity 33963 34267 +304
============================================
Files 1954 1954
Lines 154639 154757 +118
Branches 17698 17709 +11
============================================
+ Hits 111732 112623 +891
+ Misses 33884 33096 -788
- Partials 9023 9038 +15
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
Outdated
Show resolved
Hide resolved
Denovo1998
left a comment
There was a problem hiding this comment.
Can we add targeted tests for the new future lifecycle here?
The risks in this change lie not in the expected path, but in the ownership and expiration transitions:
- the cache entry is removed or expired before the future completes
- the future completes successfully after expiration and should close the reader exactly once
- the future completes exceptionally and should not try to close anything
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Please check the comment about using ConcurrentHasMap and moving locking to ExpirableValue class to avoid race conditions with expiration.
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR reduces the lock contention in SimpleCache by moving the blocking reader creation out of the synchronized block. Previously, the get() method held the global lock while synchronously creating and waiting for a reader, which blocked all other threads. Now, the cache stores CompletableFuture<Reader<T>> instead of Reader<T>, and the lock is only held to insert/retrieve the future — the actual wait() call happens outside the lock.
Changes:
SimpleCachegains a newgetWithCacheInfo()method that exposes theExpirableValuewrapper, allowing callers to update the deadline after async operations complete outside the lock.TableViewnow cachesCompletableFuture<Reader<T>>and waits on the future outside the synchronized block, with updated expiration logic to handle incomplete futures.- A new test
TableViewTest.testFailedCreateReadervalidates behavior when reader creation fails.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
SimpleCache.java |
Adds getWithCacheInfo() method and makes ExpirableValue and its members public. |
TableView.java |
Changes cache type to CompletableFuture<Reader<T>>, moves blocking wait outside the lock, adds expiration handling for futures, and extracts closeReader() helper. |
TableViewTest.java |
New test for failed reader creation scenario. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java
Show resolved
Hide resolved
7ef90da to
9b800b2
Compare
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TableViewTest.java
Show resolved
Hide resolved
| this.readers = new SimpleCache<>(executor, | ||
| Math.max(clientOperationTimeoutMs + 30 * 1000, CACHE_EXPIRE_TIMEOUT_MS), | ||
| CACHE_EXPIRE_CHECK_FREQUENCY_MS); |
There was a problem hiding this comment.
Is it risky to close the reader immediately after the cache expires? If the recovery task takes longer than the expiration period, it will consistently fail with a ReaderAlreadyClosedException.
| try { | ||
| return wait(cachedReaderFuture.value, "create reader"); | ||
| } catch (Exception e) { | ||
| readers.remove(ns); |
There was a problem hiding this comment.
It's better to compare the value here to avoid removing a new, valid entry for the same namespace
| if (value.tryExpire()) { | ||
| keys.add(key); | ||
| } |
There was a problem hiding this comment.
It's better to catch exception for this block, to avoid any exception happened from cancel() method caused expiration processing stops silently
|
|
||
| private void closeReader(Reader<T> reader) { | ||
| reader.closeAsync().exceptionally(ex -> { | ||
| log.warn("Failed to close reader {}", ex.getMessage()); |
There was a problem hiding this comment.
| log.warn("Failed to close reader {}", ex.getMessage()); | |
| log.warn("Failed to close reader ", ex); |
Motivation
#23062 improved the behaviour of initialising the Transaction Buffer: Synchronise the reader creation, read loop and the following process on its result. Maintain only one reader for each namespace. The reader is now not closed unless there is no snapshot read request in 1 minute.
However, SimpleCache is a global lock, and the lock force is too strong, causing all
pulsar-transaction-snapshot-recoverthreads to get stuck in the creation of the first reader. When system resources are insufficient, the problem will be magnified infinitelyQ1: The creation of the reader that reads
__transaction_buffer_snapshotand the reading of existing messages are both executed synchronously. Is it necessary to change it to be completed asynchronously?A1: It is not necessary, since the initialisation of all TransactionBuffers under the same namespace requires waiting for the reader to complete the processing of all messages, whether it is asynchronous or not is not important
Q2. Will these synchronisation operations cause the thread
pulsar-transaction-snapshot-recoverto get stuck and affect other functionsA2: No. The function of this thread is quite simple: 1. Initialise Transaction Buffer. 2. After the Transaction Buffer recovery is completed, handle the accumulated transaction write operations (subsequent writes will no longer use this thread).
So the stuck functions actually all need to wait for the reader's messages to be processed, and thus, no other impacts have been caused
Modifications
No longer wait for the initialisation of the reader to complete within the lock code block
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x