From 9f949802147291c469b7f350166deba867834bfc Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Mon, 8 Jun 2026 14:27:38 +0800 Subject: [PATCH 1/6] fix: optimize function `SystemPropertiesUtils.isRestarted()` --- .../conf/ConfigNodeStartupCheck.java | 14 ++- .../conf/SystemPropertiesUtils.java | 112 ++++++++++++++++-- 2 files changed, 117 insertions(+), 9 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java index 111f32bcdd374..179ae5bdfe9a7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java @@ -77,11 +77,21 @@ public void startUpCheck() throws StartupException, IOException, ConfigurationEx checkGlobalConfig(); createDirsIfNecessary(); checkRequestManager(); - if (SystemPropertiesUtils.isRestarted()) { - /* Always restore ConfigNodeId first */ + SystemPropertiesUtils.StartupState startupState = SystemPropertiesUtils.getStartupState(); + if (startupState == SystemPropertiesUtils.StartupState.RESTART) { + // Always restore ConfigNodeId first. CONF.setConfigNodeId(SystemPropertiesUtils.loadConfigNodeIdWhenRestarted()); SystemPropertiesUtils.checkSystemProperties(); + } else if (startupState != SystemPropertiesUtils.StartupState.FIRST_START) { + throw new StartupException( + "The local ConfigNode data is in " + + startupState + + " state. Please restore the missing local files or backup and clean " + + CONF.getSystemDir() + + " and " + + CONF.getConsensusDir() + + " before starting this ConfigNode."); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 2f2cea4535f57..3e5812268c101 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -61,6 +61,22 @@ public class SystemPropertiesUtils { private static final String SERIES_PARTITION_EXECUTOR_CLASS = "series_partition_executor_class"; private static final String TIME_PARTITION_ORIGIN = "time_partition_origin"; private static final String TIME_PARTITION_INTERVAL = "time_partition_interval"; + private static final String CONFIG_NODE_ID = "config_node_id"; + private static final String IS_SEED_CONFIG_NODE = "is_seed_config_node"; + + public enum StartupState { + FIRST_START, + RESTART, + PARTIAL_START_CONSENSUS_ONLY, + PARTIAL_START_SYSTEM_ONLY, + CORRUPTED_OR_INCONSISTENT + } + + private enum LocalStorageState { + EMPTY, + PRESENT, + UNREADABLE + } private SystemPropertiesUtils() { throw new IllegalStateException(ConfigNodeMessages.UTILITY_CLASS_SYSTEMPROPERTIESUTILS); @@ -76,10 +92,93 @@ public static void reinitializeStatics() { /** * Check if the ConfigNode is restarted. * - * @return True if confignode-system.properties file exist. + * @return True if both confignode-system.properties and local consensus state exist. */ public static boolean isRestarted() { - return !systemPropertiesHandler.isFirstStart(); + return getStartupState() == StartupState.RESTART; + } + + public static StartupState getStartupState() { + boolean hasSystemProperties = systemPropertiesHandler.fileExist(); + LocalStorageState consensusState = getConsensusStorageState(new File(conf.getConsensusDir())); + + if (consensusState == LocalStorageState.UNREADABLE) { + return StartupState.CORRUPTED_OR_INCONSISTENT; + } + + boolean hasConsensusState = consensusState == LocalStorageState.PRESENT; + if (!hasSystemProperties && !hasConsensusState) { + return StartupState.FIRST_START; + } + if (!hasSystemProperties) { + return StartupState.PARTIAL_START_CONSENSUS_ONLY; + } + if (!hasRequiredRestartSystemProperties()) { + return StartupState.CORRUPTED_OR_INCONSISTENT; + } + return hasConsensusState ? StartupState.RESTART : StartupState.PARTIAL_START_SYSTEM_ONLY; + } + + private static LocalStorageState getConsensusStorageState(File file) { + if (!file.exists()) { + return LocalStorageState.EMPTY; + } + if (file.isFile() || !file.isDirectory()) { + return LocalStorageState.PRESENT; + } + + File[] children = file.listFiles(); + if (children == null) { + LOGGER.warn("Cannot list ConfigNode consensus directory: {}", file.getAbsolutePath()); + return LocalStorageState.UNREADABLE; + } + + for (File child : children) { + LocalStorageState childState = getConsensusStorageState(child); + if (childState != LocalStorageState.EMPTY) { + return childState; + } + } + return LocalStorageState.EMPTY; + } + + private static boolean hasRequiredRestartSystemProperties() { + try { + Properties systemProperties = systemPropertiesHandler.read(); + return isNonNegativeInteger(systemProperties.getProperty(CONFIG_NODE_ID, null)) + && isBoolean(systemProperties.getProperty(IS_SEED_CONFIG_NODE, null)) + && isNotEmpty(systemProperties.getProperty(CN_INTERNAL_ADDRESS, null)) + && isValidPort(systemProperties.getProperty(CN_INTERNAL_PORT, null)) + && isValidPort(systemProperties.getProperty(CN_CONSENSUS_PORT, null)); + } catch (IOException | IllegalArgumentException e) { + LOGGER.warn("Cannot load ConfigNode system properties for restart state check.", e); + return false; + } + } + + private static boolean isNotEmpty(String value) { + return value != null && !value.trim().isEmpty(); + } + + private static boolean isBoolean(String value) { + return "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value); + } + + private static boolean isNonNegativeInteger(String value) { + try { + return isNotEmpty(value) && Integer.parseInt(value) >= 0; + } catch (NumberFormatException e) { + return false; + } + } + + private static boolean isValidPort(String value) { + try { + int port = Integer.parseInt(value); + return port > 0 && port <= 65535; + } catch (NumberFormatException e) { + return false; + } } /** @@ -272,11 +371,10 @@ public static void storeSystemParameters() throws IOException { systemProperties.setProperty("commit_id", IoTDBConstant.BUILD_INFO); // Cluster configuration - systemProperties.setProperty("config_node_id", String.valueOf(conf.getConfigNodeId())); + systemProperties.setProperty(CONFIG_NODE_ID, String.valueOf(conf.getConfigNodeId())); LOGGER.info(ConfigNodeMessages.SYSTEMPROPERTIES_STORE_CONFIG_NODE_ID, conf.getConfigNodeId()); systemProperties.setProperty( - "is_seed_config_node", - String.valueOf(ConfigNodeDescriptor.getInstance().isSeedConfigNode())); + IS_SEED_CONFIG_NODE, String.valueOf(ConfigNodeDescriptor.getInstance().isSeedConfigNode())); LOGGER.info( ConfigNodeMessages.SYSTEMPROPERTIES_STORE_IS_SEED_CONFIG_NODE, ConfigNodeDescriptor.getInstance().isSeedConfigNode()); @@ -345,7 +443,7 @@ public static void storeConfigNodeList(List configNodes) th public static int loadConfigNodeIdWhenRestarted() throws IOException { Properties systemProperties = systemPropertiesHandler.read(); try { - return Integer.parseInt(systemProperties.getProperty("config_node_id", null)); + return Integer.parseInt(systemProperties.getProperty(CONFIG_NODE_ID, null)); } catch (NumberFormatException e) { throw new IOException( ConfigNodeMessages.THE_PARAMETER_CONFIG_NODE_ID_DOESN_T_EXIST_IN @@ -366,7 +464,7 @@ public static boolean isSeedConfigNode() { try { Properties systemProperties = systemPropertiesHandler.read(); boolean isSeedConfigNode = - Boolean.parseBoolean(systemProperties.getProperty("is_seed_config_node", null)); + Boolean.parseBoolean(systemProperties.getProperty(IS_SEED_CONFIG_NODE, null)); if (isSeedConfigNode) { return true; } else { From 69482c369e88dcc1f9636e7f1dd462a33e9bd4e4 Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Mon, 8 Jun 2026 14:30:08 +0800 Subject: [PATCH 2/6] fix: add test for `SystemPropertiesUtils.isRestarted()` --- .../conf/SystemPropertiesUtilsTest.java | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtilsTest.java diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtilsTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtilsTest.java new file mode 100644 index 0000000000000..3560487b30f04 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtilsTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.conf; + +import org.apache.iotdb.commons.utils.FileUtils; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Properties; + +public class SystemPropertiesUtilsTest { + + private final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); + + private String originalSystemDir; + private String originalConsensusDir; + private File testDir; + + @Before + public void setUp() throws IOException { + originalSystemDir = conf.getSystemDir(); + originalConsensusDir = conf.getConsensusDir(); + testDir = Files.createTempDirectory("SystemPropertiesUtilsTest").toFile(); + + conf.setSystemDir(new File(testDir, "system").getAbsolutePath()); + conf.setConsensusDir(new File(testDir, "consensus").getAbsolutePath()); + SystemPropertiesUtils.reinitializeStatics(); + } + + @After + public void tearDown() { + conf.setSystemDir(originalSystemDir); + conf.setConsensusDir(originalConsensusDir); + SystemPropertiesUtils.reinitializeStatics(); + FileUtils.deleteFileOrDirectory(testDir, true); + } + + @Test + public void testFirstStartState() { + Assert.assertEquals( + SystemPropertiesUtils.StartupState.FIRST_START, SystemPropertiesUtils.getStartupState()); + Assert.assertFalse(SystemPropertiesUtils.isRestarted()); + } + + @Test + public void testEmptyDirsAreStillFirstStart() { + Assert.assertTrue(new File(conf.getSystemDir()).mkdirs()); + Assert.assertTrue(new File(conf.getConsensusDir()).mkdirs()); + + Assert.assertEquals( + SystemPropertiesUtils.StartupState.FIRST_START, SystemPropertiesUtils.getStartupState()); + Assert.assertFalse(SystemPropertiesUtils.isRestarted()); + } + + @Test + public void testRestartState() throws IOException { + writeSystemProperties(createValidSystemProperties()); + createConsensusStateFile(); + + Assert.assertEquals( + SystemPropertiesUtils.StartupState.RESTART, SystemPropertiesUtils.getStartupState()); + Assert.assertTrue(SystemPropertiesUtils.isRestarted()); + } + + @Test + public void testPartialStartConsensusOnlyState() throws IOException { + createConsensusStateFile(); + + Assert.assertEquals( + SystemPropertiesUtils.StartupState.PARTIAL_START_CONSENSUS_ONLY, + SystemPropertiesUtils.getStartupState()); + Assert.assertFalse(SystemPropertiesUtils.isRestarted()); + } + + @Test + public void testPartialStartSystemOnlyState() throws IOException { + writeSystemProperties(createValidSystemProperties()); + Assert.assertTrue(new File(conf.getConsensusDir()).mkdirs()); + + Assert.assertEquals( + SystemPropertiesUtils.StartupState.PARTIAL_START_SYSTEM_ONLY, + SystemPropertiesUtils.getStartupState()); + Assert.assertFalse(SystemPropertiesUtils.isRestarted()); + } + + @Test + public void testCorruptedOrInconsistentState() throws IOException { + Properties properties = createValidSystemProperties(); + properties.remove("config_node_id"); + writeSystemProperties(properties); + createConsensusStateFile(); + + Assert.assertEquals( + SystemPropertiesUtils.StartupState.CORRUPTED_OR_INCONSISTENT, + SystemPropertiesUtils.getStartupState()); + Assert.assertFalse(SystemPropertiesUtils.isRestarted()); + } + + private Properties createValidSystemProperties() { + Properties properties = new Properties(); + properties.setProperty("config_node_id", "0"); + properties.setProperty("is_seed_config_node", "true"); + properties.setProperty("cn_internal_address", "127.0.0.1"); + properties.setProperty("cn_internal_port", "10710"); + properties.setProperty("cn_consensus_port", "10720"); + return properties; + } + + private void writeSystemProperties(Properties properties) throws IOException { + File systemFile = new File(conf.getSystemDir(), ConfigNodeConstant.SYSTEM_FILE_NAME); + Assert.assertTrue(systemFile.getParentFile().mkdirs()); + try (FileOutputStream fileOutputStream = new FileOutputStream(systemFile); + Writer writer = new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) { + properties.store(writer, ""); + } + } + + private void createConsensusStateFile() throws IOException { + File stateFile = + new File( + conf.getConsensusDir() + + File.separator + + "47474747-4747-4747-4747-000000000000" + + File.separator + + "current", + "raft-meta"); + Assert.assertTrue(stateFile.getParentFile().mkdirs()); + Assert.assertTrue(stateFile.createNewFile()); + } +} From e2f32eafda2e561f19ad2ccc5ecd30695bd69b9f Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Mon, 8 Jun 2026 14:49:29 +0800 Subject: [PATCH 3/6] fix: optimize `ConsensusManager.start()` , `applyConfigNode()` and add exception --- .../manager/consensus/ConsensusManager.java | 16 +- .../confignode/manager/node/NodeManager.java | 15 +- .../iotdb/confignode/service/ConfigNode.java | 30 ++- .../consensus/ConsensusManagerTest.java | 178 ++++++++++++++++++ .../manager/node/NodeManagerTest.java | 105 +++++++++++ .../ConfigNodeStartupPersistenceTest.java | 149 +++++++++++++++ 6 files changed, 478 insertions(+), 15 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/ConfigNodeStartupPersistenceTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 8b4eeed5a1b58..072108556d1e0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; @@ -86,11 +87,19 @@ public ConsensusManager(IManager configManager, ConfigRegionStateMachine stateMa setConsensusLayer(stateMachine); } + @TestOnly + ConsensusManager(IManager configManager, IConsensus consensusImpl) { + this.configManager = configManager; + this.consensusImpl = consensusImpl; + } + public void start() throws IOException { consensusImpl.start(); - if (SystemPropertiesUtils.isRestarted()) { + SystemPropertiesUtils.StartupState startupState = SystemPropertiesUtils.getStartupState(); + if (startupState == SystemPropertiesUtils.StartupState.RESTART) { LOGGER.info(ManagerMessages.INIT_CONSENSUSMANAGER_SUCCESSFULLY_WHEN_RESTARTED); - } else if (ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { + } else if (startupState == SystemPropertiesUtils.StartupState.FIRST_START + && ConfigNodeDescriptor.getInstance().isSeedConfigNode()) { // Create ConsensusGroup that contains only itself // if the current ConfigNode is Seed-ConfigNode try { @@ -105,7 +114,10 @@ public void start() throws IOException { ManagerMessages .SOMETHING_WRONG_HAPPENED_WHILE_CALLING_CONSENSUS_LAYER_S_CREATELOCALPEER_API, e); + throw new IOException("Failed to create local ConfigNode consensus peer.", e); } + } else if (startupState != SystemPropertiesUtils.StartupState.FIRST_START) { + throw new IOException("Cannot start ConfigNode consensus from " + startupState + " state."); } isInitialized = true; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 928db046980d0..6b807e05c7a04 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -889,21 +889,30 @@ public List getRegisteredConfigNodeInfo4Infor * @param configNodeLocation The new ConfigNode. * @param versionInfo The new ConfigNode's versionInfo. */ - public void applyConfigNode( + public TSStatus applyConfigNode( TConfigNodeLocation configNodeLocation, TNodeVersionInfo versionInfo) { ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation); + TSStatus status; try { - getConsensusManager().write(applyConfigNodePlan); + status = getConsensusManager().write(applyConfigNodePlan); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage()); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(versionInfo, configNodeLocation.getConfigNodeId()); try { - getConsensusManager().write(updateVersionInfoPlan); + status = getConsensusManager().write(updateVersionInfoPlan); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage()); } + return status; } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 037f138286a18..8897e243f3df4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -221,19 +221,10 @@ public void active() { // Generate the builtin admin users after initConsensusManager initBuiltinUsers(); - // Persistence system parameters after the consensusGroup is built, - // or the consensusGroup will not be initialized successfully otherwise. - SystemPropertiesUtils.storeSystemParameters(); - // Wait for ConfigNode-leader elected before applying itself waitForLeaderElected(); - // Seed-ConfigNode should apply itself when first start - configManager - .getNodeManager() - .applyConfigNode( - CONF.generateLocalConfigNodeLocationWithSpecifiedNodeId(SEED_CONFIG_NODE_ID), - new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO)); + applySeedConfigNodeAndStoreSystemParameters(); setUpMetricService(); // Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure // that the external service is not provided until Seed-ConfigNode is fully initialized @@ -291,6 +282,25 @@ protected void initBuiltinUsers() { // nothing to do } + protected void applySeedConfigNodeAndStoreSystemParameters() + throws StartupException, IOException { + // Seed-ConfigNode should apply itself when first start. + TSStatus applyConfigNodeStatus = + configManager + .getNodeManager() + .applyConfigNode( + CONF.generateLocalConfigNodeLocationWithSpecifiedNodeId(SEED_CONFIG_NODE_ID), + new TNodeVersionInfo(IoTDBConstant.VERSION, IoTDBConstant.BUILD_INFO)); + if (applyConfigNodeStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new StartupException( + "Failed to apply Seed-ConfigNode when first startup: " + applyConfigNodeStatus); + } + + // Persist system parameters only after the consensus group is created, a leader exists and the + // Seed-ConfigNode has been applied through consensus. + SystemPropertiesUtils.storeSystemParameters(); + } + void processPid() { String pidFile = System.getProperty(IoTDBConstant.IOTDB_PIDFILE); if (pidFile != null) { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java new file mode 100644 index 0000000000000..9e55678cf9e6c --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.consensus; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeConstant; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.consensus.IConsensus; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.exception.ConsensusException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.List; +import java.util.Properties; + +public class ConsensusManagerTest { + + private final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); + + private String originalSystemDir; + private String originalConsensusDir; + private String originalInternalAddress; + private int originalInternalPort; + private int originalConsensusPort; + private TEndPoint originalSeedConfigNode; + private File testDir; + + @Before + public void setUp() throws IOException { + originalSystemDir = conf.getSystemDir(); + originalConsensusDir = conf.getConsensusDir(); + originalInternalAddress = conf.getInternalAddress(); + originalInternalPort = conf.getInternalPort(); + originalConsensusPort = conf.getConsensusPort(); + originalSeedConfigNode = conf.getSeedConfigNode(); + + testDir = Files.createTempDirectory("ConsensusManagerTest").toFile(); + conf.setSystemDir(new File(testDir, "system").getAbsolutePath()); + conf.setConsensusDir(new File(testDir, "consensus").getAbsolutePath()); + conf.setInternalAddress("127.0.0.1"); + conf.setInternalPort(10710); + conf.setConsensusPort(10720); + conf.setSeedConfigNode(new TEndPoint("127.0.0.1", 10710)); + SystemPropertiesUtils.reinitializeStatics(); + } + + @After + public void tearDown() { + conf.setSystemDir(originalSystemDir); + conf.setConsensusDir(originalConsensusDir); + conf.setInternalAddress(originalInternalAddress); + conf.setInternalPort(originalInternalPort); + conf.setConsensusPort(originalConsensusPort); + conf.setSeedConfigNode(originalSeedConfigNode); + SystemPropertiesUtils.reinitializeStatics(); + FileUtils.deleteFileOrDirectory(testDir, true); + } + + @Test + public void startShouldCreateSeedPeerOnFirstStart() throws Exception { + IConsensus consensus = Mockito.mock(IConsensus.class); + ConsensusManager consensusManager = + new ConsensusManager(Mockito.mock(IManager.class), consensus); + + consensusManager.start(); + + Mockito.verify(consensus).start(); + @SuppressWarnings("unchecked") + ArgumentCaptor> peerCaptor = ArgumentCaptor.forClass(List.class); + Mockito.verify(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), peerCaptor.capture()); + Assert.assertEquals(1, peerCaptor.getValue().size()); + Peer localPeer = peerCaptor.getValue().get(0); + Assert.assertEquals(0, localPeer.getNodeId()); + Assert.assertEquals(new TEndPoint("127.0.0.1", 10720), localPeer.getEndpoint()); + Assert.assertTrue(consensusManager.isInitialized()); + } + + @Test + public void startShouldFailWhenSeedPeerCreationFails() throws Exception { + IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow(new ConsensusException("create local peer failed")) + .when(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + ConsensusManager consensusManager = + new ConsensusManager(Mockito.mock(IManager.class), consensus); + + IOException exception = Assert.assertThrows(IOException.class, consensusManager::start); + + Assert.assertTrue(exception.getMessage().contains("Failed to create local")); + Mockito.verify(consensus).start(); + Mockito.verify(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + Assert.assertFalse(consensusManager.isInitialized()); + } + + @Test + public void startShouldNotCreatePeerWhenRestarted() throws Exception { + writeSystemProperties(); + createConsensusStateFile(); + IConsensus consensus = Mockito.mock(IConsensus.class); + ConsensusManager consensusManager = + new ConsensusManager(Mockito.mock(IManager.class), consensus); + + consensusManager.start(); + + Mockito.verify(consensus).start(); + Mockito.verify(consensus, Mockito.never()) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + Assert.assertTrue(consensusManager.isInitialized()); + } + + private void writeSystemProperties() throws IOException { + File systemFile = new File(conf.getSystemDir(), ConfigNodeConstant.SYSTEM_FILE_NAME); + Assert.assertTrue(systemFile.getParentFile().mkdirs()); + Properties properties = new Properties(); + properties.setProperty("config_node_id", "0"); + properties.setProperty("is_seed_config_node", "true"); + properties.setProperty("cn_internal_address", "127.0.0.1"); + properties.setProperty("cn_internal_port", "10710"); + properties.setProperty("cn_consensus_port", "10720"); + try (FileOutputStream fileOutputStream = new FileOutputStream(systemFile); + Writer writer = new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)) { + properties.store(writer, ""); + } + } + + private void createConsensusStateFile() throws IOException { + File stateFile = + new File( + conf.getConsensusDir() + + File.separator + + "47474747-4747-4747-4747-000000000000" + + File.separator + + "current", + "raft-meta"); + Assert.assertTrue(stateFile.getParentFile().mkdirs()); + Assert.assertTrue(stateFile.createNewFile()); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java new file mode 100644 index 0000000000000..07124042bca91 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.node; + +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan; +import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.persistence.node.NodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +public class NodeManagerTest { + + @Test + public void applyConfigNodeShouldReturnSuccessAfterBothConsensusWrites() throws Exception { + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + IManager configManager = Mockito.mock(IManager.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + TSStatus success = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + Mockito.when(consensusManager.write(Mockito.isA(ApplyConfigNodePlan.class))) + .thenReturn(success); + Mockito.when(consensusManager.write(Mockito.isA(UpdateVersionInfoPlan.class))) + .thenReturn(success); + NodeManager nodeManager = new NodeManager(configManager, Mockito.mock(NodeInfo.class)); + + TSStatus status = nodeManager.applyConfigNode(configNodeLocation(), versionInfo()); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + InOrder inOrder = Mockito.inOrder(consensusManager); + inOrder.verify(consensusManager).write(Mockito.isA(ApplyConfigNodePlan.class)); + inOrder.verify(consensusManager).write(Mockito.isA(UpdateVersionInfoPlan.class)); + } + + @Test + public void applyConfigNodeShouldReturnFailureAndSkipVersionInfoWhenApplyFails() + throws Exception { + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + IManager configManager = Mockito.mock(IManager.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + TSStatus failure = + new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()).setMessage("apply failed"); + Mockito.when(consensusManager.write(Mockito.isA(ApplyConfigNodePlan.class))) + .thenReturn(failure); + NodeManager nodeManager = new NodeManager(configManager, Mockito.mock(NodeInfo.class)); + + TSStatus status = nodeManager.applyConfigNode(configNodeLocation(), versionInfo()); + + Assert.assertEquals(failure.getCode(), status.getCode()); + Mockito.verify(consensusManager, Mockito.never()) + .write(Mockito.isA(UpdateVersionInfoPlan.class)); + } + + @Test + public void applyConfigNodeShouldReturnFailureAndSkipVersionInfoWhenConsensusThrows() + throws Exception { + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + IManager configManager = Mockito.mock(IManager.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.write(Mockito.isA(ApplyConfigNodePlan.class))) + .thenThrow(new ConsensusException("write failed")); + NodeManager nodeManager = new NodeManager(configManager, Mockito.mock(NodeInfo.class)); + + TSStatus status = nodeManager.applyConfigNode(configNodeLocation(), versionInfo()); + + Assert.assertEquals(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), status.getCode()); + Mockito.verify(consensusManager, Mockito.never()) + .write(Mockito.isA(UpdateVersionInfoPlan.class)); + } + + private TConfigNodeLocation configNodeLocation() { + return new TConfigNodeLocation( + 0, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 10720)); + } + + private TNodeVersionInfo versionInfo() { + return new TNodeVersionInfo("test-version", "test-build"); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/ConfigNodeStartupPersistenceTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/ConfigNodeStartupPersistenceTest.java new file mode 100644 index 0000000000000..848efd079a6fd --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/service/ConfigNodeStartupPersistenceTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.service; + +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeConstant; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.node.NodeManager; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +public class ConfigNodeStartupPersistenceTest { + + private final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf(); + + private String originalSystemDir; + private String originalConsensusDir; + private String originalInternalAddress; + private int originalInternalPort; + private int originalConsensusPort; + private int originalConfigNodeId; + private TEndPoint originalSeedConfigNode; + private File testDir; + + @Before + public void setUp() throws IOException { + originalSystemDir = conf.getSystemDir(); + originalConsensusDir = conf.getConsensusDir(); + originalInternalAddress = conf.getInternalAddress(); + originalInternalPort = conf.getInternalPort(); + originalConsensusPort = conf.getConsensusPort(); + originalConfigNodeId = conf.getConfigNodeId(); + originalSeedConfigNode = conf.getSeedConfigNode(); + + testDir = Files.createTempDirectory("ConfigNodeStartupPersistenceTest").toFile(); + conf.setSystemDir(new File(testDir, "system").getAbsolutePath()); + conf.setConsensusDir(new File(testDir, "consensus").getAbsolutePath()); + conf.setInternalAddress("127.0.0.1"); + conf.setInternalPort(10710); + conf.setConsensusPort(10720); + conf.setConfigNodeId(0); + conf.setSeedConfigNode(new TEndPoint("127.0.0.1", 10710)); + SystemPropertiesUtils.reinitializeStatics(); + Assert.assertTrue(new File(conf.getSystemDir()).mkdirs()); + createConsensusStateFile(); + } + + @After + public void tearDown() { + conf.setSystemDir(originalSystemDir); + conf.setConsensusDir(originalConsensusDir); + conf.setInternalAddress(originalInternalAddress); + conf.setInternalPort(originalInternalPort); + conf.setConsensusPort(originalConsensusPort); + conf.setConfigNodeId(originalConfigNodeId); + conf.setSeedConfigNode(originalSeedConfigNode); + SystemPropertiesUtils.reinitializeStatics(); + FileUtils.deleteFileOrDirectory(testDir, true); + } + + @Test + public void applySeedConfigNodeShouldNotStoreSystemPropertiesWhenApplyFails() throws Exception { + ConfigNode configNode = new ConfigNode(); + NodeManager nodeManager = Mockito.mock(NodeManager.class); + Mockito.when(nodeManager.applyConfigNode(Mockito.any(), Mockito.any())) + .thenReturn( + new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()) + .setMessage("apply failed")); + configNode.setConfigManager(mockConfigManager(nodeManager)); + + Assert.assertThrows( + StartupException.class, configNode::applySeedConfigNodeAndStoreSystemParameters); + + Assert.assertFalse(getSystemPropertiesFile().exists()); + Assert.assertEquals( + SystemPropertiesUtils.StartupState.PARTIAL_START_CONSENSUS_ONLY, + SystemPropertiesUtils.getStartupState()); + } + + @Test + public void applySeedConfigNodeShouldStoreSystemPropertiesAfterApplySucceeds() throws Exception { + ConfigNode configNode = new ConfigNode(); + NodeManager nodeManager = Mockito.mock(NodeManager.class); + Mockito.when(nodeManager.applyConfigNode(Mockito.any(), Mockito.any())) + .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + configNode.setConfigManager(mockConfigManager(nodeManager)); + + configNode.applySeedConfigNodeAndStoreSystemParameters(); + + Assert.assertTrue(getSystemPropertiesFile().exists()); + Assert.assertEquals( + SystemPropertiesUtils.StartupState.RESTART, SystemPropertiesUtils.getStartupState()); + } + + private ConfigManager mockConfigManager(NodeManager nodeManager) { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); + return configManager; + } + + private File getSystemPropertiesFile() { + return new File(conf.getSystemDir(), ConfigNodeConstant.SYSTEM_FILE_NAME); + } + + private void createConsensusStateFile() throws IOException { + File stateFile = + new File( + conf.getConsensusDir() + + File.separator + + "47474747-4747-4747-4747-000000000000" + + File.separator + + "current", + "raft-meta"); + Assert.assertTrue(stateFile.getParentFile().mkdirs()); + Assert.assertTrue(stateFile.createNewFile()); + } +} From cd352045b8dd7a6f2068f7d063a5f3a7c1aed9d5 Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Mon, 8 Jun 2026 15:22:35 +0800 Subject: [PATCH 4/6] fix: optimize ainode restart --- .../ainode/iotdb/ainode/core/ai_node.py | 50 +++- iotdb-core/ainode/iotdb/ainode/core/config.py | 232 +++++++++--------- .../ainode/iotdb/ainode/core/rpc/client.py | 8 +- .../ainode/tests/test_ai_node_restart.py | 182 ++++++++++++++ .../manager/node/ClusterNodeStartUtils.java | 19 ++ .../confignode/manager/node/NodeManager.java | 35 ++- .../node/ClusterNodeStartUtilsTest.java | 84 +++++++ .../manager/node/NodeManagerTest.java | 53 ++++ 8 files changed, 542 insertions(+), 121 deletions(-) create mode 100644 iotdb-core/ainode/tests/test_ai_node_restart.py create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtilsTest.java diff --git a/iotdb-core/ainode/iotdb/ainode/core/ai_node.py b/iotdb-core/ainode/iotdb/ainode/core/ai_node.py index 7ad0dce842f0d..653713ca61fb7 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/ai_node.py +++ b/iotdb-core/ainode/iotdb/ainode/core/ai_node.py @@ -17,6 +17,7 @@ # import os import signal +import shutil import threading from datetime import datetime @@ -83,6 +84,42 @@ def _generate_system_properties(ainode_id: int): } +def _backup_system_properties(system_properties_file: str): + if not os.path.exists(system_properties_file): + return None + backup_file = "{}.{}.bak".format( + system_properties_file, datetime.now().strftime("%Y%m%d%H%M%S%f") + ) + shutil.copy2(system_properties_file, backup_file) + logger.warning("Backed up AINode system properties to", backup_file) + return backup_file + + +def _write_system_properties(system_properties_file: str, system_properties): + tmp_file = system_properties_file + ".tmp" + try: + with open(tmp_file, "w") as f: + f.write("#" + str(datetime.now()) + "\n") + for key, value in system_properties.items(): + f.write(key + "=" + str(value) + "\n") + os.replace(tmp_file, system_properties_file) + except Exception: + if os.path.exists(tmp_file): + os.remove(tmp_file) + raise + + +def _verify_registered_ainode_id(system_properties_file: str): + ainode_id = AINodeDescriptor().get_config().get_ainode_id() + if ainode_id < 0: + _backup_system_properties(system_properties_file) + raise RuntimeError( + "AINode system.properties exists but does not contain a valid ainode_id. " + "Please restore the local system.properties or explicitly remove it before " + "registering a new AINode." + ) + + class AINode: def __init__(self): self._rpc_service = None @@ -110,10 +147,7 @@ def start(self): ) AINodeDescriptor().get_config().set_ainode_id(ainode_id) system_properties = _generate_system_properties(ainode_id) - with open(system_properties_file, "w") as f: - f.write("#" + str(datetime.now()) + "\n") - for key, value in system_properties.items(): - f.write(key + "=" + str(value) + "\n") + _write_system_properties(system_properties_file, system_properties) except Exception as e: logger.error( "IoTDB-AINode failed to register to IoTDB cluster: {}".format(e) @@ -122,6 +156,7 @@ def start(self): else: # If the system.properties file does exist, the AINode will just restart. try: + _verify_registered_ainode_id(system_properties_file) logger.info("IoTDB-AINode is restarting...") ClientManager().borrow_config_node_client().node_restart( AINodeDescriptor().get_config().get_cluster_name(), @@ -129,6 +164,13 @@ def start(self): _generate_version_info(), ) except Exception as e: + if AINodeDescriptor().get_config().get_ainode_id() >= 0: + try: + _backup_system_properties(system_properties_file) + except Exception as backup_error: + logger.warning( + "Failed to back up AINode system properties:", backup_error + ) logger.error("IoTDB-AINode failed to restart: {}".format(e)) raise e diff --git a/iotdb-core/ainode/iotdb/ainode/core/config.py b/iotdb-core/ainode/iotdb/ainode/core/config.py index 4995dda7bf337..d88c709461aca 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/config.py +++ b/iotdb-core/ainode/iotdb/ainode/core/config.py @@ -59,7 +59,7 @@ def __init__(self): self._build_info = AINODE_BUILD_INFO # Cluster configuration - self._ainode_id = 0 + self._ainode_id = -1 self._cluster_name = AINODE_CLUSTER_NAME self._ain_target_config_node_list: TEndPoint = AINODE_TARGET_CONFIG_NODE_LIST self._ain_rpc_address: str = AINODE_RPC_ADDRESS @@ -294,14 +294,6 @@ def __init__(self): logger.info("AINodeDescriptor is init successfully.") def _load_config_from_file(self) -> None: - system_properties_file = os.path.join( - self._config.get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME - ) - if os.path.exists(system_properties_file): - system_configs = load_properties(system_properties_file) - if "ainode_id" in system_configs: - self._config.set_ainode_id(int(system_configs["ainode_id"])) - git_file = os.path.join(AINODE_CONF_DIRECTORY_NAME, AINODE_CONF_GIT_FILE_NAME) if os.path.exists(git_file): git_configs = load_properties(git_file) @@ -325,112 +317,126 @@ def _load_config_from_file(self) -> None: conf_file ) ) - return - - # noinspection PyBroadException - try: - file_configs = load_properties(conf_file) - - config_keys = file_configs.keys() - - if "ain_rpc_address" in config_keys: - self._config.set_ain_rpc_address(file_configs["ain_rpc_address"]) - - if "ain_rpc_port" in config_keys: - self._config.set_ain_rpc_port(int(file_configs["ain_rpc_port"])) - - if "ain_inference_batch_interval_in_ms" in config_keys: - self._config.set_ain_inference_batch_interval_in_ms( - int(file_configs["ain_inference_batch_interval_in_ms"]) - ) - - if "ain_inference_model_mem_usage_map" in config_keys: - self._config.set_ain_inference_model_mem_usage_map( - eval(file_configs["ain_inference_model_mem_usage_map"]) - ) - - if "ain_inference_memory_usage_ratio" in config_keys: - self._config.set_ain_inference_memory_usage_ratio( - float(file_configs["ain_inference_memory_usage_ratio"]) + else: + # noinspection PyBroadException + try: + file_configs = load_properties(conf_file) + + config_keys = file_configs.keys() + + if "ain_rpc_address" in config_keys: + self._config.set_ain_rpc_address(file_configs["ain_rpc_address"]) + + if "ain_rpc_port" in config_keys: + self._config.set_ain_rpc_port(int(file_configs["ain_rpc_port"])) + + if "ain_inference_batch_interval_in_ms" in config_keys: + self._config.set_ain_inference_batch_interval_in_ms( + int(file_configs["ain_inference_batch_interval_in_ms"]) + ) + + if "ain_inference_model_mem_usage_map" in config_keys: + self._config.set_ain_inference_model_mem_usage_map( + eval(file_configs["ain_inference_model_mem_usage_map"]) + ) + + if "ain_inference_memory_usage_ratio" in config_keys: + self._config.set_ain_inference_memory_usage_ratio( + float(file_configs["ain_inference_memory_usage_ratio"]) + ) + + if "ain_inference_extra_memory_ratio" in config_keys: + self._config.set_ain_inference_extra_memory_ratio( + float(file_configs["ain_inference_extra_memory_ratio"]) + ) + + if "ain_models_dir" in config_keys: + self._config.set_ain_models_dir(file_configs["ain_models_dir"]) + + if "ain_models_builtin_dir" in config_keys: + self._config.set_ain_models_builtin_dir( + file_configs["ain_models_builtin_dir"] + ) + + if "ain_system_dir" in config_keys: + self._config.set_ain_system_dir(file_configs["ain_system_dir"]) + + if "ain_seed_config_node" in config_keys: + self._config.set_ain_target_config_node_list( + file_configs["ain_seed_config_node"] + ) + + if "cluster_name" in config_keys: + self._config.set_cluster_name(file_configs["cluster_name"]) + + if "ain_thrift_compression_enabled" in config_keys: + self._config.set_ain_thrift_compression_enabled( + int(file_configs["ain_thrift_compression_enabled"]) + ) + + if "ain_cluster_ingress_ssl_enabled" in config_keys: + self._config.set_ain_cluster_ingress_ssl_enabled( + int(file_configs["ain_cluster_ingress_ssl_enabled"]) + ) + + if "ain_thrift_ssl_cert_file" in config_keys: + self._config.set_ain_thrift_ssl_cert_file( + file_configs["ain_thrift_ssl_cert_file"] + ) + + if "ain_thrift_ssl_key_file" in config_keys: + self._config.set_ain_thrift_ssl_key_file( + file_configs["ain_thrift_ssl_key_file"] + ) + + if "ain_logs_dir" in config_keys: + log_dir = file_configs["ain_logs_dir"] + self._config.set_ain_logs_dir(log_dir) + + if "ain_cluster_ingress_address" in config_keys: + self._config.set_ain_cluster_ingress_address( + file_configs["ain_cluster_ingress_address"] + ) + + if "ain_cluster_ingress_port" in config_keys: + self._config.set_ain_cluster_ingress_port( + int(file_configs["ain_cluster_ingress_port"]) + ) + + if "ain_cluster_ingress_username" in config_keys: + self._config.set_ain_cluster_ingress_username( + file_configs["ain_cluster_ingress_username"] + ) + + if "ain_cluster_ingress_password" in config_keys: + self._config.set_ain_cluster_ingress_password( + file_configs["ain_cluster_ingress_password"] + ) + + except BadNodeUrlException: + logger.warning("Cannot load AINode conf file, use default configuration.") + + except Exception as e: + logger.warning( + "Cannot load AINode conf file caused by: {}, use default configuration. ".format( + e + ) ) - if "ain_inference_extra_memory_ratio" in config_keys: - self._config.set_ain_inference_extra_memory_ratio( - float(file_configs["ain_inference_extra_memory_ratio"]) - ) - - if "ain_models_dir" in config_keys: - self._config.set_ain_models_dir(file_configs["ain_models_dir"]) - - if "ain_models_builtin_dir" in config_keys: - self._config.set_ain_models_builtin_dir( - file_configs["ain_models_builtin_dir"] - ) - - if "ain_system_dir" in config_keys: - self._config.set_ain_system_dir(file_configs["ain_system_dir"]) - - if "ain_seed_config_node" in config_keys: - self._config.set_ain_target_config_node_list( - file_configs["ain_seed_config_node"] - ) - - if "cluster_name" in config_keys: - self._config.set_cluster_name(file_configs["cluster_name"]) - - if "ain_thrift_compression_enabled" in config_keys: - self._config.set_ain_thrift_compression_enabled( - int(file_configs["ain_thrift_compression_enabled"]) - ) - - if "ain_cluster_ingress_ssl_enabled" in config_keys: - self._config.set_ain_cluster_ingress_ssl_enabled( - int(file_configs["ain_cluster_ingress_ssl_enabled"]) - ) - - if "ain_thrift_ssl_cert_file" in config_keys: - self._config.set_ain_thrift_ssl_cert_file( - file_configs["ain_thrift_ssl_cert_file"] - ) - - if "ain_thrift_ssl_key_file" in config_keys: - self._config.set_ain_thrift_ssl_key_file( - file_configs["ain_thrift_ssl_key_file"] - ) - - if "ain_logs_dir" in config_keys: - log_dir = file_configs["ain_logs_dir"] - self._config.set_ain_logs_dir(log_dir) - - if "ain_cluster_ingress_address" in config_keys: - self._config.set_ain_cluster_ingress_address( - file_configs["ain_cluster_ingress_address"] - ) - - if "ain_cluster_ingress_port" in config_keys: - self._config.set_ain_cluster_ingress_port( - int(file_configs["ain_cluster_ingress_port"]) - ) - - if "ain_cluster_ingress_username" in config_keys: - self._config.set_ain_cluster_ingress_username( - file_configs["ain_cluster_ingress_username"] - ) - - if "ain_cluster_ingress_password" in config_keys: - self._config.set_ain_cluster_ingress_password( - file_configs["ain_cluster_ingress_password"] - ) - - except BadNodeUrlException: - logger.warning("Cannot load AINode conf file, use default configuration.") - - except Exception as e: - logger.warning( - "Cannot load AINode conf file caused by: {}, use default configuration. ".format( - e - ) - ) + system_properties_file = os.path.join( + self._config.get_ain_system_dir(), AINODE_SYSTEM_FILE_NAME + ) + if os.path.exists(system_properties_file): + system_configs = load_properties(system_properties_file) + if "ainode_id" in system_configs: + try: + self._config.set_ainode_id(int(system_configs["ainode_id"])) + except ValueError: + logger.warning( + "Cannot load ainode_id from '{}', keep AINode unregistered.".format( + system_properties_file + ) + ) def get_config(self) -> AINodeConfig: return self._config diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py index ea6362ef080af..af1f3825817f1 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py +++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py @@ -201,7 +201,7 @@ def node_restart( cluster_name: str, configuration: TAINodeConfiguration, version_info: TNodeVersionInfo, - ) -> None: + ) -> TSStatus: req = TAINodeRestartReq( clusterName=cluster_name, aiNodeConfiguration=configuration, @@ -212,6 +212,12 @@ def node_restart( try: resp = self._client.restartAINode(req) if not self._update_config_node_leader(resp.status): + if resp.status.code != TSStatusCode.SUCCESS_STATUS.get_status_code(): + logger.warning( + "AINode restart is rejected by ConfigNode. " + "The local system.properties will be kept and AINode will not " + "register a new id automatically." + ) verify_success( resp.status, "An error occurs when calling node_restart()" ) diff --git a/iotdb-core/ainode/tests/test_ai_node_restart.py b/iotdb-core/ainode/tests/test_ai_node_restart.py new file mode 100644 index 0000000000000..4160818fb92b8 --- /dev/null +++ b/iotdb-core/ainode/tests/test_ai_node_restart.py @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import tempfile +import types +import unittest +from pathlib import Path +from unittest import mock + +rpc_handler_module = types.ModuleType("iotdb.ainode.core.rpc.handler") + + +class AINodeRPCServiceHandler: + def __init__(self, ainode): + self.ainode = ainode + + +rpc_handler_module.AINodeRPCServiceHandler = AINodeRPCServiceHandler +sys.modules["iotdb.ainode.core.rpc.handler"] = rpc_handler_module + +rpc_service_module = types.ModuleType("iotdb.ainode.core.rpc.service") + + +class AINodeRPCService: + exit_code = 0 + + def __init__(self, handler): + self.handler = handler + + def start(self): + pass + + def join(self, timeout=None): + pass + + +rpc_service_module.AINodeRPCService = AINodeRPCService +sys.modules["iotdb.ainode.core.rpc.service"] = rpc_service_module + +from iotdb.ainode.core import ai_node +from iotdb.ainode.core import config as config_module +from iotdb.thrift.common.ttypes import TEndPoint + + +class FakeConfig: + def __init__(self, system_dir: str, ainode_id: int): + self._system_dir = system_dir + self._ainode_id = ainode_id + + def get_ain_system_dir(self): + return self._system_dir + + def get_ainode_id(self): + return self._ainode_id + + def set_ainode_id(self, ainode_id): + self._ainode_id = ainode_id + + def get_cluster_name(self): + return "defaultCluster" + + def get_version_info(self): + return "test-version" + + def get_build_info(self): + return "test-build" + + def get_ain_rpc_address(self): + return "127.0.0.1" + + def get_ain_rpc_port(self): + return 10810 + + def get_ain_target_config_node_list(self): + return TEndPoint("127.0.0.1", 10710) + + +class FakeDescriptor: + def __init__(self, config): + self._config = config + + def get_config(self): + return self._config + + +class AINodeRestartTest(unittest.TestCase): + def test_descriptor_loads_system_properties_after_configured_system_dir(self): + with tempfile.TemporaryDirectory() as temp_dir: + base_dir = Path(temp_dir) + conf_dir = base_dir / "conf" + default_system_dir = base_dir / "default-system" + configured_system_dir = base_dir / "configured-system" + conf_dir.mkdir() + configured_system_dir.mkdir() + (conf_dir / "iotdb-ainode.properties").write_text( + "ain_system_dir={}\n".format(configured_system_dir), + encoding="utf-8", + ) + (configured_system_dir / "system.properties").write_text( + "ainode_id=7\n", encoding="utf-8" + ) + + with mock.patch.object( + config_module, "AINODE_CONF_DIRECTORY_NAME", str(conf_dir) + ), mock.patch.object( + config_module, "AINODE_SYSTEM_DIR", str(default_system_dir) + ): + descriptor = config_module.AINodeDescriptor.__wrapped__() + + self.assertEqual(7, descriptor.get_config().get_ainode_id()) + + def test_start_with_invalid_local_system_properties_backs_up_and_does_not_connect(self): + with tempfile.TemporaryDirectory() as temp_dir: + system_dir = Path(temp_dir) + system_properties = system_dir / "system.properties" + system_properties.write_text("cluster_name=defaultCluster\n", encoding="utf-8") + config = FakeConfig(str(system_dir), -1) + + with mock.patch.object( + ai_node, "AINodeDescriptor", return_value=FakeDescriptor(config) + ), mock.patch.object( + ai_node, + "ClientManager", + side_effect=AssertionError("ConfigNode client should not be created"), + ): + with self.assertRaises(RuntimeError): + ai_node.AINode().start() + + self.assertEqual(-1, config.get_ainode_id()) + self.assertTrue(system_properties.exists()) + self.assertEqual( + 1, len(list(system_dir.glob("system.properties.*.bak"))) + ) + + def test_restart_failure_backs_up_old_system_properties_and_does_not_register(self): + with tempfile.TemporaryDirectory() as temp_dir: + system_dir = Path(temp_dir) + system_properties = system_dir / "system.properties" + system_properties.write_text("ainode_id=3\n", encoding="utf-8") + config = FakeConfig(str(system_dir), 3) + client = mock.Mock() + client.node_restart.side_effect = RuntimeError("restart rejected") + manager = mock.Mock() + manager.borrow_config_node_client.return_value = client + + with mock.patch.object( + ai_node, "AINodeDescriptor", return_value=FakeDescriptor(config) + ), mock.patch.object( + ai_node, "ClientManager", return_value=manager + ), mock.patch.object( + ai_node, "_generate_configuration", return_value=object() + ), mock.patch.object( + ai_node, "_generate_version_info", return_value=object() + ): + with self.assertRaises(RuntimeError): + ai_node.AINode().start() + + self.assertEqual(3, config.get_ainode_id()) + self.assertEqual("ainode_id=3\n", system_properties.read_text("utf-8")) + self.assertEqual( + 1, len(list(system_dir.glob("system.properties.*.bak"))) + ) + client.node_register.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java index 60c806db32859..0f7a2cc5bfc61 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtils.java @@ -309,6 +309,16 @@ public static TSStatus confirmNodeRestart( } } break; + case AINode: + if (nodeLocation instanceof TAINodeLocation) { + updatedTEndPoints = + checkUpdatedTEndPointOfAINode( + (TAINodeLocation) nodeLocation, (TAINodeLocation) matchedNodeLocation); + if (!updatedTEndPoints.isEmpty()) { + acceptRestart = false; + } + } + break; case DataNode: default: if (nodeLocation instanceof TDataNodeLocation) { @@ -528,4 +538,13 @@ public static Set checkUpdatedTEndPointOfDataNode( } return updatedTEndPoints; } + + public static Set checkUpdatedTEndPointOfAINode( + TAINodeLocation restartLocation, TAINodeLocation recordLocation) { + Set updatedTEndPoints = new HashSet<>(); + if (!recordLocation.getInternalEndPoint().equals(restartLocation.getInternalEndPoint())) { + updatedTEndPoints.add(0); + } + return updatedTEndPoints; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 6b807e05c7a04..3a84e77c7953f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -585,24 +585,53 @@ public TSStatus removeAINode() { public TAINodeRestartResp updateAINodeIfNecessary(TAINodeRestartReq req) { int nodeId = req.getAiNodeConfiguration().getLocation().getAiNodeId(); TAINodeConfiguration aiNodeConfiguration = getRegisteredAINode(nodeId); + if (aiNodeConfiguration == null) { + return new TAINodeRestartResp() + .setStatus( + new TSStatus(TSStatusCode.REJECT_NODE_START.getStatusCode()) + .setMessage("Reject AINode restart because the AINode is not registered.")) + .setConfigNodeList(getRegisteredConfigNodes()); + } if (!req.getAiNodeConfiguration().equals(aiNodeConfiguration)) { // Update AINodeConfiguration when modified during restart UpdateAINodePlan updateAINodePlan = new UpdateAINodePlan(req.getAiNodeConfiguration()); + TSStatus status; try { - getConsensusManager().write(updateAINodePlan); + status = getConsensusManager().write(updateAINodePlan); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + return new TAINodeRestartResp() + .setStatus( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage())) + .setConfigNodeList(getRegisteredConfigNodes()); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return new TAINodeRestartResp() + .setStatus(status) + .setConfigNodeList(getRegisteredConfigNodes()); } } TNodeVersionInfo versionInfo = nodeInfo.getVersionInfo(nodeId); - if (!req.getVersionInfo().equals(versionInfo)) { + if (req.getVersionInfo() != null && !req.getVersionInfo().equals(versionInfo)) { // Update versionInfo when modified during restart UpdateVersionInfoPlan updateVersionInfoPlan = new UpdateVersionInfoPlan(req.getVersionInfo(), nodeId); + TSStatus status; try { - getConsensusManager().write(updateVersionInfoPlan); + status = getConsensusManager().write(updateVersionInfoPlan); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + return new TAINodeRestartResp() + .setStatus( + new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) + .setMessage(e.getMessage())) + .setConfigNodeList(getRegisteredConfigNodes()); + } + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return new TAINodeRestartResp() + .setStatus(status) + .setConfigNodeList(getRegisteredConfigNodes()); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtilsTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtilsTest.java new file mode 100644 index 0000000000000..788e106a144d2 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/ClusterNodeStartUtilsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.node; + +import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TNodeResource; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cluster.NodeType; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Collections; + +public class ClusterNodeStartUtilsTest { + + @Test + public void confirmNodeRestartShouldRejectAINodeEndpointChange() { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + NodeManager nodeManager = Mockito.mock(NodeManager.class); + Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); + Mockito.when(nodeManager.getRegisteredAINodes()) + .thenReturn(Collections.singletonList(aiNodeConfiguration(3, "127.0.0.1", 10810))); + + TSStatus status = + ClusterNodeStartUtils.confirmNodeRestart( + NodeType.AINode, + ConfigNodeDescriptor.getInstance().getConf().getClusterName(), + null, + 3, + new TAINodeLocation(3, new TEndPoint("127.0.0.1", 10811)), + configManager); + + Assert.assertEquals(TSStatusCode.REJECT_NODE_START.getStatusCode(), status.getCode()); + } + + @Test + public void confirmNodeRestartShouldAcceptSameAINodeEndpoint() { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + NodeManager nodeManager = Mockito.mock(NodeManager.class); + Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager); + Mockito.when(nodeManager.getRegisteredAINodes()) + .thenReturn(Collections.singletonList(aiNodeConfiguration(3, "127.0.0.1", 10810))); + + TSStatus status = + ClusterNodeStartUtils.confirmNodeRestart( + NodeType.AINode, + ConfigNodeDescriptor.getInstance().getConf().getClusterName(), + null, + 3, + new TAINodeLocation(3, new TEndPoint("127.0.0.1", 10810)), + configManager); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + } + + private TAINodeConfiguration aiNodeConfiguration(int aiNodeId, String ip, int port) { + return new TAINodeConfiguration( + new TAINodeLocation(aiNodeId, new TEndPoint(ip, port)), new TNodeResource(1, 1024)); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java index 07124042bca91..8059107e1b49e 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/node/NodeManagerTest.java @@ -19,14 +19,20 @@ package org.apache.iotdb.confignode.manager.node; +import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TAINodeLocation; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TNodeResource; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.confignode.consensus.request.write.ainode.UpdateAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.persistence.node.NodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartReq; +import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartResp; import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; @@ -94,11 +100,58 @@ public void applyConfigNodeShouldReturnFailureAndSkipVersionInfoWhenConsensusThr .write(Mockito.isA(UpdateVersionInfoPlan.class)); } + @Test + public void updateAINodeIfNecessaryShouldRejectUnknownAINode() { + IManager configManager = Mockito.mock(IManager.class); + NodeInfo nodeInfo = Mockito.mock(NodeInfo.class); + Mockito.when(nodeInfo.getRegisteredAINode(3)).thenReturn(null); + NodeManager nodeManager = new NodeManager(configManager, nodeInfo); + + TAINodeRestartResp resp = + nodeManager.updateAINodeIfNecessary(aiNodeRestartReq(3, "127.0.0.1", 10810)); + + Assert.assertEquals(TSStatusCode.REJECT_NODE_START.getStatusCode(), resp.getStatus().getCode()); + } + + @Test + public void updateAINodeIfNecessaryShouldReturnFailureWhenAINodeUpdateConsensusThrows() + throws Exception { + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + IManager configManager = Mockito.mock(IManager.class); + NodeInfo nodeInfo = Mockito.mock(NodeInfo.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(nodeInfo.getRegisteredAINode(3)) + .thenReturn(aiNodeConfiguration(3, "127.0.0.1", 10810)); + Mockito.when(consensusManager.write(Mockito.isA(UpdateAINodePlan.class))) + .thenThrow(new ConsensusException("update failed")); + NodeManager nodeManager = new NodeManager(configManager, nodeInfo); + + TAINodeRestartResp resp = + nodeManager.updateAINodeIfNecessary(aiNodeRestartReq(3, "127.0.0.1", 10811)); + + Assert.assertEquals( + TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), resp.getStatus().getCode()); + Mockito.verify(consensusManager, Mockito.never()) + .write(Mockito.isA(UpdateVersionInfoPlan.class)); + } + private TConfigNodeLocation configNodeLocation() { return new TConfigNodeLocation( 0, new TEndPoint("127.0.0.1", 10710), new TEndPoint("127.0.0.1", 10720)); } + private TAINodeRestartReq aiNodeRestartReq(int aiNodeId, String ip, int port) { + TAINodeRestartReq req = + new TAINodeRestartReq("defaultCluster", aiNodeConfiguration(aiNodeId, ip, port)); + req.setVersionInfo(versionInfo()); + return req; + } + + private TAINodeConfiguration aiNodeConfiguration(int aiNodeId, String ip, int port) { + return new TAINodeConfiguration( + new TAINodeLocation(aiNodeId, new TEndPoint(ip, port)), new TNodeResource(1, 1024)); + } + private TNodeVersionInfo versionInfo() { return new TNodeVersionInfo("test-version", "test-build"); } From 2a1504fe161b4c85ecd2d889f229bc7aa1bdebbe Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Wed, 10 Jun 2026 10:33:56 +0800 Subject: [PATCH 5/6] fix: fix code format --- .../ainode/iotdb/ainode/core/ai_node.py | 2 +- iotdb-core/ainode/iotdb/ainode/core/config.py | 4 +- .../ainode/iotdb/ainode/core/rpc/client.py | 5 +- .../ainode/tests/test_ai_node_restart.py | 16 +-- .../manager/consensus/ConsensusManager.java | 28 ++++- .../consensus/ConsensusManagerTest.java | 111 +++++++++++++++++- 6 files changed, 147 insertions(+), 19 deletions(-) diff --git a/iotdb-core/ainode/iotdb/ainode/core/ai_node.py b/iotdb-core/ainode/iotdb/ainode/core/ai_node.py index 653713ca61fb7..249a55f6e3609 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/ai_node.py +++ b/iotdb-core/ainode/iotdb/ainode/core/ai_node.py @@ -16,8 +16,8 @@ # under the License. # import os -import signal import shutil +import signal import threading from datetime import datetime diff --git a/iotdb-core/ainode/iotdb/ainode/core/config.py b/iotdb-core/ainode/iotdb/ainode/core/config.py index d88c709461aca..55541e1a3a000 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/config.py +++ b/iotdb-core/ainode/iotdb/ainode/core/config.py @@ -414,7 +414,9 @@ def _load_config_from_file(self) -> None: ) except BadNodeUrlException: - logger.warning("Cannot load AINode conf file, use default configuration.") + logger.warning( + "Cannot load AINode conf file, use default configuration." + ) except Exception as e: logger.warning( diff --git a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py index af1f3825817f1..b62701d4a77d3 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py +++ b/iotdb-core/ainode/iotdb/ainode/core/rpc/client.py @@ -212,7 +212,10 @@ def node_restart( try: resp = self._client.restartAINode(req) if not self._update_config_node_leader(resp.status): - if resp.status.code != TSStatusCode.SUCCESS_STATUS.get_status_code(): + if ( + resp.status.code + != TSStatusCode.SUCCESS_STATUS.get_status_code() + ): logger.warning( "AINode restart is rejected by ConfigNode. " "The local system.properties will be kept and AINode will not " diff --git a/iotdb-core/ainode/tests/test_ai_node_restart.py b/iotdb-core/ainode/tests/test_ai_node_restart.py index 4160818fb92b8..87b78a97b7113 100644 --- a/iotdb-core/ainode/tests/test_ai_node_restart.py +++ b/iotdb-core/ainode/tests/test_ai_node_restart.py @@ -124,11 +124,15 @@ def test_descriptor_loads_system_properties_after_configured_system_dir(self): self.assertEqual(7, descriptor.get_config().get_ainode_id()) - def test_start_with_invalid_local_system_properties_backs_up_and_does_not_connect(self): + def test_start_with_invalid_local_system_properties_backs_up_and_does_not_connect( + self, + ): with tempfile.TemporaryDirectory() as temp_dir: system_dir = Path(temp_dir) system_properties = system_dir / "system.properties" - system_properties.write_text("cluster_name=defaultCluster\n", encoding="utf-8") + system_properties.write_text( + "cluster_name=defaultCluster\n", encoding="utf-8" + ) config = FakeConfig(str(system_dir), -1) with mock.patch.object( @@ -143,9 +147,7 @@ def test_start_with_invalid_local_system_properties_backs_up_and_does_not_connec self.assertEqual(-1, config.get_ainode_id()) self.assertTrue(system_properties.exists()) - self.assertEqual( - 1, len(list(system_dir.glob("system.properties.*.bak"))) - ) + self.assertEqual(1, len(list(system_dir.glob("system.properties.*.bak")))) def test_restart_failure_backs_up_old_system_properties_and_does_not_register(self): with tempfile.TemporaryDirectory() as temp_dir: @@ -172,9 +174,7 @@ def test_restart_failure_backs_up_old_system_properties_and_does_not_register(se self.assertEqual(3, config.get_ainode_id()) self.assertEqual("ainode_id=3\n", system_properties.read_text("utf-8")) - self.assertEqual( - 1, len(list(system_dir.glob("system.properties.*.bak"))) - ) + self.assertEqual(1, len(list(system_dir.glob("system.properties.*.bak")))) client.node_register.assert_not_called() diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java index 072108556d1e0..82c72f0a0d0aa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java @@ -45,6 +45,9 @@ import org.apache.iotdb.consensus.config.ConsensusConfig; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; +import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; +import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.rpc.TSStatusCode; @@ -108,7 +111,8 @@ public void start() throws IOException { new TConfigNodeLocation( SEED_CONFIG_NODE_ID, new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort()), - new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort())))); + new TEndPoint(CONF.getInternalAddress(), CONF.getConsensusPort()))), + false); } catch (ConsensusException e) { LOGGER.error( ManagerMessages @@ -291,6 +295,12 @@ private void upgrade() { */ public void createPeerForConsensusGroup(List configNodeLocations) throws ConsensusException { + createPeerForConsensusGroup(configNodeLocations, true); + } + + private void createPeerForConsensusGroup( + List configNodeLocations, boolean ignoreAlreadyCreated) + throws ConsensusException { LOGGER.info(ManagerMessages.CREATEPEERFORCONSENSUSGROUP, configNodeLocations); List peerList = new ArrayList<>(); @@ -301,7 +311,14 @@ public void createPeerForConsensusGroup(List configNodeLoca configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); } - consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList); + try { + consensusImpl.createLocalPeer(DEFAULT_CONSENSUS_GROUP_ID, peerList); + } catch (ConsensusGroupAlreadyExistException e) { + if (!ignoreAlreadyCreated) { + throw e; + } + LOGGER.info("ConfigNode local peer has already been created: {}", e.getMessage()); + } } /** @@ -318,6 +335,9 @@ public void addConfigNodePeer(TConfigNodeLocation configNodeLocation) throws Add DEFAULT_CONSENSUS_GROUP_ID, configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); + } catch (PeerAlreadyInConsensusGroupException e) { + LOGGER.info( + "ConfigNode peer {} has already been added: {}", configNodeLocation, e.getMessage()); } catch (ConsensusException e) { throw new AddPeerException(configNodeLocation); } @@ -339,6 +359,10 @@ public boolean removeConfigNodePeer(TConfigNodeLocation configNodeLocation) { configNodeLocation.getConfigNodeId(), configNodeLocation.getConsensusEndPoint())); return true; + } catch (PeerNotInConsensusGroupException e) { + LOGGER.info( + "ConfigNode peer {} has already been removed: {}", configNodeLocation, e.getMessage()); + return true; } catch (ConsensusException e) { return false; } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java index 9e55678cf9e6c..5b90c69e6fcac 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManagerTest.java @@ -19,16 +19,21 @@ package org.apache.iotdb.confignode.manager.consensus; +import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeConstant; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.conf.SystemPropertiesUtils; +import org.apache.iotdb.confignode.exception.AddPeerException; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.common.Peer; import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; +import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException; +import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException; import org.junit.After; import org.junit.Assert; @@ -44,6 +49,7 @@ import java.io.Writer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -93,8 +99,7 @@ public void tearDown() { @Test public void startShouldCreateSeedPeerOnFirstStart() throws Exception { IConsensus consensus = Mockito.mock(IConsensus.class); - ConsensusManager consensusManager = - new ConsensusManager(Mockito.mock(IManager.class), consensus); + ConsensusManager consensusManager = newConsensusManager(consensus); consensusManager.start(); @@ -118,8 +123,27 @@ public void startShouldFailWhenSeedPeerCreationFails() throws Exception { .when(consensus) .createLocalPeer( Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); - ConsensusManager consensusManager = - new ConsensusManager(Mockito.mock(IManager.class), consensus); + ConsensusManager consensusManager = newConsensusManager(consensus); + + IOException exception = Assert.assertThrows(IOException.class, consensusManager::start); + + Assert.assertTrue(exception.getMessage().contains("Failed to create local")); + Mockito.verify(consensus).start(); + Mockito.verify(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + Assert.assertFalse(consensusManager.isInitialized()); + } + + @Test + public void startShouldFailWhenSeedPeerAlreadyExistsOnFirstStart() throws Exception { + IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new ConsensusGroupAlreadyExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID)) + .when(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + ConsensusManager consensusManager = newConsensusManager(consensus); IOException exception = Assert.assertThrows(IOException.class, consensusManager::start); @@ -136,8 +160,7 @@ public void startShouldNotCreatePeerWhenRestarted() throws Exception { writeSystemProperties(); createConsensusStateFile(); IConsensus consensus = Mockito.mock(IConsensus.class); - ConsensusManager consensusManager = - new ConsensusManager(Mockito.mock(IManager.class), consensus); + ConsensusManager consensusManager = newConsensusManager(consensus); consensusManager.start(); @@ -148,6 +171,82 @@ public void startShouldNotCreatePeerWhenRestarted() throws Exception { Assert.assertTrue(consensusManager.isInitialized()); } + @Test + public void createPeerForConsensusGroupShouldIgnoreAlreadyCreatedLocalPeer() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new ConsensusGroupAlreadyExistException(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID)) + .when(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + + newConsensusManager(consensus) + .createPeerForConsensusGroup(Collections.singletonList(newConfigNodeLocation(1))); + + Mockito.verify(consensus) + .createLocalPeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.anyList()); + } + + @Test + public void addConfigNodePeerShouldIgnoreAlreadyAddedPeer() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new PeerAlreadyInConsensusGroupException( + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, + new Peer( + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, + 1, + new TEndPoint("127.0.0.1", 10720)))) + .when(consensus) + .addRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + + newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1)); + + Mockito.verify(consensus) + .addRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + } + + @Test + public void addConfigNodePeerShouldKeepFailingForOtherConsensusErrors() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow(new ConsensusException("reconfiguration failed")) + .when(consensus) + .addRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + + Assert.assertThrows( + AddPeerException.class, + () -> newConsensusManager(consensus).addConfigNodePeer(newConfigNodeLocation(1))); + } + + @Test + public void removeConfigNodePeerShouldIgnoreAlreadyRemovedPeer() throws Exception { + final IConsensus consensus = Mockito.mock(IConsensus.class); + Mockito.doThrow( + new PeerNotInConsensusGroupException( + ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID, "127.0.0.1:10720")) + .when(consensus) + .removeRemotePeer( + Mockito.eq(ConsensusManager.DEFAULT_CONSENSUS_GROUP_ID), Mockito.any(Peer.class)); + + Assert.assertTrue( + newConsensusManager(consensus).removeConfigNodePeer(newConfigNodeLocation(1))); + } + + private ConsensusManager newConsensusManager(final IConsensus consensus) { + return new ConsensusManager(Mockito.mock(IManager.class), consensus); + } + + private static TConfigNodeLocation newConfigNodeLocation(final int configNodeId) { + return new TConfigNodeLocation( + configNodeId, + new TEndPoint("127.0.0.1", 10710 + configNodeId), + new TEndPoint("127.0.0.1", 10720 + configNodeId)); + } + private void writeSystemProperties() throws IOException { File systemFile = new File(conf.getSystemDir(), ConfigNodeConstant.SYSTEM_FILE_NAME); Assert.assertTrue(systemFile.getParentFile().mkdirs()); From db5fd4784a5d78cc1c83fb2fb17fda749e1acf6f Mon Sep 17 00:00:00 2001 From: alanchuang22-dev <2584829494@qq.com> Date: Wed, 10 Jun 2026 13:17:05 +0800 Subject: [PATCH 6/6] fix: replace `eval` with `ast.literal_eval` --- iotdb-core/ainode/iotdb/ainode/core/config.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/iotdb-core/ainode/iotdb/ainode/core/config.py b/iotdb-core/ainode/iotdb/ainode/core/config.py index 55541e1a3a000..89d4366869aff 100644 --- a/iotdb-core/ainode/iotdb/ainode/core/config.py +++ b/iotdb-core/ainode/iotdb/ainode/core/config.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # +import ast import os import re @@ -337,7 +338,9 @@ def _load_config_from_file(self) -> None: if "ain_inference_model_mem_usage_map" in config_keys: self._config.set_ain_inference_model_mem_usage_map( - eval(file_configs["ain_inference_model_mem_usage_map"]) + ast.literal_eval( + file_configs["ain_inference_model_mem_usage_map"] + ) ) if "ain_inference_memory_usage_ratio" in config_keys: