diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java index e42e0ccac90..c362f45cbf1 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java @@ -18,7 +18,11 @@ package org.apache.hadoop.ozone.recon.scm; import static java.util.Comparator.comparingLong; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -190,16 +194,16 @@ public void checkAndAddNewContainerBatch( * targeted sync path use this method to avoid divergence in the count exposed * to the Recon Node API. * - *

If the container was recorded without a pipeline (null pipeline at - * {@code addNewContainer} time) the count decrement is safely skipped. + *

If the container was recorded without a pipeline, the count decrement is + * safely skipped. * * @param containerID container to advance from OPEN to CLOSING - * @param containerInfo already-fetched {@code ContainerInfo} for the container - * (avoids a redundant lookup inside this method) + * @param containerInfo already-fetched ContainerInfo for the container * @throws IOException if the state update fails - * @throws InvalidStateTransitionException if the container is not in OPEN state + * @throws InvalidStateTransitionException if the state transition is invalid */ - void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInfo) + void transitionOpenToClosing(ContainerID containerID, + ContainerInfo containerInfo) throws IOException, InvalidStateTransitionException { PipelineID pipelineID = containerInfo.getPipelineID(); if (pipelineID != null) { @@ -210,28 +214,102 @@ void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInf pipelineToOpenContainer.put(pipelineID, curCnt - 1); } } - updateContainerState(containerID, FINALIZE); // OPEN → CLOSING + updateContainerState(containerID, FINALIZE); } /** - * Check if an OPEN container should move to CLOSING based on a healthy - * non-OPEN DN replica report. + * Check if Recon's container lifecycle state can be corrected based on a DN + * replica report and SCM's authoritative state. + * + *

OPEN uses the DN replica state as the signal: only a healthy non-OPEN + * replica can move Recon to CLOSING. For CLOSING, the DN report is only a + * wake-up signal and Recon queries SCM before advancing. For DELETED, Recon + * recovers only on a live terminal replica report and only when SCM still + * reports the container as live. + * + * @param containerID containerID to check + * @param state replica state reported by a DataNode */ private void checkContainerStateAndUpdate(ContainerID containerID, - ContainerReplicaProto.State replicaState) + ContainerReplicaProto.State state) throws IOException, InvalidStateTransitionException { ContainerInfo containerInfo = getContainer(containerID); HddsProtos.LifeCycleState reconState = containerInfo.getState(); - if (reconState != HddsProtos.LifeCycleState.OPEN - || replicaState == ContainerReplicaProto.State.OPEN - || !isHealthy(replicaState)) { + if (reconState == HddsProtos.LifeCycleState.OPEN) { + if (state.equals(ContainerReplicaProto.State.OPEN) || !isHealthy(state)) { + return; + } + LOG.info("Container {} has state OPEN, but given state is {}.", + containerID, state); + transitionOpenToClosing(containerID, containerInfo); + return; + } + + if (reconState == HddsProtos.LifeCycleState.CLOSING) { + reconcileClosingContainerFromScm(containerID); + return; + } + + if (reconState == HddsProtos.LifeCycleState.DELETED) { + recoverDeletedContainerFromScm(containerID, state); + } + } + + private void reconcileClosingContainerFromScm(ContainerID containerID) + throws IOException, InvalidStateTransitionException { + HddsProtos.LifeCycleState scmState = getScmContainerState(containerID); + + if (scmState == HddsProtos.LifeCycleState.QUASI_CLOSED) { + updateContainerState(containerID, QUASI_CLOSE); + LOG.info("Container {} advanced to QUASI_CLOSED in Recon " + + "based on SCM state.", containerID); + } else if (scmState == HddsProtos.LifeCycleState.CLOSED) { + updateContainerState(containerID, CLOSE); + LOG.info("Container {} advanced to CLOSED in Recon based on SCM state.", + containerID); + } else if (scmState == HddsProtos.LifeCycleState.DELETING + || scmState == HddsProtos.LifeCycleState.DELETED) { + updateContainerState(containerID, CLOSE); + updateContainerState(containerID, DELETE); + if (scmState == HddsProtos.LifeCycleState.DELETED) { + updateContainerState(containerID, CLEANUP); + } + LOG.info("Container {} advanced to {} in Recon based on SCM state.", + containerID, scmState); + } + } + + private void recoverDeletedContainerFromScm(ContainerID containerID, + ContainerReplicaProto.State replicaState) throws IOException { + if (replicaState != ContainerReplicaProto.State.CLOSED + && replicaState != ContainerReplicaProto.State.QUASI_CLOSED) { + return; + } + + ContainerWithPipeline scmContainer = + scmClient.getContainerWithPipeline(containerID.getId()); + HddsProtos.LifeCycleState scmState = + scmContainer.getContainerInfo().getState(); + if (scmState != HddsProtos.LifeCycleState.CLOSED + && scmState != HddsProtos.LifeCycleState.QUASI_CLOSED) { + LOG.info("Container {} is DELETED in Recon and DN reported {}, " + + "but SCM state is {}. Skipping recovery.", + containerID, replicaState, scmState); return; } - LOG.info("Container {} is OPEN in Recon but DN reports replica state {}. " - + "Moving to CLOSING.", containerID, replicaState); - transitionOpenToClosing(containerID, containerInfo); + deleteContainer(containerID); + addNewContainer(scmContainer); + LOG.info("Recovered container {} from DELETED in Recon to {} based on " + + "DN report {} and SCM state {}.", + containerID, scmState, replicaState, scmState); + } + + private HddsProtos.LifeCycleState getScmContainerState(ContainerID containerID) + throws IOException { + return scmClient.getContainerWithPipeline(containerID.getId()) + .getContainerInfo().getState(); } private boolean isHealthy(ContainerReplicaProto.State replicaState) { @@ -249,7 +327,8 @@ private boolean isHealthy(ContainerReplicaProto.State replicaState) { * the container is still recorded in the state manager without pipeline * tracking so that it is not permanently absent from Recon. * - * @param containerWithPipeline containerInfo with pipeline info (pipeline may be null) + * @param containerWithPipeline containerInfo with pipeline info + * (pipeline may be null) * @throws IOException on Error. */ public void addNewContainer(ContainerWithPipeline containerWithPipeline) @@ -263,21 +342,23 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline) PipelineID pipelineID = pipeline.getId(); // Check if the pipeline is present in Recon; add it if not. if (reconPipelineManager.addPipeline(pipeline)) { - LOG.info("Added new pipeline {} to Recon pipeline metadata from SCM.", pipelineID); + LOG.info("Added new pipeline {} to Recon pipeline metadata from " + + "SCM.", pipelineID); } getContainerStateManager().addContainer(containerInfo.getProtobuf()); - pipelineManager.addContainerToPipeline(pipelineID, containerInfo.containerID()); + pipelineManager.addContainerToPipeline(pipelineID, + containerInfo.containerID()); // Update open container count on all datanodes on this pipeline. pipelineToOpenContainer.put(pipelineID, pipelineToOpenContainer.getOrDefault(pipelineID, 0) + 1); - LOG.info("Successfully added OPEN container {} with pipeline {} to Recon.", - containerInfo.containerID(), pipelineID); + LOG.info("Successfully added OPEN container {} with pipeline {} to " + + "Recon.", containerInfo.containerID(), pipelineID); } else { // Pipeline not available (cleaned up in SCM). Record the container // without pipeline tracking so it is not permanently absent from Recon. getContainerStateManager().addContainer(containerInfo.getProtobuf()); LOG.warn("Added OPEN container {} to Recon without pipeline " - + "(pipeline was null — likely cleaned up on SCM side). " + + "(pipeline was null - likely cleaned up on SCM side). " + "Pipeline tracking unavailable for this container.", containerInfo.containerID()); } @@ -287,7 +368,8 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline) containerInfo.containerID(), containerInfo.getState()); } } catch (IOException ex) { - LOG.info("Exception while adding container {}.", containerInfo.containerID(), ex); + LOG.info("Exception while adding container {}.", + containerInfo.containerID(), ex); PipelineID pipelineID = containerInfo.getPipelineID(); if (pipelineID != null) { pipelineManager.removeContainerFromPipeline( diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java index 1d871b9974b..4a0f1ffbd55 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconContainerManager.java @@ -20,12 +20,16 @@ import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN; import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.LinkedList; @@ -181,6 +185,173 @@ public void testUpdateContainerStateFromOpen() throws Exception { datanodeDetails); assertEquals(CLOSING, getContainerManager().getContainer(containerID).getState()); + assertFalse(getContainerManager().getPipelineToOpenContainer() + .containsKey(containerWithPipeline.getPipeline().getId())); + } + + @Test + public void testOpenContainerNotUpdatedFromUnhealthyReplicaReports() + throws Exception { + for (State replicaState : new State[] { + State.UNHEALTHY, State.INVALID, State.DELETED}) { + ContainerWithPipeline containerWithPipeline = + getTestContainer(120L + replicaState.ordinal(), LifeCycleState.OPEN); + ContainerID containerID = + containerWithPipeline.getContainerInfo().containerID(); + getContainerManager().addNewContainer(containerWithPipeline); + + getContainerManager().checkAndAddNewContainer(containerID, replicaState, + randomDatanodeDetails()); + + assertEquals(LifeCycleState.OPEN, + getContainerManager().getContainer(containerID).getState()); + assertTrue(getContainerManager().getPipelineToOpenContainer() + .containsKey(containerWithPipeline.getPipeline().getId())); + } + } + + @Test + public void testClosingContainerAdvancesToQuasiClosedFromScm() + throws Exception { + assertClosingContainerAdvancesToScmState(101L, QUASI_CLOSED, + QUASI_CLOSED); + } + + @Test + public void testClosingContainerAdvancesToClosedFromScm() throws Exception { + assertClosingContainerAdvancesToScmState(102L, CLOSED, CLOSED); + } + + @Test + public void testClosingContainerAdvancesToDeletingFromScm() + throws Exception { + assertClosingContainerAdvancesToScmState(103L, DELETING, DELETING); + } + + @Test + public void testClosingContainerAdvancesToDeletedFromScm() throws Exception { + assertClosingContainerAdvancesToScmState(104L, DELETED, DELETED); + } + + @Test + public void testClosingContainerReconcilesFromScmEvenForUnhealthyReplica() + throws Exception { + ContainerWithPipeline closingContainer = getTestContainer(105L, CLOSING); + ContainerID containerID = closingContainer.getContainerInfo().containerID(); + getContainerManager().addNewContainer(closingContainer); + + when(getContainerManager().getScmClient() + .getContainerWithPipeline(containerID.getId())) + .thenReturn(getTestContainer(105L, CLOSED)); + + getContainerManager().checkAndAddNewContainer(containerID, State.UNHEALTHY, + randomDatanodeDetails()); + + assertEquals(CLOSED, + getContainerManager().getContainer(containerID).getState()); + } + + @Test + public void testRecoverDeletedContainerToClosedFromDnReport() + throws Exception { + assertDeletedContainerRecoversFromScm(106L, State.CLOSED, CLOSED); + } + + @Test + public void testRecoverDeletedContainerToQuasiClosedFromDnReport() + throws Exception { + assertDeletedContainerRecoversFromScm(107L, State.QUASI_CLOSED, + QUASI_CLOSED); + } + + @Test + public void testDeletedContainerNotRecoveredFromOpenReplicaReport() + throws Exception { + ContainerWithPipeline deletedContainer = getTestContainer(108L, DELETED); + ContainerID containerID = deletedContainer.getContainerInfo().containerID(); + getContainerManager().addNewContainer(deletedContainer); + + getContainerManager().checkAndAddNewContainer(containerID, State.OPEN, + randomDatanodeDetails()); + + assertEquals(DELETED, + getContainerManager().getContainer(containerID).getState()); + } + + @Test + public void testDeletedContainerNotRecoveredWhenScmIsNotLive() + throws Exception { + assertDeletedContainerNotRecoveredFromScm(109L, State.CLOSED, + LifeCycleState.OPEN); + assertDeletedContainerNotRecoveredFromScm(110L, State.CLOSED, DELETING); + assertDeletedContainerNotRecoveredFromScm(111L, State.QUASI_CLOSED, + DELETED); + } + + @Test + public void testOtherReconStatesDoNotInferDnReplicaTransition() + throws Exception { + ContainerWithPipeline closedContainer = getTestContainer(112L, CLOSED); + ContainerID containerID = closedContainer.getContainerInfo().containerID(); + getContainerManager().addNewContainer(closedContainer); + + getContainerManager().checkAndAddNewContainer(containerID, + State.QUASI_CLOSED, randomDatanodeDetails()); + + assertEquals(CLOSED, + getContainerManager().getContainer(containerID).getState()); + } + + private void assertClosingContainerAdvancesToScmState(long id, + LifeCycleState scmState, LifeCycleState expectedReconState) + throws Exception { + ContainerWithPipeline closingContainer = getTestContainer(id, CLOSING); + ContainerID containerID = closingContainer.getContainerInfo().containerID(); + getContainerManager().addNewContainer(closingContainer); + + when(getContainerManager().getScmClient() + .getContainerWithPipeline(containerID.getId())) + .thenReturn(getTestContainer(id, scmState)); + + getContainerManager().checkAndAddNewContainer(containerID, State.CLOSED, + randomDatanodeDetails()); + + assertEquals(expectedReconState, + getContainerManager().getContainer(containerID).getState()); + } + + private void assertDeletedContainerRecoversFromScm(long id, + State replicaState, LifeCycleState scmState) throws Exception { + ContainerWithPipeline deletedContainer = getTestContainer(id, DELETED); + ContainerID containerID = deletedContainer.getContainerInfo().containerID(); + getContainerManager().addNewContainer(deletedContainer); + + when(getContainerManager().getScmClient() + .getContainerWithPipeline(containerID.getId())) + .thenReturn(getTestContainer(id, scmState)); + + getContainerManager().checkAndAddNewContainer(containerID, replicaState, + randomDatanodeDetails()); + + assertEquals(scmState, + getContainerManager().getContainer(containerID).getState()); + } + + private void assertDeletedContainerNotRecoveredFromScm(long id, + State replicaState, LifeCycleState scmState) throws Exception { + ContainerWithPipeline deletedContainer = getTestContainer(id, DELETED); + ContainerID containerID = deletedContainer.getContainerInfo().containerID(); + getContainerManager().addNewContainer(deletedContainer); + + when(getContainerManager().getScmClient() + .getContainerWithPipeline(containerID.getId())) + .thenReturn(getTestContainer(id, scmState)); + + getContainerManager().checkAndAddNewContainer(containerID, replicaState, + randomDatanodeDetails()); + + assertEquals(DELETED, + getContainerManager().getContainer(containerID).getState()); } ContainerInfo newContainerInfo(long containerId, Pipeline pipeline) {