From 69395a08ee6cb1c91d6fbc4d06984484ede079e0 Mon Sep 17 00:00:00 2001 From: shreenidhiSaigaonkar Date: Tue, 2 Jun 2026 21:03:49 +0530 Subject: [PATCH] HIVE-29644: HMS hang/deadlock during ACID replication: compaction enqueue incorrectly runs inside replTableWriteIdState transaction - ReplTableWriteIdStateFunction enqueued major compactions inline (via CompactFunction) inside the long @Transactional(POOL_TX) replTableWriteIdState call, holding the NEXT_COMPACTION_QUEUE_ID row lock across all partitions while re-acquiring the CompactionScheduler mutex; this could AB-BA deadlock with a concurrent compact() caller (initiator or another replication job) across the POOL_TX/POOL_MUTEX connection pools. - replTableWriteIdState now only applies the write-id state and returns whether compaction is needed; TransactionHandler.repl_tbl_writeid_state schedules the per-partition major compactions afterwards via ReplAbortedWriteCompactionScheduler, each in its own compact() transaction (restoring the pre-HIVE-27481 behaviour, same as manual ALTER TABLE COMPACT). --- .../metastore/handler/TransactionHandler.java | 5 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 4 +- .../hadoop/hive/metastore/txn/TxnStore.java | 2 +- .../ReplAbortedWriteCompactionScheduler.java | 48 +++++++++++++++++++ .../ReplTableWriteIdStateFunction.java | 30 +++--------- 5 files changed, 61 insertions(+), 28 deletions(-) create mode 100644 standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplAbortedWriteCompactionScheduler.java diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TransactionHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TransactionHandler.java index 087a258920d1..161eba555751 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TransactionHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/TransactionHandler.java @@ -100,6 +100,7 @@ import org.apache.hadoop.hive.metastore.messaging.EventMessage; import org.apache.hadoop.hive.metastore.txn.CompactionMetricsDataConverter; import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo; +import org.apache.hadoop.hive.metastore.txn.jdbc.functions.ReplAbortedWriteCompactionScheduler; import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.FilterUtils; import org.apache.thrift.TException; @@ -249,7 +250,9 @@ public void commit_txn(CommitTxnRequest rqst) throws TException { @Override public void repl_tbl_writeid_state(ReplTblWriteIdStateRequest rqst) throws TException { - getTxnHandler().replTableWriteIdState(rqst); + if (getTxnHandler().replTableWriteIdState(rqst)) { + ReplAbortedWriteCompactionScheduler.scheduleMajorCompactions(getTxnHandler(), rqst); + } } @Override diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index a847e01aca4d..ba77ed08efcf 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -661,8 +661,8 @@ public long getLatestTxnIdInConflict(long txnid) throws MetaException { * @throws MetaException */ @Override - public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException { - new ReplTableWriteIdStateFunction(rqst, mutexAPI, transactionalListeners).execute(jdbcResource); + public boolean replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException { + return new ReplTableWriteIdStateFunction(rqst, transactionalListeners).execute(jdbcResource); } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 22e6c279fc84..8785d46b9fc0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -268,7 +268,7 @@ void commitTxn(CommitTxnRequest rqst) @SqlRetry(lockInternally = true) @Transactional(POOL_TX) @RetrySemantics.Idempotent("No-op if already replicated the writeid state") - void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException; + boolean replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException; @Transactional(POOL_TX) void updateTransactionStatistics(UpdateTransactionalStatsRequest req) throws MetaException; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplAbortedWriteCompactionScheduler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplAbortedWriteCompactionScheduler.java new file mode 100644 index 000000000000..ce4463ba1b35 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplAbortedWriteCompactionScheduler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.txn.jdbc.functions; + +import org.apache.hadoop.hive.metastore.api.CompactionRequest; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; +import org.apache.hadoop.hive.metastore.txn.TxnStore; + +/** + * Schedules major compactions after replication write-id state is committed. + * Each compaction is enqueued via {@link TxnStore#compact(CompactionRequest)}, which runs in its + * own transaction, avoiding cross-pool deadlocks when multiple partitions are processed. + */ +public final class ReplAbortedWriteCompactionScheduler { + + private ReplAbortedWriteCompactionScheduler() {} + + public static void scheduleMajorCompactions(TxnStore txnStore, ReplTblWriteIdStateRequest rqst) + throws MetaException { + CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(), + CompactionType.MAJOR); + if (rqst.isSetPartNames()) { + for (String partName : rqst.getPartNames()) { + compactRqst.setPartitionname(partName); + txnStore.compact(compactRqst); + } + } else { + txnStore.compact(compactRqst); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplTableWriteIdStateFunction.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplTableWriteIdStateFunction.java index b47cf7fdcfd9..b0c82c55d363 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplTableWriteIdStateFunction.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/ReplTableWriteIdStateFunction.java @@ -20,14 +20,11 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg; -import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource; import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction; import org.slf4j.Logger; @@ -43,22 +40,21 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class ReplTableWriteIdStateFunction implements TransactionalFunction { +public class ReplTableWriteIdStateFunction implements TransactionalFunction { private static final Logger LOG = LoggerFactory.getLogger(ReplTableWriteIdStateFunction.class); private final ReplTblWriteIdStateRequest rqst; - private final TxnStore.MutexAPI mutexAPI; private final List transactionalListeners; - public ReplTableWriteIdStateFunction(ReplTblWriteIdStateRequest rqst, TxnStore.MutexAPI mutexAPI, List transactionalListeners) { + public ReplTableWriteIdStateFunction(ReplTblWriteIdStateRequest rqst, + List transactionalListeners) { this.rqst = rqst; - this.mutexAPI = mutexAPI; this.transactionalListeners = transactionalListeners; } @Override - public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaException { + public Boolean execute(MultiDataSourceJdbcResource jdbcResource) throws MetaException { long openTxnTimeOutMillis = MetastoreConf.getTimeVar(jdbcResource.getConf(), MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS); String dbName = rqst.getDbName().toLowerCase(); @@ -79,7 +75,7 @@ public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExcepti if (found) { LOG.info("Idempotent flow: WriteId state <{}> is already applied for the table: {}.{}", validWriteIdList, dbName, tblName); - return null; + return false; } // Get the abortedWriteIds which are already sorted in ascending order. @@ -131,21 +127,7 @@ public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaExcepti .addValue("tableName", tblName) .addValue("nextWriteId", nextWriteId)); LOG.info("WriteId state <{}> is applied for the table: {}.{}", validWriteIdList, dbName, tblName); - - // Schedule Major compaction on all the partitions/table to clean aborted data - if (numAbortedWrites > 0) { - CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(), - CompactionType.MAJOR); - if (rqst.isSetPartNames()) { - for (String partName : rqst.getPartNames()) { - compactRqst.setPartitionname(partName); - new CompactFunction(compactRqst, openTxnTimeOutMillis, mutexAPI).execute(jdbcResource); - } - } else { - new CompactFunction(compactRqst, openTxnTimeOutMillis, mutexAPI).execute(jdbcResource); - } - } - return null; + return numAbortedWrites > 0; } private List getAbortedWriteIds(ValidWriteIdList validWriteIdList) {