diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 840b38802690..e157a430033c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -86,7 +86,7 @@ public class SCMStateMachine extends BaseStateMachine { private List installingSecretKeys = null; private AtomicLong currentLeaderTerm = new AtomicLong(-1L); - private AtomicBoolean refreshedAfterLeaderReady = new AtomicBoolean(); + private AtomicBoolean isStateMachineReady = new AtomicBoolean(); public SCMStateMachine(final StorageContainerManager scm, SCMHADBTransactionBuffer buffer) { @@ -164,7 +164,7 @@ public CompletableFuture applyTransaction( // After previous term transactions are applied, still in safe mode, // perform refreshAndValidate to update the safemode rule state. - if (scm.isInSafeMode() && refreshedAfterLeaderReady.get()) { + if (scm.isInSafeMode() && isStateMachineReady.get()) { scm.getScmSafeModeManager().refreshAndValidate(); } final TermIndex appliedTermIndex = TermIndex.valueOf(trx.getLogEntry()); @@ -285,6 +285,14 @@ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, currentLeaderTerm.set(scm.getScmHAManager().getRatisServer().getDivision() .getInfo().getCurrentTerm()); + if (isStateMachineReady.compareAndSet(false, true)) { + // refresh and validate safe mode rules if it can exit safe mode + // if being leader, all previous term transactions have been applied + // if other states, just refresh safe mode rules, and transaction keeps flushing from leader + // and does not depend on pending transactions. + scm.getScmSafeModeManager().refreshAndValidate(); + } + if (!groupMemberId.getPeerId().equals(newLeaderId)) { LOG.info("leader changed, yet current SCM is still follower."); return; @@ -355,21 +363,17 @@ public void notifyTermIndexUpdated(long term, long index) { } if (currentLeaderTerm.get() == term) { - // Means all transactions before this term have been applied. // This means after a restart, all pending transactions have been applied. - // Perform - // 1. Refresh Safemode rules state. - // 2. Start DN Rpc server. - if (!refreshedAfterLeaderReady.get()) { - refreshedAfterLeaderReady.set(true); + if (isStateMachineReady.compareAndSet(false, true)) { + // Refresh Safemode rules state if not already done. scm.getScmSafeModeManager().refreshAndValidate(); } currentLeaderTerm.set(-1L); } } - public boolean isRefreshedAfterLeaderReady() { - return refreshedAfterLeaderReady.get(); + public boolean getIsStateMachineReady() { + return isStateMachineReady.get(); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java index d5724979f30f..8c6762b10877 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/StateMachineReadyRule.java @@ -41,7 +41,7 @@ protected TypedEvent getEventType() { @Override protected boolean validate() { if (null != scmStateMachine) { - return scmStateMachine.isRefreshedAfterLeaderReady(); + return scmStateMachine.getIsStateMachineReady(); } // if no HA, always return true. return true; @@ -58,7 +58,7 @@ protected void cleanup() { @Override public String getStatusText() { return String.format("Refreshed SCM State Machine after leader ready: %s", - scmStateMachine != null ? scmStateMachine.isRefreshedAfterLeaderReady() : "NA"); + scmStateMachine != null ? scmStateMachine.getIsStateMachineReady() : "NA"); } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java index c05c66b3cc12..fdf38a7a67ca 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java @@ -54,8 +54,11 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; import org.apache.hadoop.hdds.scm.ha.SCMServiceManager; +import org.apache.hadoop.hdds.scm.ha.SCMStateMachine; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -67,6 +70,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterEach; @@ -177,6 +181,16 @@ public void testSafeModeExitRule() throws Exception { } ContainerManager containerManager = mock(ContainerManager.class); when(containerManager.getContainers(ReplicationType.RATIS)).thenReturn(containers); + + StorageContainerManager mockScmManager = mock(StorageContainerManager.class); + SCMHAManager mockScmhaManager = mock(SCMHAManager.class); + when(mockScmManager.getScmHAManager()).thenReturn(mockScmhaManager); + SCMRatisServer mockScmRatisServer = mock(SCMRatisServer.class); + when(mockScmhaManager.getRatisServer()).thenReturn(mockScmRatisServer); + SCMStateMachine mockScmStateMachine = mock(SCMStateMachine.class); + when(mockScmRatisServer.getSCMStateMachine()).thenReturn(mockScmStateMachine); + when((mockScmStateMachine.getIsStateMachineReady())).thenReturn(true); + scmContext = new SCMContext.Builder().setSCM(mockScmManager).build(); scmSafeModeManager = new SCMSafeModeManager(config, null, null, containerManager, serviceManager, queue, scmContext); scmSafeModeManager.start(); @@ -207,6 +221,7 @@ public void testSafeModeExitRule() throws Exception { testContainerThreshold(containers.subList(75, 100), 1.0); assertEquals(100, scmSafeModeManager.getSafeModeMetrics() .getCurrentContainersWithOneReplicaReportedCount().value()); + scmSafeModeManager.validateSafeModeExitRules(StateMachineReadyRule.class.getSimpleName()); GenericTestUtils.waitFor(() -> !scmSafeModeManager.getInSafeMode(), 100, 1000 * 5); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeSCMHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeSCMHA.java new file mode 100644 index 000000000000..bc8ea3a4c8ff --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeSCMHA.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.safemode; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.io.IOException; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ha.SCMStateMachine; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneVolume; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests safemode with SCM HA setup. + */ +public class TestSafeModeSCMHA { + private static final String OM_SERVICE_ID = "om-service-test1"; + private static final String SCM_SERVICE_ID = "scm-service-test1"; + private static final int NUM_OF_OMS = 1; + private static final int NUM_OF_SCMS = 3; + + private MiniOzoneHAClusterImpl cluster = null; + private OzoneConfiguration conf; + + @BeforeEach + public void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newHABuilder(conf) + .setOMServiceId(OM_SERVICE_ID) + .setSCMServiceId(SCM_SERVICE_ID).setNumOfOzoneManagers(NUM_OF_OMS) + .setNumOfStorageContainerManagers(NUM_OF_SCMS).setNumOfActiveSCMs(3) + .build(); + cluster.waitForClusterToBeReady(); + } + + @AfterEach + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFollowerRestartExitSafeMode() throws Exception { + try (OzoneClient client = cluster.newClient()) { + createTestData(client); + } + + StorageContainerManager followerScm = null; + StorageContainerManager leaderScm = null; + for (StorageContainerManager scm : cluster.getStorageContainerManagers()) { + if (!scm.checkLeader()) { + followerScm = scm; + } else { + leaderScm = scm; + } + } + + assertNotNull(followerScm); + assertNotNull(leaderScm); + // wait for sync between leader and follower + SCMStateMachine leaderScmStateMachine = leaderScm.getScmHAManager().getRatisServer().getSCMStateMachine(); + SCMStateMachine followerScmStateMachine = followerScm.getScmHAManager().getRatisServer().getSCMStateMachine(); + GenericTestUtils.waitFor(() -> leaderScmStateMachine.getLastAppliedTermIndex().getIndex() + == followerScmStateMachine.getLastAppliedTermIndex().getIndex(),1000, 60000); + + // wait for follower to exit safe mode + StorageContainerManager newFollowerScm = cluster.restartStorageContainerManager(followerScm, false); + GenericTestUtils.waitFor(() -> !newFollowerScm.isInSafeMode(), 1000, 60000); + } + + private void createTestData(OzoneClient client) throws IOException { + ObjectStore objectStore = client.getObjectStore(); + objectStore.createVolume("testvolume"); + OzoneVolume volume = objectStore.getVolume("testvolume"); + volume.createBucket("testbucket"); + + OzoneBucket bucket = volume.getBucket("testbucket"); + + TestDataUtil.createKey(bucket, "testkey123", + RatisReplicationConfig.getInstance(THREE), "Hello".getBytes(UTF_8)); + } +}