diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java
index bddc6a15239ad..a629af209bc6e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rex/RexUtil.java
@@ -79,10 +79,7 @@
* because of current Calcite way of inferring constants from IS NOT DISTINCT FROM clashes with
* filter push down.
*
- *
Lines 402 ~ 404, Use Calcite 1.32.0 behavior for {@link RexUtil#gatherConstraints(Class,
- * RexNode, Map, Set, RexBuilder)}.
- *
- *
FLINK modifications (backport of CALCITE-6764): Line 2481~2485
+ *
FLINK modifications (backport of CALCITE-6764): Line 2489~2494
*/
public class RexUtil {
@@ -401,9 +398,7 @@ private static void gatherConstraints(
final RexNode right;
switch (predicate.getKind()) {
case EQUALS:
- // FLINK BEGIN MODIFICATION
- // case IS_NOT_DISTINCT_FROM:
- // FLINK END MODIFICATION
+ case IS_NOT_DISTINCT_FROM:
left = ((RexCall) predicate).getOperands().get(0);
right = ((RexCall) predicate).getOperands().get(1);
break;
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
index c4442ef5dde99..041111d053893 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DynamicFunctionPlanTest.xml
@@ -60,10 +60,11 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)], EXPR$3=[COUNT()])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
index be2b0e58f2905..f08d5635de70e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
@@ -490,19 +490,20 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
@@ -1158,19 +1159,20 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], f=[$5], a0=[$6], b0=[$7],
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
index 897e52cf7f783..dc0d9c3e721b3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
@@ -138,14 +138,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
-
-
-
-
-
-
-
-
-
-
-
@@ -311,14 +272,15 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
@@ -280,43 +282,6 @@ Calc(select=[c, g])
+- Exchange(distribution=[hash[e, d]])
+- Calc(select=[d, e, g])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
-]]>
-
-
-
-
-
-
-
-
-
-
-
@@ -416,20 +381,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
@@ -656,20 +623,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
index 249f17770908a..b125f43544167 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -241,20 +241,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
@@ -280,43 +282,6 @@ Calc(select=[c, g])
+- Exchange(distribution=[hash[e, d]])
+- Calc(select=[d, e, g])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[d, e, f, g, h])
-]]>
-
-
-
-
-
-
-
-
-
-
-
@@ -416,20 +381,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
@@ -656,20 +623,22 @@ LogicalProject(a=[$0], b=[$1], d=[$2], e=[$3])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
index 0e508ef7691de..aa0a43e59b279 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml
@@ -517,12 +517,10 @@ LogicalProject(a=[$0], c=[$2], c0=[$9])
@@ -767,10 +765,10 @@ LogicalProject(a=[$0], b=[$1], c=[$9])
@@ -467,15 +469,15 @@ LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
@@ -1423,18 +1427,20 @@ LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3])
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala
index ec3ada5a2fb6f..fd3bc8c2b1816 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.scala
@@ -124,4 +124,11 @@ class BroadcastHashJoinTest extends JoinTestBase {
.hasMessageContaining("Cannot generate a valid execution plan for the given query")
.isInstanceOf[TableException]
}
+
+ @Test
+ override def testInnerJoinWithJoinConditionPushDown(): Unit = {
+ assertThatThrownBy(() => super.testInnerJoinWithJoinConditionPushDown())
+ .hasMessageContaining("Cannot generate a valid execution plan for the given query")
+ .isInstanceOf[TableException]
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala
index e7ad20d02c49b..0370b2c4c0cc0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashJoinTest.scala
@@ -96,4 +96,11 @@ class ShuffledHashJoinTest extends JoinTestBase {
.hasMessageContaining("Cannot generate a valid execution plan for the given query")
.isInstanceOf[TableException]
}
+
+ @Test
+ override def testInnerJoinWithJoinConditionPushDown(): Unit = {
+ assertThatThrownBy(() => super.testInnerJoinWithJoinConditionPushDown())
+ .hasMessageContaining("Cannot generate a valid execution plan for the given query")
+ .isInstanceOf[TableException]
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala
index e10732166ee5e..dffbe8e8b8223 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.scala
@@ -94,4 +94,10 @@ class SortMergeJoinTest extends JoinTestBase {
.isInstanceOf[TableException]
}
+ @Test
+ override def testInnerJoinWithJoinConditionPushDown(): Unit = {
+ assertThatThrownBy(() => super.testInnerJoinWithJoinConditionPushDown())
+ .hasMessageContaining("Cannot generate a valid execution plan for the given query")
+ .isInstanceOf[TableException]
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
index d0e50a00da6ec..be8b8d791d042 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/DecimalITCase.scala
@@ -179,9 +179,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin1(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
-
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),
@@ -193,9 +190,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin2(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
-
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),
@@ -207,9 +201,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin3(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
-
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),
@@ -222,9 +213,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin4(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
-
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),
@@ -236,9 +224,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin5(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
-
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),
@@ -250,9 +235,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin6(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
-
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),
@@ -264,8 +246,6 @@ class DecimalITCase extends BatchTestBase {
@Test
def testJoin7(): Unit = {
- tEnv.getConfig
- .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "HashJoin, NestedLoopJoin")
checkQuery(
Seq(DECIMAL(8, 2), DECIMAL(8, 4), INT, DOUBLE),
s1r(d"1", d"1", 1, 1.0),