Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,23 @@ public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTop

assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));

TestUtils.waitForCondition(
() -> streams.allLocalTasksRunningForTopology(TOPOLOGY_1),
() -> "Not all local tasks for topology " + TOPOLOGY_1
+ " are initialized and in RUNNING state before remove. "
+ "streamsState=" + streams.state()
+ ", localThreads=" + streams.metadataForLocalThreads()
);
streams.removeNamedTopology(TOPOLOGY_1, true).all().get();

TestUtils.waitForCondition(
() -> !streams.hasAnyLocalTaskForTopology(TOPOLOGY_1),
() -> "Topology " + TOPOLOGY_1
+ " still has local tasks after remove. "
+ "streamsState=" + streams.state()
+ ", localThreads=" + streams.metadataForLocalThreads()
);
streams.cleanUpNamedTopology(TOPOLOGY_1);

CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2078,6 +2078,17 @@ TaskManager taskManager() {
return taskManager;
}

// VisibleForTesting
public boolean hasAnyTaskForTopology(final String topologyName) {
return taskManager.hasAnyTaskForTopology(topologyName);
}


// VisibleForTesting
public boolean allTasksRunningForTopology(final String topologyName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return taskManager.allTasksRunningForTopology(topologyName);
}

int currentNumIterations() {
return numIterations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,38 @@ Map<TaskId, Task> allTasks() {
return ret;
}

// VisibleForTesting
boolean hasAnyTaskForTopology(final String topologyName) {
return allTasks().keySet().stream().anyMatch(taskId -> topologyName.equals(taskId.topologyName()));
}

/**
* Returns {@code true} if every task for the given topology is initialized and in
* {@link State#RUNNING}.
*
* <p>If there are no tasks for the given topology, this method returns {@code true}.
*
* @param topologyName the topology name
* @return {@code true} if all matching tasks are initialized and in {@link State#RUNNING},
* or if there are no matching tasks; {@code false} otherwise
*/
// VisibleForTesting
boolean allTasksRunningForTopology(final String topologyName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

final Map<TaskId, Task> allTasks = allTasks();
final Set<TaskId> initializedTaskIds = tasks.allInitializedTaskIds();

for (final Map.Entry<TaskId, Task> entry : allTasks.entrySet()) {
final TaskId taskId = entry.getKey();
if (topologyName.equals(taskId.topologyName())) {
if (!initializedTaskIds.contains(taskId) || entry.getValue().state() != State.RUNNING) {
return false;
}
}
}

return true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are no tasks, this allTasksRunning would still return true, which is ok in the current test usage, but I wonder if it may be confusing. Should we add a java doc to call out the behaviour?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right!
I added java doc!

}

/**
* Returns tasks owned by the stream thread.
* This does not return any tasks currently owned by the state updater.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,18 @@ public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLagsForTopology(
.collect(Collectors.toList())));
return allLocalStorePartitionLags(allTopologyTasks);
}

// VisibleForTesting
public boolean hasAnyLocalTaskForTopology(final String topologyName) {
synchronized (threads) {
return threads.stream().anyMatch(thread -> thread.hasAnyTaskForTopology(topologyName));
}
}

// VisibleForTesting
public boolean allLocalTasksRunningForTopology(final String topologyName) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To align with (isNamedTopologyPaused(...)) and better reflect the logic, would areAllLocalTasksRunningForTopology be a better name here?

synchronized (threads) {
return threads.stream().allMatch(thread -> thread.allTasksRunningForTopology(topologyName));
}
}
}
Loading