Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,10 @@ public void lock() {
LOGGER.debug(
ManagerMessages.PIPETASKCOORDINATOR_LOCK_WAITING_FOR_THREAD,
Thread.currentThread().getName());
try {
semaphore.acquire();
LOGGER.debug(
ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
Thread.currentThread().getName());
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD,
Thread.currentThread().getName());
}
semaphore.acquireUninterruptibly();
LOGGER.debug(
ManagerMessages.PIPETASKCOORDINATOR_LOCK_ACQUIRED_BY_THREAD,
Thread.currentThread().getName());
}

public boolean tryLock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -62,6 +63,7 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
private volatile long lastUpdate;

private final AtomicReference<byte[]> result = new AtomicReference<>();
private final AtomicBoolean executing = new AtomicBoolean(false);
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;

Expand Down Expand Up @@ -256,6 +258,14 @@ protected boolean isYieldAfterExecution(Env env) {
}

// -------------------------Internal methods - called by the procedureExecutor------------------
final boolean tryAcquireExecution() {
return executing.compareAndSet(false, true);
}

final void releaseExecution() {
executing.set(false);
}

/**
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
Expand All @@ -42,7 +41,6 @@
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -328,45 +326,44 @@
LOG.warn(ProcedureMessages.ROLLBACK_STACK_IS_NULL_FOR, proc.getProcId());
return;
}
ProcedureLockState lockState = null;
try {
do {
if (!rootProcStack.acquire()) {
if (rootProcStack.setRollback()) {
lockState = executeRootStackRollback(rootProcId, rootProcStack);
switch (lockState) {
ProcedureLockState lockState;
do {
if (!rootProcStack.acquire()) {
if (rootProcStack.setRollback()) {
lockState = executeRootStackRollback(rootProcId, rootProcStack);
switch (lockState) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
rootProcStack.unsetRollback();
break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
scheduler.yield(proc);
break;
default:
throw new UnsupportedOperationException();
}
} else {
if (!proc.wasExecuted()) {
switch (executeRollback(proc)) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_ROLLBACK, proc);
rootProcStack.unsetRollback();
LOG.info(ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc);
break;
case LOCK_YIELD_WAIT:
rootProcStack.unsetRollback();
scheduler.yield(proc);
break;
default:
throw new UnsupportedOperationException();
}
} else {
if (!proc.wasExecuted()) {
switch (executeRollback(proc)) {
case LOCK_ACQUIRED:
break;
case LOCK_EVENT_WAIT:
LOG.info(
ProcedureMessages.LOCK_EVENT_WAIT_CAN_T_ROLLBACK_CHILD_RUNNING_FOR, proc);
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
break;
default:
throw new UnsupportedOperationException();
}
}
}
break;
}
break;
}
try {
lockState = acquireLock(proc);
switch (lockState) {
case LOCK_ACQUIRED:
Expand All @@ -379,29 +376,23 @@
default:
throw new UnsupportedOperationException();
}
} finally {
rootProcStack.release();
}

if (proc.isSuccess()) {
// update metrics on finishing the procedure
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, proc.elapsedTime());
if (proc.getProcId() == rootProcId) {
rootProcedureCleanup(proc);
} else {
executeCompletionCleanup(proc);
}
return;
if (proc.isSuccess()) {
// update metrics on finishing the procedure
proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true);
LOG.debug(ProcedureMessages.FINISHED_IN_MS_SUCCESSFULLY, proc, proc.elapsedTime());
if (proc.getProcId() == rootProcId) {
rootProcedureCleanup(proc);
} else {
executeCompletionCleanup(proc);
}

} while (rootProcStack.isFailed());
} finally {
// Only after procedure has completed execution can it be allowed to be rescheduled to prevent
// data races
if (Objects.equals(lockState, ProcedureLockState.LOCK_EVENT_WAIT)) {
LOG.info(ProcedureMessages.PROCEDUREID_WAIT_FOR_LOCK, proc.getProcId());
((ConfigNodeProcedureEnv) this.environment).getNodeLock().waitProcedure(proc);
return;
}
}

} while (rootProcStack.isFailed());
}

/**
Expand All @@ -414,6 +405,7 @@
if (proc.getState() != ProcedureState.RUNNABLE) {
LOG.error(
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
releaseLock(proc, false);
return;
}
boolean reExecute;
Expand Down Expand Up @@ -781,12 +773,19 @@
public void run() {
long lastUpdated = System.currentTimeMillis();
try {
while (isRunning() && keepAlive(lastUpdated)) {

Check warning on line 776 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6xDlUhfFqTS2wWbRPc&open=AZ6xDlUhfFqTS2wWbRPc&pullRequest=17902
Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (procedure == null) {
Thread.sleep(1000);
continue;
}
boolean executionAcquired = false;
while (isRunning() && !(executionAcquired = procedure.tryAcquireExecution())) {
Thread.sleep(10);
}
if (!executionAcquired) {
continue;
}
this.activeProcedure.set(procedure);
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
Expand All @@ -795,13 +794,14 @@
executeProcedure(procedure);
} finally {
PROCEDURE_EXECUTION_CONTEXT.remove();
procedure.releaseExecution();
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}

} catch (Exception e) {
Expand All @@ -812,6 +812,7 @@
this.activeProcedure.get(),
e);
}
this.activeProcedure.set(null);
} finally {
LOG.info(ProcedureMessages.PROCEDURE_WORKER_TERMINATED, getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
}

public void add(Procedure<Env> procedure) {
queue.add(new ProcedureDelayContainer<>(procedure));
ProcedureDelayContainer<Env> delayTask = new ProcedureDelayContainer<>(procedure);
queue.remove(delayTask);

Check warning on line 47 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do something with the "boolean" value returned by "remove".

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6xQoIAsClnBOWNLWo4&open=AZ6xQoIAsClnBOWNLWo4&pullRequest=17902
queue.add(delayTask);
}

public boolean remove(Procedure<Env> procedure) {
return queue.remove(new ProcedureDelayContainer<>(procedure));
return queue.remove(new ProcedureDelayContainer<>(procedure)) || procedure.isFinished();
}

private ProcedureDelayContainer<Env> takeQuietly() {
Expand All @@ -60,18 +62,23 @@
}

@Override
public void run() {

Check failure on line 65 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6xQoIAsClnBOWNLWo5&open=AZ6xQoIAsClnBOWNLWo5&pullRequest=17902
while (executor.isRunning()) {

Check warning on line 66 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6xQoIAsClnBOWNLWo3&open=AZ6xQoIAsClnBOWNLWo3&pullRequest=17902
ProcedureDelayContainer<Env> delayTask = takeQuietly();
if (delayTask == null) {
continue;
}
Procedure<Env> procedure = delayTask.getProcedure();
if (procedure instanceof InternalProcedure) {
if (procedure.isFinished()) {
continue;
}
InternalProcedure internal = (InternalProcedure) procedure;
internal.periodicExecute(executor.getEnvironment());
procedure.updateTimestamp();
queue.add(delayTask);
if (!procedure.isFinished()) {
procedure.updateTimestamp();
queue.add(delayTask);
}
} else {
if (procedure.setTimeoutFailure(executor.getEnvironment())) {
long rootProcId = executor.getRootProcedureId(procedure);
Expand Down Expand Up @@ -104,6 +111,23 @@
return procedure;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ProcedureDelayContainer)) {
return false;
}
ProcedureDelayContainer<?> that = (ProcedureDelayContainer<?>) o;
return procedure == that.procedure;
}

@Override
public int hashCode() {
return System.identityHashCode(procedure);
}

@Override
public long getDelay(TimeUnit unit) {
long delay = procedure.getTimeoutTimestamp() - System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private void addNextStateAndCalculateCycles() {
nextState);
}
}
if (getStateId(getCurrentState()) == stateToBeAdded) {
final TState currentState = getCurrentState();
if (currentState != null && getStateId(currentState) == stateToBeAdded) {
cycles++;
} else {
cycles = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public boolean equals(Object o) {
}
CreateCQProcedure that = (CreateCQProcedure) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isGeneratedByPipe == that.isGeneratedByPipe
&& firstExecutionTime == that.firstExecutionTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ protected ProcedureLockState acquireLock(ConfigNodeProcedureEnv configNodeProced
LOG.info(
"procedureId {} acquire lock failed, will wait for lock after finishing execution.",
getProcId());
configNodeProcedureEnv.getNodeLock().waitProcedure(this);
return ProcedureLockState.LOCK_EVENT_WAIT;
} finally {
configNodeProcedureEnv.getSchedulerLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public boolean equals(Object that) {
if (that instanceof CreatePipePluginProcedure) {
CreatePipePluginProcedure thatProcedure = (CreatePipePluginProcedure) that;
return thatProcedure.getProcId() == getProcId()
&& thatProcedure.getCurrentState().equals(getCurrentState())
&& Objects.equals(thatProcedure.getCurrentState(), getCurrentState())
&& thatProcedure.getCycles() == getCycles()
&& thatProcedure.pipePluginMeta.equals(pipePluginMeta);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public boolean equals(Object that) {
if (that instanceof DropPipePluginProcedure) {
final DropPipePluginProcedure thatProcedure = (DropPipePluginProcedure) that;
return thatProcedure.getProcId() == getProcId()
&& thatProcedure.getCurrentState().equals(this.getCurrentState())
&& Objects.equals(thatProcedure.getCurrentState(), this.getCurrentState())
&& thatProcedure.getCycles() == this.getCycles()
&& (thatProcedure.pluginName).equals(pluginName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public boolean equals(Object o) {
}
PipeHandleLeaderChangeProcedure that = (PipeHandleLeaderChangeProcedure) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& this.regionGroupToOldAndNewLeaderPairMap.equals(
that.regionGroupToOldAndNewLeaderPairMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public boolean equals(Object o) {
}
PipeHandleMetaChangeProcedure that = (PipeHandleMetaChangeProcedure) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& needWriteConsensusOnConfigNodes == that.needWriteConsensusOnConfigNodes
&& needPushPipeMetaToDataNodes == that.needPushPipeMetaToDataNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public boolean equals(Object o) {
}
DropPipeProcedureV2 that = (DropPipeProcedureV2) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& pipeName.equals(that.pipeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public boolean equals(Object o) {
}
StartPipeProcedureV2 that = (StartPipeProcedureV2) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& pipeName.equals(that.pipeName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public boolean equals(Object o) {
}
StopPipeProcedureV2 that = (StopPipeProcedureV2) o;
return getProcId() == that.getProcId()
&& getCurrentState().equals(that.getCurrentState())
&& Objects.equals(getCurrentState(), that.getCurrentState())
&& getCycles() == that.getCycles()
&& isStoppedByRuntimeExceptionBeforeStop == that.isStoppedByRuntimeExceptionBeforeStop
&& pipeName.equals(that.pipeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public boolean equals(final Object o) {
}
final AlterEncodingCompressorProcedure that = (AlterEncodingCompressorProcedure) o;
return this.getProcId() == that.getProcId()
&& this.getCurrentState().equals(that.getCurrentState())
&& Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == getCycles()
&& Objects.equals(this.queryId, that.queryId)
&& this.isGeneratedByPipe == that.isGeneratedByPipe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ public boolean equals(final Object o) {
}
final AlterTimeSeriesDataTypeProcedure that = (AlterTimeSeriesDataTypeProcedure) o;
return this.getProcId() == that.getProcId()
&& this.getCurrentState().equals(that.getCurrentState())
&& Objects.equals(this.getCurrentState(), that.getCurrentState())
&& this.getCycles() == getCycles()
&& this.isGeneratedByPipe == that.isGeneratedByPipe
&& this.measurementPath.equals(that.measurementPath)
Expand Down
Loading
Loading