[FLINK-39392][table] Support conditional traits for PTFs#27886
[FLINK-39392][table] Support conditional traits for PTFs#27886gustavodemorais wants to merge 5 commits intoapache:masterfrom
Conversation
twalthr
left a comment
There was a problem hiding this comment.
Thank you for this PR @gustavodemorais. Overall I'm +1 for this change. However, we need to clearly define the boundaries, when static arguments are fully resolved and a trait condition has no effect anymore. Some locations look currently very hacky, we should take another look. Also we need Table API support which is not covered by this PR, at least not in tests.
| | Parameter | Required | Description | | ||
| |:-------------|:---------|:------------| | ||
| | `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. | | ||
| | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, providing `PARTITION BY` is recommended for better performance. | |
There was a problem hiding this comment.
| | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, providing `PARTITION BY` is recommended for better performance. | | |
| | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located for parallel execution (set semantics). Without `PARTITION BY`, each row is processed independently (row semantics). Accepts insert-only, retract, and upsert tables. | |
|
|
||
| #### Without PARTITION BY | ||
|
|
||
| ```sql |
There was a problem hiding this comment.
Let's remove all PARTITION BY examples for now. A default TO_CHANGELOG example should always be without PARTITION BY. They do not provide any benefit but rather add exchange overhead.
| Row.class, | ||
| false, | ||
| EnumSet.of( | ||
| StaticArgumentTrait.TABLE, |
There was a problem hiding this comment.
Just to make things explicit, I would add StateicArgumentTrait.ROW_SEMANTIC here as well.
| * .addTraitWhen(not(hasPartitionBy()), ROW_SEMANTIC_TABLE); | ||
| * }</pre> | ||
| */ | ||
| public StaticArgument addTraitWhen( |
There was a problem hiding this comment.
| public StaticArgument addTraitWhen( | |
| public StaticArgument withConditionalTrait( |
There was a problem hiding this comment.
I would swap the parameter order then: withConditionalTrait(trait, condition)
| private final List<ConditionalTrait> conditionalTraits; | ||
|
|
||
| /** A trait that is conditionally added based on a {@link TraitCondition}. */ | ||
| private static final class ConditionalTrait implements Serializable { |
There was a problem hiding this comment.
nit: classes to the bottom of the file
| private static final class ConditionalTrait implements Serializable { | |
| private static final class ConditionalTrait { |
| } | ||
|
|
||
| /** True when the named boolean argument is provided and its value is {@code true}. */ | ||
| static TraitCondition argIsTrue(final String name) { |
There was a problem hiding this comment.
generialize the is true and is false to:
static <T> TraitCondition argIsEqualTo(T obj) {ctx.getScalarArgument(name, obj.getClass) == obj}
| } | ||
|
|
||
| final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex()); | ||
| final org.apache.flink.table.types.inference.TraitContext traitCtx = |
There was a problem hiding this comment.
pay attention to full imports, seems Claude loves to do this
| final org.apache.flink.table.types.inference.TraitContext traitCtx = | |
| final TraitContext traitCtx = |
There was a problem hiding this comment.
same comment as above. resolve the static arg as early as possible to not reconstruct TraitContext multiple times
| .noneMatch( | ||
| arg -> | ||
| arg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE) | ||
| || arg.hasConditionalTrait( |
There was a problem hiding this comment.
we should do this on the actual resulting trait
| if (operand.getKind() == SqlKind.DEFAULT || !(operand instanceof RexLiteral)) { | ||
| return Optional.empty(); | ||
| } | ||
| return Optional.ofNullable(((RexLiteral) operand).getValueAs(clazz)); |
There was a problem hiding this comment.
this is too simple, it should follow the same rules as CallContext does. Otherwise it won't be possible e.g. to get Instant.class or other literals.
| final boolean hasPartitionBy = partitionKeys.length > 0; | ||
| final boolean reportedAsSet = tableCharacteristic.semantics == Semantics.SET; | ||
| final boolean setIsConditional = | ||
| staticArg.hasConditionalTrait(StaticArgumentTrait.SET_SEMANTIC_TABLE); |
There was a problem hiding this comment.
too fragile. determine the effective StaticArgument first and then execute this logic.
What is the purpose of the change
We'd like to make PTF traits configurable so we can have multiple versions depending on how they are configured by the user.
The first suggestion: introduce a declarative addTraitWhen API on StaticArgument that allows table argument traits to vary based on the SQL call context (e.g., whether PARTITION BY is provided or scalar argument values). This replaces the static
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation