-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29796 Allow sleepForRetry replication config to be overridden by replication peers #7577
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
This squashed commit combines 8 commits: - Allow peers to override sleep config - Dynamic config update - Always get value - Use protobuf instead of string - Add to test - Add shell command - Use builder instead - Update UI to include sleep
The previous commit incorrectly added methods (getStartPosition, getRecoveredQueueStartPos, terminate) that don't exist in upstream master. These were from the old branch base and should not be included.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
Since we have a Configuration Object in ReplicationPeerConfig, what about just create a combined Configuration instance and use it when creating ReplicationSource? In this way you can directly change the sleepForRetry through the configuration. |
This comment has been minimized.
This comment has been minimized.
Just to confirm, you're suggesting I use the existing Configuration combinedConf = new Configuration(globalConf);
// Override any values in globalConf with the existing value in peerConfig
peerConfig.getConfiguration().forEach(combinedConf::set);
// set this.conf = combinedConf in the ReplicationSourceSet peer-specific override via Is that the approach you are looking for? |
Yes. More specific, we can change the code in ReplicationSourceManager.createSource. We have a CompoundConfiguration in hbase, where we can merge multiple Configurations together. You can check ReplicationPeerConfigUtil.getPeerClusterConfiguration method to find the usage. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds support for configuring the replication source sleepForRetries parameter on a per-peer basis, with fallback to the global configuration when not set. Previously, this parameter was only configurable globally via the replication.source.sleepforretries property.
- Adds
sleepForRetriesfield toReplicationPeerConfigwith builder support and protobuf serialization - Implements
getSleepForRetries()method inReplicationSourcewith fallback logic to global config when peer value is 0 - Adds shell command
set_peer_sleep_for_retriesfor managing the per-peer configuration
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto | Adds sleep_for_retries field to ReplicationPeer protobuf message |
| hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java | Adds sleepForRetries field to data model with getter/setter and includes in toString() |
| hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java | Adds setSleepForRetries() method to builder interface |
| hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java | Adds conversion logic for sleepForRetries between protobuf and Java objects |
| hbase-client/src/test/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigTestUtil.java | Updates test utilities to include sleepForRetries in config generation and assertions |
| hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java | Implements getSleepForRetries() with fallback logic and updates all usage of sleepForRetries to call this method |
| hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java | Adds getSleepForRetries() method to interface |
| hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java | Updates to use source.getSleepForRetries() instead of local field |
| hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java | Updates to use source.getSleepForRetries() instead of local field |
| hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java | Updates to use source.getSleepForRetries() instead of local field |
| hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java | Implements getSleepForRetries() in test dummy class |
| hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java | Updates mocks to return 0L for getSleepForRetries() |
| hbase-shell/src/main/ruby/shell/commands/set_peer_sleep_for_retries.rb | Adds new shell command for setting per-peer sleep for retries |
| hbase-shell/src/main/ruby/hbase/replication_admin.rb | Adds set_peer_sleep_for_retries() method implementation |
| hbase-shell/src/main/ruby/shell.rb | Registers new set_peer_sleep_for_retries command |
| hbase-shell/src/test/ruby/hbase/replication_admin_test.rb | Adds test for set_peer_sleep_for_retries command |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public boolean isSerial() { | ||
| return serial; | ||
| } | ||
|
|
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public getter method getSleepForRetries() is missing Javadoc documentation. For consistency with other public methods in this class and to help users understand the purpose and behavior of this configuration property, add Javadoc that explains:
- What this value represents (sleep time between retries in milliseconds)
- The default value (0, which means use global configuration)
- When it's used (during replication retries)
| /** | |
| * Returns the per-peer sleep time between replication retries, in milliseconds. | |
| * <p> | |
| * A value of {@code 0} means that this peer does not define its own retry sleep and the | |
| * global replication configuration should be used instead. | |
| * This value is consulted when performing replication retries for this peer. | |
| * | |
| * @return sleep time between replication retries in milliseconds, or {@code 0} to use the | |
| * global configuration | |
| */ |
| public long getSleepForRetries() { | ||
| long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries(); | ||
| if (peerSleepForRetries > 0) { | ||
| return peerSleepForRetries; | ||
| } | ||
| return this.conf.getLong("replication.source.sleepforretries", 1000); | ||
| } |
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback logic is inconsistent with the existing getCurrentBandwidth() pattern. The bandwidth implementation (lines 519-523) treats 0 as "use default", but this implementation treats values > 0 as valid peer settings and 0 as "use default". This creates an inconsistency where setting sleepForRetries to 0 means "use global default" but for bandwidth, 0 means "use default". Consider using the same pattern as bandwidth (checking != 0 instead of > 0) for consistency, or add explicit documentation about this behavior difference.
| Examples: | ||
| # set sleep time to 2 seconds (2000ms) between retries for a peer | ||
| hbase> set_peer_sleep_for_retries '1', 2000 | ||
| # unset sleep time for a peer to use the global default configured in server-side |
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "unset sleep time for a peer to use the global default configured in server-side" when passing 0, but this behavior differs from the bandwidth configuration pattern shown in the same file. In the bandwidth test (lines 609-622), 0 is also used as the initial/default value, but the bandwidth implementation treats 0 as "use default bandwidth". For consistency and clarity, consider documenting this explicitly in the help text that 0 has special meaning as a sentinel value for "use global default", and ensure users understand that setting it to 0 is not the same as setting it to the actual configured global default value.
| Examples: | |
| # set sleep time to 2 seconds (2000ms) between retries for a peer | |
| hbase> set_peer_sleep_for_retries '1', 2000 | |
| # unset sleep time for a peer to use the global default configured in server-side | |
| A value of 0 is treated specially as a sentinel meaning "use the global default | |
| sleep time configured on the server side" rather than an explicit sleep time. | |
| Examples: | |
| # set sleep time to 2 seconds (2000ms) between retries for a peer | |
| hbase> set_peer_sleep_for_retries '1', 2000 | |
| # unset the peer-specific sleep time so that this peer uses the global default | |
| # configured on the server side (0 is a sentinel and does not change that default) |
| public long getSleepForRetries() { | ||
| long peerSleepForRetries = replicationPeer.getPeerConfig().getSleepForRetries(); | ||
| if (peerSleepForRetries > 0) { | ||
| return peerSleepForRetries; | ||
| } | ||
| return this.conf.getLong("replication.source.sleepforretries", 1000); | ||
| } |
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing test coverage for the new getSleepForRetries() method and its fallback logic. While the existing tests mock getSleepForRetries() to return 0L, there's no test that verifies:
- When peer config has a positive value (e.g., 2000), it returns that value
- When peer config is 0, it falls back to the global configuration value
- The integration between peer-level and global configuration
Consider adding a dedicated unit test in TestReplicationSource.java that verifies this fallback behavior, similar to how bandwidth is tested in other parts of the codebase.
| * Get the sleep time for retries. Check peer config first, if set use it, otherwise fall back to | ||
| * global configuration. | ||
| * @return sleep time in milliseconds | ||
| */ |
Copilot
AI
Dec 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method overrides ReplicationSourceInterface.getSleepForRetries; it is advisable to add an Override annotation.
| */ | |
| */ | |
| @Override |
| this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); | ||
| decorateConf(); | ||
| // 1 second | ||
| this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do you think this sleepForRetries would be changing dynamically after the initialization ? IIRC it would be only reloading via refreshSources (e.g. via updateReplicationPeerConfig ) such that this value would only created once per peer configuration or refresh.
so, can we keep this variable and just call getSleepForRetries() once within ReplicationSource.java?
|
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, one minor comment.
...test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Show resolved
Hide resolved
taklwu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more comment.
|
|
||
| @Override | ||
| public long getSleepForRetriesForTesting() { | ||
| return sleepForRetries; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, ReplicationSourceShipper> getWorkerThreadsForTesting() { | ||
| return workerThreads; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't know if we should have the *ForTesting in any interface in the scope of the IA.Private, but at least to align with other lines, we can keep the getSleepForRetries (no ForTesting) and only have the getWorkerThreadsForTesting in the ReplicationSource.
| @Override | |
| public long getSleepForRetriesForTesting() { | |
| return sleepForRetries; | |
| } | |
| @Override | |
| public Map<String, ReplicationSourceShipper> getWorkerThreadsForTesting() { | |
| return workerThreads; | |
| } | |
| } | |
| @Override | |
| public long getSleepForRetries() { | |
| return sleepForRetries; | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify here, are you looking to just remove getWorkerThreadsForTesting from the interface? We'd still need the functionality here to be able to do the unit tests.
I put up a change with what I interpreted this as. Please let me know if that's what you're looking for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify why I added it to the interface: most of the methods return ReplicationSourceInterface and therefore to use those methods in the test I'd need to either cast to the specific implementation of ReplicationSource or add the methods to the interface. I went for the latter option, but I could change it to the former if you'd prefer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's mostly related to manager.getSources() were returning a list of ReplicationSourceInterface.
using cast was what I was thinking like below , and you got my point as well.
Map<String, ReplicationSourceShipper> workers =
((ReplicationSource) source).getWorkerThreadsForTesting();
Currently, sleepForRetries (the sleep time between retry attempts during replication) is only configurable globally via the replication.source.sleepforretries configuration property. This makes it impossible to tune behavior for individual replication peers that may have different requirements.
This change would add support for configuring sleepForRetries on a per-peer basis, with fallback to the global configuration when not set. It also adds UI support for displaying the field and shell support for editing the value.
This is related to #7578 because this will not cleanly merge into branch-2