diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java index fa9d5e2e9dd6..fbece9c199a7 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -57,9 +56,6 @@ public class PartitionManagementTask implements MetastoreTaskThread { public static final String DISCOVER_PARTITIONS_TBLPROPERTY = "discover.partitions"; public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = "partition.retention.period"; private static final Lock lock = new ReentrantLock(); - // these are just for testing - private static int completedAttempts; - private static int skippedAttempts; private Configuration conf; @@ -87,7 +83,6 @@ private static boolean partitionDiscoveryEnabled(Map params) { @Override public void run() { if (lock.tryLock()) { - skippedAttempts = 0; String qualifiedTableName = null; IMetaStoreClient msc = null; try { @@ -136,10 +131,8 @@ public void run() { } lock.unlock(); } - completedAttempts++; } else { - skippedAttempts++; - LOG.info("Lock is held by some other partition discovery task. Skipping this attempt..#{}", skippedAttempts); + LOG.info("Lock is held by some other partition discovery task. Skipping this attempt."); } } @@ -200,13 +193,4 @@ public void run() { } } - @VisibleForTesting - public static int getSkippedAttempts() { - return skippedAttempts; - } - - @VisibleForTesting - public static int getCompletedAttempts() { - return completedAttempts; - } } diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java index e2fd7bf9cc51..55e86e3a5323 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java @@ -54,6 +54,11 @@ import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.core.test.appender.ListAppender; import org.apache.thrift.TException; import org.junit.After; import org.junit.Assert; @@ -473,7 +478,7 @@ public void testPartitionDiscoveryTablePattern() throws TException, IOException } @Test - public void testPartitionDiscoveryTransactionalTable() + public void testPartitionDiscoveryTransactionalTableConcurrent() throws TException, IOException, InterruptedException, ExecutionException { String dbName = "db6"; String tableName = "tbl6"; @@ -503,47 +508,72 @@ public void testPartitionDiscoveryTransactionalTable() TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY); client.alter_table(dbName, tableName, table); - runPartitionManagementTask(conf); - partitions = client.listPartitions(dbName, tableName, (short) -1); - assertEquals(5, partitions.size()); - - // only one partition discovery task is running, there will be no skipped attempts - assertEquals(0, PartitionManagementTask.getSkippedAttempts()); - - // delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the - // same table, only one of them will run other attempts will be skipped - boolean deleted = fs.delete(newPart1.getParent(), true); - assertTrue(deleted); - assertEquals(4, fs.listStatus(tablePath).length); - - // 3 tasks are submitted at the same time, only one will eventually lock the table and only one get to run at a time - // This is to simulate, skipping partition discovery task attempt when previous attempt is still incomplete - PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask(); - partitionDiscoveryTask1.setConf(conf); - PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask(); - partitionDiscoveryTask2.setConf(conf); - PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask(); - partitionDiscoveryTask3.setConf(conf); - List tasks = Lists - .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3); - ExecutorService executorService = Executors.newFixedThreadPool(3); - int successBefore = PartitionManagementTask.getCompletedAttempts(); - int skippedBefore = PartitionManagementTask.getSkippedAttempts(); - List> futures = new ArrayList<>(); - for (PartitionManagementTask task : tasks) { - futures.add(executorService.submit(task)); - } - for (Future future : futures) { - future.get(); + final String appenderName = "testPartitionDiscoveryTransactionalTableConcurrentAppender"; + LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false); + LoggerConfig rootLoggerConfig = loggerContext.getConfiguration().getLoggerConfig(""); + ListAppender skipAppender = new ListAppender(appenderName); + skipAppender.start(); + rootLoggerConfig.addAppender(skipAppender, Level.INFO, null); + try { + runPartitionManagementTask(conf); + partitions = client.listPartitions(dbName, tableName, (short) -1); + assertEquals(5, partitions.size()); + + // only one partition discovery task is running, there will be no skipped attempts + assertEquals(0, countSkipMessages(skipAppender)); + assertEquals(1, countDiscoveryEntries(skipAppender)); + + // delete a partition from fs, and submit 3 tasks at the same time each of them trying to acquire X lock on the + // same table, only one of them will run other attempts will be skipped + boolean deleted = fs.delete(newPart1.getParent(), true); + assertTrue(deleted); + assertEquals(4, fs.listStatus(tablePath).length); + + // 3 tasks are submitted at the same time, only one will eventually lock the table and only one + // get to run at a time. This is to simulate, skipping partition discovery task attempt when + // previous attempt is still incomplete + PartitionManagementTask partitionDiscoveryTask1 = new PartitionManagementTask(); + partitionDiscoveryTask1.setConf(conf); + PartitionManagementTask partitionDiscoveryTask2 = new PartitionManagementTask(); + partitionDiscoveryTask2.setConf(conf); + PartitionManagementTask partitionDiscoveryTask3 = new PartitionManagementTask(); + partitionDiscoveryTask3.setConf(conf); + List tasks = Lists + .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, partitionDiscoveryTask3); + ExecutorService executorService = Executors.newFixedThreadPool(3); + List> futures = new ArrayList<>(); + for (PartitionManagementTask task : tasks) { + futures.add(executorService.submit(task)); + } + for (Future future : futures) { + future.get(); + } + long skips = countSkipMessages(skipAppender); + long discoveries = countDiscoveryEntries(skipAppender); + assertEquals(4, skips + discoveries); + assertTrue("at least one more task should have entered the work path during the race", discoveries >= 2); + } finally { + rootLoggerConfig.removeAppender(appenderName); + skipAppender.stop(); } - int successAfter = PartitionManagementTask.getCompletedAttempts(); - int skippedAfter = PartitionManagementTask.getSkippedAttempts(); - assertEquals(1, successAfter - successBefore); - assertEquals(2, skippedAfter - skippedBefore); partitions = client.listPartitions(dbName, tableName, (short) -1); assertEquals(4, partitions.size()); } + private static long countSkipMessages(ListAppender appender) { + return appender.getEvents().stream() + .map(e -> e.getMessage().getFormattedMessage()) + .filter(m -> m.equals("Lock is held by some other partition discovery task. Skipping this attempt.")) + .count(); + } + + private static long countDiscoveryEntries(ListAppender appender) { + return appender.getEvents().stream() + .map(e -> e.getMessage().getFormattedMessage()) + .filter(m -> m.equals("Found 1 candidate tables for partition discovery")) + .count(); + } + @Test public void testPartitionRetention() throws TException, IOException, InterruptedException { String dbName = "db7";