Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Locale;
import java.util.Optional;

import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
import org.apache.hadoop.hive.metastore.client.builder.GetTablesRequestBuilder;
Expand Down Expand Up @@ -821,6 +822,7 @@ public void testGetTablesExt() throws Exception {
count = 300;
tProps.put("TBLNAME", "test_limit");
tProps.put("TABLECOUNT", count);
tProps.remove("CAPABILITIES"); // CAPABILITIES are already appended to PROPERTIES
tables = createTables(tProps);
assertEquals("Unexpected number of tables created", count, tables.size());

Expand Down Expand Up @@ -934,7 +936,8 @@ public void testGetPartitionsByNames() throws Exception {
properties.append("transactional_properties=insert_only");
tProps.put("TBLNAME", tblName);
tProps.put("PROPERTIES", properties.toString());
setHMSClient("createTable", new String[] {"HIVEMANAGEDINSERTWRITE,HIVEFULLACIDWRITE"});
tProps.put("TBLTYPE", type);
setHMSClient("createTable", new String[] {"HIVEMANAGEDINSERTWRITE", "HIVEFULLACIDWRITE"});
table = createTableWithCapabilities(tProps);
resetHMSClient();

Expand Down Expand Up @@ -1772,7 +1775,7 @@ private List<String> createTables(Map<String, Object> props) throws Exception {
String tblName = (String)props.get("TBLNAME");
List<String> caps = (List<String>)props.get("CAPABILITIES");
StringBuilder table_params = new StringBuilder();
table_params.append((String)props.get("PROPERTIES"));
Optional.ofNullable(props.get("PROPERTIES")).ifPresent(table_params::append);
if (caps != null)
table_params.append(CAPABILITIES_KEY).append("=").append(String.join(",", caps));
props.put("PROPERTIES", table_params.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,10 @@ public void init(AtomicBoolean stop) throws Exception {
checkInterval = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
boolean isSupportAcid = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID);
optimizers = Arrays.stream(MetastoreConf.getTrimmedStringsVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS))
.filter(e -> isSupportAcid || !e.equalsIgnoreCase(MetastoreConf.ACID_TABLE_OPTIMIZER_CLASS))
.map(this::instantiateTableOptimizer).toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public class MetastoreConf {
"metastore.authentication.ldap.userMembershipKey";
public static final String METASTORE_RETRYING_HANDLER_CLASS =
"org.apache.hadoop.hive.metastore.RetryingHMSHandler";
public static final String ACID_TABLE_OPTIMIZER_CLASS =
"org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer";
public static final String ICEBERG_TABLE_OPTIMIZER_CLASS =
"org.apache.iceberg.mr.hive.compaction.IcebergTableOptimizer";

private static final Map<String, ConfVars> metaConfs = new HashMap<>();
private static volatile URL hiveSiteURL = null;
Expand Down Expand Up @@ -663,8 +667,7 @@ public enum ConfVars {
"Enable table caching in the initiator. Currently the cache is cleaned after each cycle."),
COMPACTOR_INITIATOR_TABLE_OPTIMIZERS("compactor.table.optimizers",
"hive.compactor.table.optimizers",
"org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer," +
"org.apache.iceberg.mr.hive.compaction.IcebergTableOptimizer",
ACID_TABLE_OPTIMIZER_CLASS + "," + ICEBERG_TABLE_OPTIMIZER_CLASS,
"Comma separated list of table optimizers executed by compaction Initiator."),
COMPACTOR_WORKER_THREADS("metastore.compactor.worker.threads",
"hive.compactor.worker.threads", 0,
Expand Down Expand Up @@ -2012,6 +2015,8 @@ public enum ConfVars {
"The maximum non-native tables allowed per table type during collecting the summary."),
METADATA_SUMMARY_NONNATIVE_THREADS("hive.metatool.summary.nonnative.threads", "hive.metatool.summary.nonnative.threads", 20,
"Number of threads to be allocated for MetaToolTaskMetadataSummary for collecting the non-native table's summary."),
METASTORE_SUPPORT_ACID("metastore.support.acid", "hive.metastore.support.acid", true,
"Whether to support acid functionality in Hive metastore server."),

// These are all values that we put here just for testing
STR_TEST_ENTRY("test.str", "hive.test.str", "defaultval", "comment"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre

@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, unit) / 2;
return MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID) ?
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, unit) / 2 : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -614,6 +615,14 @@ private static final Path getDefaultPath(IHMSHandler hmsHandler, Database db, St

}

private void validateIfAcidTablePermitted(Table table) throws MetaException {
if (!MetastoreConf.getBoolVar(hmsHandler.getConf(), ConfVars.METASTORE_SUPPORT_ACID) &&
TxnUtils.isTransactionalTable(table)) {
throw new MetaException("ACID tables are not permitted when the "
+ ConfVars.METASTORE_SUPPORT_ACID.getHiveName() + " property is set to false");
}
}

@Override
public Table transformCreateTable(Table table, List<String> processorCapabilities, String processorId) throws MetaException {
if (!defaultCatalog.equalsIgnoreCase(table.getCatName())) {
Expand All @@ -624,9 +633,10 @@ public Table transformCreateTable(Table table, List<String> processorCapabilitie
Table newTable = new Table(table);
LOG.info("Starting translation for CreateTable for processor " + processorId + " with " + processorCapabilities
+ " on table " + newTable.getTableName());
Map<String, String> params = table.getParameters();
Map<String, String> params = newTable.getParameters();
if (params == null) {
params = new HashMap<>();
newTable.setParameters(params);
}
String tableType = newTable.getTableType();
String dbName = table.getDbName();
Expand All @@ -647,7 +657,6 @@ public Table transformCreateTable(Table table, List<String> processorCapabilitie
params.put(HiveMetaHook.EXTERNAL, "TRUE");
params.put(EXTERNAL_TABLE_PURGE, "TRUE");
params.put(HiveMetaHook.TRANSLATED_TO_EXTERNAL, "TRUE");
newTable.setParameters(params);
LOG.info("Modified table params are:" + params.toString());

if (getLocation(table) == null) {
Expand All @@ -663,6 +672,7 @@ public Table transformCreateTable(Table table, List<String> processorCapabilitie
// should we check tbl directory existence?
}
} else { // ACID table
validateIfAcidTablePermitted(newTable);
// if the property 'EXTERNAL_TABLES_ONLY'='true' is set on the database, then creating managed/ACID tables are prohibited. See HIVE-25724 for more details.
if (db.getParameters().containsKey(EXTERNALTABLESONLY) &&
db.getParameters().get(EXTERNALTABLESONLY).equalsIgnoreCase("true")) {
Expand All @@ -673,8 +683,8 @@ public Table transformCreateTable(Table table, List<String> processorCapabilitie
throw new MetaException("Processor has no capabilities, cannot create an ACID table.");
}

newTable = validateTablePaths(table);
if (MetaStoreUtils.isInsertOnlyTableParam(table.getParameters())) { // MICRO_MANAGED Tables
validateTablePaths(newTable);
if (MetaStoreUtils.isInsertOnlyTableParam(newTable.getParameters())) { // MICRO_MANAGED Tables
if (processorCapabilities.contains(HIVEMANAGEDINSERTWRITE)) {
LOG.debug("Processor has required capabilities to be able to create INSERT-only tables");
return newTable;
Expand All @@ -694,7 +704,8 @@ public Table transformCreateTable(Table table, List<String> processorCapabilitie
}
} else if (TableType.EXTERNAL_TABLE.name().equals(tableType)) {
LOG.debug("Table to be created is of type " + tableType);
newTable = validateTablePaths(table);
params.put(HiveMetaHook.EXTERNAL, "TRUE");
validateTablePaths(newTable);
}
LOG.info("Transformer returning table:" + newTable.toString());
return newTable;
Expand Down Expand Up @@ -734,7 +745,7 @@ public Table transformAlterTable(Table oldTable, Table newTable, List<String> pr
LOG.info("Starting translation for Alter table for processor " + processorId + " with " + processorCapabilities
+ " on table " + newTable.getTableName());


validateIfAcidTablePermitted(newTable);
if (tableLocationChanged(oldTable, newTable)) {
validateTablePaths(newTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ public List<MetaStoreThread> getCompactorThreads() throws Exception {
compactors.add(initiator);
}
if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
compactors.add(cleaner);
if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID)) {
MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
compactors.add(cleaner);
} else {
HiveMetaStore.LOG.warn("Compactor Cleaner is turned On. But, automatic compaction cleaner will not run " +
"when the {} property is set to false.", MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID.getHiveName());
}
}
return compactors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.cronutils.utils.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
Expand Down Expand Up @@ -107,20 +106,19 @@ public void takeLeadership(LeaderElection election) throws Exception {
} else {
tasks = new ArrayList<>(getRemoteOnlyTasks());
}
int poolSize = Math.min(MetastoreConf.getIntVar(configuration,
MetastoreConf.ConfVars.THREAD_POOL_SIZE), tasks.size());
metastoreTaskThreadPool = Executors.newScheduledThreadPool(poolSize, threadFactory);
for (MetastoreTaskThread task : tasks) {
tasks.forEach(task -> {
task.setConf(configuration);
task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
if (freq > 0) {
if (task.runFrequency(TimeUnit.MILLISECONDS) > 0) {
runningTasks.add(task);
metastoreTaskThreadPool.scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
}
}

});
int poolSize = Math.min(MetastoreConf.getIntVar(configuration,
MetastoreConf.ConfVars.THREAD_POOL_SIZE), runningTasks.size());
metastoreTaskThreadPool = Executors.newScheduledThreadPool(poolSize, threadFactory);
runningTasks.forEach(task -> {
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
metastoreTaskThreadPool.scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS);
HiveMetaStore.LOG.info("Scheduling for " + task.getClass().getCanonicalName() + " service.");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class AcidMetricLogger implements MetastoreTaskThread {

@Override
public long runFrequency(TimeUnit timeUnit) {
return MetastoreConf
.getTimeVar(conf, MetastoreConf.ConfVars.COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, timeUnit);
return MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID) ?
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, timeUnit) : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public class AcidMetricService implements MetastoreTaskThread {

@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_CHECK_INTERVAL, unit);
return MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID) ?
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_CHECK_INTERVAL, unit) : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public Configuration getConf() {

@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_HOUSEKEEPER_SERVICE_INTERVAL, unit);
return MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID) ?
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_HOUSEKEEPER_SERVICE_INTERVAL, unit) : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class AcidOpenTxnsCounterService implements MetastoreTaskThread {

@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.COUNT_OPEN_TXNS_INTERVAL, unit);
return MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID) ?
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.COUNT_OPEN_TXNS_INTERVAL, unit) : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public Configuration getConf() {

@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_TXN_CLEANER_INTERVAL, unit);
return MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID) ?
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_TXN_CLEANER_INTERVAL, unit) : 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ public CompactionHouseKeeperService() {

@Override
protected void initTasks(){
tasks = ImmutableMap.<FailableRunnable<MetaException>, String>builder()
.put(txnHandler::removeDuplicateCompletedTxnComponents,
"Cleaning duplicate COMPLETED_TXN_COMPONENTS entries")
.put(txnHandler::purgeCompactionHistory, "Cleaning obsolete compaction history entries")
.build();
ImmutableMap.Builder<FailableRunnable<MetaException>, String> taskBuilder =
ImmutableMap.<FailableRunnable<MetaException>, String>builder()
.put(txnHandler::purgeCompactionHistory, "Cleaning obsolete compaction history entries");
if (MetastoreConf.getBoolVar(getConf(), MetastoreConf.ConfVars.METASTORE_SUPPORT_ACID)) {
taskBuilder.put(txnHandler::removeDuplicateCompletedTxnComponents,
"Cleaning duplicate COMPLETED_TXN_COMPONENTS entries");
}
tasks = taskBuilder.build();
}

@Override
Expand Down
Loading
Loading