Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.request.IConsensusRequest;
import org.apache.iotdb.commons.utils.LogThrottler;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
Expand Down Expand Up @@ -81,6 +82,8 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev
/** Variables for {@link ConfigNode} Simple Consensus. */
private LogWriter simpleLogWriter;

private final LogThrottler simpleConsensusWalFlushFailureLogThrottler = new LogThrottler();

private File simpleLogFile;
private int startIndex;
private int endIndex;
Expand Down Expand Up @@ -470,9 +473,14 @@ private void flushWALForSimpleConsensus() {
if (simpleLogWriter != null) {
try {
simpleLogWriter.force();
simpleConsensusWalFlushFailureLogThrottler.reset();
} catch (IOException e) {
LOGGER.error(
ConfigNodeMessages.CAN_T_FORCE_LOGWRITER_FOR_CONFIGNODE_FLUSHWALFORSIMPLECONSENSUS, e);
if (simpleConsensusWalFlushFailureLogThrottler.shouldLog(
LogThrottler.getFailureSignature(e))) {
LOGGER.error(
ConfigNodeMessages.CAN_T_FORCE_LOGWRITER_FOR_CONFIGNODE_FLUSHWALFORSIMPLECONSENSUS,
e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.LogThrottler;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
Expand Down Expand Up @@ -52,6 +53,7 @@
PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes();

private final ConfigManager configManager;
private final LogThrottler syncFailureLogThrottler = new LogThrottler();

private Future<?> metaSyncFuture;

Expand Down Expand Up @@ -95,10 +97,13 @@
isLastPipeSyncSuccessful = false;

if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
LOGGER.warn(
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2);
if (syncFailureLogThrottler.shouldLog("coordinator-lock-held", "locked")) {

Check failure on line 100 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "locked" 3 times.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6w7wlKchQzNLXleyJm&open=AZ6w7wlKchQzNLXleyJm&pullRequest=17889
LOGGER.warn(
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2);
}
return;
}
syncFailureLogThrottler.reset("coordinator-lock-held");

final ProcedureManager procedureManager = configManager.getProcedureManager();

Expand Down Expand Up @@ -129,27 +134,38 @@
procedureManager.pipeHandleMetaChangeWithBlock(true, false);
if (handleMetaChangeStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulSync = true;
syncFailureLogThrottler.reset("handle-pipe-meta-change");
} else {
LOGGER.warn(
ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS,
handleMetaChangeStatus);
if (syncFailureLogThrottler.shouldLog(
"handle-pipe-meta-change", getStatusFailureSignature(handleMetaChangeStatus))) {
LOGGER.warn(
ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS,
handleMetaChangeStatus);
}
}
}

if (successfulSync) {
syncFailureLogThrottler.reset();
LOGGER.info(
ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_PIPETASKINFO_IS_EMPTY_DURING_THIS);
isLastPipeSyncSuccessful = true;
}
} else {
LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
if (syncFailureLogThrottler.shouldLog(
"pipe-meta-sync", getStatusFailureSignature(metaSyncStatus))) {
LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
}
return;
}
syncFailureLogThrottler.reset("pipe-meta-sync");
}

public synchronized void stop() {
if (metaSyncFuture != null) {
metaSyncFuture.cancel(false);
metaSyncFuture = null;
syncFailureLogThrottler.reset();
LOGGER.info(ManagerMessages.PIPEMETASYNCER_IS_STOPPED_SUCCESSFULLY);
}
}
Expand All @@ -158,9 +174,12 @@
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
if (syncFailureLogThrottler.shouldLog("auto-restart-lock", "locked")) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
}
return false;
}
syncFailureLogThrottler.reset("auto-restart-lock");
try {
return pipeTaskInfo.get().autoRestart();
} finally {
Expand All @@ -175,23 +194,34 @@
.getEnv()
.getRegionMaintainHandler()
.checkAndRepairConsensusPipes();
syncFailureLogThrottler.reset("check-and-repair-consensus-pipes");
} catch (Exception e) {
LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES, e);
if (syncFailureLogThrottler.shouldLog(
"check-and-repair-consensus-pipes", LogThrottler.getFailureSignature(e))) {
LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES, e);
}
}
}

private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
if (syncFailureLogThrottler.shouldLog("handle-successful-restart-lock", "locked")) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
}
return false;
}
syncFailureLogThrottler.reset("handle-successful-restart-lock");
try {
pipeTaskInfo.get().handleSuccessfulRestart();
return true;
} finally {
configManager.getPipeManager().getPipeTaskCoordinator().unlock();
}
}

private static String getStatusFailureSignature(TSStatus status) {
return status.getCode() + ":" + status.getMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.LogThrottler;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class PipeHeartbeatScheduler {

private final ConfigManager configManager;
private final PipeHeartbeatParser pipeHeartbeatParser;
private final LogThrottler heartbeatFailureLogThrottler = new LogThrottler();

private Future<?> heartbeatFuture;

Expand All @@ -81,14 +83,18 @@ public synchronized void start() {

private synchronized void heartbeat() {
if (!configManager.getPipeManager().getPipeTaskCoordinator().hasAnyPipe()) {
heartbeatFailureLogThrottler.reset();
return;
}

if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
LOGGER.warn(
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
if (heartbeatFailureLogThrottler.shouldLog("coordinator-lock-held", "locked")) {
LOGGER.warn(
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
}
return;
}
heartbeatFailureLogThrottler.reset("coordinator-lock-held");

// Data node heartbeat
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
Expand Down Expand Up @@ -129,15 +135,20 @@ private synchronized void heartbeat() {
null,
configNodeResp.getPipeRemainingEventCountList(),
configNodeResp.getPipeRemainingTimeList()));
heartbeatFailureLogThrottler.reset("config-node-pipe-meta-collect");
} catch (final Exception e) {
LOGGER.warn(ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK, e);
if (heartbeatFailureLogThrottler.shouldLog(
"config-node-pipe-meta-collect", LogThrottler.getFailureSignature(e))) {
LOGGER.warn(ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK, e);
}
}
}

public synchronized void stop() {
if (IS_SEPERATED_PIPE_HEARTBEAT_ENABLED && heartbeatFuture != null) {
heartbeatFuture.cancel(false);
heartbeatFuture = null;
heartbeatFailureLogThrottler.reset();
LOGGER.info(ManagerMessages.PIPEHEARTBEAT_IS_STOPPED_SUCCESSFULLY);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.utils.LogThrottler;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
Expand All @@ -49,6 +50,7 @@ public class SubscriptionMetaSyncer {
SubscriptionConfig.getInstance().getSubscriptionMetaSyncerSyncIntervalMinutes();

private final ConfigManager configManager;
private final LogThrottler syncFailureLogThrottler = new LogThrottler();

private Future<?> metaSyncFuture;

Expand Down Expand Up @@ -84,37 +86,53 @@ private synchronized void sync() {
isLastSubscriptionSyncSuccessful = false;

if (configManager.getSubscriptionManager().getSubscriptionCoordinator().isLocked()) {
LOGGER.warn(
ManagerMessages.SUBSCRIPTIONCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
if (syncFailureLogThrottler.shouldLog("coordinator-lock-held", "locked")) {
LOGGER.warn(
ManagerMessages
.SUBSCRIPTIONCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
}
return;
}
syncFailureLogThrottler.reset("coordinator-lock-held");

final ProcedureManager procedureManager = configManager.getProcedureManager();

// sync topic meta firstly
// TODO: consider drop the topic which is subscribed by consumers
final TSStatus topicMetaSyncStatus = procedureManager.topicMetaSync();
if (topicMetaSyncStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_TOPIC_META_RESULT_STATUS, topicMetaSyncStatus);
if (syncFailureLogThrottler.shouldLog(
"topic-meta-sync", getStatusFailureSignature(topicMetaSyncStatus))) {
LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_TOPIC_META_RESULT_STATUS, topicMetaSyncStatus);
}
return;
}
syncFailureLogThrottler.reset("topic-meta-sync");

// sync consumer meta if syncing topic meta successfully
final TSStatus consumerGroupMetaSyncStatus = procedureManager.consumerGroupMetaSync();
if (consumerGroupMetaSyncStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn(
ManagerMessages.FAILED_TO_SYNC_CONSUMER_GROUP_META_RESULT_STATUS,
consumerGroupMetaSyncStatus);
if (syncFailureLogThrottler.shouldLog(
"consumer-group-meta-sync", getStatusFailureSignature(consumerGroupMetaSyncStatus))) {
LOGGER.warn(
ManagerMessages.FAILED_TO_SYNC_CONSUMER_GROUP_META_RESULT_STATUS,
consumerGroupMetaSyncStatus);
}
return;
}
syncFailureLogThrottler.reset("consumer-group-meta-sync");

// sync commit progress if syncing consumer group meta successfully
final TSStatus commitProgressSyncStatus = procedureManager.commitProgressSync();
if (commitProgressSyncStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to sync commit progress. Result status: {}.", commitProgressSyncStatus);
if (syncFailureLogThrottler.shouldLog(
"commit-progress-sync", getStatusFailureSignature(commitProgressSyncStatus))) {
LOGGER.warn("Failed to sync commit progress. Result status: {}.", commitProgressSyncStatus);
}
return;
}

syncFailureLogThrottler.reset();
LOGGER.info(
ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_SUBSCRIPTIONINFO_IS_EMPTY_DURING_THIS);
isLastSubscriptionSyncSuccessful = true;
Expand All @@ -124,7 +142,12 @@ public synchronized void stop() {
if (metaSyncFuture != null) {
metaSyncFuture.cancel(false);
metaSyncFuture = null;
syncFailureLogThrottler.reset();
LOGGER.info(ManagerMessages.SUBSCRIPTIONMETASYNCER_IS_STOPPED_SUCCESSFULLY);
}
}

private static String getStatusFailureSignature(TSStatus status) {
return status.getCode() + ":" + status.getMessage();
}
}
8 changes: 8 additions & 0 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@
<ignoredDependency>org.apache.iotdb:isession</ignoredDependency>
<ignoredDependency>at.yawk.lz4:lz4-java</ignoredDependency>
</ignoredDependencies>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>ch.qos.logback:logback-classic</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>ch.qos.logback:logback-core</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
<ignoredNonTestScopedDependencies>
<ignoredNonTestScopedDependency>ch.qos.logback:logback-classic</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>ch.qos.logback:logback-core</ignoredNonTestScopedDependency>
</ignoredNonTestScopedDependencies>
</configuration>
</plugin>
<plugin>
Expand Down
Loading
Loading