Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.hadoop.ozone.recon.scm;

import static java.util.Comparator.comparingLong;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLEANUP;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.CLOSE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.DELETE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.FINALIZE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent.QUASI_CLOSE;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
Expand Down Expand Up @@ -190,16 +194,16 @@ public void checkAndAddNewContainerBatch(
* targeted sync path use this method to avoid divergence in the count exposed
* to the Recon Node API.
*
* <p>If the container was recorded without a pipeline (null pipeline at
* {@code addNewContainer} time) the count decrement is safely skipped.
* <p>If the container was recorded without a pipeline, the count decrement is
* safely skipped.
*
* @param containerID container to advance from OPEN to CLOSING
* @param containerInfo already-fetched {@code ContainerInfo} for the container
* (avoids a redundant lookup inside this method)
* @param containerInfo already-fetched ContainerInfo for the container
* @throws IOException if the state update fails
* @throws InvalidStateTransitionException if the container is not in OPEN state
* @throws InvalidStateTransitionException if the state transition is invalid
*/
void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInfo)
void transitionOpenToClosing(ContainerID containerID,
ContainerInfo containerInfo)
throws IOException, InvalidStateTransitionException {
PipelineID pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
Expand All @@ -210,28 +214,102 @@ void transitionOpenToClosing(ContainerID containerID, ContainerInfo containerInf
pipelineToOpenContainer.put(pipelineID, curCnt - 1);
}
}
updateContainerState(containerID, FINALIZE); // OPEN → CLOSING
updateContainerState(containerID, FINALIZE);
}

/**
* Check if an OPEN container should move to CLOSING based on a healthy
* non-OPEN DN replica report.
* Check if Recon's container lifecycle state can be corrected based on a DN
* replica report and SCM's authoritative state.
*
* <p>OPEN uses the DN replica state as the signal: only a healthy non-OPEN
* replica can move Recon to CLOSING. For CLOSING, the DN report is only a
* wake-up signal and Recon queries SCM before advancing. For DELETED, Recon
* recovers only on a live terminal replica report and only when SCM still
* reports the container as live.
*
* @param containerID containerID to check
* @param state replica state reported by a DataNode
*/
private void checkContainerStateAndUpdate(ContainerID containerID,
ContainerReplicaProto.State replicaState)
ContainerReplicaProto.State state)
throws IOException, InvalidStateTransitionException {
ContainerInfo containerInfo = getContainer(containerID);
HddsProtos.LifeCycleState reconState = containerInfo.getState();

if (reconState != HddsProtos.LifeCycleState.OPEN
|| replicaState == ContainerReplicaProto.State.OPEN
|| !isHealthy(replicaState)) {
if (reconState == HddsProtos.LifeCycleState.OPEN) {
if (state.equals(ContainerReplicaProto.State.OPEN) || !isHealthy(state)) {
return;
}
LOG.info("Container {} has state OPEN, but given state is {}.",
containerID, state);
transitionOpenToClosing(containerID, containerInfo);
return;
}

if (reconState == HddsProtos.LifeCycleState.CLOSING) {
reconcileClosingContainerFromScm(containerID);
return;
}

if (reconState == HddsProtos.LifeCycleState.DELETED) {
recoverDeletedContainerFromScm(containerID, state);
}
}

private void reconcileClosingContainerFromScm(ContainerID containerID)
throws IOException, InvalidStateTransitionException {
HddsProtos.LifeCycleState scmState = getScmContainerState(containerID);

if (scmState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
updateContainerState(containerID, QUASI_CLOSE);
LOG.info("Container {} advanced to QUASI_CLOSED in Recon "
+ "based on SCM state.", containerID);
} else if (scmState == HddsProtos.LifeCycleState.CLOSED) {
updateContainerState(containerID, CLOSE);
LOG.info("Container {} advanced to CLOSED in Recon based on SCM state.",
containerID);
} else if (scmState == HddsProtos.LifeCycleState.DELETING
|| scmState == HddsProtos.LifeCycleState.DELETED) {
updateContainerState(containerID, CLOSE);
updateContainerState(containerID, DELETE);
if (scmState == HddsProtos.LifeCycleState.DELETED) {
updateContainerState(containerID, CLEANUP);
}
LOG.info("Container {} advanced to {} in Recon based on SCM state.",
containerID, scmState);
}
}

private void recoverDeletedContainerFromScm(ContainerID containerID,
ContainerReplicaProto.State replicaState) throws IOException {
if (replicaState != ContainerReplicaProto.State.CLOSED
&& replicaState != ContainerReplicaProto.State.QUASI_CLOSED) {
return;
}

ContainerWithPipeline scmContainer =
scmClient.getContainerWithPipeline(containerID.getId());
HddsProtos.LifeCycleState scmState =
scmContainer.getContainerInfo().getState();
if (scmState != HddsProtos.LifeCycleState.CLOSED
&& scmState != HddsProtos.LifeCycleState.QUASI_CLOSED) {
LOG.info("Container {} is DELETED in Recon and DN reported {}, "
+ "but SCM state is {}. Skipping recovery.",
containerID, replicaState, scmState);
return;
}

LOG.info("Container {} is OPEN in Recon but DN reports replica state {}. "
+ "Moving to CLOSING.", containerID, replicaState);
transitionOpenToClosing(containerID, containerInfo);
deleteContainer(containerID);
addNewContainer(scmContainer);
LOG.info("Recovered container {} from DELETED in Recon to {} based on "
+ "DN report {} and SCM state {}.",
containerID, scmState, replicaState, scmState);
}

private HddsProtos.LifeCycleState getScmContainerState(ContainerID containerID)
throws IOException {
return scmClient.getContainerWithPipeline(containerID.getId())
.getContainerInfo().getState();
}

private boolean isHealthy(ContainerReplicaProto.State replicaState) {
Expand All @@ -249,7 +327,8 @@ private boolean isHealthy(ContainerReplicaProto.State replicaState) {
* the container is still recorded in the state manager without pipeline
* tracking so that it is not permanently absent from Recon.
*
* @param containerWithPipeline containerInfo with pipeline info (pipeline may be null)
* @param containerWithPipeline containerInfo with pipeline info
* (pipeline may be null)
* @throws IOException on Error.
*/
public void addNewContainer(ContainerWithPipeline containerWithPipeline)
Expand All @@ -263,21 +342,23 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline)
PipelineID pipelineID = pipeline.getId();
// Check if the pipeline is present in Recon; add it if not.
if (reconPipelineManager.addPipeline(pipeline)) {
LOG.info("Added new pipeline {} to Recon pipeline metadata from SCM.", pipelineID);
LOG.info("Added new pipeline {} to Recon pipeline metadata from "
+ "SCM.", pipelineID);
}
getContainerStateManager().addContainer(containerInfo.getProtobuf());
pipelineManager.addContainerToPipeline(pipelineID, containerInfo.containerID());
pipelineManager.addContainerToPipeline(pipelineID,
containerInfo.containerID());
// Update open container count on all datanodes on this pipeline.
pipelineToOpenContainer.put(pipelineID,
pipelineToOpenContainer.getOrDefault(pipelineID, 0) + 1);
LOG.info("Successfully added OPEN container {} with pipeline {} to Recon.",
containerInfo.containerID(), pipelineID);
LOG.info("Successfully added OPEN container {} with pipeline {} to "
+ "Recon.", containerInfo.containerID(), pipelineID);
} else {
// Pipeline not available (cleaned up in SCM). Record the container
// without pipeline tracking so it is not permanently absent from Recon.
getContainerStateManager().addContainer(containerInfo.getProtobuf());
LOG.warn("Added OPEN container {} to Recon without pipeline "
+ "(pipeline was null likely cleaned up on SCM side). "
+ "(pipeline was null - likely cleaned up on SCM side). "
+ "Pipeline tracking unavailable for this container.",
containerInfo.containerID());
}
Expand All @@ -287,7 +368,8 @@ public void addNewContainer(ContainerWithPipeline containerWithPipeline)
containerInfo.containerID(), containerInfo.getState());
}
} catch (IOException ex) {
LOG.info("Exception while adding container {}.", containerInfo.containerID(), ex);
LOG.info("Exception while adding container {}.",
containerInfo.containerID(), ex);
PipelineID pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
pipelineManager.removeContainerFromPipeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.util.LinkedList;
Expand Down Expand Up @@ -181,6 +185,173 @@ public void testUpdateContainerStateFromOpen() throws Exception {
datanodeDetails);
assertEquals(CLOSING,
getContainerManager().getContainer(containerID).getState());
assertFalse(getContainerManager().getPipelineToOpenContainer()
.containsKey(containerWithPipeline.getPipeline().getId()));
}

@Test
public void testOpenContainerNotUpdatedFromUnhealthyReplicaReports()
throws Exception {
for (State replicaState : new State[] {
State.UNHEALTHY, State.INVALID, State.DELETED}) {
ContainerWithPipeline containerWithPipeline =
getTestContainer(120L + replicaState.ordinal(), LifeCycleState.OPEN);
ContainerID containerID =
containerWithPipeline.getContainerInfo().containerID();
getContainerManager().addNewContainer(containerWithPipeline);

getContainerManager().checkAndAddNewContainer(containerID, replicaState,
randomDatanodeDetails());

assertEquals(LifeCycleState.OPEN,
getContainerManager().getContainer(containerID).getState());
assertTrue(getContainerManager().getPipelineToOpenContainer()
.containsKey(containerWithPipeline.getPipeline().getId()));
}
}

@Test
public void testClosingContainerAdvancesToQuasiClosedFromScm()
throws Exception {
assertClosingContainerAdvancesToScmState(101L, QUASI_CLOSED,
QUASI_CLOSED);
}

@Test
public void testClosingContainerAdvancesToClosedFromScm() throws Exception {
assertClosingContainerAdvancesToScmState(102L, CLOSED, CLOSED);
}

@Test
public void testClosingContainerAdvancesToDeletingFromScm()
throws Exception {
assertClosingContainerAdvancesToScmState(103L, DELETING, DELETING);
}

@Test
public void testClosingContainerAdvancesToDeletedFromScm() throws Exception {
assertClosingContainerAdvancesToScmState(104L, DELETED, DELETED);
}

@Test
public void testClosingContainerReconcilesFromScmEvenForUnhealthyReplica()
throws Exception {
ContainerWithPipeline closingContainer = getTestContainer(105L, CLOSING);
ContainerID containerID = closingContainer.getContainerInfo().containerID();
getContainerManager().addNewContainer(closingContainer);

when(getContainerManager().getScmClient()
.getContainerWithPipeline(containerID.getId()))
.thenReturn(getTestContainer(105L, CLOSED));

getContainerManager().checkAndAddNewContainer(containerID, State.UNHEALTHY,
randomDatanodeDetails());

assertEquals(CLOSED,
getContainerManager().getContainer(containerID).getState());
}

@Test
public void testRecoverDeletedContainerToClosedFromDnReport()
throws Exception {
assertDeletedContainerRecoversFromScm(106L, State.CLOSED, CLOSED);
}

@Test
public void testRecoverDeletedContainerToQuasiClosedFromDnReport()
throws Exception {
assertDeletedContainerRecoversFromScm(107L, State.QUASI_CLOSED,
QUASI_CLOSED);
}

@Test
public void testDeletedContainerNotRecoveredFromOpenReplicaReport()
throws Exception {
ContainerWithPipeline deletedContainer = getTestContainer(108L, DELETED);
ContainerID containerID = deletedContainer.getContainerInfo().containerID();
getContainerManager().addNewContainer(deletedContainer);

getContainerManager().checkAndAddNewContainer(containerID, State.OPEN,
randomDatanodeDetails());

assertEquals(DELETED,
getContainerManager().getContainer(containerID).getState());
}

@Test
public void testDeletedContainerNotRecoveredWhenScmIsNotLive()
throws Exception {
assertDeletedContainerNotRecoveredFromScm(109L, State.CLOSED,
LifeCycleState.OPEN);
assertDeletedContainerNotRecoveredFromScm(110L, State.CLOSED, DELETING);
assertDeletedContainerNotRecoveredFromScm(111L, State.QUASI_CLOSED,
DELETED);
}

@Test
public void testOtherReconStatesDoNotInferDnReplicaTransition()
throws Exception {
ContainerWithPipeline closedContainer = getTestContainer(112L, CLOSED);
ContainerID containerID = closedContainer.getContainerInfo().containerID();
getContainerManager().addNewContainer(closedContainer);

getContainerManager().checkAndAddNewContainer(containerID,
State.QUASI_CLOSED, randomDatanodeDetails());

assertEquals(CLOSED,
getContainerManager().getContainer(containerID).getState());
}

private void assertClosingContainerAdvancesToScmState(long id,
LifeCycleState scmState, LifeCycleState expectedReconState)
throws Exception {
ContainerWithPipeline closingContainer = getTestContainer(id, CLOSING);
ContainerID containerID = closingContainer.getContainerInfo().containerID();
getContainerManager().addNewContainer(closingContainer);

when(getContainerManager().getScmClient()
.getContainerWithPipeline(containerID.getId()))
.thenReturn(getTestContainer(id, scmState));

getContainerManager().checkAndAddNewContainer(containerID, State.CLOSED,
randomDatanodeDetails());

assertEquals(expectedReconState,
getContainerManager().getContainer(containerID).getState());
}

private void assertDeletedContainerRecoversFromScm(long id,
State replicaState, LifeCycleState scmState) throws Exception {
ContainerWithPipeline deletedContainer = getTestContainer(id, DELETED);
ContainerID containerID = deletedContainer.getContainerInfo().containerID();
getContainerManager().addNewContainer(deletedContainer);

when(getContainerManager().getScmClient()
.getContainerWithPipeline(containerID.getId()))
.thenReturn(getTestContainer(id, scmState));

getContainerManager().checkAndAddNewContainer(containerID, replicaState,
randomDatanodeDetails());

assertEquals(scmState,
getContainerManager().getContainer(containerID).getState());
}

private void assertDeletedContainerNotRecoveredFromScm(long id,
State replicaState, LifeCycleState scmState) throws Exception {
ContainerWithPipeline deletedContainer = getTestContainer(id, DELETED);
ContainerID containerID = deletedContainer.getContainerInfo().containerID();
getContainerManager().addNewContainer(deletedContainer);

when(getContainerManager().getScmClient()
.getContainerWithPipeline(containerID.getId()))
.thenReturn(getTestContainer(id, scmState));

getContainerManager().checkAndAddNewContainer(containerID, replicaState,
randomDatanodeDetails());

assertEquals(DELETED,
getContainerManager().getContainer(containerID).getState());
}

ContainerInfo newContainerInfo(long containerId, Pipeline pipeline) {
Expand Down