Skip to content

Kafka Connect: Add pluggable RecordRouter interface with five built-in routing strategies#15959

Open
kumarpritam863 wants to merge 22 commits intoapache:mainfrom
kumarpritam863:feature/pluggable-record-router
Open

Kafka Connect: Add pluggable RecordRouter interface with five built-in routing strategies#15959
kumarpritam863 wants to merge 22 commits intoapache:mainfrom
kumarpritam863:feature/pluggable-record-router

Conversation

@kumarpritam863
Copy link
Copy Markdown
Contributor

@kumarpritam863 kumarpritam863 commented Apr 13, 2026

Summary

Introduces a pluggable RecordRouter interface that abstracts all record-to-table routing logic in the Kafka Connect sink, replacing the hardcoded static/dynamic if-else in SinkWriter. This unlocks custom
routing strategies without modifying connector internals.

Ships six built-in implementations:

  • StaticRouter — wraps existing static routing behavior (broadcast + regex filtering)
  • DynamicRouter — wraps existing dynamic routing behavior (field value as table name)
  • TopicNameRouter — maps Kafka topic names to Iceberg table identifiers with namespace prefixing, regex transforms, explicit topic-to-table overrides, and case control
  • CompositeFieldRouter — multi-dimensional routing using multiple record fields combined via a template engine
  • HeaderRouter — routes based on Kafka record header values with multi-value fan-out, explicit mappings, and configurable missing-header policies
  • FilterRouter — decorator that evaluates predicates (topic regex, field regex, header regex, field existence) before delegating to any other router

Motivation

The current routing is single-dimensional (one field determines the destination) and the two modes (static vs dynamic) are mutually exclusive. Users frequently need:

  • Topic-based routing — no workaround exists today
  • Multi-field routing — e.g., route by region + event_type to different tables
  • Header-based routing — standard in event-driven architectures where producers attach routing metadata
  • Record filtering — drop test data, PII, or records missing required fields before writing

The pluggable interface enables all of these as independent, composable implementations.

Key design decisions

  • RecordRouter.configure(Map<String, String>) takes raw props instead of IcebergSinkConfig to keep the interface decoupled — external implementations only depend on the router API
  • route() returns List<RouteTarget> to support fan-out (one record → multiple tables), which static routing already does in broadcast mode and HeaderRouter uses for multi-value headers
  • RouteTarget carries ignoreMissingTable to preserve the semantic difference between static routing (throw on missing) and dynamic/topic routing (NoOpWriter)
  • Built-in routers have a package-private configure(IcebergSinkConfig) overload to avoid the cost of re-creating config + JsonConverter from raw props
  • FilterRouter uses the decorator pattern — it wraps any other router and evaluates predicates before delegating, making filtering composable with any routing strategy
  • When iceberg.tables.router-class is unset, RecordRouterFactory falls back to StaticRouter/DynamicRouterzero behavioral change for existing users

New configuration

Property Type Default Description
iceberg.tables.router-class String null Fully-qualified RecordRouter class name. Overrides iceberg.tables and iceberg.tables.dynamic-enabled when set.

TopicNameRouter properties (prefix iceberg.tables.route.topic-name.)

Property Default Description
table-namespace null Namespace prepended to derived table name
table-map.<topic> Explicit topic→table overrides (bypass all transforms)
regex null Regex applied to topic name
regex-replacement $1 Replacement string for regex captures
lowercase true Lowercase the derived table name
ignore-missing-table true Use NoOpWriter for missing tables instead of throwing

CompositeFieldRouter properties (prefix iceberg.tables.route.composite.)

Property Default Description
fields required Comma-separated list of field paths (dot notation for nesting)
table-template ${0}.${1} Template for building table name. ${N} references fields by index.
table-namespace null Namespace prepended to the final table name
lowercase true Lowercase the derived table name
ignore-missing-table true Use NoOpWriter for missing tables
null-handling drop When a field is null: drop, literal, or default:<value>

HeaderRouter properties (prefix iceberg.tables.route.header.)

Property Default Description
name required Header name to extract
table-namespace null Namespace prepended to derived table name
table-map.<header-value> Explicit header-value→table overrides
regex null Regex applied to header value
regex-replacement $1 Replacement string for regex captures
lowercase true Lowercase the derived table name
ignore-missing-table true Use NoOpWriter for missing tables
on-missing-header drop When header is absent: drop, fail, or default:<table>
multi-value first Multiple headers with same name: first, last, or all (fan-out)

FilterRouter properties (prefix iceberg.tables.route.filter.)

Property Default Description
delegate-class required Fully-qualified class name of the router to delegate to after filtering
mode include include = only matching records pass. exclude = matching records dropped.
topic-regex null Regex that topic name must match
field null Record value field to evaluate (dot notation)
field-regex null Regex that extracted field value must match
header null Header name to evaluate
header-regex null Regex that extracted header value must match
field-exists null Field path that must exist (non-null) in the record

Example usage

# Simple topic routing: topic "orders" → table "analytics.orders"
iceberg.tables.router-class=org.apache.iceberg.connect.data.TopicNameRouter                                                                                                                                       
iceberg.tables.route.topic-name.table-namespace=analytics                                                                                                                                                         
                                                                                                                                                                                                                  
# Multi-field routing: {region: "us_east", event_type: "clicks"} → "warehouse.us_east_clicks"                                                                                                                     
iceberg.tables.router-class=org.apache.iceberg.connect.data.CompositeFieldRouter                                                                                                                                  
iceberg.tables.route.composite.fields=region,event_type                                                                                                                                                           
iceberg.tables.route.composite.table-template=${0}_${1}                                                                                                                                                           
iceberg.tables.route.composite.table-namespace=warehouse                                                                                                                                                          
                                                                                                                                                                                                                  
# Header routing: header "iceberg.table: db.orders" → routes to db.orders                                                                                                                                         
iceberg.tables.router-class=org.apache.iceberg.connect.data.HeaderRouter                                                                                                                                          
iceberg.tables.route.header.name=iceberg.table                                                                                                                                                                    
                                                                                                                                                                                                                  
# Filtered routing: only process prod events, then route by topic                                                                                                                                                 
iceberg.tables.router-class=org.apache.iceberg.connect.data.FilterRouter                                                                                                                                          
iceberg.tables.route.filter.delegate-class=org.apache.iceberg.connect.data.TopicNameRouter                                                                                                                        
iceberg.tables.route.filter.field=env                                                                                                                                                                             
iceberg.tables.route.filter.field-regex=prod                                                                                                                                                                      
iceberg.tables.route.filter.mode=include                                                                                                                                                                          
iceberg.tables.route.topic-name.table-namespace=warehouse                                                                                                                                                                                                                                                                                                                          

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@bryanck can you please check.

@kumarpritam863 kumarpritam863 changed the title Kafka Connect: Add pluggable RecordRouter interface with TopicNameRouter Kafka Connect: Add pluggable RecordRouter interface with five built-in routing strategies Apr 13, 2026
@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@danielcweeks can you please review this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant