Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,13 @@ public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockM
return state;
}

public void deleteLocks(ZooReaderWriter zk, ServiceLock.ServiceLockPath path, String txidStr)
/**
* Removes locks at specific serviceLockPaths
* <p>
* When removing fate table locks, the txIdStr value must be the fate txId in hex format. See
* {@link FateTxId#toHexString(String)}
*/
public void deleteLocks(ZooReaderWriter zk, ServiceLock.ServiceLockPath path, String txIdStr)
throws KeeperException, InterruptedException {
// delete any locks assoc w/ fate operation
List<String> lockedIds = zk.getChildren(path.toString());
Expand All @@ -530,7 +536,7 @@ public void deleteLocks(ZooReaderWriter zk, ServiceLock.ServiceLockPath path, St
String lockPath = path + "/" + id + "/" + node;
byte[] data = zk.getData(path + "/" + id + "/" + node);
String[] lda = new String(data, UTF_8).split(":");
if (lda[1].equals(txidStr)) {
if (lda[1].equals(txIdStr)) {
zk.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
}
}
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateTxId.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.accumulo.core.util.FastFormat;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

public class FateTxId {
Expand Down Expand Up @@ -51,6 +52,18 @@ public static long fromString(String fmtTid) {
return Long.parseLong(getHex(fmtTid), 16);
}

/**
* Returns the hex value of the FateTxId from a formatted fate transaction
*
* @param fmtTid formatted fate transaction
* @return hex value of long txId
*/
@VisibleForTesting
public static String toHexString(String fmtTid) {
Preconditions.checkArgument(isFormatedTid(fmtTid));
return getHex(fmtTid);
}

/**
* Formats transaction ids in a consistent way that is useful for logging and persisting.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public int execute(final String fullCommand, final CommandLine cl, final Shell s
} else if (cl.hasOption(delete.getOpt())) {
String[] txids = cl.getOptionValues(delete.getOpt());
validateArgs(txids);
failedCommand = deleteTx(shellState.getWriter(), admin, zs, zk, managerLockPath, txids);
failedCommand =
deleteTx(shellState.getWriter(), admin, zs, zk, managerLockPath, tableLocksPath, txids);
} else if (cl.hasOption(list.getOpt())) {
printTx(shellState, admin, zs, zk, tableLocksPath, cl.getOptionValues(list.getOpt()), cl);
} else if (cl.hasOption(print.getOpt())) {
Expand Down Expand Up @@ -237,11 +238,11 @@ protected void printTx(Shell shellState, AdminUtil<FateCommand> admin, ZooStore<
}

protected boolean deleteTx(PrintWriter out, AdminUtil<FateCommand> admin,
ZooStore<FateCommand> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath, String[] args)
throws InterruptedException, KeeperException {
ZooStore<FateCommand> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath,
ServiceLockPath zTableLocksPath, String[] args) throws InterruptedException, KeeperException {
for (int i = 1; i < args.length; i++) {
if (admin.prepDelete(zs, zk, zLockManagerPath, args[i])) {
admin.deleteLocks(zk, zLockManagerPath, args[i]);
admin.deleteLocks(zk, zTableLocksPath, args[i]);
} else {
out.printf("Could not delete transaction: %s%n", args[i]);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ String dumpTx(ZooStore<FateCommand> zs, String[] args) {
@Override
protected boolean deleteTx(PrintWriter out, AdminUtil<FateCommand> admin,
ZooStore<FateCommand> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath,
String[] args) throws InterruptedException, KeeperException {
ServiceLockPath zTableLocksPath, String[] args)
throws InterruptedException, KeeperException {
deleteCalled = true;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.AgeOffStore;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.ZooStore;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.util.UtilWaitThread;
Expand All @@ -65,6 +67,7 @@
import org.apache.accumulo.manager.tableOps.TraceRepo;
import org.apache.accumulo.manager.tableOps.Utils;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.test.util.Wait;
import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -404,6 +407,51 @@ public void testCancelWhileInCall() throws Exception {
assertFalse(fate.cancel(txid));
}

/**
* Test that verifies that fate table locks are created and deleted from the correct ZooKeeper
* path
*/
@Test
public void testTableLockDeleteUsesCorrectZKPath() throws Exception {

ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
AdminUtil<Admin> admin = new AdminUtil<>(true);

callStarted = new CountDownLatch(1);
finishCall = new CountDownLatch(1);

long txId = fate.startTransaction();
String formattedTxId = FateTxId.formatTid(txId);
LOG.debug("Starting test testDeleteUsesCorrectZKPath {}", formattedTxId);
assertEquals(NEW, getTxStatus(zk, txId));
fate.seedTransaction("TestOperation", txId, new TestOperation(NS, TID), false,
"Test Delete Op");
assertEquals(SUBMITTED, getTxStatus(zk, txId));
fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
// Wait for the transaction runner to be in progress
Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txId));

assertFalse(fate.cancel(txId));

var tableLocksPath = ServiceLock.path(ZK_ROOT + Constants.ZTABLE_LOCKS);
assertFalse(zk.getChildren(tableLocksPath.toString()).isEmpty(),
"Table locks at " + tableLocksPath + "do not exist");
for (var tableName : zk.getChildren(tableLocksPath.toString())) {
for (var tableLock : zk.getChildren(
ServiceLock.path(ZK_ROOT + Constants.ZTABLE_LOCKS + "/" + tableName).toString())) {
LOG.debug("Found table {} with fate lock {}", tableName, tableLock);
}
}

admin.deleteLocks(zk, tableLocksPath, FateTxId.toHexString(formattedTxId));
for (var tableName : zk.getChildren(tableLocksPath.toString())) {
var fateTableLocks = tableLocksPath + "/" + tableName;
assertTrue(zk.getChildren(fateTableLocks).isEmpty(), " table fate locks are still present");
}
}

@Test
public void testRepoFails() throws Exception {
/*
Expand Down