Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ private static void extractEqualityConditions(
final SqlKind kind = call.getOperator().getKind();

if (kind != SqlKind.EQUALS) {
for (final RexNode operand : call.getOperands()) {
extractEqualityConditions(
operand, inputOffsets, inputFieldCounts, joinAttributeMap);
// Only conjunctions (AND) can contain equality conditions that are valid for multijoin.
// All other condition types are deferred to the postJoinFilter.
if (kind == SqlKind.AND) {
for (final RexNode operand : call.getOperands()) {
extractEqualityConditions(
operand, inputOffsets, inputFieldCounts, joinAttributeMap);
}
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ public List<TableTestProgram> programs() {
MultiJoinTestPrograms.MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS,
MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS,
MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES,
MultiJoinTestPrograms
.MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION);
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION,
MultiJoinTestPrograms.MULTI_JOIN_TWO_WAY_INNER_JOIN_WITH_WHERE_IN,
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_MULTI_KEY_TYPES,
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED);
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.planner.expressions.utils.FuncWithOpen
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.table.planner.runtime.utils._
Expand Down Expand Up @@ -623,6 +624,22 @@ class JoinITCase(miniBatch: MiniBatchMode, state: StateBackendMode, enableAsyncS
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

@TestTemplate
def testInnerMultiJoinWithEqualPk(): Unit = {
tEnv.getConfig.getConfiguration
.setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), "true")
val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"

val sink = new TestingRetractSink
tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()

val expected = Seq("1,1", "2,2", "3,3")
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}

@TestTemplate
def testInnerJoinWithPk(): Unit = {
val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.operators.join.stream;

import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
Expand Down Expand Up @@ -609,6 +610,10 @@ private Integer processRecords(
}
}

private boolean isHeapBackend() {
return getKeyedStateBackend() instanceof HeapKeyedStateBackend;
}

private static RowData newJoinedRowData(int depth, RowData joinedRowData, RowData record) {
RowData newJoinedRowData;
if (depth == 0) {
Expand Down Expand Up @@ -808,6 +813,11 @@ private void initializeStateHandlers() {
"Keyed state store not found when initializing keyed state store handlers.");
}

boolean prohibitReuseRow = isHeapBackend();
if (prohibitReuseRow) {
this.keyExtractor.requiresKeyDeepCopy();
}

this.stateHandlers = new ArrayList<>(inputSpecs.size());
for (int i = 0; i < inputSpecs.size(); i++) {
MultiJoinStateView stateView;
Expand Down
Loading