Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/pipe-it.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -188,6 +189,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \
-Dfailsafe.includesFile="$RUNNER_TEMP/it-shard.txt" \
Expand Down Expand Up @@ -295,6 +297,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-Dfailsafe.includesFile="$RUNNER_TEMP/it-shard.txt" \
Expand Down Expand Up @@ -402,6 +405,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-Dfailsafe.includesFile="$RUNNER_TEMP/it-shard.txt" \
Expand Down Expand Up @@ -491,6 +495,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -577,6 +582,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -663,6 +669,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -749,6 +756,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \
-pl integration-test \
Expand Down Expand Up @@ -852,6 +860,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \
-Dfailsafe.includesFile="$RUNNER_TEMP/it-shard.txt" \
Expand Down Expand Up @@ -958,6 +967,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \
-Dfailsafe.includesFile="$RUNNER_TEMP/it-shard.txt" \
Expand Down Expand Up @@ -1047,6 +1057,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
-DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }},${{ matrix.cluster3 }} \
-pl integration-test \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static BaseEnv getEnv(final int index) throws IndexOutOfBoundsException {
/** Create several environments according to the specific number. */
public static void createEnv(final int num) {
// Not judge EnvType for individual test convenience
envList.clear();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < num; ++i) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@

public abstract class AbstractEnv implements BaseEnv {
private static final Logger logger = IoTDBTestLogger.logger;
private static final int DEFAULT_CLUSTER_READY_RETRY_COUNT = 30;
private static final String CLUSTER_READY_RETRY_COUNT_PROPERTY =
"integrationTest.clusterReadyRetryCount";

private final Random rand = new Random();
protected List<ConfigNodeWrapper> configNodeWrapperList = Collections.emptyList();
Expand All @@ -105,7 +108,7 @@ public abstract class AbstractEnv implements BaseEnv {
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
protected int retryCount = 30;
protected int retryCount = getDefaultClusterReadyRetryCount();
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
private List<String> configNodeKillPoints = new ArrayList<>();
private List<String> dataNodeKillPoints = new ArrayList<>();
Expand All @@ -128,6 +131,12 @@ protected AbstractEnv(final long startTime) {
this.clusterConfig = new MppClusterConfig();
}

private static int getDefaultClusterReadyRetryCount() {
final int configuredRetryCount =
Integer.getInteger(CLUSTER_READY_RETRY_COUNT_PROPERTY, DEFAULT_CLUSTER_READY_RETRY_COUNT);
return configuredRetryCount > 0 ? configuredRetryCount : DEFAULT_CLUSTER_READY_RETRY_COUNT;
}

@Override
public ClusterConfig getConfig() {
return clusterConfig;
Expand Down Expand Up @@ -421,12 +430,14 @@ public void checkClusterStatus(
processStatusMap.clear();

showClusterResp = client.showCluster();
showClusterStatus = showClusterResp.getStatus();
actualNodeSize = showClusterResp.getNodeStatusSize();
lastNodeStatus = showClusterResp.getNodeStatus();

// Check resp status
if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
passed = false;
showClusterPassed = false;
showClusterStatus = showClusterResp.getStatus();
}

// Check the number of nodes
Expand All @@ -436,15 +447,13 @@ public void checkClusterStatus(
+ aiNodeWrapperList.size()) {
passed = false;
nodeSizePassed = false;
actualNodeSize = showClusterResp.getNodeStatusSize();
}

// Check the status of nodes
if (passed) {
passed = nodeStatusCheck.test(showClusterResp.getNodeStatus());
if (!passed) {
nodeStatusPassed = false;
lastNodeStatus = showClusterResp.getNodeStatus();
}
}

Expand Down Expand Up @@ -530,8 +539,93 @@ public void checkClusterStatus(
}
}

dumpTestJVMSnapshotQuietly("cluster status check failed");
throw new AssertionError(
String.format("After %d times retry, the cluster can't work!", retryCount));
buildClusterStatusFailureMessage(
showClusterPassed,
nodeSizePassed,
nodeStatusPassed,
processStatusPassed,
showClusterStatus,
actualNodeSize,
lastNodeStatus,
processStatusMap,
lastException));
}

private String buildClusterStatusFailureMessage(
final boolean showClusterPassed,
final boolean nodeSizePassed,
final boolean nodeStatusPassed,
final boolean processStatusPassed,
final TSStatus showClusterStatus,
final int actualNodeSize,
final Map<Integer, String> lastNodeStatus,
final Map<AbstractNodeWrapper, Integer> processStatusMap,
final Exception lastException) {
final StringBuilder builder =
new StringBuilder(
String.format("After %d times retry, the cluster status check failed", retryCount));
builder
.append(": showClusterPassed=")
.append(showClusterPassed)
.append(", nodeSizePassed=")
.append(nodeSizePassed)
.append(", nodeStatusPassed=")
.append(nodeStatusPassed)
.append(", processStatusPassed=")
.append(processStatusPassed)
.append(", expectedNodeSize=")
.append(
configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size())
.append(", actualNodeSize=")
.append(actualNodeSize);
if (showClusterStatus != null) {
builder.append(", showClusterStatus=").append(showClusterStatus);
}
if (lastNodeStatus != null) {
builder.append(", lastNodeStatus=").append(lastNodeStatus);
}
if (!processStatusMap.isEmpty()) {
builder.append(", processStatus=").append(formatProcessStatus(processStatusMap));
}
if (lastException != null) {
builder
.append(", lastException=")
.append(lastException.getClass().getName())
.append(": ")
.append(lastException.getMessage());
}
builder.append(", logDirs=").append(getClusterLogDirs());
return builder.toString();
}

private Map<String, Integer> formatProcessStatus(
final Map<AbstractNodeWrapper, Integer> processStatusMap) {
final Map<String, Integer> result = new LinkedHashMap<>();
processStatusMap.forEach(
(nodeWrapper, statusCode) -> result.put(nodeWrapper.getId(), statusCode));
return result;
}

private List<String> getClusterLogDirs() {
final List<AbstractNodeWrapper> allNodeWrappers = new ArrayList<>();
allNodeWrappers.addAll(configNodeWrapperList);
allNodeWrappers.addAll(dataNodeWrapperList);
allNodeWrappers.addAll(aiNodeWrapperList);
return allNodeWrappers.stream()
.map(AbstractNodeWrapper::getLogDirPath)
.distinct()
.collect(Collectors.toList());
}

private void dumpTestJVMSnapshotQuietly(final String reason) {
try {
logger.info("Dumping test JVM snapshots because {}.", reason);
dumpTestJVMSnapshot();
} catch (final Exception e) {
logger.warn("Failed to dump test JVM snapshots after {}", reason, e);
}
}

private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatusMap) {
Expand Down Expand Up @@ -976,6 +1070,7 @@ protected void testJDBCConnection() {
.collect(Collectors.toList());
final RequestDelegate<Void> testDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
final Map<String, String> lastConnectionFailures = Collections.synchronizedMap(new HashMap<>());
for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
Expand All @@ -994,6 +1089,8 @@ protected void testJDBCConnection() {
return null;
} catch (final Exception e) {
lastException = e;
lastConnectionFailures.put(
dataNodeEndpoint, e.getClass().getName() + ": " + e.getMessage());
TimeUnit.SECONDS.sleep(1L);
}
}
Expand All @@ -1007,8 +1104,11 @@ protected void testJDBCConnection() {
testDelegate.requestAll();
} catch (final Exception e) {
logger.error("exception in test Cluster with RPC, message: {}", e.getMessage(), e);
dumpTestJVMSnapshotQuietly("JDBC connection check failed");
throw new AssertionError(
String.format("After %d times retry, the cluster can't work!", retryCount));
String.format(
"After %d times retry, JDBC connections to DataNodes are not ready. endpoints=%s, lastConnectionFailures=%s, logDirs=%s",
retryCount, endpoints, lastConnectionFailures, getClusterLogDirs()));
}
}

Expand Down
Loading