Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public final class ManagerMessages {
"Detected completion of pipe {}, static meta: {}, remove it.";
public static final String DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
public static final String DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeSinkCriticalException {} from agent, stop pipe {}.";
public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED =
"Enable separation of powers is not supported";
public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public final class ManagerMessages {
"Detected completion of pipe {}, static meta: {}, remove it.";
public static final String DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"Detect PipeRuntimeCriticalException {} from agent, stop pipe {}.";
public static final String DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE =
"检测到 agent 上报 PipeRuntimeSinkCriticalException {},停止 pipe {}。";
public static final String ENABLE_SEPARATION_OF_POWERS_IS_NOT_SUPPORTED = "不支持启用权力分离";
public static final String ENDEXECUTECQ_TIME_RANGE_IS_CURRENT_TIME_IS =
"[EndExecuteCQ] {}, time range is [{}, {}), current time is {}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.conf.TrimProperties;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.pipe.config.PipeDescriptor;
import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
Expand Down Expand Up @@ -811,12 +813,18 @@ public boolean isSeedConfigNode() {
}
}

public void loadHotModifiedProps(TrimProperties properties) {
public void loadHotModifiedProps(TrimProperties properties) throws IOException {
ConfigurationFileUtils.updateAppliedProperties(properties, true);
Optional.ofNullable(properties.getProperty(IoTDBConstant.CLUSTER_NAME))
.ifPresent(conf::setClusterName);
Optional.ofNullable(properties.getProperty("enable_topology_probing"))
.ifPresent(v -> conf.setEnableTopologyProbing(Boolean.parseBoolean(v)));
loadPipeHotModifiedProp(properties);
}

private void loadPipeHotModifiedProp(TrimProperties properties) throws IOException {
PipeDescriptor.loadPipeProps(commonDescriptor.getConfig(), properties, true);
PipePeriodicalLogReducer.update();
}

public static ConfigNodeDescriptor getInstance() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
Expand Down Expand Up @@ -56,6 +58,7 @@ public class PipeConfigNodeRuntimeAgent implements IService {
@Override
public synchronized void start() {
PipeConfig.getInstance().printAllConfigs();
PipeLogger.setLogger(PipePeriodicalLogReducer::log);

// PipeTasks will not be started here and will be started by "HandleLeaderChange"
// procedure when the consensus layer notify leader ready
Expand Down Expand Up @@ -142,19 +145,21 @@ public void report(final EnrichedEvent event, final PipeRuntimeException pipeRun
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
LOGGER.warn(
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA,
pipeRuntimeException);
PipeLogger.log(
LOGGER::warn,
pipeRuntimeException,
ManagerMessages.ATTEMPT_TO_REPORT_PIPE_EXCEPTION_TO_A_NULL_PIPETASKMETA);
}
}

private void report(
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
pipeRuntimeException,
ManagerMessages.REPORT_PIPERUNTIMEEXCEPTION_TO_LOCAL_PIPETASKMETA_EXCEPTION_MESSAGE,
pipeTaskMeta,
pipeRuntimeException.getMessage(),
pipeRuntimeException);
pipeRuntimeException.getMessage());

pipeTaskMeta.trackExceptionMessage(pipeRuntimeException);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSourceRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.sink.PipeConfigRegionSinkMetrics;
Expand Down Expand Up @@ -105,10 +106,11 @@ private void initSource(final Map<String, String> sourceAttributes) throws Excep
try {
source.close();
} catch (Exception closeException) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
closeException,
ManagerMessages.FAILED_TO_CLOSE_EXTRACTOR_AFTER_FAILED_TO_INITIALIZE_EXTRACTOR
+ "Ignore this exception.",
closeException);
+ "Ignore this exception.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this missed in i18n

}
throw e;
}
Expand Down Expand Up @@ -154,9 +156,11 @@ private void initSink(final Map<String, String> sinkAttributes) throws Exception
try {
outputPipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
closeException,
ManagerMessages.FAILED_TO_CLOSE_SINK_AFTER_FAILED_TO_INITIALIZE_IT_IGNORE,
closeException);
closeException.getMessage());
}
throw e;
}
Expand Down Expand Up @@ -208,19 +212,19 @@ public void close() {
try {
source.close();
} catch (final Exception e) {
LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR, e);
PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEEXTRACTOR);
}

try {
processor.close();
} catch (final Exception e) {
LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR, e);
PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPEPROCESSOR);
}

try {
outputPipeSink.close();
} catch (final Exception e) {
LOGGER.info(ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR, e);
PipeLogger.log(LOGGER::info, e, ManagerMessages.ERROR_OCCURRED_DURING_CLOSING_PIPECONNECTOR);
} finally {
// Should be after connector.close()
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
Expand Down Expand Up @@ -95,7 +96,8 @@ private synchronized void sync() {
isLastPipeSyncSuccessful = false;

if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF_2);
return;
}
Expand All @@ -109,7 +111,9 @@ private synchronized void sync() {
== PipeConfig.getInstance().getPipeMetaSyncerAutoRestartPipeCheckIntervalRound()) {
somePipesNeedRestarting = autoRestartWithLock();
if (somePipesNeedRestarting) {
LOGGER.info(ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
PipeLogger.log(
LOGGER::info,
ManagerMessages.SOME_PIPES_NEED_RESTARTING_WILL_RESTART_THEM_AFTER_THIS_SYNC);
}
pipeAutoRestartRoundCounter.set(0);
}
Expand All @@ -130,19 +134,22 @@ private synchronized void sync() {
if (handleMetaChangeStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
successfulSync = true;
} else {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.FAILED_TO_HANDLE_PIPE_META_CHANGE_RESULT_STATUS,
handleMetaChangeStatus);
}
}

if (successfulSync) {
LOGGER.info(
PipeLogger.log(
LOGGER::info,
ManagerMessages.AFTER_THIS_SUCCESSFUL_SYNC_IF_PIPETASKINFO_IS_EMPTY_DURING_THIS);
isLastPipeSyncSuccessful = true;
}
} else {
LOGGER.warn(ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
PipeLogger.log(
LOGGER::warn, ManagerMessages.FAILED_TO_SYNC_PIPE_META_RESULT_STATUS, metaSyncStatus);
}
}

Expand All @@ -158,7 +165,8 @@ private boolean autoRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
PipeLogger.log(
LOGGER::warn, ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_AUTO_RESTART_PIPE_TASK);
return false;
}
try {
Expand All @@ -176,15 +184,17 @@ private void checkAndRepairConsensusPipes() {
.getRegionMaintainHandler()
.checkAndRepairConsensusPipes();
} catch (Exception e) {
LOGGER.warn(ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES, e);
PipeLogger.log(LOGGER::warn, e, ManagerMessages.FAILED_TO_CHECK_AND_REPAIR_CONSENSUS_PIPES);
}
}

private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
PipeLogger.log(
LOGGER::warn,
ManagerMessages.FAILED_TO_ACQUIRE_PIPE_LOCK_FOR_HANDLING_SUCCESSFUL_RESTART);
return false;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.i18n.ManagerMessages;
import org.apache.iotdb.confignode.manager.ConfigManager;
Expand Down Expand Up @@ -93,7 +94,8 @@ synchronized void parseHeartbeat(final int nodeId, final PipeHeartbeat pipeHeart
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.FAILED_TO_ACQUIRE_LOCK_WHEN_PARSEHEARTBEAT_FROM_NODE_ID,
nodeId);
return;
Expand Down Expand Up @@ -127,8 +129,10 @@ private int getExpectedHeartbeatNodeCount() {
configManager.getNodeManager().getRegisteredDataNodeCount()
+ (PipeConfig.getInstance().isSeperatedPipeHeartbeatEnabled() ? 1 : 0);
if (expectedNodeCount <= 0) {
LOGGER.warn(
ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1, expectedNodeCount);
PipeLogger.log(
LOGGER::warn,
ManagerMessages.EXPECTED_PIPE_HEARTBEAT_NODE_COUNT_IS_FALLBACK_TO_1,
expectedNodeCount);
return 1;
}
return expectedNodeCount;
Expand All @@ -142,7 +146,8 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
final PipeStaticMeta staticMeta = pipeMetaFromCoordinator.getStaticMeta();
final PipeMeta pipeMetaFromAgent = pipeHeartbeat.getPipeMeta(staticMeta);
if (pipeMetaFromAgent == null) {
LOGGER.info(
PipeLogger.log(
LOGGER::info,
ManagerMessages.PIPERUNTIMECOORDINATOR_MEETS_ERROR_IN_UPDATING_PIPEMETAKEEPER
+ "pipeMetaFromAgent is null, pipeMetaFromCoordinator: {}",
pipeMetaFromCoordinator);
Expand All @@ -157,7 +162,8 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
if (Boolean.TRUE.equals(isPipeCompletedFromAgent)) {

temporaryMeta.markDataNodeCompleted(nodeId);
LOGGER.info(
PipeLogger.log(
LOGGER::info,
"Detected historical pipe completion report from DataNode {} for pipe {}. remainingEventCount: {}, remainingTime: {}, completedDataNodes: {}",
nodeId,
staticMeta.getPipeName(),
Expand All @@ -169,14 +175,16 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
uncompletedDataNodeIds.removeAll(temporaryMeta.getCompletedDataNodeIds());
if (uncompletedDataNodeIds.isEmpty()) {
LOGGER.info(
PipeLogger.log(
LOGGER::info,
"All DataNodes reported historical pipe {} completed. globalRemainingEventCount: {}, globalRemainingTime: {}, staticMeta: {}",
staticMeta.getPipeName(),
temporaryMeta.getGlobalRemainingEvents(),
temporaryMeta.getGlobalRemainingTime(),
staticMeta);
pipeTaskInfo.get().removePipeMeta(staticMeta.getPipeName());
LOGGER.info(
PipeLogger.log(
LOGGER::info,
ManagerMessages.DETECTED_COMPLETION_OF_PIPE_STATIC_META_REMOVE_IT,
staticMeta.getPipeName(),
staticMeta);
Expand Down Expand Up @@ -267,7 +275,9 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
exception,
ManagerMessages.DETECT_PIPERUNTIMECRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
exception,
pipeName);
Expand Down Expand Up @@ -296,11 +306,13 @@ private void parseHeartbeatAndSaveMetaChangeLocally(
needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);

LOGGER.warn(
String.format(
"Detect PipeRuntimeConnectorCriticalException %s "
+ "from agent, stop pipe %s.",
exception, pipeName));
PipeLogger.log(
LOGGER::warn,
exception,
ManagerMessages
.DETECT_PIPERUNTIMESINKCRITICALEXCEPTION_FROM_AGENT_STOP_PIPE,
exception,
pipeName);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
Expand Down Expand Up @@ -85,7 +86,8 @@ private synchronized void heartbeat() {
}

if (configManager.getPipeManager().getPipeTaskCoordinator().isLocked()) {
LOGGER.warn(
PipeLogger.log(
LOGGER::warn,
ManagerMessages.PIPETASKCOORDINATORLOCK_IS_HELD_BY_ANOTHER_THREAD_SKIP_THIS_ROUND_OF);
return;
}
Expand Down Expand Up @@ -130,7 +132,8 @@ private synchronized void heartbeat() {
configNodeResp.getPipeRemainingEventCountList(),
configNodeResp.getPipeRemainingTimeList()));
} catch (final Exception e) {
LOGGER.warn(ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK, e);
PipeLogger.log(
LOGGER::warn, e, ManagerMessages.FAILED_TO_COLLECT_PIPE_META_LIST_FROM_CONFIG_NODE_TASK);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
Expand Down Expand Up @@ -215,7 +216,10 @@ public TShowPipeResp showPipes(final TShowPipeReq req) {
.filter(req.whereClause, req.pipeName, req.isTableModel, req.userName)
.convertToTShowPipeResp();
} catch (final ConsensusException e) {
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e);
PipeLogger.log(
LOGGER::warn,
e,
ConfigNodeMessages.FAILED_IN_THE_READ_API_EXECUTING_THE_CONSENSUS_LAYER_DUE);
final TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return new PipeTableResp(res, Collections.emptyList()).convertToTShowPipeResp();
Expand All @@ -227,7 +231,7 @@ public TGetAllPipeInfoResp getAllPipeInfo() {
return ((PipeTableResp) configManager.getConsensusManager().read(new ShowPipePlanV2()))
.convertToTGetAllPipeInfoResp();
} catch (IOException | ConsensusException e) {
LOGGER.warn(ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO, e);
PipeLogger.log(LOGGER::warn, e, ManagerMessages.FAILED_TO_GET_ALL_PIPE_INFO);
return new TGetAllPipeInfoResp(
new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage()),
Collections.emptyList());
Expand Down
Loading
Loading