Skip to content

kafka_consumer: collect Kafka Connect connector metrics and configurations#24013

Draft
piochelepiotr wants to merge 17 commits into
masterfrom
pwolski/kafka-consumer-connect-monitoring
Draft

kafka_consumer: collect Kafka Connect connector metrics and configurations#24013
piochelepiotr wants to merge 17 commits into
masterfrom
pwolski/kafka-consumer-connect-monitoring

Conversation

@piochelepiotr

@piochelepiotr piochelepiotr commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

Adds Kafka Connect monitoring to the `kafka_consumer` integration via a new `connectors.py` module (`KafkaConnectCollector`). When `kafka_connect_url` is configured and `enable_cluster_monitoring` is enabled, the agent queries the Kafka Connect REST API and emits connector health metrics and configuration events.

New metrics (all tagged `kafka_cluster_id`, `connect_url`):

Metric Description
`kafka.connector.count` Total connectors per Connect worker
`kafka.connector.running` 1 if RUNNING, 0 otherwise — tagged with `connector`, `connector_type`, `connector_class`, `connector_state`
`kafka.connector.task.count` Configured task count per connector
`kafka.connector.tasks` Task count by state — tagged with `task_state` (running/failed/paused/unassigned)
`kafka.connector.task.running` Per-task 0/1, gated by `kafka_connect_collect_task_metrics: true` (default off)

Configuration events (`data-streams-message`, `config_type: connector`): emitted when connector config changes or the 1-hour TTL expires, using the same hash-based dedup pattern as broker/topic configs. Sensitive config values (passwords, secrets, JAAS configs) are redacted before emission.

Plugin inventory events (`config_type: connector_plugins`): emitted on the `kafka_configs_refresh_interval` cadence.

Connectivity status in heartbeat: instead of a service check, whether each Connect API endpoint was reachable is reported in the cluster monitoring heartbeat event (`connect_api_status` field).

Authentication methods supported:

  • No auth (default)
  • Basic auth (`kafka_connect_username` / `kafka_connect_password`)
  • TLS client certificates (`kafka_connect_tls_cert` / `kafka_connect_tls_key`)
  • Custom CA verification (`kafka_connect_tls_ca_cert`)
  • OAuth/OIDC (`kafka_connect_oauth_token_provider` — identical shape to `schema_registry_oauth_token_provider`)
  • Confluent Cloud Connect (`kafka_connect_confluent_cloud_environment_id` / `kafka_connect_confluent_cloud_cluster_id`)

Dashboard: a new integration dashboard asset is included — Kafka Connect - Connector Monitoring (DSM).

Implementation notes:

  • Collection is gated by `enable_cluster_monitoring` (same as schema registry and other DSM-only metrics)
  • Uses a dedicated `requests.Session` (not the shared check HTTP client) so Connect auth doesn't conflict with schema registry auth
  • Single `GET /connectors?expand=info&expand=status` call per worker — no N+1 requests
  • Confluent Cloud Connect uses the same REST collection logic via `https://api.confluent.cloud/connect/v1/environments/{env}/clusters/{cluster}\`

Motivation

Kafka Connect is widely deployed alongside Kafka for CDC, streaming ETL, and data replication. Currently there is no native Datadog integration for monitoring connector health — users have to rely on JMX or custom scripts. The primary pain point is knowing when a connector or task has failed so data flow is interrupted.

Review checklist (to be filled by reviewers)

  • Feature or bugfix MUST have appropriate tests (unit, integration, e2e)
  • Add `qa/required` if this PR needs QA validation, or `qa/skip-qa` if it does not. Exactly one of the two is required.
  • If you need to backport this PR to another branch, you can add the `backport/` label to the PR and it will automatically open a backport PR once this one is merged

…tions

Adds a new KafkaConnectCollector (connectors.py) that queries the Kafka
Connect REST API and Amazon MSK Connect to surface connector health,
task status, and configuration into DSM.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@datadog-prod-us1-5

This comment has been minimized.

piochelepiotr and others added 6 commits June 11, 2026 12:20
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Gate connector collection on enable_cluster_monitoring flag
- Fix dedup hash: exclude collection_timestamp from hashed content
- Scope cache keys per-URL to avoid collisions across multiple Connect endpoints
- Remove kafka_connect_aws_msk_cluster_arn option (boto3 API doesn't support ARN filtering)
- Add MSK can_connect service check with aws_region tag
- Add isinstance guard for older Kafka Connect workers returning list responses
- Fix type hints and improve exception messages
- Add 15 unit tests covering dedup, metrics, service checks, and compat
- Regenerate config_models and conf.yaml.example after spec.yaml changes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…t display name

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… in alphabetical position)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@dd-octo-sts

dd-octo-sts Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Validation Report

All 21 validations passed.

Show details
Validation Description Status
agent-reqs Verify check versions match the Agent requirements file
ci Validate CI configuration and code coverage settings
codeowners Validate every integration has a CODEOWNERS entry
config Validate default configuration files against spec.yaml
dep Verify dependency pins are consistent and Agent-compatible
http Validate integrations use the HTTP wrapper correctly
imports Validate check imports do not use deprecated modules
integration-style Validate check code style conventions
jmx-metrics Validate JMX metrics definition files and config
labeler Validate PR labeler config matches integration directories
legacy-signature Validate no integration uses the legacy Agent check signature
license-headers Validate Python files have proper license headers
licenses Validate third-party license attribution list
metadata Validate metadata.csv metric definitions
models Validate configuration data models match spec.yaml
openmetrics Validate OpenMetrics integrations disable the metric limit
package Validate Python package metadata and naming
qa-label Validate the pull request declares whether it needs QA for the next Agent release
readmes Validate README files have required sections
saved-views Validate saved view JSON file structure and fields
version Validate version consistency between package and changelog

View full run

piochelepiotr and others added 10 commits June 11, 2026 13:29
…ics, add connectivity to heartbeat

- Replace connector.can_connect service check with connectivity status in the
  cluster monitoring heartbeat (connect_api_status field)
- Consolidate 4 task-state gauges (tasks_running/failed/paused/unassigned) into
  a single connector.tasks metric tagged with task_state
- collect() now returns dict[str, bool] (endpoint → connected)
- Heartbeat is sent after connector collection so it includes connect_api_status
- Update tests to match new API

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add tests covering _configure_session TLS/auth paths, _collect_rest success path,
_collect_plugins, _get_items_to_fetch/_mark_items_fetched cache logic, _fetch_oidc_token,
_refresh_oauth_token, MSK collect path with mocked boto3, _get_tags, and all exception paths.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add a _collect_confluent_cloud() method that constructs the Confluent Cloud
Connect REST API URL (api.confluent.cloud/connect/v1/environments/{env}/clusters/{cluster})
and delegates to the existing _collect_rest() path, which is already compatible
with the Confluent Cloud API response format.

New config params:
  kafka_connect_confluent_cloud_environment_id
  kafka_connect_confluent_cloud_cluster_id
  kafka_connect_confluent_cloud_url (optional, defaults to https://api.confluent.cloud)

Authenticate using kafka_connect_username (Cloud API key ID) and
kafka_connect_password (Cloud API key secret).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Drop the MSK Connect (boto3) collection path — it uses a different API,
returns fewer metrics, and is better handled in a follow-up PR. Also
fixes the caller-side guard in kafka_consumer.py which was missing the
Confluent Cloud fields, causing CC-only configs to silently produce no
metrics.

Also simplifies a convoluted OIDC test assertion flagged by ruff.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix dashboard "Connectors by State" paused/failed series: use count:
  aggregator instead of sum: (value is always 0 for non-running connectors)
- Fix "Connectors by Type" toplist to count all connectors, not just running
- Fix e.g., comma in spec.yaml/conf.yaml.example (documentation style)
- OAuth token failure now returns endpoints as False so heartbeat reports
  connectivity failure instead of silently omitting connect_api_status
- Task state bucketing now dynamic: covers RESTARTING/DESTROYED in addition
  to RUNNING/FAILED/PAUSED/UNASSIGNED
- Wire up CONNECTOR_CONFIG_CACHE_MAX_SIZE in _get_events_to_send to bound
  the config-event cache
- Type _parse_connect_urls value parameter

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add credential redaction in _truncate_connector_config (SENSITIVE_KEY_PATTERN)
- Fix OAuth failure on Confluent Cloud-only setup: include CC key in failure dict
- Fix connect_status None sentinel: distinguish "not configured" from "crashed"
- Normalize task_state casing to lowercase in connector.task.running metric
- Add max_cache_size param to _get_events_to_send for consistency with _mark_items_fetched
- Rename self.CONFIGS_REFRESH_INTERVAL/JITTER to lowercase _configs_refresh_interval/_jitter
- Add type hints to KafkaConnectCollector.__init__
- Parameterize _parse_connect_urls signature with typed generics
- Parametrize TLS and Confluent Cloud URL construction tests
- Add test for OAuth failure on CC-only deployment
- Fix changelog entry: remove MSK Connect claim, add Confluent Cloud
- Backtick URL in kafka_connect_confluent_cloud_url description
- Add requests as explicit pyproject.toml dependency
- Remove demo docker-compose-connect-demo.yaml

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move _get_items_to_fetch, _mark_items_fetched, _get_events_to_send,
_get_tags, and _original_cluster_id_field into a new EventCacheMixin so
ClusterMetadataCollector and KafkaConnectCollector share one copy.

Also removes the dashboard JSON (shipped too early, no product sign-off)
and renames CONFIGS_REFRESH_INTERVAL/JITTER to private _configs_refresh_*
to match the convention already used in KafkaConnectCollector.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…_override description

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ectCollector

Use the same internal HTTP client (check.http) that schema registry uses,
passing auth/TLS/OAuth as per-request kwargs via ChainMap override rather
than managing a separate requests.Session. Drop the requests dependency.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant