diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 60ff77aa03c52..07b7364d0042d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -56,7 +56,10 @@ import org.apache.iotdb.session.Session; import org.apache.iotdb.session.pool.SessionPool; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import java.io.File; @@ -1119,7 +1122,7 @@ public void shutdownForciblyAllDataNodes() { @Override public void ensureNodeStatus( - final List nodes, final List targetStatus) + final List nodes, final List targetStatusList) throws IllegalStateException { Throwable lastException = null; for (int i = 0; i < retryCount; i++) { @@ -1147,7 +1150,9 @@ public void ensureNodeStatus( + node.getClientRpcEndPoint().getPort(), node.getDataNodeId())); for (int j = 0; j < nodes.size(); j++) { - final String endpoint = nodes.get(j).getIpAndPortString(); + BaseNodeWrapper nodeWrapper = nodes.get(j); + String ipAndPortString = nodeWrapper.getIpAndPortString(); + final String endpoint = ipAndPortString; if (!nodeIds.containsKey(endpoint)) { // Node not exist // Notice: Never modify this line, since the NodeLocation might be modified in IT @@ -1155,12 +1160,27 @@ public void ensureNodeStatus( continue; } final String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint)); - if (!targetStatus.get(j).getStatus().equals(status)) { + final NodeStatus targetStatus = targetStatusList.get(j); + if (!targetStatus.getStatus().equals(status)) { // Error status errorMessages.add( String.format( "Node %s is in status %s, but expected %s", - endpoint, status, targetStatus.get(j))); + endpoint, status, targetStatusList.get(j))); + continue; + } + if (nodeWrapper instanceof DataNodeWrapper && targetStatus.equals(NodeStatus.Running)) { + final String[] ipPort = nodeWrapper.getIpAndPortString().split(":"); + final String ip = ipPort[0]; + final int port = Integer.parseInt(ipPort[1]); + try (TSocket socket = new TSocket(new TConfiguration(), ip, port, 1000)) { + socket.open(); + } catch (final TTransportException e) { + errorMessages.add( + String.format( + "DataNode %s is not reachable: %s", + nodeWrapper.getIpAndPortString(), e.getMessage())); + } } } if (errorMessages.isEmpty()) {