From c0bf0a070ac6611db3c3f84567d654c92654a338 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Tue, 19 May 2026 16:07:45 -0400 Subject: [PATCH 1/4] [FLINK-39711][runtime] Expose maxExceptions on exception message parameters The REST endpoint GET /jobs/:jobid/exceptions accepts an optional maxExceptions query parameter, but Java clients of Flink's typed REST API could not set it: the corresponding MessageQueryParameter fields on JobExceptionsMessageParameters and ApplicationExceptionsMessageParameters were declared private. This forced downstream Java clients (e.g. flink-kubernetes-operator) to fetch the full job exception history on every observation tick, wasting network bytes, JobManager-side serialization time, and operator JVM memory for long-running jobs with large exception histories. --- .../application/ApplicationExceptionsMessageParameters.java | 2 +- .../rest/messages/job/JobExceptionsMessageParameters.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java index cee07b8b20966..62db2c9351c30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java @@ -30,7 +30,7 @@ /** {@link MessageParameters} for {@link ApplicationExceptionsHandler}. */ public class ApplicationExceptionsMessageParameters extends ApplicationMessageParameters { - private final UpperLimitExceptionParameter upperLimitExceptionParameter = + public final UpperLimitExceptionParameter upperLimitExceptionParameter = new UpperLimitExceptionParameter(); @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java index 46e1d02716db4..679badb27860c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java @@ -30,10 +30,10 @@ /** {@link MessageParameters} for {@link JobExceptionsHandler}. */ public class JobExceptionsMessageParameters extends JobMessageParameters { - private final UpperLimitExceptionParameter upperLimitExceptionParameter = + public final UpperLimitExceptionParameter upperLimitExceptionParameter = new UpperLimitExceptionParameter(); - private final FailureLabelFilterParameter failureLabelExceptionParameter = + public final FailureLabelFilterParameter failureLabelExceptionParameter = new FailureLabelFilterParameter(); @Override From cada42cdefd20746d99aa4ee6058e45da27ff52e Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Tue, 19 May 2026 16:28:33 -0400 Subject: [PATCH 2/4] [FLINK-39711][runtime] Add tests for exception message parameters URL rendering Covers the newly accessible upperLimitExceptionParameter and failureLabelExceptionParameter fields on JobExceptionsMessageParameters, and upperLimitExceptionParameter on ApplicationExceptionsMessageParameters, by asserting end-to-end URL rendering via MessageParameters.resolveUrl. Also verifies that unresolved optional parameters are omitted from the rendered URL. --- ...cationExceptionsMessageParametersTest.java | 58 ++++++++++++++++ .../JobExceptionsMessageParametersTest.java | 69 +++++++++++++++++++ 2 files changed, 127 insertions(+) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java new file mode 100644 index 0000000000000..4a2e13d9fb910 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.application; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.rest.messages.MessageParameters; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ApplicationExceptionsMessageParameters}. */ +class ApplicationExceptionsMessageParametersTest { + + @Test + void testMaxExceptionsQueryParameterIsRendered() throws Exception { + ApplicationID applicationId = ApplicationID.generate(); + ApplicationExceptionsMessageParameters parameters = + new ApplicationExceptionsMessageParameters(); + parameters.applicationPathParameter.resolve(applicationId); + parameters.upperLimitExceptionParameter.resolveFromString("20"); + + String resolvedUrl = + MessageParameters.resolveUrl(ApplicationExceptionsHeaders.URL, parameters); + + assertThat(resolvedUrl) + .isEqualTo("/applications/" + applicationId + "/exceptions?maxExceptions=20"); + } + + @Test + void testMaxExceptionsQueryParameterIsOmittedWhenUnresolved() { + ApplicationID applicationId = ApplicationID.generate(); + ApplicationExceptionsMessageParameters parameters = + new ApplicationExceptionsMessageParameters(); + parameters.applicationPathParameter.resolve(applicationId); + + String resolvedUrl = + MessageParameters.resolveUrl(ApplicationExceptionsHeaders.URL, parameters); + + assertThat(resolvedUrl).isEqualTo("/applications/" + applicationId + "/exceptions"); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java new file mode 100644 index 0000000000000..a03514d876932 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link JobExceptionsMessageParameters}. */ +class JobExceptionsMessageParametersTest { + + @Test + void testMaxExceptionsQueryParameterIsRendered() throws Exception { + JobID jobId = new JobID(); + JobExceptionsMessageParameters parameters = new JobExceptionsMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + parameters.upperLimitExceptionParameter.resolveFromString("20"); + + String resolvedUrl = + MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); + + assertThat(resolvedUrl).isEqualTo("/jobs/" + jobId + "/exceptions?maxExceptions=20"); + } + + @Test + void testFailureLabelFilterQueryParameterIsRendered() throws Exception { + JobID jobId = new JobID(); + JobExceptionsMessageParameters parameters = new JobExceptionsMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + parameters.failureLabelExceptionParameter.resolveFromString("type:system"); + + String resolvedUrl = + MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); + + assertThat(resolvedUrl).contains("failureLabelFilter="); + } + + @Test + void testOptionalQueryParametersAreOmittedWhenUnresolved() { + JobID jobId = new JobID(); + JobExceptionsMessageParameters parameters = new JobExceptionsMessageParameters(); + parameters.jobPathParameter.resolve(jobId); + + String resolvedUrl = + MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); + + assertThat(resolvedUrl).isEqualTo("/jobs/" + jobId + "/exceptions"); + } +} From f893d666d3e450c9d06d34e7953d3e185c9aabba Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Thu, 21 May 2026 15:58:16 -0400 Subject: [PATCH 3/4] Fix lint formatting in JobExceptionsMessageParametersTest --- .../messages/job/JobExceptionsMessageParametersTest.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java index a03514d876932..b30006282731f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java @@ -36,8 +36,7 @@ void testMaxExceptionsQueryParameterIsRendered() throws Exception { parameters.jobPathParameter.resolve(jobId); parameters.upperLimitExceptionParameter.resolveFromString("20"); - String resolvedUrl = - MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); + String resolvedUrl = MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); assertThat(resolvedUrl).isEqualTo("/jobs/" + jobId + "/exceptions?maxExceptions=20"); } @@ -49,8 +48,7 @@ void testFailureLabelFilterQueryParameterIsRendered() throws Exception { parameters.jobPathParameter.resolve(jobId); parameters.failureLabelExceptionParameter.resolveFromString("type:system"); - String resolvedUrl = - MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); + String resolvedUrl = MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); assertThat(resolvedUrl).contains("failureLabelFilter="); } @@ -61,8 +59,7 @@ void testOptionalQueryParametersAreOmittedWhenUnresolved() { JobExceptionsMessageParameters parameters = new JobExceptionsMessageParameters(); parameters.jobPathParameter.resolve(jobId); - String resolvedUrl = - MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); + String resolvedUrl = MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); assertThat(resolvedUrl).isEqualTo("/jobs/" + jobId + "/exceptions"); } From 041eca08f15c84087eabdcd1339f3cfda13bd394 Mon Sep 17 00:00:00 2001 From: Jubin Soni Date: Sat, 23 May 2026 11:14:19 -0400 Subject: [PATCH 4/4] [FLINK-39711][runtime] Scope back to job-side maxExceptions per review Addresses review feedback on the FLINK-39711 PR: - Revert visibility change on ApplicationExceptionsMessageParameters and remove its test. The application-side parameter is silently discarded by ApplicationExceptionsHandler today; the full server-side fix is being handled in #28226 (FLINK-39730), so exposing the field here would land an incomplete state where clients can set a parameter the server ignores. - Revert visibility change on JobExceptionsMessageParameters.failureLabelExceptionParameter and remove testFailureLabelFilterQueryParameterIsRendered. failureLabelFilter exposure wasn't part of the JIRA scope; it was added for parity but surfaces a pre-existing FailureLabel.toString rendering quirk that deserves its own ticket. - Change test value in testMaxExceptionsQueryParameterIsRendered from 20 to 5 so the value is distinguishable from MAX_NUMBER_EXCEPTION_TO_REPORT (the server-side default), making test intent unambiguous. --- ...pplicationExceptionsMessageParameters.java | 2 +- .../job/JobExceptionsMessageParameters.java | 2 +- ...cationExceptionsMessageParametersTest.java | 58 ------------------- .../JobExceptionsMessageParametersTest.java | 16 +---- 4 files changed, 4 insertions(+), 74 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java index 62db2c9351c30..cee07b8b20966 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParameters.java @@ -30,7 +30,7 @@ /** {@link MessageParameters} for {@link ApplicationExceptionsHandler}. */ public class ApplicationExceptionsMessageParameters extends ApplicationMessageParameters { - public final UpperLimitExceptionParameter upperLimitExceptionParameter = + private final UpperLimitExceptionParameter upperLimitExceptionParameter = new UpperLimitExceptionParameter(); @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java index 679badb27860c..0bc350dfdbf41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParameters.java @@ -33,7 +33,7 @@ public class JobExceptionsMessageParameters extends JobMessageParameters { public final UpperLimitExceptionParameter upperLimitExceptionParameter = new UpperLimitExceptionParameter(); - public final FailureLabelFilterParameter failureLabelExceptionParameter = + private final FailureLabelFilterParameter failureLabelExceptionParameter = new FailureLabelFilterParameter(); @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java deleted file mode 100644 index 4a2e13d9fb910..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/application/ApplicationExceptionsMessageParametersTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages.application; - -import org.apache.flink.api.common.ApplicationID; -import org.apache.flink.runtime.rest.messages.MessageParameters; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link ApplicationExceptionsMessageParameters}. */ -class ApplicationExceptionsMessageParametersTest { - - @Test - void testMaxExceptionsQueryParameterIsRendered() throws Exception { - ApplicationID applicationId = ApplicationID.generate(); - ApplicationExceptionsMessageParameters parameters = - new ApplicationExceptionsMessageParameters(); - parameters.applicationPathParameter.resolve(applicationId); - parameters.upperLimitExceptionParameter.resolveFromString("20"); - - String resolvedUrl = - MessageParameters.resolveUrl(ApplicationExceptionsHeaders.URL, parameters); - - assertThat(resolvedUrl) - .isEqualTo("/applications/" + applicationId + "/exceptions?maxExceptions=20"); - } - - @Test - void testMaxExceptionsQueryParameterIsOmittedWhenUnresolved() { - ApplicationID applicationId = ApplicationID.generate(); - ApplicationExceptionsMessageParameters parameters = - new ApplicationExceptionsMessageParameters(); - parameters.applicationPathParameter.resolve(applicationId); - - String resolvedUrl = - MessageParameters.resolveUrl(ApplicationExceptionsHeaders.URL, parameters); - - assertThat(resolvedUrl).isEqualTo("/applications/" + applicationId + "/exceptions"); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java index b30006282731f..86219a0a2a2f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExceptionsMessageParametersTest.java @@ -34,23 +34,11 @@ void testMaxExceptionsQueryParameterIsRendered() throws Exception { JobID jobId = new JobID(); JobExceptionsMessageParameters parameters = new JobExceptionsMessageParameters(); parameters.jobPathParameter.resolve(jobId); - parameters.upperLimitExceptionParameter.resolveFromString("20"); + parameters.upperLimitExceptionParameter.resolveFromString("5"); String resolvedUrl = MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); - assertThat(resolvedUrl).isEqualTo("/jobs/" + jobId + "/exceptions?maxExceptions=20"); - } - - @Test - void testFailureLabelFilterQueryParameterIsRendered() throws Exception { - JobID jobId = new JobID(); - JobExceptionsMessageParameters parameters = new JobExceptionsMessageParameters(); - parameters.jobPathParameter.resolve(jobId); - parameters.failureLabelExceptionParameter.resolveFromString("type:system"); - - String resolvedUrl = MessageParameters.resolveUrl(JobExceptionsHeaders.URL, parameters); - - assertThat(resolvedUrl).contains("failureLabelFilter="); + assertThat(resolvedUrl).isEqualTo("/jobs/" + jobId + "/exceptions?maxExceptions=5"); } @Test