From d66d51cdeda38934f98848c66893e93c119dec2b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 18:02:29 +0800 Subject: [PATCH 1/2] Fix duplicate scheduling in procedure execution --- .../task/PipeTaskCoordinatorLock.java | 15 +-- .../iotdb/confignode/procedure/Procedure.java | 10 ++ .../procedure/ProcedureExecutor.java | 107 +++++++++--------- .../procedure/impl/StateMachineProcedure.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 2 +- .../impl/node/AbstractNodeProcedure.java | 1 + .../plugin/CreatePipePluginProcedure.java | 2 +- .../pipe/plugin/DropPipePluginProcedure.java | 2 +- .../PipeHandleLeaderChangeProcedure.java | 2 +- .../PipeHandleMetaChangeProcedure.java | 2 +- .../impl/pipe/task/DropPipeProcedureV2.java | 2 +- .../impl/pipe/task/StartPipeProcedureV2.java | 2 +- .../impl/pipe/task/StopPipeProcedureV2.java | 2 +- .../AlterEncodingCompressorProcedure.java | 2 +- .../AlterTimeSeriesDataTypeProcedure.java | 2 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 +- .../schema/DeleteLogicalViewProcedure.java | 2 +- .../schema/DeleteTimeSeriesProcedure.java | 2 +- ...bscriptionHandleLeaderChangeProcedure.java | 2 +- .../impl/trigger/CreateTriggerProcedure.java | 2 +- .../impl/trigger/DropTriggerProcedure.java | 2 +- .../procedure/scheduler/LockQueue.java | 4 + .../task/PipeTaskCoordinatorLockTest.java | 60 ++++++++++ .../confignode/procedure/TestLockRegime.java | 19 ++++ .../procedure/TestProcedureExecutor.java | 52 +++++++++ .../PipeHandleLeaderChangeProcedureTest.java | 47 ++++++++ 26 files changed, 269 insertions(+), 81 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index 0f788435394f2..3a6fe1f698ac2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -46,17 +46,10 @@ public void lock() { LOGGER.debug( ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD, Thread.currentThread().getName()); - try { - semaphore.acquire(); - LOGGER.debug( - ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, - Thread.currentThread().getName()); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.error( - ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, - Thread.currentThread().getName()); - } + semaphore.acquireUninterruptibly(); + LOGGER.debug( + ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD, + Thread.currentThread().getName()); } public boolean tryLock() { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 29a241b16d09d..24a8adc713ffb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -62,6 +63,7 @@ public abstract class Procedure implements Comparable> { private volatile long lastUpdate; private final AtomicReference result = new AtomicReference<>(); + private final AtomicBoolean executing = new AtomicBoolean(false); private volatile boolean locked = false; private boolean lockedWhenLoading = false; @@ -256,6 +258,14 @@ protected boolean isYieldAfterExecution(Env env) { } // -------------------------Internal methods - called by the procedureExecutor------------------ + final boolean tryAcquireExecution() { + return executing.compareAndSet(false, true); + } + + final void releaseExecution() { + executing.set(false); + } + /** * Internal method called by the ProcedureExecutor that starts the user-level code execute(). * diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index 82afea3859fd4..16a9c72626dd3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.i18n.ProcedureMessages; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; @@ -42,7 +41,6 @@ import java.util.Deque; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -328,45 +326,44 @@ private void executeProcedure(Procedure proc) { LOG.warn(ProcedureMessages.ROLLBACK_STACK_IS_NULL_FOR, proc.getProcId()); return; } - ProcedureLockState lockState = null; - try { - do { - if (!rootProcStack.acquire()) { - if (rootProcStack.setRollback()) { - lockState = executeRootStackRollback(rootProcId, rootProcStack); - switch (lockState) { + ProcedureLockState lockState; + do { + if (!rootProcStack.acquire()) { + if (rootProcStack.setRollback()) { + lockState = executeRootStackRollback(rootProcId, rootProcStack); + switch (lockState) { + case LOCK_ACQUIRED: + break; + case LOCK_EVENT_WAIT: + LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc); + rootProcStack.unsetRollback(); + break; + case LOCK_YIELD_WAIT: + rootProcStack.unsetRollback(); + scheduler.yield(proc); + break; + default: + throw new UnsupportedOperationException(); + } + } else { + if (!proc.wasExecuted()) { + switch (executeRollback(proc)) { case LOCK_ACQUIRED: break; case LOCK_EVENT_WAIT: - LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc); - rootProcStack.unsetRollback(); + LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc); break; case LOCK_YIELD_WAIT: - rootProcStack.unsetRollback(); scheduler.yield(proc); break; default: throw new UnsupportedOperationException(); } - } else { - if (!proc.wasExecuted()) { - switch (executeRollback(proc)) { - case LOCK_ACQUIRED: - break; - case LOCK_EVENT_WAIT: - LOG.info( - ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc); - break; - case LOCK_YIELD_WAIT: - scheduler.yield(proc); - break; - default: - throw new UnsupportedOperationException(); - } - } } - break; } + break; + } + try { lockState = acquireLock(proc); switch (lockState) { case LOCK_ACQUIRED: @@ -379,29 +376,23 @@ private void executeProcedure(Procedure proc) { default: throw new UnsupportedOperationException(); } + } finally { rootProcStack.release(); + } - if (proc.isSuccess()) { - // update metrics on finishing the procedure - proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); - LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, proc.elapsedTime()); - if (proc.getProcId() == rootProcId) { - rootProcedureCleanup(proc); - } else { - executeCompletionCleanup(proc); - } - return; + if (proc.isSuccess()) { + // update metrics on finishing the procedure + proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); + LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, proc.elapsedTime()); + if (proc.getProcId() == rootProcId) { + rootProcedureCleanup(proc); + } else { + executeCompletionCleanup(proc); } - - } while (rootProcStack.isFailed()); - } finally { - // Only after procedure has completed execution can it be allowed to be rescheduled to prevent - // data races - if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) { - LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, proc.getProcId()); - ((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc); + return; } - } + + } while (rootProcStack.isFailed()); } /** @@ -414,6 +405,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure p if (proc.getState() != ProcedureState.RUNNABLE) { LOG.error( "The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc); + releaseLock(proc, false); return; } boolean reExecute; @@ -787,6 +779,13 @@ public void run() { Thread.sleep(1000); continue; } + boolean executionAcquired = false; + while (isRunning() && !(executionAcquired = procedure.tryAcquireExecution())) { + Thread.sleep(10); + } + if (!executionAcquired) { + continue; + } this.activeProcedure.set(procedure); activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); @@ -795,13 +794,14 @@ public void run() { executeProcedure(procedure); } finally { PROCEDURE_EXECUTION_CONTEXT.remove(); + procedure.releaseExecution(); + activeExecutorCount.decrementAndGet(); + LOG.trace( + "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); + lastUpdated = System.currentTimeMillis(); + startTime.set(lastUpdated); } - activeExecutorCount.decrementAndGet(); - LOG.trace( - "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); this.activeProcedure.set(null); - lastUpdated = System.currentTimeMillis(); - startTime.set(lastUpdated); } } catch (Exception e) { @@ -812,6 +812,7 @@ public void run() { this.activeProcedure.get(), e); } + this.activeProcedure.set(null); } finally { LOG.info(ProcedureMessages.PROCEDURE_WORKER_TERMINATED, getName()); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java index c3c9167e6ef82..f7275a152d7e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java @@ -195,7 +195,8 @@ private void addNextStateAndCalculateCycles() { nextState); } } - if (getStateId(getCurrentState()) == stateToBeAdded) { + final TState currentState = getCurrentState(); + if (currentState != null && getStateId(currentState) == stateToBeAdded) { cycles++; } else { cycles = 0; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index 490f723d2e6b0..240e84f6316f8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -299,7 +299,7 @@ public boolean equals(Object o) { } CreateCQProcedure that = (CreateCQProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java index b141027917366..9a088560a5b38 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/AbstractNodeProcedure.java @@ -50,6 +50,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced LOG.info( "procedureId {} acquire lock failed, will wait for lock after finishing execution.", getProcId()); + configNodeProcedureEnv.getNodeLock().waitProcedure(this); return ProcedureLockState.LOCK_EVENT_WAIT; } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index ddc658bdf4d2d..b1583c74c6b7e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -327,7 +327,7 @@ public boolean equals(Object that) { if (that instanceof CreatePipePluginProcedure) { CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that; return thatProcedure.getProcId() == getProcId() - && thatProcedure.getCurrentState().equals(getCurrentState()) + && Objects.equals(thatProcedure.getCurrentState(), getCurrentState()) && thatProcedure.getCycles() == getCycles() && thatProcedure.pipePluginMeta.equals(pipePluginMeta); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 771ab6230bff5..ff1caad7ad4d1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -296,7 +296,7 @@ public boolean equals(Object that) { if (that instanceof DropPipePluginProcedure) { final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) that; return thatProcedure.getProcId() == getProcId() - && thatProcedure.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProcedure.getCurrentState(), this.getCurrentState()) && thatProcedure.getCycles() == this.getCycles() && (thatProcedure.pluginName).equals(pluginName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 2c5b9566fce61..b2ef752854fd1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -185,7 +185,7 @@ public boolean equals(Object o) { } PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && this.regionGroupToOldAndNewLeaderPairMap.equals( that.regionGroupToOldAndNewLeaderPairMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index a0dec36735265..76bf432d45407 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -190,7 +190,7 @@ public boolean equals(Object o) { } PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && needWriteConsensusOnConfigNodes == that.needWriteConsensusOnConfigNodes && needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java index 4ba6906785101..530ca4c1956c6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java @@ -169,7 +169,7 @@ public boolean equals(Object o) { } DropPipeProcedureV2 that = (DropPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && pipeName.equals(that.pipeName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index 251a4f0e3af0f..7c9bff96f8fe6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -188,7 +188,7 @@ public boolean equals(Object o) { } StartPipeProcedureV2 that = (StartPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && pipeName.equals(that.pipeName); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java index e2ec41f3b8336..4410be01d3fdd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java @@ -191,7 +191,7 @@ public boolean equals(Object o) { } StopPipeProcedureV2 that = (StopPipeProcedureV2) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && isStoppedByRuntimeExceptionBeforeStop == that.isStoppedByRuntimeExceptionBeforeStop && pipeName.equals(that.pipeName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java index 1ad60f6f852ca..f168f6dd6835b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java @@ -311,7 +311,7 @@ public boolean equals(final Object o) { } final AlterEncodingCompressorProcedure that = (AlterEncodingCompressorProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && Objects.equals(this.queryId, that.queryId) && this.isGeneratedByPipe == that.isGeneratedByPipe diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java index 26ea988f98e72..3a4431047c218 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterTimeSeriesDataTypeProcedure.java @@ -407,7 +407,7 @@ public boolean equals(final Object o) { } final AlterTimeSeriesDataTypeProcedure that = (AlterTimeSeriesDataTypeProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && this.isGeneratedByPipe == that.isGeneratedByPipe && this.measurementPath.equals(that.measurementPath) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 0b3eca82dceb4..7b8de7821739a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -315,7 +315,7 @@ public boolean equals(final Object that) { if (that instanceof DeleteDatabaseProcedure) { final DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java index 4f63e96840c20..5846c79b5219f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteLogicalViewProcedure.java @@ -313,7 +313,7 @@ public boolean equals(final Object o) { } final DeleteLogicalViewProcedure that = (DeleteLogicalViewProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == that.getCycles() && isGeneratedByPipe == that.isGeneratedByPipe && patternTree.equals(that.patternTree); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java index 0b5e45b5ca1f5..1cd5efe639cd8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java @@ -392,7 +392,7 @@ public boolean equals(final Object o) { } final DeleteTimeSeriesProcedure that = (DeleteTimeSeriesProcedure) o; return this.getProcId() == that.getProcId() - && this.getCurrentState().equals(that.getCurrentState()) + && Objects.equals(this.getCurrentState(), that.getCurrentState()) && this.getCycles() == getCycles() && this.isGeneratedByPipe == that.isGeneratedByPipe && this.patternTree.equals(that.patternTree); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java index 5337b719d207f..222b66a944bff 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/runtime/SubscriptionHandleLeaderChangeProcedure.java @@ -265,7 +265,7 @@ public boolean equals(final Object o) { final SubscriptionHandleLeaderChangeProcedure that = (SubscriptionHandleLeaderChangeProcedure) o; return getProcId() == that.getProcId() - && getCurrentState().equals(that.getCurrentState()) + && Objects.equals(getCurrentState(), that.getCurrentState()) && getCycles() == that.getCycles() && runtimeVersion == that.runtimeVersion && regionGroupToOldAndNewLeaderPairMap.equals(that.regionGroupToOldAndNewLeaderPairMap); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java index 8406eee97b380..dd297bb4d2235 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/CreateTriggerProcedure.java @@ -312,7 +312,7 @@ public boolean equals(Object that) { if (that instanceof CreateTriggerProcedure) { CreateTriggerProcedure thatProc = (CreateTriggerProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && thatProc.triggerInformation.equals(this.triggerInformation); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java index 126197cbfe653..a3f2c51baa2e1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/trigger/DropTriggerProcedure.java @@ -180,7 +180,7 @@ public boolean equals(Object that) { if (that instanceof DropTriggerProcedure) { DropTriggerProcedure thatProc = (DropTriggerProcedure) that; return thatProc.getProcId() == this.getProcId() - && thatProc.getCurrentState().equals(this.getCurrentState()) + && Objects.equals(thatProc.getCurrentState(), this.getCurrentState()) && thatProc.getCycles() == this.getCycles() && thatProc.isGeneratedByPipe == this.isGeneratedByPipe && (thatProc.triggerName).equals(this.triggerName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java index 832e339c0aede..895c173dbc85a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/scheduler/LockQueue.java @@ -46,6 +46,10 @@ public boolean releaseLock(Procedure procedure) { } public void waitProcedure(Procedure procedure) { + if (deque.stream() + .anyMatch(waitingProcedure -> waitingProcedure.getProcId() == procedure.getProcId())) { + return; + } deque.addLast(procedure); } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java new file mode 100644 index 0000000000000..74d5d821d76c3 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLockTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.pipe.coordinator.task; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class PipeTaskCoordinatorLockTest { + + @Test + public void testInterruptedThreadDoesNotAcquireWithoutPermit() throws Exception { + PipeTaskCoordinatorLock lock = new PipeTaskCoordinatorLock(); + lock.lock(); + + CountDownLatch waiting = new CountDownLatch(1); + AtomicBoolean acquired = new AtomicBoolean(false); + Thread thread = + new Thread( + () -> { + Thread.currentThread().interrupt(); + waiting.countDown(); + lock.lock(); + acquired.set(true); + lock.unlock(); + }); + thread.start(); + + Assert.assertTrue(waiting.await(3, TimeUnit.SECONDS)); + TimeUnit.MILLISECONDS.sleep(200); + Assert.assertFalse(acquired.get()); + + lock.unlock(); + thread.join(TimeUnit.SECONDS.toMillis(3)); + + Assert.assertFalse(thread.isAlive()); + Assert.assertTrue(acquired.get()); + Assert.assertFalse(lock.isLocked()); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java index 500a51e9e3d07..85b0dfb8c9fb4 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestLockRegime.java @@ -19,7 +19,10 @@ package org.apache.iotdb.confignode.procedure; +import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; import org.apache.iotdb.confignode.procedure.entity.SimpleLockProcedure; +import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; +import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; @@ -43,4 +46,20 @@ public void testAcquireLock() { this.procExecutor, procIdList.stream().mapToLong(Long::longValue).toArray()); Assert.assertEquals(env.lockAcquireSeq.toString(), env.executeSeq.toString()); } + + @Test + public void testLockQueueDoesNotWakeDuplicateProcedure() { + LockQueue lockQueue = new LockQueue(); + SimpleProcedureScheduler scheduler = new SimpleProcedureScheduler(); + scheduler.start(); + + NoopProcedure procedure = new NoopProcedure(); + procedure.setProcId(1); + lockQueue.waitProcedure(procedure); + lockQueue.waitProcedure(procedure); + + Assert.assertEquals(1, lockQueue.wakeWaitingProcedures(scheduler)); + Assert.assertEquals(1, scheduler.size()); + scheduler.stop(); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index dce7f2ba5dc48..0b4e716dc3b05 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -23,11 +23,13 @@ import org.apache.iotdb.confignode.procedure.entity.NoopProcedure; import org.apache.iotdb.confignode.procedure.entity.StuckProcedure; import org.apache.iotdb.confignode.procedure.env.TestProcEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -96,6 +98,23 @@ public void testWorkerThreadStuck() throws InterruptedException { ProcedureTestUtil.waitForProcedure(procExecutor, busyProcId2); } + @Test + public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws InterruptedException { + BlockingProcedure blockingProcedure = new BlockingProcedure(); + long procId = procExecutor.submitProcedure(blockingProcedure); + + Assert.assertTrue(blockingProcedure.awaitExecution(30, TimeUnit.SECONDS)); + + procExecutor.getScheduler().addFront(blockingProcedure); + boolean duplicated = blockingProcedure.awaitExecution(3, TimeUnit.SECONDS); + + blockingProcedure.releaseExecutions(duplicated ? 2 : 1); + ProcedureTestUtil.waitForProcedure(procExecutor, procId); + + Assert.assertFalse(duplicated); + Assert.assertEquals(1, blockingProcedure.getExecutionCount()); + } + private int waitThreadCount(final int expectedThreads) { long startTime = System.currentTimeMillis(); while (procExecutor.isRunning() @@ -107,4 +126,37 @@ private int waitThreadCount(final int expectedThreads) { } return procExecutor.getWorkerThreadCount(); } + + private static class BlockingProcedure extends Procedure { + + private final Semaphore entered = new Semaphore(0); + private final Semaphore finish = new Semaphore(0); + private final AtomicInteger executionCount = new AtomicInteger(); + + @Override + protected Procedure[] execute(TestProcEnv env) throws InterruptedException { + executionCount.incrementAndGet(); + entered.release(); + finish.acquire(); + return null; + } + + @Override + protected void rollback(TestProcEnv env) + throws IOException, InterruptedException, ProcedureException { + // No state to roll back. + } + + private boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return entered.tryAcquire(timeout, unit); + } + + private void releaseExecutions(int permits) { + finish.release(permits); + } + + private int getExecutionCount() { + return executionCount.get(); + } + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java index 75c0963a27fab..b2ec615fbcd5b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedureTest.java @@ -106,4 +106,51 @@ public void deserializeOldFormatConfigRegionTest() { fail(); } } + + @Test + public void completedProcedureEqualsTest() { + Map> leaderMap = new HashMap<>(); + leaderMap.put(new TConsensusGroupId(TConsensusGroupType.DataRegion, 1), new Pair<>(1, 2)); + + try { + PipeHandleLeaderChangeProcedure proc = deserializeCompletedProcedure(leaderMap); + PipeHandleLeaderChangeProcedure proc2 = deserializeCompletedProcedure(leaderMap); + + assertEquals(proc, proc2); + assertEquals(proc.hashCode(), proc2.hashCode()); + } catch (Exception e) { + fail(); + } + } + + private PipeHandleLeaderChangeProcedure deserializeCompletedProcedure( + Map> leaderMap) throws Exception { + PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + outputStream.writeShort(ProcedureType.PIPE_HANDLE_LEADER_CHANGE_PROCEDURE.getTypeCode()); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeInt(ProcedureState.SUCCESS.ordinal()); + outputStream.writeLong(0L); + outputStream.writeLong(0L); + outputStream.writeLong(Procedure.NO_PROC_ID); + outputStream.writeLong(Procedure.NO_TIMEOUT); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(-1); + outputStream.write((byte) 0); + outputStream.writeInt(1); + outputStream.writeInt(Integer.MIN_VALUE); + outputStream.write((byte) 0); + outputStream.writeInt(leaderMap.size()); + for (Map.Entry> entry : leaderMap.entrySet()) { + outputStream.writeInt(entry.getKey().getId()); + outputStream.writeInt(entry.getValue().getLeft()); + outputStream.writeInt(entry.getValue().getRight()); + } + + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + return (PipeHandleLeaderChangeProcedure) ProcedureFactory.getInstance().create(buffer); + } } From f6e2115b404459305e9cd427ee3b8b706bb48ca8 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 10 Jun 2026 19:07:31 +0800 Subject: [PATCH 2/2] Fix delayed procedure deduplication and semaphore release --- .../procedure/TimeoutExecutorThread.java | 32 ++++++++-- .../procedure/TestProcedureExecutor.java | 40 +++++++++++++ .../table/DataNodeTableCache.java | 10 ++-- .../table/DataNodeTableCacheTest.java | 58 +++++++++++++++++++ 4 files changed, 132 insertions(+), 8 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java index 3e62cdc1d4f30..cfd7bf023b06d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java @@ -43,11 +43,13 @@ public TimeoutExecutorThread( } public void add(Procedure procedure) { - queue.add(new ProcedureDelayContainer<>(procedure)); + ProcedureDelayContainer delayTask = new ProcedureDelayContainer<>(procedure); + queue.remove(delayTask); + queue.add(delayTask); } public boolean remove(Procedure procedure) { - return queue.remove(new ProcedureDelayContainer<>(procedure)); + return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished(); } private ProcedureDelayContainer takeQuietly() { @@ -68,10 +70,15 @@ public void run() { } Procedure procedure = delayTask.getProcedure(); if (procedure instanceof InternalProcedure) { + if (procedure.isFinished()) { + continue; + } InternalProcedure internal = (InternalProcedure) procedure; internal.periodicExecute(executor.getEnvironment()); - procedure.updateTimestamp(); - queue.add(delayTask); + if (!procedure.isFinished()) { + procedure.updateTimestamp(); + queue.add(delayTask); + } } else { if (procedure.setTimeoutFailure(executor.getEnvironment())) { long rootProcId = executor.getRootProcedureId(procedure); @@ -104,6 +111,23 @@ public Procedure getProcedure() { return procedure; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ProcedureDelayContainer)) { + return false; + } + ProcedureDelayContainer that = (ProcedureDelayContainer) o; + return procedure == that.procedure; + } + + @Override + public int hashCode() { + return System.identityHashCode(procedure); + } + @Override public long getDelay(TimeUnit unit) { long delay = procedure.getTimeoutTimestamp() - System.currentTimeMillis(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java index 0b4e716dc3b05..ba5f635507a4a 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureExecutor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.confignode.procedure.entity.StuckProcedure; import org.apache.iotdb.confignode.procedure.env.TestProcEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.state.ProcedureState; import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil; import org.junit.Assert; @@ -115,6 +116,20 @@ public void testDuplicatedSchedulingDoesNotExecuteConcurrently() throws Interrup Assert.assertEquals(1, blockingProcedure.getExecutionCount()); } + @Test + public void testInternalProcedureCanBeDeduplicatedAndRemoved() throws InterruptedException { + CompletingInternalProcedure internalProcedure = new CompletingInternalProcedure(); + + procExecutor.addInternalProcedure(internalProcedure); + procExecutor.addInternalProcedure(internalProcedure); + + Assert.assertTrue(internalProcedure.awaitExecution(30, TimeUnit.SECONDS)); + Assert.assertFalse(internalProcedure.awaitExecution(300, TimeUnit.MILLISECONDS)); + Assert.assertEquals(1, internalProcedure.getExecutionCount()); + + Assert.assertTrue(procExecutor.removeInternalProcedure(internalProcedure)); + } + private int waitThreadCount(final int expectedThreads) { long startTime = System.currentTimeMillis(); while (procExecutor.isRunning() @@ -159,4 +174,29 @@ private int getExecutionCount() { return executionCount.get(); } } + + private static class CompletingInternalProcedure extends InternalProcedure { + + private final Semaphore entered = new Semaphore(0); + private final AtomicInteger executionCount = new AtomicInteger(); + + private CompletingInternalProcedure() { + super(0); + } + + @Override + protected void periodicExecute(TestProcEnv env) { + executionCount.incrementAndGet(); + entered.release(); + setState(ProcedureState.SUCCESS); + } + + private boolean awaitExecution(long timeout, TimeUnit unit) throws InterruptedException { + return entered.tryAcquire(timeout, unit); + } + + private int getExecutionCount() { + return executionCount.get(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java index f545a9bda237d..d8d8f4fe19b95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java @@ -374,8 +374,10 @@ private Map> mayGetTableInPreUpdateMap( private Map> getTablesInConfigNode( final Map> tableInput) { Map> result = Collections.emptyMap(); + boolean acquired = false; try { fetchTableSemaphore.acquire(); + acquired = true; final TFetchTableResp resp = ClusterConfigTaskExecutor.getInstance() .fetchTables( @@ -388,11 +390,11 @@ private Map> getTablesInConfigNode( } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn(DataNodeSchemaMessages.INTERRUPTED_ACQUIRE_SEMAPHORE_GET_TABLES); - } catch (final Exception e) { - fetchTableSemaphore.release(); - throw e; + } finally { + if (acquired) { + fetchTableSemaphore.release(); + } } - fetchTableSemaphore.release(); return result; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java new file mode 100644 index 0000000000000..2d6114363a509 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.schemaengine.table; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.concurrent.Semaphore; + +public class DataNodeTableCacheTest { + + private static final String DATABASE = "interrupted_fetch_database"; + + @Test + public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception { + final DataNodeTableCache cache = DataNodeTableCache.getInstance(); + cache.invalid(DATABASE); + try { + final Semaphore fetchTableSemaphore = getFetchTableSemaphore(cache); + final int permitsBeforeFetch = fetchTableSemaphore.availablePermits(); + + Thread.currentThread().interrupt(); + try { + Assert.assertFalse(cache.isDatabaseExist(DATABASE)); + } finally { + Thread.interrupted(); + } + + Assert.assertEquals(permitsBeforeFetch, fetchTableSemaphore.availablePermits()); + } finally { + cache.invalid(DATABASE); + } + } + + private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache) throws Exception { + final Field field = DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore"); + field.setAccessible(true); + return (Semaphore) field.get(cache); + } +}