Kafka Connect: Add ConnectorMetrics instrumentation for observability#15961
Open
kumarpritam863 wants to merge 21 commits intoapache:mainfrom
Open
Kafka Connect: Add ConnectorMetrics instrumentation for observability#15961kumarpritam863 wants to merge 21 commits intoapache:mainfrom
kumarpritam863 wants to merge 21 commits intoapache:mainfrom
Conversation
Contributor
Author
|
@bryanck can you please check once. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a
ConnectorMetricsfacade that instruments the entire write and commit pipeline with 13 counters and gauges across three tiers: operational essentials, write path visibility, and schema/table management.Today, the connector has zero metrics — operators monitor production deployments by parsing logs. This PR places instrumentation at every critical point in the data path so that when the Kafka dependency
moves to 4.0+ (KIP-KAFKA-15995
PluginMetricsAPI), all metrics automatically light up through the runtime's configured reporters (JMX, Prometheus, Datadog, etc.) with zero code changes.On Kafka 3.9.x (current), counters are tracked in-memory via
AtomicLongand accessible programmatically (e.g., for integration tests or custom health checks). TheConnectorMetrics.NOOPsingleton ensureszero overhead in tests.
Metrics catalog
Tier 1 — Operational Essentials
records-received-totalSinkWriter.save()— every batch received fromput()records-written-totalIcebergWriter.write()— each record successfully writtenrecords-dropped-totalrecord-conversion-errors-totalIcebergWriter.write()catch block — type mismatches, parse failurescommit-totalCoordinator.commit()— every commit cycle initiatedcommit-success-totalCoordinator.commit()— successfuldoCommit()completioncommit-failure-totalCoordinator.commit()— caught exception in commit cycleTier 2 — Write Path
data-files-written-totalIcebergWriter.flush()— fromWriteResult.dataFiles()delete-files-written-totalIcebergWriter.flush()— fromWriteResult.deleteFiles()active-writersSinkWriter.writersmap size — registered via supplierTier 3 — Schema & Table Management
schema-evolutions-totalIcebergWriter.convertToRow()— schema update detected and appliedtables-auto-created-totalIcebergWriterFactory.autoCreateTable()— after successful creationcommit-timeout-totalCoordinator.process()— commit timed out waiting for workersDesign decisions
ConnectorMetricsfacade — All instrumentation calls go through one class (recordWritten(table),commitStarted(), etc.). This decouples metric recording from the metric backend. WhenPluginMetricsbecomes available, only this class changes.
enabledguard on all methods — TheNOOPsingleton hasenabled=false, making every method a branch-predicted no-op. Enabled instances check once per call (branch onvolatile boolean). No overheadwhen disabled.
ConnectorMetrics.create()contains the exact try-catch pattern the KIP recommends (catch NoSuchMethodError | NoClassDefFoundError). When Kafka 4.0+ is available, metricsregister under
kafka.connect:type=plugins,connector=<name>,task=<id>automatically.iceberg.metrics.*properties needed.tableNameparameter on per-table methods — Even though table-scoped sensors aren't wired yet, the parameter is accepted now so call sites don't need to change whenPluginMetricsadds per-table tagsupport.
commitSucceeded(durationMs)andflushCompleted(tableName, durationMs)accept duration for future histogram/avg wiring.