diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/top-n-metrics.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/top-n-metrics.ts
new file mode 100644
index 0000000000000..44cedfd1c990e
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/top-n-metrics.ts
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+export interface TopNMetrics {
+ topCpuConsumers: CpuConsumer[];
+ topBackpressureOperators: BackpressureOperator[];
+ topGcIntensiveTasks: GcIntensiveTask[];
+}
+
+export interface CpuConsumer {
+ subtaskId: number;
+ taskName: string;
+ cpuPercentage: number;
+ taskManagerId: string | null;
+}
+
+export interface BackpressureOperator {
+ operatorId: string;
+ operatorName: string;
+ backpressureRatio: number;
+ subtaskId: number;
+}
+
+export interface GcIntensiveTask {
+ taskName: string;
+ gcTimePercentage: number;
+ taskManagerId: string;
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
index 4c1c3d6beb95e..32d3610b2f398 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
@@ -22,6 +22,12 @@
nzType="info"
nzMessage="Job is not running yet."
>
+ 0"
+ [topCpuConsumers]="topNMetrics.topCpuConsumers"
+ [topBackpressureOperators]="topNMetrics.topBackpressureOperators"
+ [topGcIntensiveTasks]="topNMetrics.topGcIntensiveTasks"
+>
{
+ return of({
+ topCpuConsumers: [],
+ topBackpressureOperators: [],
+ topGcIntensiveTasks: []
+ });
+ }),
+ takeUntil(this.destroy$)
+ )
+ .subscribe(metrics => {
+ this.topNMetrics = metrics;
+ this.cdr.markForCheck();
+ });
+ }
}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/top-n-metrics/top-n-metrics.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/top-n-metrics/top-n-metrics.component.ts
new file mode 100644
index 0000000000000..9f653094aaa8b
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/top-n-metrics/top-n-metrics.component.ts
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Component, Input } from '@angular/core';
+import { CommonModule } from '@angular/common';
+
+import { CpuConsumer, BackpressureOperator, GcIntensiveTask } from '@flink-runtime-web/interfaces/top-n-metrics';
+
+@Component({
+ selector: 'flink-top-n-metrics',
+ standalone: true,
+ template: `
+
+
Top N Metrics
+
+
+
Top {{ topCpuConsumers.length }} CPU Consumers
+
+ -
+ {{ cpu.taskName }} (Subtask {{ cpu.subtaskId }}): {{ cpu.cpuPercentage | number:'1.2' }}% CPU
+
+
+
+
+
+
Top {{ topBackpressureOperators.length }} Backpressure Operators
+
+ -
+ {{ bp.operatorName }} (Subtask {{ bp.subtaskId }}): {{ bp.backpressureRatio | percent }}
+
+
+
+
+
+
Top {{ topGcIntensiveTasks.length }} GC Intensive Tasks
+
+ -
+ {{ gc.taskName }}: {{ gc.gcTimePercentage | number:'1.2' }}% GC Time
+
+
+
+
+ `,
+ styles: [`
+ .top-n-metrics {
+ padding: 16px;
+ margin: 16px 0;
+ background-color: #f5f5f5;
+ border-radius: 4px;
+ }
+ .metric-section {
+ margin: 12px 0;
+ }
+ h3 {
+ margin: 0 0 12px 0;
+ }
+ h4 {
+ margin: 8px 0;
+ }
+ ul {
+ margin: 4px 0;
+ padding-left: 20px;
+ }
+ `],
+ imports: [CommonModule]
+})
+export class TopNMetricsComponent {
+ @Input() topCpuConsumers: CpuConsumer[] = [];
+ @Input() topBackpressureOperators: BackpressureOperator[] = [];
+ @Input() topGcIntensiveTasks: GcIntensiveTask[] = [];
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/services/top-n-metrics.service.ts b/flink-runtime-web/web-dashboard/src/app/services/top-n-metrics.service.ts
new file mode 100644
index 0000000000000..357ab2f244133
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/services/top-n-metrics.service.ts
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { Injectable } from '@angular/core';
+import { Observable } from 'rxjs';
+
+import { HttpClient } from '@angular/common/http';
+
+import { TopNMetrics } from '@flink-runtime-web/interfaces/top-n-metrics';
+
+@Injectable({
+ providedIn: 'root'
+})
+export class TopNMetricsService {
+ constructor(private readonly http: HttpClient) {}
+
+ public loadTopNMetrics(jobId: string): Observable {
+ return this.http.get(`/jobs/${jobId}/metrics/top-n`);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java
new file mode 100644
index 0000000000000..fa55c1deecd68
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.JobVertexHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody.BackpressureOperator;
+import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody.CpuConsumer;
+import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody.GcIntensiveTask;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.runtime.metrics.dump.QueryScopeUtils.createScopedMetricName;
+
+/**
+ * Handler that returns Top N metrics for a job, including top CPU consumers, top backpressure
+ * operators, and top GC intensive tasks.
+ */
+public class TopNMetricsHandler
+ extends JobVertexHandler {
+
+ private static final int DEFAULT_TOP_N = 5;
+
+ private final TopNMetricsHeaders headers;
+ private final MetricFetcher metricFetcher;
+
+ public TopNMetricsHandler(
+ @Nonnull GatewayRetriever extends RestfulGateway> leaderRetriever,
+ @Nonnull Time timeout,
+ @Nonnull Map responseHeaders,
+ @Nonnull TopNMetricsHeaders headers,
+ @Nonnull Executor executor,
+ @Nonnull MetricFetcher metricFetcher) {
+ super(leaderRetriever, timeout, responseHeaders, executor);
+ this.headers = headers;
+ this.metricFetcher = metricFetcher;
+ }
+
+ @Override
+ protected CompletableFuture handleRequest(
+ @Nonnull HandlerRequest request,
+ @Nonnull AccessExecutionGraph executionGraph)
+ throws RestHandlerException {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ // Fetch metrics data
+ metricFetcher.update();
+
+ // Collect top CPU consumers
+ List topCpuConsumers = collectTopCpuConsumers(executionGraph);
+
+ // Collect top backpressure operators
+ List topBackpressureOperators =
+ collectTopBackpressureOperators(executionGraph);
+
+ // Collect top GC intensive tasks
+ List topGcIntensiveTasks =
+ collectTopGcIntensiveTasks(executionGraph);
+
+ return new TopNMetricsResponseBody(
+ topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to fetch Top N metrics", e);
+ }
+ },
+ executor);
+ }
+
+ /**
+ * Collect top N CPU consumers from all subtasks.
+ */
+ private List collectTopCpuConsumers(AccessExecutionGraph executionGraph) {
+ List cpuConsumers = new ArrayList<>();
+
+ for (AccessExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ String vertexName = jobVertex.getName();
+ int subtaskId = vertex.getParallelSubtaskIndex();
+
+ // Try to get CPU usage metric
+ String cpuMetricName =
+ createScopedMetricName(
+ MetricNames.TASK_NAME, MetricNames.CPU_NAME, MetricNames.USAGE_NAME);
+
+ Double cpuValue =
+ metricFetcher
+ .getMetric(
+ vertex.getCurrentExecutionAttempt()
+ .getTaskManagerLocation()
+ .getResourceID(),
+ vertexName,
+ subtaskId,
+ cpuMetricName)
+ .orElse(0.0);
+
+ // Only include subtasks with meaningful CPU usage
+ if (cpuValue > 0.1) {
+ String taskManagerId =
+ vertex.getCurrentExecutionAttempt()
+ .getTaskManagerLocation()
+ .getResourceID()
+ .toString();
+ cpuConsumers.add(
+ new CpuConsumer(
+ subtaskId, vertexName, cpuValue * 100.0, taskManagerId));
+ }
+ }
+ }
+
+ // Sort by CPU usage descending and take top N
+ return cpuConsumers.stream()
+ .sorted(Comparator.comparing(CpuConsumer::getCpuPercentage).reversed())
+ .limit(DEFAULT_TOP_N)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Collect top N backpressure operators.
+ */
+ private List collectTopBackpressureOperators(
+ AccessExecutionGraph executionGraph) {
+ List backpressureOperators = new ArrayList<>();
+
+ for (AccessExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
+ String operatorId = jobVertex.getID().toString();
+ String operatorName = jobVertex.getName();
+
+ // Get backpressure ratio from job vertex
+ double backpressureRatio = getBackpressureRatio(jobVertex);
+
+ // Create entry for each subtask with backpressure
+ if (backpressureRatio > 0.05) {
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ int subtaskId = vertex.getParallelSubtaskIndex();
+ backpressureOperators.add(
+ new BackpressureOperator(operatorId, operatorName, backpressureRatio, subtaskId));
+ }
+ }
+ }
+
+ // Sort by backpressure ratio descending and take top N
+ return backpressureOperators.stream()
+ .sorted(Comparator.comparing(BackpressureOperator::getBackpressureRatio).reversed())
+ .limit(DEFAULT_TOP_N)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Get backpressure ratio for a job vertex.
+ */
+ private double getBackpressureRatio(AccessExecutionJobVertex jobVertex) {
+ try {
+ // Try to get backpressure ratio from metrics
+ String backpressureMetricName =
+ createScopedMetricName(MetricNames.TASK_NAME, MetricNames.BACKPRESSURE_NAME);
+
+ Double ratio = 0.0;
+ int count = 0;
+
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ String vertexName = jobVertex.getName();
+ int subtaskId = vertex.getParallelSubtaskIndex();
+
+ Double value =
+ metricFetcher
+ .getMetric(
+ vertex.getCurrentExecutionAttempt()
+ .getTaskManagerLocation()
+ .getResourceID(),
+ vertexName,
+ subtaskId,
+ backpressureMetricName)
+ .orElse(0.0);
+ ratio += value;
+ count++;
+ }
+
+ return count > 0 ? ratio / count : 0.0;
+ } catch (Exception e) {
+ return 0.0;
+ }
+ }
+
+ /**
+ * Collect top N GC intensive tasks.
+ */
+ private List collectTopGcIntensiveTasks(AccessExecutionGraph executionGraph) {
+ List gcTasks = new ArrayList<>();
+
+ for (AccessExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) {
+ String taskName = jobVertex.getName();
+
+ // Aggregate GC time across all subtasks
+ double totalGcTime = 0.0;
+ double totalCpuTime = 0.0;
+ String taskManagerId = null;
+
+ for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
+ String vertexName = jobVertex.getName();
+ int subtaskId = vertex.getParallelSubtaskIndex();
+
+ // Get GC time metric
+ String gcTimeMetricName =
+ createScopedMetricName(
+ MetricNames.TASK_NAME,
+ MetricNames.GC_NAME,
+ MetricNames.TIME_NAME);
+
+ Double gcTime =
+ metricFetcher
+ .getMetric(
+ vertex.getCurrentExecutionAttempt()
+ .getTaskManagerLocation()
+ .getResourceID(),
+ vertexName,
+ subtaskId,
+ gcTimeMetricName)
+ .orElse(0.0);
+
+ // Get CPU time metric (as proxy for total time)
+ String cpuTimeMetricName =
+ createScopedMetricName(MetricNames.TASK_NAME, MetricNames.CPU_NAME, MetricNames.TIME_NAME);
+
+ Double cpuTime =
+ metricFetcher
+ .getMetric(
+ vertex.getCurrentExecutionAttempt()
+ .getTaskManagerLocation()
+ .getResourceID(),
+ vertexName,
+ subtaskId,
+ cpuTimeMetricName)
+ .orElse(1.0);
+
+ totalGcTime += gcTime;
+ totalCpuTime += cpuTime;
+
+ if (taskManagerId == null) {
+ taskManagerId =
+ vertex.getCurrentExecutionAttempt()
+ .getTaskManagerLocation()
+ .getResourceID()
+ .toString();
+ }
+ }
+
+ // Calculate GC time percentage
+ double gcPercentage = totalCpuTime > 0 ? (totalGcTime / totalCpuTime) * 100 : 0.0;
+
+ // Only include tasks with meaningful GC time
+ if (gcPercentage > 1.0) {
+ gcTasks.add(
+ new GcIntensiveTask(
+ taskName,
+ gcPercentage,
+ taskManagerId != null ? taskManagerId : "unknown"));
+ }
+ }
+
+ // Sort by GC time percentage descending and take top N
+ return gcTasks.stream()
+ .sorted(Comparator.comparing(GcIntensiveTask::getGcTimePercentage).reversed())
+ .limit(DEFAULT_TOP_N)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public TopNMetricsHeaders getMessageHeaders() {
+ return headers;
+ }
+
+ /**
+ * Simple metric fetcher for demonstration purposes.
+ * In production, this would use the actual MetricFetcher.
+ */
+ private static class MetricFetcher {
+ public void update() {
+ // In production, this would update the metric cache
+ }
+
+ public java.util.Optional getMetric(
+ org.apache.flink.runtime.clusterframework.types.ResourceID resourceID,
+ String taskName,
+ int subtaskId,
+ String metricName) {
+ // In production, this would query the metric store
+ return java.util.Optional.empty();
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java
new file mode 100644
index 0000000000000..a29b2652df3e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+
+/** Headers for Top N metrics request. */
+public class TopNMetricsHeaders
+ implements MessageHeaders {
+
+ private static final TopNMetricsHeaders INSTANCE = new TopNMetricsHeaders();
+
+ public static final String URL = "/jobs/:" + JobIDPathParameter.KEY + "/metrics/top-n";
+
+ @Override
+ public Class getRequestClass() {
+ return TopNMetricsParameters.class;
+ }
+
+ @Override
+ public Class getResponseClass() {
+ return TopNMetricsResponseBody.class;
+ }
+
+ @Override
+ public HttpMethod getHttpMethod() {
+ return HttpMethod.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Returns Top N metrics for a job including top CPU consumers, backpressure operators, and GC intensive tasks.";
+ }
+
+ @Override
+ public Class getUnresolvedMessageClass() {
+ return JobMessageParameters.class;
+ }
+
+ public static TopNMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsParameters.java
new file mode 100644
index 0000000000000..3d32dd4ce54e4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsParameters.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+
+/** Parameters for Top N metrics request. */
+public class TopNMetricsParameters extends JobMessageParameters {
+
+ @Override
+ public String getDescription() {
+ return "Parameters for Top N metrics request.";
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java
new file mode 100644
index 0000000000000..612bfcbe2483e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/** Response body for Top N metrics. */
+public class TopNMetricsResponseBody implements ResponseBody {
+
+ private static final String FIELD_NAME_TOP_CPU_CONSUMERS = "topCpuConsumers";
+ private static final String FIELD_NAME_TOP_BACKPRESSURE_OPERATORS = "topBackpressureOperators";
+ private static final String FIELD_NAME_TOP_GC_INTENSIVE_TASKS = "topGcIntensiveTasks";
+
+ @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS)
+ private final List topCpuConsumers;
+
+ @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS)
+ private final List topBackpressureOperators;
+
+ @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS)
+ private final List topGcIntensiveTasks;
+
+ @JsonCreator
+ public TopNMetricsResponseBody(
+ @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) List topCpuConsumers,
+ @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS) List topBackpressureOperators,
+ @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) List topGcIntensiveTasks) {
+ this.topCpuConsumers = topCpuConsumers != null ? topCpuConsumers : Collections.emptyList();
+ this.topBackpressureOperators =
+ topBackpressureOperators != null ? topBackpressureOperators : Collections.emptyList();
+ this.topGcIntensiveTasks =
+ topGcIntensiveTasks != null ? topGcIntensiveTasks : Collections.emptyList();
+ }
+
+ @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS)
+ public List getTopCpuConsumers() {
+ return topCpuConsumers;
+ }
+
+ @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS)
+ public List getTopBackpressureOperators() {
+ return topBackpressureOperators;
+ }
+
+ @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS)
+ public List getTopGcIntensiveTasks() {
+ return topGcIntensiveTasks;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TopNMetricsResponseBody that = (TopNMetricsResponseBody) o;
+ return Objects.equals(topCpuConsumers, that.topCpuConsumers)
+ && Objects.equals(topBackpressureOperators, that.topBackpressureOperators)
+ && Objects.equals(topGcIntensiveTasks, that.topGcIntensiveTasks);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks);
+ }
+
+ /** CPU consumer metrics. */
+ public static class CpuConsumer {
+ private static final String FIELD_NAME_SUBTASK_ID = "subtaskId";
+ private static final String FIELD_NAME_TASK_NAME = "taskName";
+ private static final String FIELD_NAME_CPU_PERCENTAGE = "cpuPercentage";
+ private static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId";
+
+ @JsonProperty(FIELD_NAME_SUBTASK_ID)
+ private final int subtaskId;
+
+ @JsonProperty(FIELD_NAME_TASK_NAME)
+ private final String taskName;
+
+ @JsonProperty(FIELD_NAME_CPU_PERCENTAGE)
+ private final double cpuPercentage;
+
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
+ private final String taskManagerId;
+
+ @JsonCreator
+ public CpuConsumer(
+ @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId,
+ @JsonProperty(FIELD_NAME_TASK_NAME) String taskName,
+ @JsonProperty(FIELD_NAME_CPU_PERCENTAGE) double cpuPercentage,
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String taskManagerId) {
+ this.subtaskId = subtaskId;
+ this.taskName = taskName;
+ this.cpuPercentage = cpuPercentage;
+ this.taskManagerId = taskManagerId;
+ }
+
+ public int getSubtaskId() {
+ return subtaskId;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public double getCpuPercentage() {
+ return cpuPercentage;
+ }
+
+ public String getTaskManagerId() {
+ return taskManagerId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CpuConsumer that = (CpuConsumer) o;
+ return subtaskId == that.subtaskId
+ && Double.compare(that.cpuPercentage, cpuPercentage) == 0
+ && Objects.equals(taskName, that.taskName)
+ && Objects.equals(taskManagerId, that.taskManagerId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(subtaskId, taskName, cpuPercentage, taskManagerId);
+ }
+ }
+
+ /** Backpressure operator metrics. */
+ public static class BackpressureOperator {
+ private static final String FIELD_NAME_OPERATOR_ID = "operatorId";
+ private static final String FIELD_NAME_OPERATOR_NAME = "operatorName";
+ private static final String FIELD_NAME_BACKPRESSURE_RATIO = "backpressureRatio";
+ private static final String FIELD_NAME_SUBTASK_ID = "subtaskId";
+
+ @JsonProperty(FIELD_NAME_OPERATOR_ID)
+ private final String operatorId;
+
+ @JsonProperty(FIELD_NAME_OPERATOR_NAME)
+ private final String operatorName;
+
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO)
+ private final double backpressureRatio;
+
+ @JsonProperty(FIELD_NAME_SUBTASK_ID)
+ private final int subtaskId;
+
+ @JsonCreator
+ public BackpressureOperator(
+ @JsonProperty(FIELD_NAME_OPERATOR_ID) String operatorId,
+ @JsonProperty(FIELD_NAME_OPERATOR_NAME) String operatorName,
+ @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO) double backpressureRatio,
+ @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId) {
+ this.operatorId = operatorId;
+ this.operatorName = operatorName;
+ this.backpressureRatio = backpressureRatio;
+ this.subtaskId = subtaskId;
+ }
+
+ public String getOperatorId() {
+ return operatorId;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public double getBackpressureRatio() {
+ return backpressureRatio;
+ }
+
+ public int getSubtaskId() {
+ return subtaskId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BackpressureOperator that = (BackpressureOperator) o;
+ return Double.compare(that.backpressureRatio, backpressureRatio) == 0
+ && subtaskId == that.subtaskId
+ && Objects.equals(operatorId, that.operatorId)
+ && Objects.equals(operatorName, that.operatorName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(operatorId, operatorName, backpressureRatio, subtaskId);
+ }
+ }
+
+ /** GC intensive task metrics. */
+ public static class GcIntensiveTask {
+ private static final String FIELD_NAME_TASK_NAME = "taskName";
+ private static final String FIELD_NAME_GC_TIME_PERCENTAGE = "gcTimePercentage";
+ private static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId";
+
+ @JsonProperty(FIELD_NAME_TASK_NAME)
+ private final String taskName;
+
+ @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE)
+ private final double gcTimePercentage;
+
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID)
+ private final String taskManagerId;
+
+ @JsonCreator
+ public GcIntensiveTask(
+ @JsonProperty(FIELD_NAME_TASK_NAME) String taskName,
+ @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE) double gcTimePercentage,
+ @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String taskManagerId) {
+ this.taskName = taskName;
+ this.gcTimePercentage = gcTimePercentage;
+ this.taskManagerId = taskManagerId;
+ }
+
+ public String getTaskName() {
+ return taskName;
+ }
+
+ public double getGcTimePercentage() {
+ return gcTimePercentage;
+ }
+
+ public String getTaskManagerId() {
+ return taskManagerId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GcIntensiveTask that = (GcIntensiveTask) o;
+ return Double.compare(that.gcTimePercentage, gcTimePercentage) == 0
+ && Objects.equals(taskName, that.taskName)
+ && Objects.equals(taskManagerId, that.taskManagerId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(taskName, gcTimePercentage, taskManagerId);
+ }
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 6e3926d6a2c93..d9fb34c3c0499 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -94,6 +94,7 @@
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.TopNMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -540,6 +541,10 @@ protected List> initiali
final JobMetricsHandler jobMetricsHandler =
new JobMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
+ final TopNMetricsHandler topNMetricsHandler =
+ new TopNMetricsHandler(
+ leaderRetriever, timeout, responseHeaders, TopNMetricsHeaders.getInstance(), executor, metricFetcher);
+
final SubtaskMetricsHandler subtaskMetricsHandler =
new SubtaskMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher);
@@ -831,6 +836,7 @@ protected List> initiali
jobVertexWatermarksHandler.getMessageHeaders(),
jobVertexWatermarksHandler));
handlers.add(Tuple2.of(jobMetricsHandler.getMessageHeaders(), jobMetricsHandler));
+ handlers.add(Tuple2.of(topNMetricsHandler.getMessageHeaders(), topNMetricsHandler));
handlers.add(Tuple2.of(subtaskMetricsHandler.getMessageHeaders(), subtaskMetricsHandler));
handlers.add(
Tuple2.of(