diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java index 5ed820c8cbe1..88e04ee401ad 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordUtils.java @@ -47,9 +47,12 @@ class RecordUtils { - @SuppressWarnings("unchecked") static Object extractFromRecordValue(Object recordValue, String fieldName) { - List fields = Splitter.on('.').splitToList(fieldName); + return extractFromRecordValue(recordValue, Splitter.on('.').splitToList(fieldName)); + } + + @SuppressWarnings("unchecked") + static Object extractFromRecordValue(Object recordValue, List fields) { if (recordValue instanceof Struct) { return valueFromStruct((Struct) recordValue, fields); } else if (recordValue instanceof Map) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java index 48a01881935b..d9f4b6091f27 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; @@ -39,12 +40,16 @@ public class SinkWriter { private final IcebergWriterFactory writerFactory; private final Map writers; private final Map sourceOffsets; + // the route field is fixed per config, so split its dotted path once instead of per record + private final List routeFieldPath; public SinkWriter(Catalog catalog, IcebergSinkConfig config) { this.config = config; this.writerFactory = new IcebergWriterFactory(catalog, config); this.writers = Maps.newHashMap(); this.sourceOffsets = Maps.newHashMap(); + String routeField = config.tablesRouteField(); + this.routeFieldPath = routeField == null ? null : Splitter.on('.').splitToList(routeField); } public void close() { @@ -90,9 +95,7 @@ private void save(SinkRecord record) { } private void routeRecordStatically(SinkRecord record) { - String routeField = config.tablesRouteField(); - - if (routeField == null) { + if (routeFieldPath == null) { // route to all tables config .tables() @@ -102,7 +105,7 @@ private void routeRecordStatically(SinkRecord record) { }); } else { - String routeValue = extractRouteValue(record.value(), routeField); + String routeValue = extractRouteValue(record.value()); if (routeValue != null) { config .tables() @@ -118,21 +121,20 @@ private void routeRecordStatically(SinkRecord record) { } private void routeRecordDynamically(SinkRecord record) { - String routeField = config.tablesRouteField(); - Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing"); + Preconditions.checkNotNull(routeFieldPath, "Route field cannot be null with dynamic routing"); - String routeValue = extractRouteValue(record.value(), routeField); + String routeValue = extractRouteValue(record.value()); if (routeValue != null) { String tableName = routeValue.toLowerCase(Locale.ROOT); writerForTable(tableName, record, true).write(record); } } - private String extractRouteValue(Object recordValue, String routeField) { + private String extractRouteValue(Object recordValue) { if (recordValue == null) { return null; } - Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeField); + Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeFieldPath); return routeValue == null ? null : routeValue.toString(); }