Skip to content

Kafka Connect: Add ConnectorMetrics instrumentation for observability#15961

Open
kumarpritam863 wants to merge 21 commits intoapache:mainfrom
kumarpritam863:feature/connector-metrics
Open

Kafka Connect: Add ConnectorMetrics instrumentation for observability#15961
kumarpritam863 wants to merge 21 commits intoapache:mainfrom
kumarpritam863:feature/connector-metrics

Conversation

@kumarpritam863
Copy link
Copy Markdown
Contributor

Summary

Adds a ConnectorMetrics facade that instruments the entire write and commit pipeline with 13 counters and gauges across three tiers: operational essentials, write path visibility, and schema/table management.

Today, the connector has zero metrics — operators monitor production deployments by parsing logs. This PR places instrumentation at every critical point in the data path so that when the Kafka dependency
moves to 4.0+ (KIP-KAFKA-15995 PluginMetrics API), all metrics automatically light up through the runtime's configured reporters (JMX, Prometheus, Datadog, etc.) with zero code changes.

On Kafka 3.9.x (current), counters are tracked in-memory via AtomicLong and accessible programmatically (e.g., for integration tests or custom health checks). The ConnectorMetrics.NOOP singleton ensures
zero overhead in tests.

Metrics catalog

Tier 1 — Operational Essentials

Metric Type Instrumentation Point
records-received-total Counter SinkWriter.save() — every batch received from put()
records-written-total Counter IcebergWriter.write() — each record successfully written
records-dropped-total Counter Reserved for routing drops (tombstones, empty route targets)
record-conversion-errors-total Counter IcebergWriter.write() catch block — type mismatches, parse failures
commit-total Counter Coordinator.commit() — every commit cycle initiated
commit-success-total Counter Coordinator.commit() — successful doCommit() completion
commit-failure-total Counter Coordinator.commit() — caught exception in commit cycle

Tier 2 — Write Path

Metric Type Instrumentation Point
data-files-written-total Counter IcebergWriter.flush() — from WriteResult.dataFiles()
delete-files-written-total Counter IcebergWriter.flush() — from WriteResult.deleteFiles()
active-writers Gauge SinkWriter.writers map size — registered via supplier

Tier 3 — Schema & Table Management

Metric Type Instrumentation Point
schema-evolutions-total Counter IcebergWriter.convertToRow() — schema update detected and applied
tables-auto-created-total Counter IcebergWriterFactory.autoCreateTable() — after successful creation
commit-timeout-total Counter Coordinator.process() — commit timed out waiting for workers

Design decisions

  • ConnectorMetrics facade — All instrumentation calls go through one class (recordWritten(table), commitStarted(), etc.). This decouples metric recording from the metric backend. When PluginMetrics
    becomes available, only this class changes.
  • enabled guard on all methods — The NOOP singleton has enabled=false, making every method a branch-predicted no-op. Enabled instances check once per call (branch on volatile boolean). No overhead
    when disabled.
  • KIP-KAFKA-15995 readyConnectorMetrics.create() contains the exact try-catch pattern the KIP recommends (catch NoSuchMethodError | NoClassDefFoundError). When Kafka 4.0+ is available, metrics
    register under kafka.connect:type=plugins,connector=<name>,task=<id> automatically.
  • No new connector config — The KIP handles all metric infrastructure (reporters, JMX registration, cleanup). No iceberg.metrics.* properties needed.
  • tableName parameter on per-table methods — Even though table-scoped sensors aren't wired yet, the parameter is accepted now so call sites don't need to change when PluginMetrics adds per-table tag
    support.
  • Duration tracking accepted but not storedcommitSucceeded(durationMs) and flushCompleted(tableName, durationMs) accept duration for future histogram/avg wiring.

@kumarpritam863
Copy link
Copy Markdown
Contributor Author

@bryanck can you please check once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant