-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-20372: Fix flaky test in NamedTopologyIntegrationTest #21968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1631,6 +1631,38 @@ Map<TaskId, Task> allTasks() { | |
| return ret; | ||
| } | ||
|
|
||
| // VisibleForTesting | ||
| boolean hasAnyTaskForTopology(final String topologyName) { | ||
| return allTasks().keySet().stream().anyMatch(taskId -> topologyName.equals(taskId.topologyName())); | ||
| } | ||
|
|
||
| /** | ||
| * Returns {@code true} if every task for the given topology is initialized and in | ||
| * {@link State#RUNNING}. | ||
| * | ||
| * <p>If there are no tasks for the given topology, this method returns {@code true}. | ||
| * | ||
| * @param topologyName the topology name | ||
| * @return {@code true} if all matching tasks are initialized and in {@link State#RUNNING}, | ||
| * or if there are no matching tasks; {@code false} otherwise | ||
| */ | ||
| // VisibleForTesting | ||
| boolean allTasksRunningForTopology(final String topologyName) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| final Map<TaskId, Task> allTasks = allTasks(); | ||
| final Set<TaskId> initializedTaskIds = tasks.allInitializedTaskIds(); | ||
|
|
||
| for (final Map.Entry<TaskId, Task> entry : allTasks.entrySet()) { | ||
| final TaskId taskId = entry.getKey(); | ||
| if (topologyName.equals(taskId.topologyName())) { | ||
| if (!initializedTaskIds.contains(taskId) || entry.getValue().state() != State.RUNNING) { | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if there are no tasks, this allTasksRunning would still return true, which is ok in the current test usage, but I wonder if it may be confusing. Should we add a java doc to call out the behaviour?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right! |
||
| } | ||
|
|
||
| /** | ||
| * Returns tasks owned by the stream thread. | ||
| * This does not return any tasks currently owned by the state updater. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -443,4 +443,18 @@ public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLagsForTopology( | |
| .collect(Collectors.toList()))); | ||
| return allLocalStorePartitionLags(allTopologyTasks); | ||
| } | ||
|
|
||
| // VisibleForTesting | ||
| public boolean hasAnyLocalTaskForTopology(final String topologyName) { | ||
| synchronized (threads) { | ||
| return threads.stream().anyMatch(thread -> thread.hasAnyTaskForTopology(topologyName)); | ||
| } | ||
| } | ||
|
|
||
| // VisibleForTesting | ||
| public boolean allLocalTasksRunningForTopology(final String topologyName) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To align with ( |
||
| synchronized (threads) { | ||
| return threads.stream().allMatch(thread -> thread.allTasksRunningForTopology(topologyName)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto