diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index b37c7ed3e16b..e63b48eda335 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest; @@ -76,7 +77,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService { // from the same table and can send deletion requests for same snapshot // multiple times. private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1; - private static final int MIN_ERR_LIMIT_PER_TASK = 1000; private final ClientId clientId = ClientId.randomId(); private final OzoneManager ozoneManager; @@ -210,8 +210,8 @@ public BackgroundTaskResult call() throws InterruptedException { renameKeys.add(HddsProtos.KeyValue.newBuilder().setKey(renameEntry.getKey()) .setValue(renameEntry.getValue()).build()); } - submitSnapshotMoveDeletedKeys(snapInfo, deletedKeys, renameKeys, deletedDirs); - remaining -= moveCount; + int submitted = submitSnapshotMoveDeletedKeysWithBatching(snapInfo, deletedKeys, renameKeys, deletedDirs); + remaining -= submitted; } else { snapshotsToBePurged.add(snapInfo.getTableKey()); } @@ -247,39 +247,179 @@ private void submitSnapshotPurgeRequest(List purgeSnapshotKeys) { } } - private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, - List deletedKeys, - List renamedList, - List dirsToMove) { - - SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = SnapshotMoveTableKeysRequest.newBuilder() + /** + * Submits a single batch of snapshot move requests. + * + * @param snapInfo The snapshot being processed + * @param deletedKeys List of deleted keys to move + * @param renamedList List of renamed keys + * @param dirsToMove List of deleted directories to move + * @return true if submission was successful, false otherwise + */ + private boolean submitSingleSnapshotMoveBatch(SnapshotInfo snapInfo, + List deletedKeys, + List renamedList, + List dirsToMove) { + SnapshotMoveTableKeysRequest.Builder moveDeletedKeys = SnapshotMoveTableKeysRequest.newBuilder() .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId())); - SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder - .addAllDeletedKeys(deletedKeys) - .addAllRenamedKeys(renamedList) - .addAllDeletedDirs(dirsToMove) - .build(); - if (isBufferLimitCrossed(ratisByteLimit, 0, moveDeletedKeys.getSerializedSize())) { - int remaining = MIN_ERR_LIMIT_PER_TASK; - deletedKeys = deletedKeys.subList(0, Math.min(remaining, deletedKeys.size())); - remaining -= deletedKeys.size(); - renamedList = renamedList.subList(0, Math.min(remaining, renamedList.size())); - remaining -= renamedList.size(); - dirsToMove = dirsToMove.subList(0, Math.min(remaining, dirsToMove.size())); - moveDeletedKeys = moveDeletedKeysBuilder - .addAllDeletedKeys(deletedKeys) - .addAllRenamedKeys(renamedList) - .addAllDeletedDirs(dirsToMove) - .build(); + if (!deletedKeys.isEmpty()) { + moveDeletedKeys.addAllDeletedKeys(deletedKeys); + } + + if (!renamedList.isEmpty()) { + moveDeletedKeys.addAllRenamedKeys(renamedList); + } + + if (!dirsToMove.isEmpty()) { + moveDeletedKeys.addAllDeletedDirs(dirsToMove); } OMRequest omRequest = OMRequest.newBuilder() .setCmdType(Type.SnapshotMoveTableKeys) - .setSnapshotMoveTableKeysRequest(moveDeletedKeys) + .setSnapshotMoveTableKeysRequest(moveDeletedKeys.build()) .setClientId(clientId.toString()) .build(); - submitOMRequest(omRequest); + + try { + OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest); + if (response == null || !response.getSuccess()) { + LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run."); + return false; + } + return true; + } catch (ServiceException e) { + LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run", e); + return false; + } + } + + /** + * Submits snapshot move requests with batching to respect the Ratis buffer limit. + * This method progressively builds batches while checking size limits before adding entries. + * + * @param snapInfo The snapshot being processed + * @param deletedKeys List of deleted keys to move + * @param renamedList List of renamed keys + * @param dirsToMove List of deleted directories to move + * @return The number of entries successfully submitted + */ + private int submitSnapshotMoveDeletedKeysWithBatching(SnapshotInfo snapInfo, + List deletedKeys, + List renamedList, + List dirsToMove) { + List currentDeletedKeys = new ArrayList<>(); + List currentRenamedKeys = new ArrayList<>(); + List currentDeletedDirs = new ArrayList<>(); + int totalSubmitted = 0; + int batchCount = 0; + + SnapshotMoveTableKeysRequest emptyRequest = SnapshotMoveTableKeysRequest.newBuilder() + .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId())) + .build(); + OMRequest baseRequest = OMRequest.newBuilder() + .setCmdType(Type.SnapshotMoveTableKeys) + .setSnapshotMoveTableKeysRequest(emptyRequest) + .setClientId(clientId.toString()) + .build(); + int baseOverhead = baseRequest.getSerializedSize(); + long batchBytes = baseOverhead; + + for (SnapshotMoveKeyInfos key : deletedKeys) { + int keySize = key.getSerializedSize(); + + // If adding this key would exceed the limit, flush the current batch first + if (batchBytes + keySize > ratisByteLimit && !currentDeletedKeys.isEmpty()) { + batchCount++; + LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size(); + currentDeletedKeys.clear(); + currentRenamedKeys.clear(); + currentDeletedDirs.clear(); + batchBytes = baseOverhead; + } + + currentDeletedKeys.add(key); + batchBytes += keySize; + } + + for (HddsProtos.KeyValue renameKey : renamedList) { + int keySize = renameKey.getSerializedSize(); + + // If adding this key would exceed the limit, flush the current batch first + if (batchBytes + keySize > ratisByteLimit && + (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty())) { + batchCount++; + LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size(); + currentDeletedKeys.clear(); + currentRenamedKeys.clear(); + currentDeletedDirs.clear(); + batchBytes = baseOverhead; + } + + currentRenamedKeys.add(renameKey); + batchBytes += keySize; + } + + for (SnapshotMoveKeyInfos dir : dirsToMove) { + int dirSize = dir.getSerializedSize(); + + // If adding this dir would exceed the limit, flush the current batch first + if (batchBytes + dirSize > ratisByteLimit && + (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty())) { + batchCount++; + LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size(); + currentDeletedKeys.clear(); + currentRenamedKeys.clear(); + currentDeletedDirs.clear(); + batchBytes = baseOverhead; + } + + currentDeletedDirs.add(dir); + batchBytes += dirSize; + } + + // Submit the final batch if any + if (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty()) { + batchCount++; + LOG.debug("Submitting final batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size(); + } + + LOG.debug("Successfully submitted {} total entries in {} batches for snapshot {}", totalSubmitted, batchCount, + snapInfo.getTableKey()); + + return totalSubmitted; } private void submitOMRequest(OMRequest omRequest) {