From 64c4f42c8ab5d112c6e599955a1f1d7779bdfdea Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 9 Jun 2026 17:09:13 +0800 Subject: [PATCH 1/2] Optimize pipe logging with shared PipePeriodicalLogReducer Move PipePeriodicalLogReducer to node-commons and route ConfigNode and DataNode pipe logs through PipeLogger to rate-limit repetitive messages. --- .../confignode/i18n/ManagerMessages.java | 2 + .../confignode/i18n/ManagerMessages.java | 2 + .../runtime/PipeConfigNodeRuntimeAgent.java | 17 ++-- .../agent/task/PipeConfigNodeSubtask.java | 20 ++-- .../coordinator/runtime/PipeMetaSyncer.java | 26 ++++-- .../heartbeat/PipeHeartbeatParser.java | 38 +++++--- .../heartbeat/PipeHeartbeatScheduler.java | 7 +- .../coordinator/task/PipeTaskCoordinator.java | 8 +- .../task/PipeTaskCoordinatorLock.java | 10 +- .../persistence/pipe/PipeTaskInfo.java | 9 +- .../PipeHandleLeaderChangeProcedure.java | 36 +++++-- .../PipeHandleMetaChangeProcedure.java | 33 +++++-- .../pipe/runtime/PipeMetaSyncProcedure.java | 6 +- .../apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +- .../runtime/PipeDataNodeRuntimeAgent.java | 14 ++- .../thrift/IoTDBDataNodeReceiver.java | 2 +- .../log/PipePeriodicalLogReducer.java | 93 ------------------- .../commons/pipe/resource/log/PipeLogger.java | 11 ++- .../log/PipePeriodicalLogReducer.java | 76 +++++++++++++++ 19 files changed, 248 insertions(+), 164 deletions(-) delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java create mode 100644 iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java index 5dbfe15157668..5ee5d570dcc17 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -67,6 +67,8 @@ public final class ManagerMessages { "Detected completion of pipe {}, static meta: {}, remove it."; public static final String DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE = "Detect PipeRuntimeCriticalException {} from agent, stop pipe {}."; + public static final String DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE = + "Detect PipeRuntimeSinkCriticalException {} from agent, stop pipe {}."; public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED = "Enable separation of powers is not supported"; public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS = diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java index e7321e5376c7c..f499f9ff15eec 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ManagerMessages.java @@ -67,6 +67,8 @@ public final class ManagerMessages { "Detected completion of pipe {}, static meta: {}, remove it."; public static final String DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE = "Detect PipeRuntimeCriticalException {} from agent, stop pipe {}."; + public static final String DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE = + "检测到 agent 上报 PipeRuntimeSinkCriticalException {},停止 pipe {}。"; public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED = "不支持启用权力分离"; public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS = "[EndExecuteCQ] {}, time range is [{}, {}), current time is {}"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java index ca98f4ec9e892..4726b50d899c3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.confignode.i18n.ManagerMessages; @@ -56,6 +58,7 @@ public class PipeConfigNodeRuntimeAgent implements IService { @Override public synchronized void start() { PipeConfig.getInstance().printAllConfigs(); + PipeLogger.setLogger(PipePeriodicalLogReducer::log); // PipeTasks will not be started here and will be started by "HandleLeaderChange" // procedure when the consensus layer notify leader ready @@ -142,19 +145,21 @@ public void report(final EnrichedEvent event, final PipeRuntimeException pipeRun if (event.getPipeTaskMeta() != null) { report(event.getPipeTaskMeta(), pipeRuntimeException); } else { - LOGGER.warn( - ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA, - pipeRuntimeException); + PipeLogger.log( + LOGGER::warn, + pipeRuntimeException, + ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA); } } private void report( final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, + pipeRuntimeException, ManagerMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE, pipeTaskMeta, - pipeRuntimeException.getMessage(), - pipeRuntimeException); + pipeRuntimeException.getMessage()); pipeTaskMeta.trackExceptionMessage(pipeRuntimeException); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java index fd1a849063062..59b8113235f33 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.confignode.manager.pipe.metric.sink.PipeConfigRegionSinkMetrics; @@ -105,10 +106,11 @@ private void initSource(final Map sourceAttributes) throws Excep try { source.close(); } catch (Exception closeException) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, + closeException, ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR - + "Ignore this exception.", - closeException); + + "Ignore this exception."); } throw e; } @@ -154,9 +156,11 @@ private void initSink(final Map sinkAttributes) throws Exception try { outputPipeSink.close(); } catch (final Exception closeException) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, + closeException, ManagerMessages.FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE, - closeException); + closeException.getMessage()); } throw e; } @@ -208,19 +212,19 @@ public void close() { try { source.close(); } catch (final Exception e) { - LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR, e); + PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR); } try { processor.close(); } catch (final Exception e) { - LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR, e); + PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR); } try { outputPipeSink.close(); } catch (final Exception e) { - LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR, e); + PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR); } finally { // Should be after connector.close() super.close(); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java index 941d6d6d06cec..0d348e16417e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.ProcedureManager; @@ -95,7 +96,8 @@ private synchronized void sync() { isLastPipeSyncSuccessful = false; if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2); return; } @@ -109,7 +111,9 @@ private synchronized void sync() { == PipeConfig.getInstance().getPipeMetaSyncerAutoRestartPipeCheckIntervalRound()) { somePipesNeedRestarting = autoRestartWithLock(); if (somePipesNeedRestarting) { - LOGGER.info(ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC); + PipeLogger.log( + LOGGER::info, + ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC); } pipeAutoRestartRoundCounter.set(0); } @@ -130,19 +134,22 @@ private synchronized void sync() { if (handleMetaChangeStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { successfulSync = true; } else { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS, handleMetaChangeStatus); } } if (successfulSync) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_PIPETASKINFO_IS_EMPTY_DURING_THIS); isLastPipeSyncSuccessful = true; } } else { - LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus); + PipeLogger.log( + LOGGER::warn, ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus); } } @@ -158,7 +165,8 @@ private boolean autoRestartWithLock() { final AtomicReference pipeTaskInfo = configManager.getPipeManager().getPipeTaskCoordinator().tryLock(); if (pipeTaskInfo == null) { - LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK); + PipeLogger.log( + LOGGER::warn, ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK); return false; } try { @@ -176,7 +184,7 @@ private void checkAndRepairConsensusPipes() { .getRegionMaintainHandler() .checkAndRepairConsensusPipes(); } catch (Exception e) { - LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES, e); + PipeLogger.log(LOGGER::warn, e, ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES); } } @@ -184,7 +192,9 @@ private boolean handleSuccessfulRestartWithLock() { final AtomicReference pipeTaskInfo = configManager.getPipeManager().getPipeTaskCoordinator().tryLock(); if (pipeTaskInfo == null) { - LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART); + PipeLogger.log( + LOGGER::warn, + ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART); return false; } try { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 597120ffe1cf5..99e776a7e9c8e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.apache.iotdb.confignode.manager.ConfigManager; @@ -93,7 +94,8 @@ synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat pipeHeart final AtomicReference pipeTaskInfo = configManager.getPipeManager().getPipeTaskCoordinator().tryLock(); if (pipeTaskInfo == null) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, ManagerMessages.FAILED_TO_ACQUIRE_LOCK_WHEN_PARSEHEARTBEAT_FROM_NODE_ID, nodeId); return; @@ -127,8 +129,10 @@ private int getExpectedHeartbeatNodeCount() { configManager.getNodeManager().getRegisteredDataNodeCount() + (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1 : 0); if (expectedNodeCount <= 0) { - LOGGER.warn( - ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1, expectedNodeCount); + PipeLogger.log( + LOGGER::warn, + ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1, + expectedNodeCount); return 1; } return expectedNodeCount; @@ -142,7 +146,8 @@ private void parseHeartbeatAndSaveMetaChangeLocally( final PipeStaticMeta staticMeta = pipeMetaFromCoordinator.getStaticMeta(); final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta); if (pipeMetaFromAgent == null) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, ManagerMessages.PIPERUNTIMECOORDINATOR_MEETS_ERROR_IN_UPDATING_PIPEMETAKEEPER + "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}", pipeMetaFromCoordinator); @@ -157,7 +162,8 @@ private void parseHeartbeatAndSaveMetaChangeLocally( if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) { temporaryMeta.markDataNodeCompleted(nodeId); - LOGGER.info( + PipeLogger.log( + LOGGER::info, "Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}", nodeId, staticMeta.getPipeName(), @@ -169,14 +175,16 @@ private void parseHeartbeatAndSaveMetaChangeLocally( configManager.getNodeManager().getRegisteredDataNodeLocations().keySet(); uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds()); if (uncompletedDataNodeIds.isEmpty()) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, "All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}", staticMeta.getPipeName(), temporaryMeta.getGlobalRemainingEvents(), temporaryMeta.getGlobalRemainingTime(), staticMeta); pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName()); - LOGGER.info( + PipeLogger.log( + LOGGER::info, ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT, staticMeta.getPipeName(), staticMeta); @@ -267,7 +275,9 @@ private void parseHeartbeatAndSaveMetaChangeLocally( needWriteConsensusOnConfigNodes.set(true); needPushPipeMetaToDataNodes.set(false); - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, + exception, ManagerMessages.DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE, exception, pipeName); @@ -296,11 +306,13 @@ private void parseHeartbeatAndSaveMetaChangeLocally( needWriteConsensusOnConfigNodes.set(true); needPushPipeMetaToDataNodes.set(false); - LOGGER.warn( - String.format( - "Detect PipeRuntimeConnectorCriticalException %s " - + "from agent, stop pipe %s.", - exception, pipeName)); + PipeLogger.log( + LOGGER::warn, + exception, + ManagerMessages + .DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE, + exception, + pipeName); }); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java index bbdaee8e12c3e..91d36f4e4416e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; @@ -85,7 +86,8 @@ private synchronized void heartbeat() { } if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF); return; } @@ -130,7 +132,8 @@ private synchronized void heartbeat() { configNodeResp.getPipeRemainingEventCountList(), configNodeResp.getPipeRemainingTimeList())); } catch (final Exception e) { - LOGGER.warn(ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK, e); + PipeLogger.log( + LOGGER::warn, e, ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java index 1d4d10006b625..490c3f0389237 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; @@ -215,7 +216,10 @@ public TShowPipeResp showPipes(final TShowPipeReq req) { .filter(req.whereClause, req.pipeName, req.isTableModel, req.userName) .convertToTShowPipeResp(); } catch (final ConsensusException e) { - LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + PipeLogger.log( + LOGGER::warn, + e, + ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE); final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); res.setMessage(e.getMessage()); return new PipeTableResp(res, Collections.emptyList()).convertToTShowPipeResp(); @@ -227,7 +231,7 @@ public TGetAllPipeInfoResp getAllPipeInfo() { return ((PipeTableResp) configManager.getConsensusManager().read(new ShowPipePlanV2())) .convertToTGetAllPipeInfoResp(); } catch (IOException | ConsensusException e) { - LOGGER.warn(ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO, e); + PipeLogger.log(LOGGER::warn, e, ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO); return new TGetAllPipeInfoResp( new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()), Collections.emptyList()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java index 0f788435394f2..15991cfc90eeb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java @@ -19,6 +19,7 @@ package org.apache.iotdb.confignode.manager.pipe.coordinator.task; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.i18n.ManagerMessages; import org.slf4j.Logger; @@ -53,7 +54,8 @@ public void lock() { Thread.currentThread().getName()); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.error( + PipeLogger.log( + LOGGER::error, ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, Thread.currentThread().getName()); } @@ -70,14 +72,16 @@ public boolean tryLock() { Thread.currentThread().getName()); return true; } else { - LOGGER.info( + PipeLogger.log( + LOGGER::info, ManagerMessages.PIPETASKCOORDINATOR_LOCK_FAILED_TO_ACQUIRE_BY_THREAD_BECAUSE_OF_TIMEOUT, Thread.currentThread().getName()); return false; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - LOGGER.error( + PipeLogger.log( + LOGGER::error, ManagerMessages.INTERRUPTED_WHILE_WAITING_FOR_PIPETASKCOORDINATOR_LOCK_CURRENT_THREAD, Thread.currentThread().getName()); return false; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index d411708035359..3c2e331b0a164 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -39,6 +39,7 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan; @@ -636,7 +637,8 @@ private TSStatus handleLeaderChangeInternal(final PipeHandleLeaderChangePlan pla // external source pipe tasks are not balanced here since non-leaders // don't know about RegionLeader Map and will be balanced in the meta // sync procedure - LOGGER.info( + PipeLogger.log( + LOGGER::info, ConfigNodeMessages.PIPE_IS_USING_EXTERNAL_SOURCE_SKIP_REGION, pipeMeta.getStaticMeta().getPipeName(), plan.getConsensusGroupId2NewLeaderIdMap()); @@ -905,7 +907,10 @@ private boolean autoRestartInternal() { }); if (needRestart.get()) { - LOGGER.info(ConfigNodeMessages.PIPEMETASYNCER_IS_TRYING_TO_RESTART_THE_PIPES, pipeToRestart); + PipeLogger.log( + LOGGER::info, + ConfigNodeMessages.PIPEMETASYNCER_IS_TRYING_TO_RESTART_THE_PIPES, + pipeToRestart); } return needRestart.get(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java index 2c5b9566fce61..936539233755c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.i18n.ProcedureMessages; @@ -70,7 +71,8 @@ protected PipeTaskOperation getOperation() { @Override public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMVALIDATETASK); + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMVALIDATETASK); // Nothing needs to be checked return true; @@ -78,14 +80,18 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { @Override public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK); // Nothing needs to be calculated } @Override public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONCONFIGNODES); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONCONFIGNODES); final Map newConsensusGroupIdToLeaderConsensusIdMap = new HashMap<>(); @@ -100,7 +106,10 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { try { response = env.getConfigManager().getConsensusManager().write(pipeHandleLeaderChangePlan); } catch (ConsensusException e) { - LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + PipeLogger.log( + LOGGER::warn, + e, + ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE); response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); response.setMessage(e.getMessage()); } @@ -111,35 +120,44 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { @Override public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES); pushPipeMetaToDataNodesIgnoreException(env); } @Override public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK); + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK); // Nothing to do } @Override public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK); // Nothing to do } @Override public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMHANDLEONCONFIGNODES); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMHANDLEONCONFIGNODES); // Nothing to do } @Override public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCREATEONDATANODES); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLELEADERCHANGEPROCEDURE_ROLLBACKFROMCREATEONDATANODES); // Nothing to do } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java index a0dec36735265..394f7e2fa9f91 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.i18n.ProcedureMessages; @@ -80,7 +81,8 @@ protected PipeTaskOperation getOperation() { @Override public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMVALIDATETASK); + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMVALIDATETASK); // Do nothing return true; @@ -88,14 +90,17 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { @Override public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMCALCULATEINFOFORTASK); // Do nothing } @Override public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMWRITECONFIGNODECONSENSUS); if (!needWriteConsensusOnConfigNodes) { @@ -114,7 +119,10 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { .getConsensusManager() .write(new PipeHandleMetaChangePlan(pipeMetaList)); } catch (ConsensusException e) { - LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + PipeLogger.log( + LOGGER::warn, + e, + ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE); response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); response.setMessage(e.getMessage()); } @@ -125,7 +133,8 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { @Override public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES); + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_EXECUTEFROMHANDLEONDATANODES); if (!needPushPipeMetaToDataNodes) { return; @@ -136,21 +145,25 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { @Override public void rollbackFromValidateTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK); + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMVALIDATETASK); // Do nothing } @Override public void rollbackFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMCALCULATEINFOFORTASK); // Do nothing } @Override public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { - LOGGER.info( + PipeLogger.log( + LOGGER::info, ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMWRITECONFIGNODECONSENSUS); // Do nothing @@ -158,7 +171,9 @@ public void rollbackFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { @Override public void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) { - LOGGER.info(ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMOPERATEONDATANODES); + PipeLogger.log( + LOGGER::info, + ProcedureMessages.PIPEHANDLEMETACHANGEPROCEDURE_ROLLBACKFROMOPERATEONDATANODES); // Do nothing } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java index 33b909eae274e..072555146c1ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.i18n.ProcedureMessages; @@ -187,7 +188,10 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) { .getConsensusManager() .write(new PipeHandleMetaChangePlan(pipeMetaList)); } catch (ConsensusException e) { - LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + PipeLogger.log( + LOGGER::warn, + e, + ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE); response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); response.setMessage(e.getMessage()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index df4e42a87d692..860ced9e46ef6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.memory.MemoryManager; import org.apache.iotdb.commons.pipe.config.PipeDescriptor; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.NodeUrlUtils; @@ -37,7 +38,6 @@ import org.apache.iotdb.consensus.config.IoTConsensusV2Config; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; -import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy; import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter; import org.apache.iotdb.db.storageengine.StorageEngine; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index 5e75199e38899..3cdf92d53925b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -35,6 +35,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.commons.utils.TestOnly; @@ -42,7 +43,6 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner; -import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; @@ -214,16 +214,20 @@ public void report(EnrichedEvent event, PipeRuntimeException pipeRuntimeExceptio if (event.getPipeTaskMeta() != null) { report(event.getPipeTaskMeta(), pipeRuntimeException); } else { - LOGGER.warn(DataNodePipeMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A, pipeRuntimeException); + PipeLogger.log( + LOGGER::warn, + pipeRuntimeException, + DataNodePipeMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A); } } public void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) { - LOGGER.warn( + PipeLogger.log( + LOGGER::warn, + pipeRuntimeException, DataNodePipeMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE, pipeTaskMeta, - pipeRuntimeException.getMessage(), - pipeRuntimeException); + pipeRuntimeException.getMessage()); // Quick stop all pipes locally if critical exception occurs, // no need to wait for the next heartbeat cycle. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index e6c8ddefc997e..f4cb3b281e993 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -36,6 +36,7 @@ import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; @@ -61,7 +62,6 @@ import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementDataTypeConvertExecutionVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; -import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java deleted file mode 100644 index 946450192c5f7..0000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.resource.log; - -import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.i18n.DataNodePipeMessages; -import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; -import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import org.apache.tsfile.utils.RamUsageEstimator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - -public class PipePeriodicalLogReducer { - private static final Logger LOGGER = LoggerFactory.getLogger(PipePeriodicalLogReducer.class); - private static final PipeMemoryBlock block; - protected static final Cache loggerCache; - - static { - // Never close because it's static - block = - PipeDataNodeResourceManager.memory() - .tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); - loggerCache = - Caffeine.newBuilder() - .expireAfterWrite( - PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), TimeUnit.SECONDS) - .weigher( - (k, v) -> - Math.toIntExact( - RamUsageEstimator.sizeOf((String) k) - + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY)) - .maximumWeight(block.getMemoryUsageInBytes()) - .build(); - } - - public static boolean log( - final Consumer loggerFunction, final String rawMessage, final Object... formatter) { - final String loggerMessage = String.format(rawMessage, formatter); - if (!loggerCache.asMap().containsKey(loggerMessage)) { - loggerCache.put(loggerMessage, loggerMessage); - loggerFunction.accept(loggerMessage); - return true; - } - return false; - } - - public static void update() { - loggerCache - .policy() - .expireAfterWrite() - .ifPresent( - time -> - time.setExpiresAfter( - PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), - TimeUnit.SECONDS)); - PipeDataNodeResourceManager.memory() - .resize(block, PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), false); - LOGGER.info( - DataNodePipeMessages.PIPEPERIODICALLOGREDUCER_IS_ALLOCATED_TO_BYTES, - block.getMemoryUsageInBytes()); - loggerCache - .policy() - .eviction() - .ifPresent(eviction -> eviction.setMaximum(block.getMemoryUsageInBytes())); - } - - private PipePeriodicalLogReducer() { - // static - } -} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java index 70b494da03270..95e2d3e28ebe7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipeLogger.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.pipe.resource.log; +import org.slf4j.helpers.MessageFormatter; + import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.function.Consumer; @@ -26,7 +28,7 @@ public class PipeLogger { private static PipePeriodicalLogger logger = (loggerFunction, rawMessage, formatter) -> - loggerFunction.accept(String.format(rawMessage, formatter)); + loggerFunction.accept(formatMessage(rawMessage, formatter)); public static void log( final Consumer loggerFunction, final String rawMessage, final Object... formatter) { @@ -51,6 +53,13 @@ private PipeLogger() { // static } + static String formatMessage(final String rawMessage, final Object... formatter) { + if (rawMessage.contains("{}")) { + return MessageFormatter.arrayFormat(rawMessage, formatter).getMessage(); + } + return String.format(rawMessage, formatter); + } + @FunctionalInterface public interface PipePeriodicalLogger { void log(final Consumer loggerFunction, final String rawMessage, final Object... args); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java new file mode 100644 index 0000000000000..30074f90ebf9b --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.resource.log; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class PipePeriodicalLogReducer { + + protected static final Cache LOGGER_CACHE = + Caffeine.newBuilder() + .expireAfterWrite( + PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), TimeUnit.SECONDS) + .weigher(PipePeriodicalLogReducer::estimateSize) + .maximumWeight(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()) + .build(); + + private static int estimateSize(final String key, final String value) { + return Math.toIntExact( + RamUsageEstimator.sizeOf(key) + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY); + } + + public static boolean log( + final Consumer loggerFunction, final String rawMessage, final Object... formatter) { + final String loggerMessage = PipeLogger.formatMessage(rawMessage, formatter); + if (!LOGGER_CACHE.asMap().containsKey(loggerMessage)) { + LOGGER_CACHE.put(loggerMessage, loggerMessage); + loggerFunction.accept(loggerMessage); + return true; + } + return false; + } + + public static void update() { + update(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); + } + + public static void update(final long maxWeight) { + LOGGER_CACHE + .policy() + .expireAfterWrite() + .ifPresent( + time -> + time.setExpiresAfter( + PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), + TimeUnit.SECONDS)); + LOGGER_CACHE.policy().eviction().ifPresent(eviction -> eviction.setMaximum(maxWeight)); + } + + private PipePeriodicalLogReducer() { + // static + } +} From fa7bfef21d7a1af55e14ab254c17bfad9e6ea24b Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 10 Jun 2026 09:59:08 +0800 Subject: [PATCH 2/2] Hot-reload PipePeriodicalLogReducer settings on ConfigNode Apply pipe log reducer cache updates during ConfigNode configuration reload so pipe_periodical_log_min_interval_seconds and pipe_logger_cache_max_size_in_bytes take effect without restart. Co-authored-by: Cursor --- .../iotdb/confignode/conf/ConfigNodeDescriptor.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index b6bf74edb31ba..bca9e0708accb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.conf.TrimProperties; import org.apache.iotdb.commons.exception.BadNodeUrlException; +import org.apache.iotdb.commons.pipe.config.PipeDescriptor; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; @@ -811,12 +813,18 @@ public boolean isSeedConfigNode() { } } - public void loadHotModifiedProps(TrimProperties properties) { + public void loadHotModifiedProps(TrimProperties properties) throws IOException { ConfigurationFileUtils.updateAppliedProperties(properties, true); Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME)) .ifPresent(conf::setClusterName); Optional.ofNullable(properties.getProperty("enable_topology_probing")) .ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v))); + loadPipeHotModifiedProp(properties); + } + + private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException { + PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, true); + PipePeriodicalLogReducer.update(); } public static ConfigNodeDescriptor getInstance() {