Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest;
Expand All @@ -76,7 +77,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService {
// from the same table and can send deletion requests for same snapshot
// multiple times.
private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1;
private static final int MIN_ERR_LIMIT_PER_TASK = 1000;
private final ClientId clientId = ClientId.randomId();

private final OzoneManager ozoneManager;
Expand Down Expand Up @@ -210,8 +210,8 @@ public BackgroundTaskResult call() throws InterruptedException {
renameKeys.add(HddsProtos.KeyValue.newBuilder().setKey(renameEntry.getKey())
.setValue(renameEntry.getValue()).build());
}
submitSnapshotMoveDeletedKeys(snapInfo, deletedKeys, renameKeys, deletedDirs);
remaining -= moveCount;
int submitted = submitSnapshotMoveDeletedKeysWithBatching(snapInfo, deletedKeys, renameKeys, deletedDirs);
remaining -= submitted;
} else {
snapshotsToBePurged.add(snapInfo.getTableKey());
}
Expand Down Expand Up @@ -247,39 +247,179 @@ private void submitSnapshotPurgeRequest(List<String> purgeSnapshotKeys) {
}
}

private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo,
List<SnapshotMoveKeyInfos> deletedKeys,
List<HddsProtos.KeyValue> renamedList,
List<SnapshotMoveKeyInfos> dirsToMove) {

SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = SnapshotMoveTableKeysRequest.newBuilder()
/**
* Submits a single batch of snapshot move requests.
*
* @param snapInfo The snapshot being processed
* @param deletedKeys List of deleted keys to move
* @param renamedList List of renamed keys
* @param dirsToMove List of deleted directories to move
* @return true if submission was successful, false otherwise
*/
private boolean submitSingleSnapshotMoveBatch(SnapshotInfo snapInfo,
List<SnapshotMoveKeyInfos> deletedKeys,
List<HddsProtos.KeyValue> renamedList,
List<SnapshotMoveKeyInfos> dirsToMove) {
SnapshotMoveTableKeysRequest.Builder moveDeletedKeys = SnapshotMoveTableKeysRequest.newBuilder()
.setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId()));

SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder
.addAllDeletedKeys(deletedKeys)
.addAllRenamedKeys(renamedList)
.addAllDeletedDirs(dirsToMove)
.build();
if (isBufferLimitCrossed(ratisByteLimit, 0, moveDeletedKeys.getSerializedSize())) {
int remaining = MIN_ERR_LIMIT_PER_TASK;
deletedKeys = deletedKeys.subList(0, Math.min(remaining, deletedKeys.size()));
remaining -= deletedKeys.size();
renamedList = renamedList.subList(0, Math.min(remaining, renamedList.size()));
remaining -= renamedList.size();
dirsToMove = dirsToMove.subList(0, Math.min(remaining, dirsToMove.size()));
moveDeletedKeys = moveDeletedKeysBuilder
.addAllDeletedKeys(deletedKeys)
.addAllRenamedKeys(renamedList)
.addAllDeletedDirs(dirsToMove)
.build();
if (!deletedKeys.isEmpty()) {
moveDeletedKeys.addAllDeletedKeys(deletedKeys);
}

if (!renamedList.isEmpty()) {
moveDeletedKeys.addAllRenamedKeys(renamedList);
}

if (!dirsToMove.isEmpty()) {
moveDeletedKeys.addAllDeletedDirs(dirsToMove);
}

OMRequest omRequest = OMRequest.newBuilder()
.setCmdType(Type.SnapshotMoveTableKeys)
.setSnapshotMoveTableKeysRequest(moveDeletedKeys)
.setSnapshotMoveTableKeysRequest(moveDeletedKeys.build())
.setClientId(clientId.toString())
.build();
submitOMRequest(omRequest);

try {
OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest);
if (response == null || !response.getSuccess()) {
LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run.");
return false;
}
return true;
} catch (ServiceException e) {
LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run", e);
return false;
}
}

/**
* Submits snapshot move requests with batching to respect the Ratis buffer limit.
* This method progressively builds batches while checking size limits before adding entries.
*
* @param snapInfo The snapshot being processed
* @param deletedKeys List of deleted keys to move
* @param renamedList List of renamed keys
* @param dirsToMove List of deleted directories to move
* @return The number of entries successfully submitted
*/
private int submitSnapshotMoveDeletedKeysWithBatching(SnapshotInfo snapInfo,
List<SnapshotMoveKeyInfos> deletedKeys,
List<HddsProtos.KeyValue> renamedList,
List<SnapshotMoveKeyInfos> dirsToMove) {
List<SnapshotMoveKeyInfos> currentDeletedKeys = new ArrayList<>();
List<HddsProtos.KeyValue> currentRenamedKeys = new ArrayList<>();
List<SnapshotMoveKeyInfos> currentDeletedDirs = new ArrayList<>();
int totalSubmitted = 0;
int batchCount = 0;

SnapshotMoveTableKeysRequest emptyRequest = SnapshotMoveTableKeysRequest.newBuilder()
.setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId()))
.build();
OMRequest baseRequest = OMRequest.newBuilder()
.setCmdType(Type.SnapshotMoveTableKeys)
.setSnapshotMoveTableKeysRequest(emptyRequest)
.setClientId(clientId.toString())
.build();
int baseOverhead = baseRequest.getSerializedSize();
long batchBytes = baseOverhead;

for (SnapshotMoveKeyInfos key : deletedKeys) {
int keySize = key.getSerializedSize();

// If adding this key would exceed the limit, flush the current batch first
if (batchBytes + keySize > ratisByteLimit && !currentDeletedKeys.isEmpty()) {
batchCount++;
LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size();
currentDeletedKeys.clear();
currentRenamedKeys.clear();
currentDeletedDirs.clear();
batchBytes = baseOverhead;
}

currentDeletedKeys.add(key);
batchBytes += keySize;
}

for (HddsProtos.KeyValue renameKey : renamedList) {
int keySize = renameKey.getSerializedSize();

// If adding this key would exceed the limit, flush the current batch first
if (batchBytes + keySize > ratisByteLimit &&
(!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty())) {
batchCount++;
LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size();
currentDeletedKeys.clear();
currentRenamedKeys.clear();
currentDeletedDirs.clear();
batchBytes = baseOverhead;
}

currentRenamedKeys.add(renameKey);
batchBytes += keySize;
}

for (SnapshotMoveKeyInfos dir : dirsToMove) {
int dirSize = dir.getSerializedSize();

// If adding this dir would exceed the limit, flush the current batch first
if (batchBytes + dirSize > ratisByteLimit &&
(!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty())) {
batchCount++;
LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size();
currentDeletedKeys.clear();
currentRenamedKeys.clear();
currentDeletedDirs.clear();
batchBytes = baseOverhead;
}

currentDeletedDirs.add(dir);
batchBytes += dirSize;
}

// Submit the final batch if any
if (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty()) {
batchCount++;
LOG.debug("Submitting final batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " +
"size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(),
currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes);

if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) {
return totalSubmitted;
}

totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size();
}

LOG.debug("Successfully submitted {} total entries in {} batches for snapshot {}", totalSubmitted, batchCount,
snapInfo.getTableKey());

return totalSubmitted;
}

private void submitOMRequest(OMRequest omRequest) {
Expand Down