kafka_consumer: collect Kafka Connect connector metrics and configurations#24013
Draft
piochelepiotr wants to merge 17 commits into
Draft
kafka_consumer: collect Kafka Connect connector metrics and configurations#24013piochelepiotr wants to merge 17 commits into
piochelepiotr wants to merge 17 commits into
Conversation
…tions Adds a new KafkaConnectCollector (connectors.py) that queries the Kafka Connect REST API and Amazon MSK Connect to surface connector health, task status, and configuration into DSM. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This comment has been minimized.
This comment has been minimized.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Gate connector collection on enable_cluster_monitoring flag - Fix dedup hash: exclude collection_timestamp from hashed content - Scope cache keys per-URL to avoid collisions across multiple Connect endpoints - Remove kafka_connect_aws_msk_cluster_arn option (boto3 API doesn't support ARN filtering) - Add MSK can_connect service check with aws_region tag - Add isinstance guard for older Kafka Connect workers returning list responses - Fix type hints and improve exception messages - Add 15 unit tests covering dedup, metrics, service checks, and compat - Regenerate config_models and conf.yaml.example after spec.yaml changes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…t display name Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… in alphabetical position) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Contributor
Validation ReportAll 21 validations passed. Show details
|
…ics, add connectivity to heartbeat - Replace connector.can_connect service check with connectivity status in the cluster monitoring heartbeat (connect_api_status field) - Consolidate 4 task-state gauges (tasks_running/failed/paused/unassigned) into a single connector.tasks metric tagged with task_state - collect() now returns dict[str, bool] (endpoint → connected) - Heartbeat is sent after connector collection so it includes connect_api_status - Update tests to match new API Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add tests covering _configure_session TLS/auth paths, _collect_rest success path, _collect_plugins, _get_items_to_fetch/_mark_items_fetched cache logic, _fetch_oidc_token, _refresh_oauth_token, MSK collect path with mocked boto3, _get_tags, and all exception paths. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add a _collect_confluent_cloud() method that constructs the Confluent Cloud
Connect REST API URL (api.confluent.cloud/connect/v1/environments/{env}/clusters/{cluster})
and delegates to the existing _collect_rest() path, which is already compatible
with the Confluent Cloud API response format.
New config params:
kafka_connect_confluent_cloud_environment_id
kafka_connect_confluent_cloud_cluster_id
kafka_connect_confluent_cloud_url (optional, defaults to https://api.confluent.cloud)
Authenticate using kafka_connect_username (Cloud API key ID) and
kafka_connect_password (Cloud API key secret).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Drop the MSK Connect (boto3) collection path — it uses a different API, returns fewer metrics, and is better handled in a follow-up PR. Also fixes the caller-side guard in kafka_consumer.py which was missing the Confluent Cloud fields, causing CC-only configs to silently produce no metrics. Also simplifies a convoluted OIDC test assertion flagged by ruff. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Fix dashboard "Connectors by State" paused/failed series: use count: aggregator instead of sum: (value is always 0 for non-running connectors) - Fix "Connectors by Type" toplist to count all connectors, not just running - Fix e.g., comma in spec.yaml/conf.yaml.example (documentation style) - OAuth token failure now returns endpoints as False so heartbeat reports connectivity failure instead of silently omitting connect_api_status - Task state bucketing now dynamic: covers RESTARTING/DESTROYED in addition to RUNNING/FAILED/PAUSED/UNASSIGNED - Wire up CONNECTOR_CONFIG_CACHE_MAX_SIZE in _get_events_to_send to bound the config-event cache - Type _parse_connect_urls value parameter Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add credential redaction in _truncate_connector_config (SENSITIVE_KEY_PATTERN) - Fix OAuth failure on Confluent Cloud-only setup: include CC key in failure dict - Fix connect_status None sentinel: distinguish "not configured" from "crashed" - Normalize task_state casing to lowercase in connector.task.running metric - Add max_cache_size param to _get_events_to_send for consistency with _mark_items_fetched - Rename self.CONFIGS_REFRESH_INTERVAL/JITTER to lowercase _configs_refresh_interval/_jitter - Add type hints to KafkaConnectCollector.__init__ - Parameterize _parse_connect_urls signature with typed generics - Parametrize TLS and Confluent Cloud URL construction tests - Add test for OAuth failure on CC-only deployment - Fix changelog entry: remove MSK Connect claim, add Confluent Cloud - Backtick URL in kafka_connect_confluent_cloud_url description - Add requests as explicit pyproject.toml dependency - Remove demo docker-compose-connect-demo.yaml Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Move _get_items_to_fetch, _mark_items_fetched, _get_events_to_send, _get_tags, and _original_cluster_id_field into a new EventCacheMixin so ClusterMetadataCollector and KafkaConnectCollector share one copy. Also removes the dashboard JSON (shipped too early, no product sign-off) and renames CONFIGS_REFRESH_INTERVAL/JITTER to private _configs_refresh_* to match the convention already used in KafkaConnectCollector. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…_override description Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ectCollector Use the same internal HTTP client (check.http) that schema registry uses, passing auth/TLS/OAuth as per-request kwargs via ChainMap override rather than managing a separate requests.Session. Drop the requests dependency. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What does this PR do?
Adds Kafka Connect monitoring to the `kafka_consumer` integration via a new `connectors.py` module (`KafkaConnectCollector`). When `kafka_connect_url` is configured and `enable_cluster_monitoring` is enabled, the agent queries the Kafka Connect REST API and emits connector health metrics and configuration events.
New metrics (all tagged `kafka_cluster_id`, `connect_url`):
Configuration events (`data-streams-message`, `config_type: connector`): emitted when connector config changes or the 1-hour TTL expires, using the same hash-based dedup pattern as broker/topic configs. Sensitive config values (passwords, secrets, JAAS configs) are redacted before emission.
Plugin inventory events (`config_type: connector_plugins`): emitted on the `kafka_configs_refresh_interval` cadence.
Connectivity status in heartbeat: instead of a service check, whether each Connect API endpoint was reachable is reported in the cluster monitoring heartbeat event (`connect_api_status` field).
Authentication methods supported:
Dashboard: a new integration dashboard asset is included — Kafka Connect - Connector Monitoring (DSM).
Implementation notes:
Motivation
Kafka Connect is widely deployed alongside Kafka for CDC, streaming ETL, and data replication. Currently there is no native Datadog integration for monitoring connector health — users have to rely on JMX or custom scripts. The primary pain point is knowing when a connector or task has failed so data flow is interrupted.
Review checklist (to be filled by reviewers)