Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,20 @@ public Frame decorrelateRel(LogicalFilter rel) {
unsupportedCorConditions);
assert unsupportedCorConditions.isEmpty();

final RexNode remainingCondition =
RexNode remainingCondition =
RexUtil.composeConjunction(rexBuilder, nonCorConditions, false);

// Re-index the remaining (non-correlated) condition against the rewritten input.
// The child may have shifted its row type during decorrelation (e.g. an Aggregate
// injects correlated columns into its group key), so RexInputRefs in HAVING /
// Filter predicates that survive in nonCorConditions must be remapped through
// frame.oldToNewOutputs. Otherwise they silently point at the wrong column.
if (remainingCondition != null) {
remainingCondition =
adjustInputRefs(
remainingCondition, frame.oldToNewOutputs, frame.r.getRowType());
}

// Using LogicalFilter.create instead of RelBuilder.filter to create Filter
// because RelBuilder.filter method does not have VariablesSet arg.
final RelNode newFilter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,35 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalProject(e=[$1], f=[$2])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
]]>
</Resource>
</TestCase>
<TestCase name="testExistsWithCorrelatedOnWhere_Aggregate_LocalWhere">
<Resource name="sql">
<![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d AND r.e > 10 GROUP BY r.f)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[EXISTS({
LogicalAggregate(group=[{0}])
LogicalProject(f=[$2])
LogicalFilter(condition=[AND(=($cor0.a, $0), >($1, 10))])
LogicalTableScan(table=[[default_catalog, default_database, r]])
})], variablesSet=[[$cor0]])
+- LogicalTableScan(table=[[default_catalog, default_database, l]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l]])
+- LogicalProject(d=[$1])
+- LogicalAggregate(group=[{0, 1}])
+- LogicalProject(f=[$2], d=[$0])
+- LogicalFilter(condition=[>($1, 10)])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
]]>
</Resource>
</TestCase>
Expand Down Expand Up @@ -446,6 +475,130 @@ LogicalProject(a=[$0], b=[$1])
+- LogicalProject(d=[$1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, y]])
]]>
</Resource>
</TestCase>
<TestCase name="testExistsWithCorrelatedOnWhere_Having1">
<Resource name="sql">
<![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[EXISTS({
LogicalFilter(condition=[>=($1, 3)])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)])
LogicalProject(f=[$2], e=[$1])
LogicalFilter(condition=[=($cor0.a, $0)])
LogicalTableScan(table=[[default_catalog, default_database, r]])
})], variablesSet=[[$cor0]])
+- LogicalTableScan(table=[[default_catalog, default_database, l]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l]])
+- LogicalProject(d=[$1])
+- LogicalFilter(condition=[>=($2, 3)])
+- LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)])
+- LogicalProject(f=[$2], d=[$0], e=[$1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
]]>
</Resource>
</TestCase>
<TestCase name="testExistsWithCorrelatedOnWhere_Having2">
<Resource name="sql">
<![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3 AND MAX(r.e) < 100)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[EXISTS({
LogicalFilter(condition=[AND(>=($1, 3), <($2, 100))])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[MAX($1)])
LogicalProject(f=[$2], e=[$1])
LogicalFilter(condition=[=($cor0.a, $0)])
LogicalTableScan(table=[[default_catalog, default_database, r]])
})], variablesSet=[[$cor0]])
+- LogicalTableScan(table=[[default_catalog, default_database, l]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l]])
+- LogicalProject(d=[$1])
+- LogicalFilter(condition=[AND(>=($2, 3), <($3, 100))])
+- LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[MAX($2)])
+- LogicalProject(f=[$2], d=[$0], e=[$1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
]]>
</Resource>
</TestCase>
<TestCase name="testExistsWithCorrelatedOnWhere_Having3">
<Resource name="sql">
<![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3 AND COUNT(*) > 1)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[EXISTS({
LogicalFilter(condition=[AND(>=($1, 3), >($2, 1))])
LogicalAggregate(group=[{0}], agg#0=[SUM($1)], agg#1=[COUNT()])
LogicalProject(f=[$2], e=[$1])
LogicalFilter(condition=[=($cor0.a, $0)])
LogicalTableScan(table=[[default_catalog, default_database, r]])
})], variablesSet=[[$cor0]])
+- LogicalTableScan(table=[[default_catalog, default_database, l]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l]])
+- LogicalProject(d=[$1])
+- LogicalFilter(condition=[AND(>=($2, 3), >($3, 1))])
+- LogicalAggregate(group=[{0, 1}], agg#0=[SUM($2)], agg#1=[COUNT()])
+- LogicalProject(f=[$2], d=[$0], e=[$1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
]]>
</Resource>
</TestCase>
<TestCase name="testExistsWithCorrelatedOnWhere_Having4">
<Resource name="sql">
<![CDATA[SELECT * FROM l WHERE EXISTS (SELECT 1 FROM r WHERE l.a = r.d AND l.b = r.e GROUP BY r.f HAVING COUNT(r.d) >= 2)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalFilter(condition=[EXISTS({
LogicalFilter(condition=[>=($1, 2)])
LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])
LogicalProject(f=[$2], d=[$0])
LogicalFilter(condition=[AND(=($cor0.a, $0), =($cor0.b, $1))])
LogicalTableScan(table=[[default_catalog, default_database, r]])
})], variablesSet=[[$cor0]])
+- LogicalTableScan(table=[[default_catalog, default_database, l]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2])
+- LogicalJoin(condition=[AND(=($0, $3), =($1, $4))], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l]])
+- LogicalProject(d=[$1], e=[$2])
+- LogicalFilter(condition=[>=($3, 2)])
+- LogicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($1)])
+- LogicalProject(f=[$2], d=[$0], e=[$1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r]])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,51 @@ class SubQuerySemiJoinTest extends SubQueryTestBase {
util.verifyRelPlanNotExpected(sqlQuery, "joinType=[semi]")
}

@Test
def testExistsWithCorrelatedOnWhere_Having1(): Unit = {
// Correlated WHERE plus HAVING on a single aggregate output.
// Regression for SubQueryDecorrelator: the non-correlated HAVING predicate must be
// re-indexed against the rewritten Aggregate (which receives the correlated column
// injected into its group key).
val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
"(SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3)"
util.verifyRelPlan(sqlQuery)
}

@Test
def testExistsWithCorrelatedOnWhere_Having2(): Unit = {
// Compound HAVING with multiple aggregate refs.
val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
"(SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3 AND MAX(r.e) < 100)"
util.verifyRelPlan(sqlQuery)
}

@Test
def testExistsWithCorrelatedOnWhere_Having3(): Unit = {
// HAVING that mixes an aggregate ref with COUNT(*).
val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
"(SELECT 1 FROM r WHERE l.a = r.d GROUP BY r.f HAVING SUM(r.e) >= 3 AND COUNT(*) > 1)"
util.verifyRelPlan(sqlQuery)
}

@Test
def testExistsWithCorrelatedOnWhere_Having4(): Unit = {
// Multiple correlated WHERE columns combined with a HAVING on aggregate output.
val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
"(SELECT 1 FROM r WHERE l.a = r.d AND l.b = r.e GROUP BY r.f HAVING COUNT(r.d) >= 2)"
util.verifyRelPlan(sqlQuery)
}

@Test
def testExistsWithCorrelatedOnWhere_Aggregate_LocalWhere(): Unit = {
// Mixed correlated + local WHERE, no HAVING. Guards against an over-eager fix:
// the local predicate `r.e > 10` sits below the Aggregate, so its RexInputRef must
// remain stable through decorrelation.
val sqlQuery = "SELECT * FROM l WHERE EXISTS " +
"(SELECT 1 FROM r WHERE l.a = r.d AND r.e > 10 GROUP BY r.f)"
util.verifyRelPlan(sqlQuery)
}

@Test
def testExistsWithCorrelatedOnWhere_UnsupportedAggregate1(): Unit = {
util.addTableSource[(Int, Long)]("l1", 'a, 'b)
Expand Down