Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
Expand All @@ -57,9 +56,6 @@ public class PartitionManagementTask implements MetastoreTaskThread {
public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions";
public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period";
private static final Lock lock = new ReentrantLock();
// these are just for testing
private static int completedAttempts;
private static int skippedAttempts;

private Configuration conf;

Expand Down Expand Up @@ -87,7 +83,6 @@ private static boolean partitionDiscoveryEnabled(Map<String, String> params) {
@Override
public void run() {
if (lock.tryLock()) {
skippedAttempts = 0;
String qualifiedTableName = null;
IMetaStoreClient msc = null;
try {
Expand Down Expand Up @@ -136,10 +131,8 @@ public void run() {
}
lock.unlock();
}
completedAttempts++;
} else {
skippedAttempts++;
LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts);
LOG.info("Lock is held by some other partition discovery task. Skipping this attempt.");
}
}

Expand Down Expand Up @@ -200,13 +193,4 @@ public void run() {
}
}

@VisibleForTesting
public static int getSkippedAttempts() {
return skippedAttempts;
}

@VisibleForTesting
public static int getCompletedAttempts() {
return completedAttempts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.test.appender.ListAppender;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -473,7 +478,7 @@ public void testPartitionDiscoveryTablePattern() throws TException, IOException
}

@Test
public void testPartitionDiscoveryTransactionalTable()
public void testPartitionDiscoveryTransactionalTableConcurrent()
throws TException, IOException, InterruptedException, ExecutionException {
String dbName = "db6";
String tableName = "tbl6";
Expand Down Expand Up @@ -503,47 +508,72 @@ public void testPartitionDiscoveryTransactionalTable()
TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
client.alter_table(dbName, tableName, table);

runPartitionManagementTask(conf);
partitions = client.listPartitions(dbName, tableName, (short) -1);
assertEquals(5, partitions.size());

// only one partition discovery task is running, there will be no skipped attempts
assertEquals(0, PartitionManagementTask.getSkippedAttempts());

// delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the
// same table, only one of them will run other attempts will be skipped
boolean deleted = fs.delete(newPart1.getParent(), true);
assertTrue(deleted);
assertEquals(4, fs.listStatus(tablePath).length);

// 3 tasks are submitted at the same time, only one will eventually lock the table and only one get to run at a time
// This is to simulate, skipping partition discovery task attempt when previous attempt is still incomplete
PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask();
partitionDiscoveryTask1.setConf(conf);
PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask();
partitionDiscoveryTask2.setConf(conf);
PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask();
partitionDiscoveryTask3.setConf(conf);
List<PartitionManagementTask> tasks = Lists
.newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
int successBefore = PartitionManagementTask.getCompletedAttempts();
int skippedBefore = PartitionManagementTask.getSkippedAttempts();
List<Future<?>> futures = new ArrayList<>();
for (PartitionManagementTask task : tasks) {
futures.add(executorService.submit(task));
}
for (Future<?> future : futures) {
future.get();
final String appenderName = "testPartitionDiscoveryTransactionalTableConcurrentAppender";
LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
LoggerConfig rootLoggerConfig = loggerContext.getConfiguration().getLoggerConfig("");
ListAppender skipAppender = new ListAppender(appenderName);
skipAppender.start();
rootLoggerConfig.addAppender(skipAppender, Level.INFO, null);
try {
runPartitionManagementTask(conf);
partitions = client.listPartitions(dbName, tableName, (short) -1);
assertEquals(5, partitions.size());

// only one partition discovery task is running, there will be no skipped attempts
assertEquals(0, countSkipMessages(skipAppender));
assertEquals(1, countDiscoveryEntries(skipAppender));

// delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the
// same table, only one of them will run other attempts will be skipped
boolean deleted = fs.delete(newPart1.getParent(), true);
assertTrue(deleted);
assertEquals(4, fs.listStatus(tablePath).length);

// 3 tasks are submitted at the same time, only one will eventually lock the table and only one
// get to run at a time. This is to simulate, skipping partition discovery task attempt when
// previous attempt is still incomplete
PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask();
partitionDiscoveryTask1.setConf(conf);
PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask();
partitionDiscoveryTask2.setConf(conf);
PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask();
partitionDiscoveryTask3.setConf(conf);
List<PartitionManagementTask> tasks = Lists
.newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Future<?>> futures = new ArrayList<>();
for (PartitionManagementTask task : tasks) {
futures.add(executorService.submit(task));
}
for (Future<?> future : futures) {
future.get();
}
long skips = countSkipMessages(skipAppender);
long discoveries = countDiscoveryEntries(skipAppender);
assertEquals(4, skips + discoveries);
assertTrue("at least one more task should have entered the work path during the race", discoveries >= 2);
} finally {
rootLoggerConfig.removeAppender(appenderName);
skipAppender.stop();
}
int successAfter = PartitionManagementTask.getCompletedAttempts();
int skippedAfter = PartitionManagementTask.getSkippedAttempts();
assertEquals(1, successAfter - successBefore);
assertEquals(2, skippedAfter - skippedBefore);
partitions = client.listPartitions(dbName, tableName, (short) -1);
assertEquals(4, partitions.size());
}

private static long countSkipMessages(ListAppender appender) {
return appender.getEvents().stream()
.map(e -> e.getMessage().getFormattedMessage())
.filter(m -> m.equals("Lock is held by some other partition discovery task. Skipping this attempt."))
.count();
}

private static long countDiscoveryEntries(ListAppender appender) {
return appender.getEvents().stream()
.map(e -> e.getMessage().getFormattedMessage())
.filter(m -> m.equals("Found 1 candidate tables for partition discovery"))
.count();
}

@Test
public void testPartitionRetention() throws TException, IOException, InterruptedException {
String dbName = "db7";
Expand Down
Loading