Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@

class RecordUtils {

@SuppressWarnings("unchecked")
static Object extractFromRecordValue(Object recordValue, String fieldName) {
List<String> fields = Splitter.on('.').splitToList(fieldName);
return extractFromRecordValue(recordValue, Splitter.on('.').splitToList(fieldName));
}

@SuppressWarnings("unchecked")
static Object extractFromRecordValue(Object recordValue, List<String> fields) {
if (recordValue instanceof Struct) {
return valueFromStruct((Struct) recordValue, fields);
} else if (recordValue instanceof Map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -39,12 +40,16 @@ public class SinkWriter {
private final IcebergWriterFactory writerFactory;
private final Map<String, RecordWriter> writers;
private final Map<TopicPartition, Offset> sourceOffsets;
// the route field is fixed per config, so split its dotted path once instead of per record
private final List<String> routeFieldPath;

public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
this.config = config;
this.writerFactory = new IcebergWriterFactory(catalog, config);
this.writers = Maps.newHashMap();
this.sourceOffsets = Maps.newHashMap();
String routeField = config.tablesRouteField();
this.routeFieldPath = routeField == null ? null : Splitter.on('.').splitToList(routeField);
}

public void close() {
Expand Down Expand Up @@ -90,9 +95,7 @@ private void save(SinkRecord record) {
}

private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

if (routeField == null) {
if (routeFieldPath == null) {
// route to all tables
config
.tables()
Expand All @@ -102,7 +105,7 @@ private void routeRecordStatically(SinkRecord record) {
});

} else {
String routeValue = extractRouteValue(record.value(), routeField);
String routeValue = extractRouteValue(record.value());
if (routeValue != null) {
config
.tables()
Expand All @@ -118,21 +121,20 @@ private void routeRecordStatically(SinkRecord record) {
}

private void routeRecordDynamically(SinkRecord record) {
String routeField = config.tablesRouteField();
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
Preconditions.checkNotNull(routeFieldPath, "Route field cannot be null with dynamic routing");

String routeValue = extractRouteValue(record.value(), routeField);
String routeValue = extractRouteValue(record.value());
if (routeValue != null) {
String tableName = routeValue.toLowerCase(Locale.ROOT);
writerForTable(tableName, record, true).write(record);
}
}

private String extractRouteValue(Object recordValue, String routeField) {
private String extractRouteValue(Object recordValue) {
if (recordValue == null) {
return null;
}
Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeField);
Object routeValue = RecordUtils.extractFromRecordValue(recordValue, routeFieldPath);
return routeValue == null ? null : routeValue.toString();
}

Expand Down