Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -124,6 +125,15 @@ public LogScanner createLogScanner() {
tableInfo.getTablePath(), limit));
}

if (recordBatchFilter != null
&& tableInfo.getTableConfig().getLogFormat() != LogFormat.ARROW) {
throw new UnsupportedOperationException(
String.format(
"Filter pushdown is only supported for ARROW log format. "
+ "Table: %s, current log format: %s",
tableInfo.getTablePath(), tableInfo.getTableConfig().getLogFormat()));
}

return new LogScannerImpl(
conn.getConfiguration(),
tableInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class FlussConfigUtils {
ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
ConfigOptions.TABLE_DATALAKE_FRESHNESS.key(),
ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(),
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key());
ConfigOptions.TABLE_AUTO_PARTITION_NUM_RETENTION.key(),
ConfigOptions.TABLE_STATISTICS_COLUMNS.key());
Comment thread
platinumhamburg marked this conversation as resolved.
}

public static boolean isTableStorageConfig(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ public static void validateStatisticsConfig(TableDescriptor tableDescriptor) {
return;
}

// Statistics columns are only supported for log tables (non-PK tables)
if (tableDescriptor.hasPrimaryKey()) {
throw new InvalidConfigException(
"Statistics columns are not supported for primary key tables. "
+ "Please remove the '"
+ ConfigOptions.TABLE_STATISTICS_COLUMNS.key()
+ "' property.");
}

RowType rowType = tableDescriptor.getSchema().getRowType();

// Wildcard means all supported columns - no validation needed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.flink.FlinkConnectorOptions;
import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.lake.LakeTableFactory;
Expand Down Expand Up @@ -62,6 +63,7 @@
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.TABLE_DELETE_BEHAVIOR;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
import static org.apache.fluss.config.FlussConfigUtils.TABLE_PREFIX;
import static org.apache.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
import static org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
Expand Down Expand Up @@ -150,6 +152,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(
context.getCatalogTable().getOptions(), context.getConfiguration()),
toFlussTableConfig(tableOptions),
tableOutputType,
primaryKeyIndexes,
bucketKeyIndexes,
Expand Down Expand Up @@ -273,6 +276,24 @@ private static Configuration toFlussClientConfig(
return flussConfig;
}

private static TableConfig toFlussTableConfig(ReadableConfig tableOptions) {
Configuration tableConfig = new Configuration();

// forward all table-level configs by iterating through known table options
// this approach is safer than using toMap() which may not exist in all Flink versions
for (ConfigOption<?> option : FlinkConnectorOptions.TABLE_OPTIONS) {
if (option.key().startsWith(TABLE_PREFIX)) {
Object value = tableOptions.getOptional(option).orElse(null);
if (value != null) {
// convert value to string for configuration storage
tableConfig.setString(option.key(), value.toString());
}
}
}

return new TableConfig(tableConfig);
}

private static TablePath toFlussTablePath(ObjectIdentifier tablePath) {
return TablePath.of(tablePath.getDatabaseName(), tablePath.getObjectName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
isPartitioned,
flussRowType,
projectedFields,
null,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
new BinlogDeserializationSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
isPartitioned(),
flussRowType,
projectedFields,
null,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
new ChangelogDeserializationSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@ public class FlinkSource<OUT>
@Nullable private final LakeSource<LakeSplit> lakeSource;
private final LeaseContext leaseContext;

@Nullable private final Predicate logRecordBatchFilter;

public FlinkSource(
Configuration flussConf,
TablePath tablePath,
boolean hasPrimaryKey,
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
@Nullable Predicate logRecordBatchFilter,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
Expand All @@ -93,6 +96,7 @@ public FlinkSource(
isPartitioned,
sourceOutputType,
projectedFields,
logRecordBatchFilter,
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
deserializationSchema,
Expand All @@ -109,6 +113,7 @@ public FlinkSource(
boolean isPartitioned,
RowType sourceOutputType,
@Nullable int[] projectedFields,
@Nullable Predicate logRecordBatchFilter,
OffsetsInitializer offsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
FlussDeserializationSchema<OUT> deserializationSchema,
Expand All @@ -122,6 +127,7 @@ public FlinkSource(
this.isPartitioned = isPartitioned;
this.sourceOutputType = sourceOutputType;
this.projectedFields = projectedFields;
this.logRecordBatchFilter = logRecordBatchFilter;
this.offsetsInitializer = offsetsInitializer;
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
this.deserializationSchema = deserializationSchema;
Expand Down Expand Up @@ -213,6 +219,7 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
sourceOutputType,
context,
projectedFields,
logRecordBatchFilter,
flinkSourceReaderMetrics,
recordEmitter,
lakeSource);
Expand Down
Loading