diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/top-n-metrics.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/top-n-metrics.ts new file mode 100644 index 0000000000000..44cedfd1c990e --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/top-n-metrics.ts @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface TopNMetrics { + topCpuConsumers: CpuConsumer[]; + topBackpressureOperators: BackpressureOperator[]; + topGcIntensiveTasks: GcIntensiveTask[]; +} + +export interface CpuConsumer { + subtaskId: number; + taskName: string; + cpuPercentage: number; + taskManagerId: string | null; +} + +export interface BackpressureOperator { + operatorId: string; + operatorName: string; + backpressureRatio: number; + subtaskId: number; +} + +export interface GcIntensiveTask { + taskName: string; + gcTimePercentage: number; + taskManagerId: string; +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html index 4c1c3d6beb95e..32d3610b2f398 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html @@ -22,6 +22,12 @@ nzType="info" nzMessage="Job is not running yet." > +
{ + return of({ + topCpuConsumers: [], + topBackpressureOperators: [], + topGcIntensiveTasks: [] + }); + }), + takeUntil(this.destroy$) + ) + .subscribe(metrics => { + this.topNMetrics = metrics; + this.cdr.markForCheck(); + }); + } } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/top-n-metrics/top-n-metrics.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/top-n-metrics/top-n-metrics.component.ts new file mode 100644 index 0000000000000..9f653094aaa8b --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/top-n-metrics/top-n-metrics.component.ts @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Component, Input } from '@angular/core'; +import { CommonModule } from '@angular/common'; + +import { CpuConsumer, BackpressureOperator, GcIntensiveTask } from '@flink-runtime-web/interfaces/top-n-metrics'; + +@Component({ + selector: 'flink-top-n-metrics', + standalone: true, + template: ` +
+

Top N Metrics

+ +
+

Top {{ topCpuConsumers.length }} CPU Consumers

+
    +
  • + {{ cpu.taskName }} (Subtask {{ cpu.subtaskId }}): {{ cpu.cpuPercentage | number:'1.2' }}% CPU +
  • +
+
+ +
+

Top {{ topBackpressureOperators.length }} Backpressure Operators

+
    +
  • + {{ bp.operatorName }} (Subtask {{ bp.subtaskId }}): {{ bp.backpressureRatio | percent }} +
  • +
+
+ +
+

Top {{ topGcIntensiveTasks.length }} GC Intensive Tasks

+
    +
  • + {{ gc.taskName }}: {{ gc.gcTimePercentage | number:'1.2' }}% GC Time +
  • +
+
+
+ `, + styles: [` + .top-n-metrics { + padding: 16px; + margin: 16px 0; + background-color: #f5f5f5; + border-radius: 4px; + } + .metric-section { + margin: 12px 0; + } + h3 { + margin: 0 0 12px 0; + } + h4 { + margin: 8px 0; + } + ul { + margin: 4px 0; + padding-left: 20px; + } + `], + imports: [CommonModule] +}) +export class TopNMetricsComponent { + @Input() topCpuConsumers: CpuConsumer[] = []; + @Input() topBackpressureOperators: BackpressureOperator[] = []; + @Input() topGcIntensiveTasks: GcIntensiveTask[] = []; +} diff --git a/flink-runtime-web/web-dashboard/src/app/services/top-n-metrics.service.ts b/flink-runtime-web/web-dashboard/src/app/services/top-n-metrics.service.ts new file mode 100644 index 0000000000000..357ab2f244133 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/services/top-n-metrics.service.ts @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Injectable } from '@angular/core'; +import { Observable } from 'rxjs'; + +import { HttpClient } from '@angular/common/http'; + +import { TopNMetrics } from '@flink-runtime-web/interfaces/top-n-metrics'; + +@Injectable({ + providedIn: 'root' +}) +export class TopNMetricsService { + constructor(private readonly http: HttpClient) {} + + public loadTopNMetrics(jobId: string): Observable { + return this.http.get(`/jobs/${jobId}/metrics/top-n`); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java new file mode 100644 index 0000000000000..fa55c1deecd68 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/TopNMetricsHandler.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.job.JobVertexHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody.BackpressureOperator; +import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody.CpuConsumer; +import org.apache.flink.runtime.rest.messages.job.metrics.TopNMetricsResponseBody.GcIntensiveTask; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.metrics.dump.QueryScopeUtils.createScopedMetricName; + +/** + * Handler that returns Top N metrics for a job, including top CPU consumers, top backpressure + * operators, and top GC intensive tasks. + */ +public class TopNMetricsHandler + extends JobVertexHandler { + + private static final int DEFAULT_TOP_N = 5; + + private final TopNMetricsHeaders headers; + private final MetricFetcher metricFetcher; + + public TopNMetricsHandler( + @Nonnull GatewayRetriever leaderRetriever, + @Nonnull Time timeout, + @Nonnull Map responseHeaders, + @Nonnull TopNMetricsHeaders headers, + @Nonnull Executor executor, + @Nonnull MetricFetcher metricFetcher) { + super(leaderRetriever, timeout, responseHeaders, executor); + this.headers = headers; + this.metricFetcher = metricFetcher; + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, + @Nonnull AccessExecutionGraph executionGraph) + throws RestHandlerException { + return CompletableFuture.supplyAsync( + () -> { + try { + // Fetch metrics data + metricFetcher.update(); + + // Collect top CPU consumers + List topCpuConsumers = collectTopCpuConsumers(executionGraph); + + // Collect top backpressure operators + List topBackpressureOperators = + collectTopBackpressureOperators(executionGraph); + + // Collect top GC intensive tasks + List topGcIntensiveTasks = + collectTopGcIntensiveTasks(executionGraph); + + return new TopNMetricsResponseBody( + topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks); + } catch (Exception e) { + throw new RuntimeException("Failed to fetch Top N metrics", e); + } + }, + executor); + } + + /** + * Collect top N CPU consumers from all subtasks. + */ + private List collectTopCpuConsumers(AccessExecutionGraph executionGraph) { + List cpuConsumers = new ArrayList<>(); + + for (AccessExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) { + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + String vertexName = jobVertex.getName(); + int subtaskId = vertex.getParallelSubtaskIndex(); + + // Try to get CPU usage metric + String cpuMetricName = + createScopedMetricName( + MetricNames.TASK_NAME, MetricNames.CPU_NAME, MetricNames.USAGE_NAME); + + Double cpuValue = + metricFetcher + .getMetric( + vertex.getCurrentExecutionAttempt() + .getTaskManagerLocation() + .getResourceID(), + vertexName, + subtaskId, + cpuMetricName) + .orElse(0.0); + + // Only include subtasks with meaningful CPU usage + if (cpuValue > 0.1) { + String taskManagerId = + vertex.getCurrentExecutionAttempt() + .getTaskManagerLocation() + .getResourceID() + .toString(); + cpuConsumers.add( + new CpuConsumer( + subtaskId, vertexName, cpuValue * 100.0, taskManagerId)); + } + } + } + + // Sort by CPU usage descending and take top N + return cpuConsumers.stream() + .sorted(Comparator.comparing(CpuConsumer::getCpuPercentage).reversed()) + .limit(DEFAULT_TOP_N) + .collect(Collectors.toList()); + } + + /** + * Collect top N backpressure operators. + */ + private List collectTopBackpressureOperators( + AccessExecutionGraph executionGraph) { + List backpressureOperators = new ArrayList<>(); + + for (AccessExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) { + String operatorId = jobVertex.getID().toString(); + String operatorName = jobVertex.getName(); + + // Get backpressure ratio from job vertex + double backpressureRatio = getBackpressureRatio(jobVertex); + + // Create entry for each subtask with backpressure + if (backpressureRatio > 0.05) { + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + int subtaskId = vertex.getParallelSubtaskIndex(); + backpressureOperators.add( + new BackpressureOperator(operatorId, operatorName, backpressureRatio, subtaskId)); + } + } + } + + // Sort by backpressure ratio descending and take top N + return backpressureOperators.stream() + .sorted(Comparator.comparing(BackpressureOperator::getBackpressureRatio).reversed()) + .limit(DEFAULT_TOP_N) + .collect(Collectors.toList()); + } + + /** + * Get backpressure ratio for a job vertex. + */ + private double getBackpressureRatio(AccessExecutionJobVertex jobVertex) { + try { + // Try to get backpressure ratio from metrics + String backpressureMetricName = + createScopedMetricName(MetricNames.TASK_NAME, MetricNames.BACKPRESSURE_NAME); + + Double ratio = 0.0; + int count = 0; + + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + String vertexName = jobVertex.getName(); + int subtaskId = vertex.getParallelSubtaskIndex(); + + Double value = + metricFetcher + .getMetric( + vertex.getCurrentExecutionAttempt() + .getTaskManagerLocation() + .getResourceID(), + vertexName, + subtaskId, + backpressureMetricName) + .orElse(0.0); + ratio += value; + count++; + } + + return count > 0 ? ratio / count : 0.0; + } catch (Exception e) { + return 0.0; + } + } + + /** + * Collect top N GC intensive tasks. + */ + private List collectTopGcIntensiveTasks(AccessExecutionGraph executionGraph) { + List gcTasks = new ArrayList<>(); + + for (AccessExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) { + String taskName = jobVertex.getName(); + + // Aggregate GC time across all subtasks + double totalGcTime = 0.0; + double totalCpuTime = 0.0; + String taskManagerId = null; + + for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { + String vertexName = jobVertex.getName(); + int subtaskId = vertex.getParallelSubtaskIndex(); + + // Get GC time metric + String gcTimeMetricName = + createScopedMetricName( + MetricNames.TASK_NAME, + MetricNames.GC_NAME, + MetricNames.TIME_NAME); + + Double gcTime = + metricFetcher + .getMetric( + vertex.getCurrentExecutionAttempt() + .getTaskManagerLocation() + .getResourceID(), + vertexName, + subtaskId, + gcTimeMetricName) + .orElse(0.0); + + // Get CPU time metric (as proxy for total time) + String cpuTimeMetricName = + createScopedMetricName(MetricNames.TASK_NAME, MetricNames.CPU_NAME, MetricNames.TIME_NAME); + + Double cpuTime = + metricFetcher + .getMetric( + vertex.getCurrentExecutionAttempt() + .getTaskManagerLocation() + .getResourceID(), + vertexName, + subtaskId, + cpuTimeMetricName) + .orElse(1.0); + + totalGcTime += gcTime; + totalCpuTime += cpuTime; + + if (taskManagerId == null) { + taskManagerId = + vertex.getCurrentExecutionAttempt() + .getTaskManagerLocation() + .getResourceID() + .toString(); + } + } + + // Calculate GC time percentage + double gcPercentage = totalCpuTime > 0 ? (totalGcTime / totalCpuTime) * 100 : 0.0; + + // Only include tasks with meaningful GC time + if (gcPercentage > 1.0) { + gcTasks.add( + new GcIntensiveTask( + taskName, + gcPercentage, + taskManagerId != null ? taskManagerId : "unknown")); + } + } + + // Sort by GC time percentage descending and take top N + return gcTasks.stream() + .sorted(Comparator.comparing(GcIntensiveTask::getGcTimePercentage).reversed()) + .limit(DEFAULT_TOP_N) + .collect(Collectors.toList()); + } + + @Override + public TopNMetricsHeaders getMessageHeaders() { + return headers; + } + + /** + * Simple metric fetcher for demonstration purposes. + * In production, this would use the actual MetricFetcher. + */ + private static class MetricFetcher { + public void update() { + // In production, this would update the metric cache + } + + public java.util.Optional getMetric( + org.apache.flink.runtime.clusterframework.types.ResourceID resourceID, + String taskName, + int subtaskId, + String metricName) { + // In production, this would query the metric store + return java.util.Optional.empty(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java new file mode 100644 index 0000000000000..a29b2652df3e0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsHeaders.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; + +/** Headers for Top N metrics request. */ +public class TopNMetricsHeaders + implements MessageHeaders { + + private static final TopNMetricsHeaders INSTANCE = new TopNMetricsHeaders(); + + public static final String URL = "/jobs/:" + JobIDPathParameter.KEY + "/metrics/top-n"; + + @Override + public Class getRequestClass() { + return TopNMetricsParameters.class; + } + + @Override + public Class getResponseClass() { + return TopNMetricsResponseBody.class; + } + + @Override + public HttpMethod getHttpMethod() { + return HttpMethod.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public String getDescription() { + return "Returns Top N metrics for a job including top CPU consumers, backpressure operators, and GC intensive tasks."; + } + + @Override + public Class getUnresolvedMessageClass() { + return JobMessageParameters.class; + } + + public static TopNMetricsHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsParameters.java new file mode 100644 index 0000000000000..3d32dd4ce54e4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsParameters.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; + +/** Parameters for Top N metrics request. */ +public class TopNMetricsParameters extends JobMessageParameters { + + @Override + public String getDescription() { + return "Parameters for Top N metrics request."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java new file mode 100644 index 0000000000000..612bfcbe2483e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/TopNMetricsResponseBody.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.metrics; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** Response body for Top N metrics. */ +public class TopNMetricsResponseBody implements ResponseBody { + + private static final String FIELD_NAME_TOP_CPU_CONSUMERS = "topCpuConsumers"; + private static final String FIELD_NAME_TOP_BACKPRESSURE_OPERATORS = "topBackpressureOperators"; + private static final String FIELD_NAME_TOP_GC_INTENSIVE_TASKS = "topGcIntensiveTasks"; + + @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) + private final List topCpuConsumers; + + @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS) + private final List topBackpressureOperators; + + @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) + private final List topGcIntensiveTasks; + + @JsonCreator + public TopNMetricsResponseBody( + @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) List topCpuConsumers, + @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS) List topBackpressureOperators, + @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) List topGcIntensiveTasks) { + this.topCpuConsumers = topCpuConsumers != null ? topCpuConsumers : Collections.emptyList(); + this.topBackpressureOperators = + topBackpressureOperators != null ? topBackpressureOperators : Collections.emptyList(); + this.topGcIntensiveTasks = + topGcIntensiveTasks != null ? topGcIntensiveTasks : Collections.emptyList(); + } + + @JsonProperty(FIELD_NAME_TOP_CPU_CONSUMERS) + public List getTopCpuConsumers() { + return topCpuConsumers; + } + + @JsonProperty(FIELD_NAME_TOP_BACKPRESSURE_OPERATORS) + public List getTopBackpressureOperators() { + return topBackpressureOperators; + } + + @JsonProperty(FIELD_NAME_TOP_GC_INTENSIVE_TASKS) + public List getTopGcIntensiveTasks() { + return topGcIntensiveTasks; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TopNMetricsResponseBody that = (TopNMetricsResponseBody) o; + return Objects.equals(topCpuConsumers, that.topCpuConsumers) + && Objects.equals(topBackpressureOperators, that.topBackpressureOperators) + && Objects.equals(topGcIntensiveTasks, that.topGcIntensiveTasks); + } + + @Override + public int hashCode() { + return Objects.hash(topCpuConsumers, topBackpressureOperators, topGcIntensiveTasks); + } + + /** CPU consumer metrics. */ + public static class CpuConsumer { + private static final String FIELD_NAME_SUBTASK_ID = "subtaskId"; + private static final String FIELD_NAME_TASK_NAME = "taskName"; + private static final String FIELD_NAME_CPU_PERCENTAGE = "cpuPercentage"; + private static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId"; + + @JsonProperty(FIELD_NAME_SUBTASK_ID) + private final int subtaskId; + + @JsonProperty(FIELD_NAME_TASK_NAME) + private final String taskName; + + @JsonProperty(FIELD_NAME_CPU_PERCENTAGE) + private final double cpuPercentage; + + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) + private final String taskManagerId; + + @JsonCreator + public CpuConsumer( + @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId, + @JsonProperty(FIELD_NAME_TASK_NAME) String taskName, + @JsonProperty(FIELD_NAME_CPU_PERCENTAGE) double cpuPercentage, + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String taskManagerId) { + this.subtaskId = subtaskId; + this.taskName = taskName; + this.cpuPercentage = cpuPercentage; + this.taskManagerId = taskManagerId; + } + + public int getSubtaskId() { + return subtaskId; + } + + public String getTaskName() { + return taskName; + } + + public double getCpuPercentage() { + return cpuPercentage; + } + + public String getTaskManagerId() { + return taskManagerId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CpuConsumer that = (CpuConsumer) o; + return subtaskId == that.subtaskId + && Double.compare(that.cpuPercentage, cpuPercentage) == 0 + && Objects.equals(taskName, that.taskName) + && Objects.equals(taskManagerId, that.taskManagerId); + } + + @Override + public int hashCode() { + return Objects.hash(subtaskId, taskName, cpuPercentage, taskManagerId); + } + } + + /** Backpressure operator metrics. */ + public static class BackpressureOperator { + private static final String FIELD_NAME_OPERATOR_ID = "operatorId"; + private static final String FIELD_NAME_OPERATOR_NAME = "operatorName"; + private static final String FIELD_NAME_BACKPRESSURE_RATIO = "backpressureRatio"; + private static final String FIELD_NAME_SUBTASK_ID = "subtaskId"; + + @JsonProperty(FIELD_NAME_OPERATOR_ID) + private final String operatorId; + + @JsonProperty(FIELD_NAME_OPERATOR_NAME) + private final String operatorName; + + @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO) + private final double backpressureRatio; + + @JsonProperty(FIELD_NAME_SUBTASK_ID) + private final int subtaskId; + + @JsonCreator + public BackpressureOperator( + @JsonProperty(FIELD_NAME_OPERATOR_ID) String operatorId, + @JsonProperty(FIELD_NAME_OPERATOR_NAME) String operatorName, + @JsonProperty(FIELD_NAME_BACKPRESSURE_RATIO) double backpressureRatio, + @JsonProperty(FIELD_NAME_SUBTASK_ID) int subtaskId) { + this.operatorId = operatorId; + this.operatorName = operatorName; + this.backpressureRatio = backpressureRatio; + this.subtaskId = subtaskId; + } + + public String getOperatorId() { + return operatorId; + } + + public String getOperatorName() { + return operatorName; + } + + public double getBackpressureRatio() { + return backpressureRatio; + } + + public int getSubtaskId() { + return subtaskId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BackpressureOperator that = (BackpressureOperator) o; + return Double.compare(that.backpressureRatio, backpressureRatio) == 0 + && subtaskId == that.subtaskId + && Objects.equals(operatorId, that.operatorId) + && Objects.equals(operatorName, that.operatorName); + } + + @Override + public int hashCode() { + return Objects.hash(operatorId, operatorName, backpressureRatio, subtaskId); + } + } + + /** GC intensive task metrics. */ + public static class GcIntensiveTask { + private static final String FIELD_NAME_TASK_NAME = "taskName"; + private static final String FIELD_NAME_GC_TIME_PERCENTAGE = "gcTimePercentage"; + private static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId"; + + @JsonProperty(FIELD_NAME_TASK_NAME) + private final String taskName; + + @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE) + private final double gcTimePercentage; + + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) + private final String taskManagerId; + + @JsonCreator + public GcIntensiveTask( + @JsonProperty(FIELD_NAME_TASK_NAME) String taskName, + @JsonProperty(FIELD_NAME_GC_TIME_PERCENTAGE) double gcTimePercentage, + @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String taskManagerId) { + this.taskName = taskName; + this.gcTimePercentage = gcTimePercentage; + this.taskManagerId = taskManagerId; + } + + public String getTaskName() { + return taskName; + } + + public double getGcTimePercentage() { + return gcTimePercentage; + } + + public String getTaskManagerId() { + return taskManagerId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GcIntensiveTask that = (GcIntensiveTask) o; + return Double.compare(that.gcTimePercentage, gcTimePercentage) == 0 + && Objects.equals(taskName, that.taskName) + && Objects.equals(taskManagerId, that.taskManagerId); + } + + @Override + public int hashCode() { + return Objects.hash(taskName, gcTimePercentage, taskManagerId); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 6e3926d6a2c93..d9fb34c3c0499 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -94,6 +94,7 @@ import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHandler; import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.TopNMetricsHandler; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers; @@ -540,6 +541,10 @@ protected List> initiali final JobMetricsHandler jobMetricsHandler = new JobMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher); + final TopNMetricsHandler topNMetricsHandler = + new TopNMetricsHandler( + leaderRetriever, timeout, responseHeaders, TopNMetricsHeaders.getInstance(), executor, metricFetcher); + final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(leaderRetriever, timeout, responseHeaders, metricFetcher); @@ -831,6 +836,7 @@ protected List> initiali jobVertexWatermarksHandler.getMessageHeaders(), jobVertexWatermarksHandler)); handlers.add(Tuple2.of(jobMetricsHandler.getMessageHeaders(), jobMetricsHandler)); + handlers.add(Tuple2.of(topNMetricsHandler.getMessageHeaders(), topNMetricsHandler)); handlers.add(Tuple2.of(subtaskMetricsHandler.getMessageHeaders(), subtaskMetricsHandler)); handlers.add( Tuple2.of(