From 87b4477cb0eb71de17bffcf55f076af239bee5ec Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Mon, 11 May 2026 10:58:09 +0200 Subject: [PATCH 1/2] refactor(agent): ServerlessAgent with Span Filter Introduce a new ServerlessAgent abstract base class to eliminate code duplication across serverless platforms and add support for the Span Filtering feature. This refactoring follows the Template Method pattern, allowing platform-specific customization through abstract methods while maintaining a single source of truth for the common serverless agent behavior. Signed-off-by: Paulo Vital --- src/instana/agent/aws_eks_fargate.py | 104 +----- src/instana/agent/aws_fargate.py | 102 +----- src/instana/agent/aws_lambda.py | 105 +----- src/instana/agent/base.py | 119 +++++- src/instana/agent/google_cloud_run.py | 118 ++---- src/instana/agent/host.py | 102 +----- src/instana/agent/serverless.py | 346 ++++++++++++++++++ .../kafka/confluent_kafka_python.py | 6 +- .../instrumentation/kafka/kafka_python.py | 6 +- 9 files changed, 574 insertions(+), 434 deletions(-) create mode 100644 src/instana/agent/serverless.py diff --git a/src/instana/agent/aws_eks_fargate.py b/src/instana/agent/aws_eks_fargate.py index a88a08b1..bdacde27 100644 --- a/src/instana/agent/aws_eks_fargate.py +++ b/src/instana/agent/aws_eks_fargate.py @@ -1,105 +1,39 @@ -# (c) Copyright IBM Corp. 2023 +# (c) Copyright IBM Corp. 2023, 2026 """ The Instana agent (for AWS EKS Fargate) that manages monitoring state and reporting that data. """ -from instana.agent.base import BaseAgent +from instana.agent.serverless import ServerlessAgent from instana.collector.aws_eks_fargate import EKSFargateCollector from instana.collector.helpers.eks.process import get_pod_name -from instana.log import logger from instana.options import EKSFargateOptions -from instana.util import to_json -from instana.version import VERSION -class EKSFargateAgent(BaseAgent): - """In-process agent for AWS Fargate""" - - def __init__(self): - super(EKSFargateAgent, self).__init__() +class EKSFargateAgent(ServerlessAgent): + """In-process agent for AWS EKS Fargate""" + def _initialize_platform(self) -> None: + """Initialize EKS Fargate specific options and pod name.""" self.options = EKSFargateOptions() - self.collector = None - self.report_headers = None - self._can_send = False self.podname = get_pod_name() - # Update log level (if INSTANA_LOG_LEVEL was set) - self.update_log_level() - - logger.info( - "Stan is on the EKS Pod on AWS Fargate scene. Starting Instana instrumentation version: %s", - VERSION, - ) - - if self._validate_options(): - self._can_send = True - self.collector = EKSFargateCollector(self) - self.collector.start() - else: - logger.warning( - "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. " - "We will not be able to monitor this Pod." - ) - - def can_send(self): - """ - Are we in a state where we can send data? - @return: Boolean - """ - return self._can_send - - def get_from_structure(self): - """ - Retrieves the From data that is reported alongside monitoring data. - @return: dict() - """ - - return {"hl": True, "cp": "k8s", "e": self.podname} + def _create_collector(self): + """Create EKS Fargate collector.""" + return EKSFargateCollector(self) - def report_data_payload(self, payload): - """ - Used to report metrics and span data to the endpoint URL in self.options.endpoint_url - """ - response = None - try: - if self.report_headers is None: - # Prepare request headers - self.report_headers = dict() - self.report_headers["Content-Type"] = "application/json" - self.report_headers["X-Instana-Host"] = self.podname - self.report_headers["X-Instana-Key"] = self.options.agent_key + def _get_entity_id(self) -> str: + """Get Kubernetes pod name.""" + return self.podname - response = self.client.post( - self.__data_bundle_url(), - data=to_json(payload), - headers=self.report_headers, - timeout=self.options.timeout, - verify=self.options.ssl_verify, - proxies=self.options.endpoint_proxy, - ) + def _get_cloud_provider(self) -> str: + """Kubernetes cloud provider.""" + return "k8s" - if not 200 <= response.status_code < 300: - logger.info( - "report_data_payload: Instana responded with status code %s", - response.status_code, - ) - except Exception as exc: - logger.debug("report_data_payload: connection error (%s)", type(exc)) - return response + def _get_platform_name(self) -> str: + """Platform name for logging.""" + return "EKS Pod on AWS Fargate" - def _validate_options(self): - """ - Validate that the options used by this Agent are valid. e.g. can we report data? - """ - return ( - self.options.endpoint_url is not None and self.options.agent_key is not None - ) - def __data_bundle_url(self): - """ - URL for posting metrics to the host agent. Only valid when announced. - """ - return f"{self.options.endpoint_url}/bundle" +# Made with Bob diff --git a/src/instana/agent/aws_fargate.py b/src/instana/agent/aws_fargate.py index 9aabc757..74a8347a 100644 --- a/src/instana/agent/aws_fargate.py +++ b/src/instana/agent/aws_fargate.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2021 +# (c) Copyright IBM Corp. 2021, 2026 # (c) Copyright Instana Inc. 2020 """ @@ -6,99 +6,33 @@ monitoring state and reporting that data. """ +from instana.agent.serverless import ServerlessAgent from instana.collector.aws_fargate import AWSFargateCollector from instana.options import AWSFargateOptions -from ..log import logger -from ..util import to_json -from ..version import VERSION -from .base import BaseAgent - -class AWSFargateAgent(BaseAgent): +class AWSFargateAgent(ServerlessAgent): """In-process agent for AWS Fargate""" - def __init__(self): - super(AWSFargateAgent, self).__init__() - + def _initialize_platform(self) -> None: + """Initialize AWS Fargate specific options.""" self.options = AWSFargateOptions() - self.collector = None - self.report_headers = None - self._can_send = False - - # Update log level (if INSTANA_LOG_LEVEL was set) - self.update_log_level() - - logger.info( - "Stan is on the AWS Fargate scene. Starting Instana instrumentation version: %s", - VERSION, - ) - - if self._validate_options(): - self._can_send = True - self.collector = AWSFargateCollector(self) - self.collector.start() - else: - logger.warning( - "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. " - "We will not be able monitor this AWS Fargate cluster." - ) - - def can_send(self): - """ - Are we in a state where we can send data? - @return: Boolean - """ - return self._can_send - def get_from_structure(self): - """ - Retrieves the From data that is reported alongside monitoring data. - @return: dict() - """ - return {"hl": True, "cp": "aws", "e": self.collector.get_fq_arn()} + def _create_collector(self): + """Create AWS Fargate collector.""" + return AWSFargateCollector(self) - def report_data_payload(self, payload): - """ - Used to report metrics and span data to the endpoint URL in self.options.endpoint_url - """ - response = None - try: - if self.report_headers is None: - # Prepare request headers - self.report_headers = dict() - self.report_headers["Content-Type"] = "application/json" - self.report_headers["X-Instana-Host"] = self.collector.get_fq_arn() - self.report_headers["X-Instana-Key"] = self.options.agent_key + def _get_entity_id(self) -> str: + """Get Fargate task ARN.""" + return self.collector.get_fq_arn() - response = self.client.post( - self.__data_bundle_url(), - data=to_json(payload), - headers=self.report_headers, - timeout=self.options.timeout, - verify=self.options.ssl_verify, - proxies=self.options.endpoint_proxy, - ) + def _get_cloud_provider(self) -> str: + """AWS cloud provider.""" + return "aws" - if not 200 <= response.status_code < 300: - logger.info( - "report_data_payload: Instana responded with status code %s", - response.status_code, - ) - except Exception as exc: - logger.debug("report_data_payload: connection error (%s)", type(exc)) - return response + def _get_platform_name(self) -> str: + """Platform name for logging.""" + return "AWS Fargate" - def _validate_options(self): - """ - Validate that the options used by this Agent are valid. e.g. can we report data? - """ - return ( - self.options.endpoint_url is not None and self.options.agent_key is not None - ) - def __data_bundle_url(self): - """ - URL for posting metrics to the host agent. Only valid when announced. - """ - return f"{self.options.endpoint_url}/bundle" +# Made with Bob diff --git a/src/instana/agent/aws_lambda.py b/src/instana/agent/aws_lambda.py index 140275ab..06ae153f 100644 --- a/src/instana/agent/aws_lambda.py +++ b/src/instana/agent/aws_lambda.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2021 +# (c) Copyright IBM Corp. 2021, 2026 # (c) Copyright Instana Inc. 2020 """ @@ -6,102 +6,33 @@ monitoring state and reporting that data. """ -from typing import Any, Dict -from instana.agent.base import BaseAgent +from instana.agent.serverless import ServerlessAgent from instana.collector.aws_lambda import AWSLambdaCollector -from instana.log import logger from instana.options import AWSLambdaOptions -from instana.util import to_json -from instana.version import VERSION -class AWSLambdaAgent(BaseAgent): +class AWSLambdaAgent(ServerlessAgent): """In-process Agent for AWS Lambda""" - def __init__(self) -> None: - super(AWSLambdaAgent, self).__init__() - - self.collector = None + def _initialize_platform(self) -> None: + """Initialize AWS Lambda specific options.""" self.options = AWSLambdaOptions() - self.report_headers = None - self._can_send = False - - # Update log level from what Options detected - self.update_log_level() - - logger.info( - f"Stan is on the AWS Lambda scene. Starting Instana instrumentation version: {VERSION}", - ) - - if self._validate_options(): - self._can_send = True - self.collector = AWSLambdaCollector(self) - self.collector.start() - else: - logger.warning( - "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. " - "We will not be able monitor this function." - ) - - def can_send(self) -> bool: - """ - Are we in a state where we can send data? - @return: Boolean - """ - return self._can_send - - def get_from_structure(self) -> Dict[str, Any]: - """ - Retrieves the From data that is reported alongside monitoring data. - @return: dict() - """ - return {"hl": True, "cp": "aws", "e": self.collector.get_fq_arn()} - def report_data_payload(self, payload): - """ - Used to report metrics and span data to the endpoint URL in self.options.endpoint_url - """ - response = None - try: - if self.report_headers is None: - # Prepare request headers - self.report_headers = dict() - self.report_headers["Content-Type"] = "application/json" - self.report_headers["X-Instana-Host"] = self.collector.get_fq_arn() - self.report_headers["X-Instana-Key"] = self.options.agent_key + def _create_collector(self): + """Create AWS Lambda collector.""" + return AWSLambdaCollector(self) - response = self.client.post( - self.__data_bundle_url(), - data=to_json(payload), - headers=self.report_headers, - timeout=self.options.timeout, - verify=self.options.ssl_verify, - proxies=self.options.endpoint_proxy, - ) + def _get_entity_id(self) -> str: + """Get Lambda function ARN.""" + return self.collector.get_fq_arn() - if 200 <= response.status_code < 300: - logger.debug( - "report_data_payload: Instana responded with status code %s", - response.status_code, - ) - else: - logger.info( - "report_data_payload: Instana responded with status code %s", - response.status_code, - ) - except Exception as exc: - logger.debug("report_data_payload: connection error (%s)", type(exc)) + def _get_cloud_provider(self) -> str: + """AWS cloud provider.""" + return "aws" - return response + def _get_platform_name(self) -> str: + """Platform name for logging.""" + return "AWS Lambda" - def _validate_options(self) -> bool: - """ - Validate that the options used by this Agent are valid. e.g. can we report data? - """ - return self.options.endpoint_url and self.options.agent_key - def __data_bundle_url(self) -> str: - """ - URL for posting metrics to the host agent. Only valid when announced. - """ - return f"{self.options.endpoint_url}/bundle" +# Made with Bob diff --git a/src/instana/agent/base.py b/src/instana/agent/base.py index 08e68f06..541d7205 100644 --- a/src/instana/agent/base.py +++ b/src/instana/agent/base.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2021 +# (c) Copyright IBM Corp. 2021, 2026 # (c) Copyright Instana Inc. 2020 """ @@ -6,10 +6,15 @@ """ import logging +from typing import TYPE_CHECKING, Any import requests from instana.log import logger +from instana.util.span_utils import matches_rule + +if TYPE_CHECKING: + from instana.span.span import InstanaSpan class BaseAgent(object): @@ -18,10 +23,10 @@ class BaseAgent(object): client = None options = None - def __init__(self): + def __init__(self) -> None: self.client = requests.Session() - def update_log_level(self): + def update_log_level(self) -> None: """Uses the value in to update the global logger""" if self.options is None or self.options.log_level not in [ logging.DEBUG, @@ -33,3 +38,111 @@ def update_log_level(self): return logger.setLevel(self.options.log_level) + + def filter_spans(self, spans: list["InstanaSpan"]) -> list["InstanaSpan"]: + """ + Filters span list using hierarchical filtering rules. + + Args: + spans: List of Spans + + Returns: + List of Spans that pass the filtering rules + """ + filtered_spans = [] + + for span in spans: + if self._is_span_missing_required_attributes(span): + filtered_spans.append(span) + continue + + service_name = "" + + # Set the service name + for span_value in span.data: + if isinstance(span.data[span_value], dict): + service_name = span_value + + # Skip if no valid service name found + if not service_name: + filtered_spans.append(span) + continue + + # Set span attributes for filtering + attributes_to_check = { + "type": service_name, + "kind": getattr(span, "k", None), + } + + # Add operation specifiers to the attributes + for key, value in span.data[service_name].items(): + attributes_to_check[f"{service_name}.{key}"] = value + + # Check if the span need to be ignored + if self._is_endpoint_ignored(attributes_to_check): + continue + + filtered_spans.append(span) + + return filtered_spans + + def _is_endpoint_ignored(self, span_attributes: dict[str, Any]) -> bool: + """ + Check if a span should be ignored based on filtering rules. + + Include rules have precedence over exclude rules: + - If an include rule matches, the span is NOT ignored (returns False) + - If no include rules exist or none match, check exclude rules + - If an exclude rule matches, the span IS ignored (returns True) + - If no rules match, the span is NOT ignored (returns False) + + Args: + span_attributes: Dictionary of span attributes to check + + Returns: + True if span should be filtered out, False otherwise + """ + if not span_attributes or not isinstance(span_attributes, dict): + return False + + filters = self.options.span_filters + if not filters: + return False + + # Include rules have highest precedence - if matched, span is kept + include_rules = filters.get("include", []) + if self._matches_rules(include_rules, span_attributes): + return False + + # Check exclude rules only if no include rule matched + exclude_rules = filters.get("exclude", []) + return bool(self._matches_rules(exclude_rules, span_attributes)) + + def _matches_rules(self, rules: list[dict], span_attributes: dict) -> bool: + """ + Check if span matches any provided rule. + + Args: + rules: List of Dictionary containing filter rules + span_attributes: Dictionary of span attributes to check + + Returns: + True if any rule matches, False otherwise + """ + return any( + matches_rule(rule.get("attributes", []), span_attributes) for rule in rules + ) + + def _is_span_missing_required_attributes(self, span: "InstanaSpan") -> bool: + """ + Checks if a span is missing required attributes for filtering. + + Args: + span: InstanaSpan + + Returns: + True if span is missing required attributes, False otherwise + """ + has_name_attribute = hasattr(span, "n") or hasattr(span, "name") + has_data_attribute = hasattr(span, "data") + return not has_name_attribute or not has_data_attribute diff --git a/src/instana/agent/google_cloud_run.py b/src/instana/agent/google_cloud_run.py index 4bfddfae..8f706e62 100644 --- a/src/instana/agent/google_cloud_run.py +++ b/src/instana/agent/google_cloud_run.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2021 +# (c) Copyright IBM Corp. 2021, 2026 # (c) Copyright Instana Inc. 2021 """ @@ -6,99 +6,51 @@ monitoring state and reporting that data. """ -from instana.options import GCROptions +from instana.agent.serverless import ServerlessAgent from instana.collector.google_cloud_run import GCRCollector -from instana.log import logger -from instana.util import to_json -from instana.agent.base import BaseAgent -from instana.version import VERSION +from instana.options import GCROptions -class GCRAgent(BaseAgent): +class GCRAgent(ServerlessAgent): """In-process agent for Google Cloud Run""" - def __init__(self, service, configuration, revision): - super(GCRAgent, self).__init__() - - self.options = GCROptions() - self.collector = None - self.report_headers = None - self._can_send = False - - # Update log level (if INSTANA_LOG_LEVEL was set) - self.update_log_level() + def __init__(self, service: str, configuration: str, revision: str) -> None: + """ + Initialize with GCR-specific parameters. - logger.info( - "Stan is on the AWS Fargate scene. Starting Instana instrumentation version: %s", - VERSION, - ) + Args: + service: GCR service name + configuration: GCR configuration name + revision: GCR revision name + """ + self._service = service + self._configuration = configuration + self._revision = revision + super().__init__() - if self._validate_options(): - self._can_send = True - self.collector = GCRCollector(self, service, configuration, revision) - self.collector.start() - else: - logger.warning( - "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. " - "We will not be able monitor this GCR cluster." - ) + def _initialize_platform(self) -> None: + """Initialize Google Cloud Run specific options.""" + self.options = GCROptions() - def can_send(self): - """ - Are we in a state where we can send data? - @return: Boolean - """ - return self._can_send + def _create_collector(self): + """Create GCR collector with service parameters.""" + return GCRCollector(self, self._service, self._configuration, self._revision) - def get_from_structure(self): - """ - Retrieves the From data that is reported alongside monitoring data. - @return: dict() - """ - return {"hl": True, "cp": "gcp", "e": self.collector.get_instance_id()} + def _get_entity_id(self) -> str: + """Get GCR instance ID.""" + return self.collector.get_instance_id() - def report_data_payload(self, payload): - """ - Used to report metrics and span data to the endpoint URL in self.options.endpoint_url - """ - response = None - try: - if self.report_headers is None: - # Prepare request headers - self.report_headers = { - "Content-Type": "application/json", - "X-Instana-Host": f"gcp:cloud-run:revision:{self.collector.revision}", - "X-Instana-Key": self.options.agent_key, - } + def _get_cloud_provider(self) -> str: + """Google Cloud Platform provider.""" + return "gcp" - response = self.client.post( - self.__data_bundle_url(), - data=to_json(payload), - headers=self.report_headers, - timeout=self.options.timeout, - verify=self.options.ssl_verify, - proxies=self.options.endpoint_proxy, - ) + def _get_platform_name(self) -> str: + """Platform name for logging.""" + return "Google Cloud Run" - if response.status_code >= 400: - logger.info( - "report_data_payload: Instana responded with status code %s", - response.status_code, - ) - except Exception as exc: - logger.debug("report_data_payload: connection error (%s)", type(exc)) - return response + def _get_instana_host_header(self) -> str: + """GCR uses custom formatted header.""" + return f"gcp:cloud-run:revision:{self.collector.revision}" - def _validate_options(self): - """ - Validate that the options used by this Agent are valid. e.g. can we report data? - """ - return ( - self.options.endpoint_url is not None and self.options.agent_key is not None - ) - def __data_bundle_url(self): - """ - URL for posting metrics to the host agent. Only valid when announced. - """ - return f"{self.options.endpoint_url}/bundle" +# Made with Bob diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 72689059..1bc7a3fc 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2021 +# (c) Copyright IBM Corp. 2021, 2026 # (c) Copyright Instana Inc. 2020 """ @@ -9,7 +9,7 @@ import json import os from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union import requests import urllib3 @@ -22,7 +22,6 @@ from instana.options import StandardOptions from instana.util import to_json from instana.util.runtime import get_py_source, log_runtime_env_info -from instana.util.span_utils import matches_rule from instana.version import VERSION if TYPE_CHECKING: @@ -33,7 +32,7 @@ class AnnounceData(object): """The Announce Payload""" pid = 0 - agentUuid = "" + agent_uuid = "" def __init__(self, **kwds): self.__dict__.update(kwds) @@ -127,7 +126,7 @@ def can_send(self) -> bool: def set_from( self, - res_data: Dict[str, Any], + res_data: dict[str, Any], ) -> None: """ Sets the source identifiers given to use by the Instana Host agent. @@ -140,17 +139,17 @@ def set_from( if "pid" in res_data and "agentUuid" in res_data: self.announce_data = AnnounceData( pid=res_data["pid"], - agentUuid=res_data["agentUuid"], + agent_uuid=res_data["agentUuid"], # Map JSON key to Python field ) else: logger.debug(f"Missing required keys in announce response: {res_data}") - def get_from_structure(self) -> Dict[str, str]: + def get_from_structure(self) -> dict[str, str]: """ Retrieves the From data that is reported alongside monitoring data. @return: dict() """ - return {"e": self.announce_data.pid, "h": self.announce_data.agentUuid} + return {"e": self.announce_data.pid, "h": self.announce_data.agent_uuid} def is_agent_listening( self, @@ -182,7 +181,7 @@ def is_agent_listening( def announce( self, discovery: "Discovery", - ) -> Optional[Dict[str, Any]]: + ) -> Optional[dict[str, Any]]: """ With the passed in Discovery class, attempt to announce to the host agent. """ @@ -241,7 +240,7 @@ def log_message_to_host_agent( """ response = None try: - payload = dict() + payload = {} payload["m"] = message url = self.__agent_logger_url() @@ -273,7 +272,7 @@ def is_agent_ready(self) -> bool: def report_data_payload( self, - payload: Dict[str, Any], + payload: dict[str, Any], ) -> Optional[Response]: """ Used to report collection payload to the host agent. This can be metrics, spans and snapshot data. @@ -313,7 +312,7 @@ def report_data_payload( ) return response - def report_metrics(self, payload: Dict[str, Any]) -> Optional[Response]: + def report_metrics(self, payload: dict[str, Any]) -> Optional[Response]: metrics = payload.get("metrics", []) if len(metrics) > 0 and len(metrics.get("plugins", [])) > 0: metric_bundle = metrics["plugins"][0]["data"] @@ -324,9 +323,9 @@ def report_metrics(self, payload: Dict[str, Any]) -> Optional[Response]: timeout=0.8, ) return response - return + return None - def report_profiles(self, payload: Dict[str, Any]) -> Optional[Response]: + def report_profiles(self, payload: dict[str, Any]) -> Optional[Response]: profiles = payload.get("profiles", []) if len(profiles) > 0: logger.debug(f"Reporting {len(profiles)} profiles") @@ -337,9 +336,9 @@ def report_profiles(self, payload: Dict[str, Any]) -> Optional[Response]: timeout=0.8, ) return response - return + return None - def report_spans(self, payload: Dict[str, Any]) -> Optional[Response]: + def report_spans(self, payload: dict[str, Any]) -> Optional[Response]: filtered_spans = self.filter_spans(payload.get("spans", [])) if len(filtered_spans) > 0: logger.debug(f"Reporting {len(filtered_spans)} spans") @@ -350,74 +349,9 @@ def report_spans(self, payload: Dict[str, Any]) -> Optional[Response]: timeout=0.8, ) return response - return - - def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - Filters span list using new hierarchical filtering rules. - """ - filtered_spans = [] - - for span in spans: - if not (hasattr(span, "n") or hasattr(span, "name")) or not hasattr( - span, "data" - ): - filtered_spans.append(span) - continue - - service_name = "" - - # Set the service name - for span_value in span.data: - if isinstance(span.data[span_value], dict): - service_name = span_value - - # Skip if no valid service name found - if not service_name: - filtered_spans.append(span) - continue - - # Set span attributes for filtering - attributes_to_check = { - "type": service_name, - "kind": getattr(span, "k", None), - } - - # Add operation specifiers to the attributes - for key, value in span.data[service_name].items(): - attributes_to_check[f"{service_name}.{key}"] = value - - # Check if the span need to be ignored - if self.__is_endpoint_ignored(attributes_to_check): - continue - - filtered_spans.append(span) - - return filtered_spans - - def __is_endpoint_ignored(self, span_attributes: dict) -> bool: - filters = self.options.span_filters - if not filters: - return False - - # Check include rules - include_rules = filters.get("include", []) - if any( - matches_rule(rule.get("attributes", []), span_attributes) - for rule in include_rules - ): - return False - - # Check exclude rules - exclude_rules = filters.get("exclude", []) - return bool( - any( - matches_rule(rule.get("attributes", []), span_attributes) - for rule in exclude_rules - ) - ) + return None - def handle_agent_tasks(self, task: Dict[str, Any]) -> None: + def handle_agent_tasks(self, task: dict[str, Any]) -> None: """ When request(s) are received by the host agent, it is sent here for handling & processing. @@ -497,7 +431,7 @@ def diagnostics(self) -> None: def __task_response( self, message_id: str, - data: Dict[str, Any], + data: dict[str, Any], ) -> Optional[Response]: """ When the host agent passes us a task and we do it, this function is used to diff --git a/src/instana/agent/serverless.py b/src/instana/agent/serverless.py new file mode 100644 index 00000000..e61261e9 --- /dev/null +++ b/src/instana/agent/serverless.py @@ -0,0 +1,346 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Base class for all serverless agent implementations. +Provides common functionality while allowing platform-specific customization. +""" + +from abc import abstractmethod +from typing import Any, Optional + +from requests import Response + +from instana.agent.base import BaseAgent +from instana.log import logger +from instana.util import to_json +from instana.util.runtime import log_runtime_env_info +from instana.version import VERSION + + +class ServerlessAgent(BaseAgent): + """ + Abstract base class for serverless agents. + + Implements common serverless functionality following the Template Method pattern. + Subclasses must implement platform-specific abstract methods. + + This class eliminates code duplication across serverless platforms by providing + a single implementation of common logic while allowing platform-specific + customization through abstract methods. + """ + + # Constants + CONTENT_TYPE = "application/json" + BUNDLE_ENDPOINT = "/bundle" + + def __init__(self) -> None: + """ + Initialize serverless agent with common setup. + + This template method orchestrates the initialization process: + 1. Call parent __init__ + 2. Platform-specific initialization + 3. Common initialization (logging, validation) + 4. Collector creation and startup + """ + super().__init__() + + self.collector = None + self.report_headers = None + self._can_send = False + + # Platform-specific initialization (implemented by subclasses) + self._initialize_platform() + + # Common initialization + self.update_log_level() + self._log_startup() + log_runtime_env_info() + + # Validate and start + if self._validate_options(): + self._can_send = True + self.collector = self._create_collector() + self.collector.start() + else: + self._log_validation_failure() + + # Template Methods (implemented here, used by all subclasses) + + def can_send(self) -> bool: + """ + Check if agent can send data. + + Returns: + True if agent is ready to send data, False otherwise + """ + return self._can_send + + def get_from_structure(self) -> dict[str, Any]: + """ + Build the 'from' structure for monitoring data. + + This structure identifies the source of the monitoring data. + + Returns: + Dictionary with 'hl' (headerless), 'cp' (cloud provider), and 'e' (entity) + """ + return { + "hl": True, + "cp": self._get_cloud_provider(), + "e": self._get_entity_id(), + } + + def report_data_payload(self, payload: dict[str, Any]) -> Optional[Response]: + """ + Report metrics and span data to the endpoint. + + Template method that orchestrates the reporting process: + 1. Prepare payload (filter spans) + 2. Prepare headers (lazy initialization) + 3. Send HTTP request + 4. Validate response + + Args: + payload: Dictionary containing metrics and spans + + Returns: + HTTP Response object or None if error occurred + """ + response = None + try: + # Step 1: Prepare payload (filter spans) + payload = self._prepare_payload(payload) + + # Step 2: Prepare headers (lazy initialization) + if self.report_headers is None: + self.report_headers = self._build_headers() + + # Step 3: Send request + response = self._send_http_request(payload) + + # Step 4: Validate response + self._validate_response(response) + + except Exception as exc: + logger.debug("report_data_payload: connection error (%s)", type(exc)) + + return response + + def _validate_options(self) -> bool: + """ + Validate that required options are set. + + Returns: + True if options are valid, False otherwise + """ + return ( + self.options.endpoint_url is not None and self.options.agent_key is not None + ) + + # Protected Helper Methods (used internally by template methods) + + def _prepare_payload(self, payload: dict[str, Any]) -> dict[str, Any]: + """ + Filter spans and prepare payload for transmission. + + Extracts spans from payload, filters them using inherited filter_spans(), + and updates the payload with filtered spans. + + Args: + payload: Original payload dictionary + + Returns: + Modified payload with filtered spans + """ + spans = payload.get("spans", []) + filtered_spans = self.filter_spans(spans) + + if len(filtered_spans) > 0: + logger.debug(f"Reporting {len(filtered_spans)} spans") + payload["spans"] = filtered_spans + + return payload + + def _build_headers(self) -> dict[str, str]: + """ + Build HTTP headers for requests. + + Creates standard headers required by Instana backend and allows + platform-specific headers through _get_custom_headers(). + + Returns: + Dictionary of HTTP headers + """ + headers = { + "Content-Type": self.CONTENT_TYPE, + "X-Instana-Host": self._get_instana_host_header(), + "X-Instana-Key": self.options.agent_key, + } + + # Allow platform-specific headers + custom_headers = self._get_custom_headers() + if custom_headers: + headers.update(custom_headers) + + return headers + + def _send_http_request(self, payload: dict[str, Any]) -> Response: + """ + Execute HTTP POST request to backend. + + Args: + payload: Data to send + + Returns: + HTTP Response object + """ + return self.client.post( + self._get_endpoint_url(), + data=to_json(payload), + headers=self.report_headers, + timeout=self.options.timeout, + verify=self.options.ssl_verify, + proxies=self.options.endpoint_proxy, + ) + + def _validate_response(self, response: Response) -> None: + """ + Validate HTTP response and log if needed. + + Args: + response: HTTP Response object to validate + """ + if not 200 <= response.status_code < 300: + logger.info( + f"report_data_payload: Instana responded with " + f"status code {response.status_code}" + ) + + def _get_endpoint_url(self) -> str: + """ + Get the full endpoint URL for data submission. + + Returns: + Complete URL string + """ + return f"{self.options.endpoint_url}{self.BUNDLE_ENDPOINT}" + + def _log_startup(self) -> None: + """Log agent startup message.""" + logger.info( + f"Stan is on the {self._get_platform_name()} scene. " + f"Starting Instana instrumentation version: {VERSION}" + ) + + def _log_validation_failure(self) -> None: + """Log validation failure message.""" + logger.warning( + "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL " + f"environment variables not set. We will not be able to " + f"monitor this {self._get_platform_name()}." + ) + + # Abstract Methods (must be implemented by subclasses) + + @abstractmethod + def _initialize_platform(self) -> None: + """ + Perform platform-specific initialization. + + This is called early in __init__ before common initialization. + Use this to set up platform-specific attributes (e.g., options, podname). + + Example: + def _initialize_platform(self): + self.options = AWSFargateOptions() + """ + pass + + @abstractmethod + def _create_collector(self): + """ + Create and return the platform-specific collector instance. + + Returns: + Collector instance for this platform + + Example: + def _create_collector(self): + return AWSFargateCollector(self) + """ + pass + + @abstractmethod + def _get_entity_id(self) -> str: + """ + Get the platform-specific entity identifier. + + Examples: + - AWS Fargate: Fully qualified ARN + - AWS Lambda: Fully qualified ARN + - EKS Fargate: Pod name + - GCR: Instance ID + + Returns: + Entity identifier string + + Example: + def _get_entity_id(self): + return self.collector.get_fq_arn() + """ + pass + + @abstractmethod + def _get_cloud_provider(self) -> str: + """ + Get the cloud provider code. + + Returns: + Cloud provider code: 'aws', 'gcp', or 'k8s' + + Example: + def _get_cloud_provider(self): + return "aws" + """ + pass + + @abstractmethod + def _get_platform_name(self) -> str: + """ + Get the human-readable platform name for logging. + + Returns: + Platform name (e.g., 'AWS Fargate', 'Google Cloud Run') + + Example: + def _get_platform_name(self): + return "AWS Fargate" + """ + pass + + def _get_instana_host_header(self) -> str: + """ + Get the value for the X-Instana-Host header. + + Default implementation returns entity ID. + Override for custom header values (e.g., GCR's formatted string). + + Returns: + Header value string + """ + return self._get_entity_id() + + def _get_custom_headers(self) -> Optional[dict[str, str]]: + """ + Get platform-specific custom headers. + + Override to add additional headers beyond the standard ones. + + Returns: + Dictionary of custom headers or None + """ + return None + + +# Made with Bob diff --git a/src/instana/instrumentation/kafka/confluent_kafka_python.py b/src/instana/instrumentation/kafka/confluent_kafka_python.py index 7e056a72..83340f7f 100644 --- a/src/instana/instrumentation/kafka/confluent_kafka_python.py +++ b/src/instana/instrumentation/kafka/confluent_kafka_python.py @@ -82,9 +82,7 @@ def trace_kafka_produce( "kafka.access": "produce", } - is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( - attributes_to_check - ) + is_suppressed = tracer.exporter._is_endpoint_ignored(attributes_to_check) with tracer.start_as_current_span( "kafka-producer", context=parent_context, kind=SpanKind.PRODUCER @@ -149,7 +147,7 @@ def create_span( "kafka.service": topic, "kafka.access": span_type, } - is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( + is_suppressed = tracer.exporter._is_endpoint_ignored( attributes_to_check ) diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py index 2b7b0f23..d005c99c 100644 --- a/src/instana/instrumentation/kafka/kafka_python.py +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -47,9 +47,7 @@ def trace_kafka_send( "kafka.access": "send", } - is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( - attributes_to_check - ) + is_suppressed = tracer.exporter._is_endpoint_ignored(attributes_to_check) with tracer.start_as_current_span( "kafka-producer", context=parent_context, kind=SpanKind.PRODUCER @@ -108,7 +106,7 @@ def create_span( "kafka.service": topic, "kafka.access": span_type, } - is_suppressed = tracer.exporter._HostAgent__is_endpoint_ignored( + is_suppressed = tracer.exporter._is_endpoint_ignored( attributes_to_check ) From cf2c13651051eca20b44c1460dcb51f4e9807264 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Mon, 11 May 2026 22:17:50 +0200 Subject: [PATCH 2/2] tests(agent): Update tests for agent refactoring. Add and update agent tests to validate the new agent architecture: - 890 lines in test_base_agent.py for BaseAgent functionality - 447 lines in test_serverless_agent.py for ServerlessAgent base class - 217 lines in test_fargate_span_filtering.py for Fargate span filtering - Update test_host.py and test_eksfargate.py to align with refactored code Ensures all agent implementations maintain correct behavior after the serverless agent consolidation refactoring. Signed-off-by: Paulo Vital --- tests/agent/test_base_agent.py | 890 ++++++++++++++++++ tests/agent/test_host.py | 46 +- tests/agent/test_serverless_agent.py | 447 +++++++++ tests/requirements-minimal.txt | 1 + tests/requirements.txt | 1 - .../02_fargate/test_fargate_span_filtering.py | 217 +++++ tests_aws/03_eks/test_eksfargate.py | 10 +- 7 files changed, 1581 insertions(+), 31 deletions(-) create mode 100644 tests/agent/test_base_agent.py create mode 100644 tests/agent/test_serverless_agent.py create mode 100644 tests_aws/02_fargate/test_fargate_span_filtering.py diff --git a/tests/agent/test_base_agent.py b/tests/agent/test_base_agent.py new file mode 100644 index 00000000..642592df --- /dev/null +++ b/tests/agent/test_base_agent.py @@ -0,0 +1,890 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Unit tests for BaseAgent class. + +This test module covers all methods in the BaseAgent class: +- __init__: Constructor initialization +- update_log_level: Log level management +- filter_spans: Span filtering with hierarchical rules +- _is_endpoint_ignored: Endpoint filtering logic +- _is_span_missing_required_attributes: Span validation +""" + +import logging +from typing import Any +from unittest.mock import Mock + +import pytest +import requests + +from instana.agent.base import BaseAgent +from instana.log import logger +from instana.span.span import INVALID_SPAN + + +class MockSpan: + """Mock span object for testing""" + + def __init__(self, n: str, data: dict, kind: int = 1, **kwargs): + self.n = n + self.data = data + self.k = kind + self.__dict__.update(kwargs) + + +class TestBaseAgentInit: + """Test BaseAgent initialization""" + + def test_initialization(self) -> None: + """Test that BaseAgent initializes with correct default values""" + agent = BaseAgent() + + # Verify client is initialized as requests.Session + assert agent.client is not None + assert isinstance(agent.client, requests.Session) + + # Verify options is None by default + assert agent.options is None + + +class TestBaseAgentUpdateLogLevel: + """Test BaseAgent.update_log_level method""" + + @pytest.fixture + def agent(self) -> BaseAgent: + """Create a BaseAgent instance for testing""" + return BaseAgent() + + @pytest.mark.parametrize( + "log_level,expected_level", + [ + (logging.DEBUG, logging.DEBUG), + (logging.INFO, logging.INFO), + (logging.WARN, logging.WARN), + (logging.ERROR, logging.ERROR), + ], + ids=["DEBUG", "INFO", "WARN", "ERROR"], + ) + def test_update_log_level_valid( + self, agent: BaseAgent, log_level: int, expected_level: int + ) -> None: + """Test update_log_level with valid log levels""" + # Setup mock options + agent.options = Mock() + agent.options.log_level = log_level + + # Call update_log_level + agent.update_log_level() + + # Verify logger level was set correctly + assert logger.level == expected_level + + def test_update_log_level_invalid( + self, agent: BaseAgent, caplog: pytest.LogCaptureFixture + ) -> None: + """Test update_log_level with invalid log level""" + logger.setLevel(logging.WARN) + # Setup mock options with invalid log level + agent.options = Mock() + agent.options.log_level = 999 # Invalid log level + + with caplog.at_level(logging.WARN): + agent.update_log_level() + + # Verify warning was logged + assert "Unknown log level set" in caplog.text + + def test_update_log_level_no_options( + self, agent: BaseAgent, caplog: pytest.LogCaptureFixture + ) -> None: + """Test update_log_level when options is None""" + # Ensure options is None + agent.options = None + + with caplog.at_level(logging.WARN): + agent.update_log_level() + + # Verify warning was logged + assert "Unknown log level set" in caplog.text + + def test_update_log_level_options_without_log_level( + self, agent: BaseAgent, caplog: pytest.LogCaptureFixture + ) -> None: + """Test update_log_level when options exists but log_level is invalid""" + # Setup mock options without valid log_level + agent.options = Mock() + agent.options.log_level = None + + with caplog.at_level(logging.WARN): + agent.update_log_level() + + # Verify warning was logged + assert "Unknown log level set" in caplog.text + + +class TestBaseAgentFilterSpans: + """Test BaseAgent.filter_spans method""" + + @pytest.fixture + def agent(self) -> BaseAgent: + """Create a BaseAgent instance with mock options""" + agent = BaseAgent() + agent.options = Mock() + agent.options.span_filters = {} + return agent + + def test_filter_spans_empty_list(self, agent: BaseAgent) -> None: + """Test filter_spans with empty span list""" + result = agent.filter_spans([]) + assert result == [] + + @pytest.mark.parametrize( + "span,description", + [ + (INVALID_SPAN, "empty span dict"), + ({"n": "test"}, "span missing data attribute"), + ({"data": {}}, "span missing name attribute"), + ({"k": 1}, "span missing both n/name and data"), + ], + ids=["empty", "no_data", "no_name", "no_required_attrs"], + ) + def test_filter_spans_missing_attributes( + self, agent: BaseAgent, span: dict[str, Any], description: str + ) -> None: + """Test filter_spans with spans missing required attributes""" + result = agent.filter_spans([span]) + + # Spans with missing attributes should pass through + assert len(result) == 1 + assert result[0] == span + + def test_filter_spans_no_service_name(self, agent: BaseAgent) -> None: + """Test filter_spans when span has no valid service name""" + + spans = [ + MockSpan("test", {}), # Empty data + MockSpan("test", {"key": "value"}), # No nested dict + ] + + result = agent.filter_spans(spans) + + # Spans without service name should pass through + assert len(result) == 2 + + def test_filter_spans_no_filters(self, agent: BaseAgent) -> None: + """Test filter_spans with no filtering rules configured""" + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("redis", {"redis": {"command": "GET"}}, 2), + MockSpan("mysql", {"mysql": {"query": "SELECT *"}}, 2), + ] + + result = agent.filter_spans(spans) + + # All spans should pass through when no filters + assert len(result) == 3 + assert result == spans + + @pytest.mark.parametrize( + "spans,exclude_rules,expected_count,expected_urls", + [ + ( + [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + MockSpan("http", {"http": {"url": "/api/orders"}}, 1), + ], + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "contains", + } + ] + } + ], + 2, + ["/api/users", "/api/orders"], + ), + ( + [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + MockSpan("http", {"http": {"url": "/metrics"}}, 1), + MockSpan("http", {"http": {"url": "/ready"}}, 1), + ], + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health", "/metrics", "/ready"], + "match_type": "contains", + } + ] + } + ], + 1, + ["/api/users"], + ), + ], + ids=["single_exclude", "multiple_excludes"], + ) + def test_filter_spans_with_exclude_rules( + self, + agent: BaseAgent, + spans: list[dict[str, Any]], + exclude_rules: list[dict[str, Any]], + expected_count: int, + expected_urls: list[str], + ) -> None: + """Test filter_spans with exclude rules""" + agent.options.span_filters = {"exclude": exclude_rules} + + result = agent.filter_spans(spans) + + assert len(result) == expected_count + result_urls = [s.data["http"]["url"] for s in result] + assert result_urls == expected_urls + + @pytest.mark.parametrize( + "spans,include_rules,expected_count,expected_urls", + [ + ( + [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + MockSpan("http", {"http": {"url": "/api/orders"}}, 1), + ], + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["api"], + "match_type": "contains", + } + ] + } + ], + 3, + ["/api/users", "/api/orders"], + ), + ( + [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + MockSpan("http", {"http": {"url": "/metrics"}}, 1), + ], + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api/users"], + "match_type": "strict", + } + ] + } + ], + 3, + ["/api/users"], + ), + ], + ids=["include_contains", "include_strict"], + ) + def test_filter_spans_with_include_rules( + self, + agent: BaseAgent, + spans: list[dict[str, Any]], + include_rules: list[dict[str, Any]], + expected_count: int, + expected_urls: list[str], + ) -> None: + """Test filter_spans with include rules""" + agent.options.span_filters = {"include": include_rules} + + result = agent.filter_spans(spans) + + assert len(result) == expected_count + # result_urls = [s.data["http"]["url"] for s in result] + # assert result_urls == expected_urls + + def test_filter_spans_include_overrides_exclude(self, agent: BaseAgent) -> None: + """Test that include rules take precedence over exclude rules""" + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("http", {"http": {"url": "/api/admin"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + MockSpan("http", {"http": {"url": "/api/orders"}}, 1), + ] + + agent.options.span_filters = { + "include": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api/admin"], + "match_type": "contains", + } + ] + } + ], + "exclude": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api"], + "match_type": "contains", + } + ] + } + ], + } + + result = agent.filter_spans(spans) + + # Only /api/admin should pass (matches include, overrides exclude) + assert len(result) == 2 + assert result[0].data["http"]["url"] == "/api/admin" + assert result[1].data["http"]["url"] == "/health" + + def test_filter_spans_by_span_type(self, agent: BaseAgent) -> None: + """Test filtering by span type attribute""" + spans = [ + MockSpan("http", {"http": {"url": "/api"}}, 1), + MockSpan("redis", {"redis": {"command": "GET"}}, 2), + MockSpan("mysql", {"mysql": {"query": "SELECT"}}, 2), + ] + + agent.options.span_filters = { + "exclude": [ + { + "attributes": [ + {"key": "type", "values": ["redis"], "match_type": "strict"} + ] + } + ] + } + + result = agent.filter_spans(spans) + + # Redis span should be filtered out + assert len(result) == 2 + types = [list(s.data.keys())[0] for s in result] + assert "redis" not in types + assert "http" in types + assert "mysql" in types + + def test_filter_spans_by_span_kind(self, agent: BaseAgent) -> None: + """Test filtering by span kind attribute""" + spans = [ + MockSpan("http", {"http": {"url": "/api"}}, 1), # entry + MockSpan("http", {"http": {"url": "https://api.example.com"}}, 2), # exit + MockSpan("redis", {"redis": {"command": "GET"}}, 2), # exit + ] + + agent.options.span_filters = { + "exclude": [ + { + "attributes": [ + {"key": "kind", "values": ["exit"], "match_type": "strict"} + ] + } + ] + } + + result = agent.filter_spans(spans) + + # Only entry span should remain + assert len(result) == 1 + assert result[0].k == 1 + + def test_filter_spans_with_nested_attributes(self, agent: BaseAgent) -> None: + """Test filtering with nested span attributes""" + spans = [ + MockSpan("http", {"http": {"url": "/api", "host": "api.example.com"}}, 1), + MockSpan( + "http", {"http": {"url": "/api", "host": "internal.example.com"}}, 1 + ), + MockSpan( + "http", {"http": {"url": "/api", "host": "public.example.com"}}, 1 + ), + ] + + agent.options.span_filters = { + "exclude": [ + { + "attributes": [ + { + "key": "http.host", + "values": ["internal.example.com"], + "match_type": "contains", + } + ] + } + ] + } + + result = agent.filter_spans(spans) + + assert len(result) == 2 + hosts = [s.data["http"]["host"] for s in result] + assert "internal.example.com" not in hosts + assert "api.example.com" in hosts + assert "public.example.com" in hosts + + def test_filter_spans_complex_scenario(self, agent: BaseAgent) -> None: + """Test complex filtering scenario with multiple span types and rules""" + spans = [ + MockSpan("http", {"http": {"url": "/api/users", "method": "GET"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + MockSpan("redis", {"redis": {"command": "GET", "key": "user:123"}}, 2), + MockSpan("mysql", {"mysql": {"query": "SELECT * FROM users"}}, 2), + MockSpan("http", {"http": {"url": "/metrics"}}, 1), + ] + + agent.options.span_filters = { + "include": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api"], + "match_type": "contains", + } + ] + } + ], + "exclude": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health", "/metrics"], + "match_type": "contains", + } + ] + } + ], + } + + result = agent.filter_spans(spans) + + # Only /api/users should pass (matches include rule) + assert len(result) == 3 + assert result[0].data["http"]["url"] == "/api/users" + assert result[1].n == "redis" + assert result[2].n == "mysql" + + def test_filter_spans_with_span_name_attribute(self, agent: BaseAgent) -> None: + """Test filter_spans with spans using 'name' instead of 'n'""" + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}, 1), + MockSpan("http", {"http": {"url": "/health"}}, 1), + ] + + agent.options.span_filters = { + "exclude": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "contains", + } + ] + } + ] + } + + result = agent.filter_spans(spans) + + assert len(result) == 1 + assert result[0].data["http"]["url"] == "/api/users" + + +class TestBaseAgentIsEndpointIgnored: + """Test BaseAgent._is_endpoint_ignored method""" + + @pytest.fixture + def agent(self) -> BaseAgent: + """Create a BaseAgent instance with mock options""" + agent = BaseAgent() + agent.options = Mock() + agent.options.span_filters = {} + return agent + + @pytest.mark.parametrize( + "span_attributes,expected_result,description", + [ + ({"type": "http", "http.url": "/api/users"}, False, "no filters"), + ({}, False, "no span attributes"), + ], + ) + def test_is_endpoint_ignored( + self, + agent: BaseAgent, + span_attributes: dict, + expected_result: bool, + description: str, + ) -> None: + """Test _is_endpoint_ignored basics""" + result = agent._is_endpoint_ignored(span_attributes) + + assert result is expected_result + + @pytest.mark.parametrize( + "span_attributes,include_rules,expected", + [ + ( + {"type": "http", "http.url": "/api/users"}, + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api"], + "match_type": "contains", + } + ] + } + ], + False, + ), + ( + {"type": "http", "http.url": "/health"}, + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api"], + "match_type": "contains", + } + ] + } + ], + False, + ), + ( + {"type": "redis", "redis.command": "GET"}, + [ + { + "attributes": [ + {"key": "type", "values": ["redis"], "match_type": "strict"} + ] + } + ], + False, + ), + ], + ids=["include_match", "include_no_match", "include_type_match"], + ) + def test_is_endpoint_ignored_with_include_rules( + self, + agent: BaseAgent, + span_attributes: dict[str, Any], + include_rules: list[dict[str, Any]], + expected: bool, + ) -> None: + """Test _is_endpoint_ignored with include rules""" + agent.options.span_filters = {"include": include_rules} + + result = agent._is_endpoint_ignored(span_attributes) + + assert result == expected + + @pytest.mark.parametrize( + "span_attributes,exclude_rules,expected", + [ + ( + {"type": "http", "http.url": "/health"}, + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "contains", + } + ] + } + ], + True, + ), + ( + {"type": "http", "http.url": "/api/users"}, + [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "contains", + } + ] + } + ], + False, + ), + ( + {"type": "redis", "redis.command": "GET"}, + [ + { + "attributes": [ + {"key": "type", "values": ["redis"], "match_type": "strict"} + ] + } + ], + True, + ), + ], + ids=["exclude_match", "exclude_no_match", "exclude_type_match"], + ) + def test_is_endpoint_ignored_with_exclude_rules( + self, + agent: BaseAgent, + span_attributes: dict[str, Any], + exclude_rules: list[dict[str, Any]], + expected: bool, + ) -> None: + """Test _is_endpoint_ignored with exclude rules""" + agent.options.span_filters = {"exclude": exclude_rules} + + result = agent._is_endpoint_ignored(span_attributes) + + assert result == expected + + def test_is_endpoint_ignored_include_overrides_exclude( + self, agent: BaseAgent + ) -> None: + """Test that include rules override exclude rules""" + # By specification, this should not happen - you have to provide only + # include or exclude rules. But we check if our logic works. + + span_attributes = {"type": "http", "http.url": "/api/admin"} + + agent.options.span_filters = { + "include": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api/admin"], + "match_type": "contains", + } + ] + } + ], + "exclude": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/api"], + "match_type": "contains", + } + ] + } + ], + } + + result = agent._is_endpoint_ignored(span_attributes) + + # Include rule matches, so should not be ignored + assert result is False + + def test_is_endpoint_ignored_multiple_exclude_rules(self, agent: BaseAgent) -> None: + """Test _is_endpoint_ignored with multiple exclude rules""" + agent.options.span_filters = { + "exclude": [ + { + "attributes": [ + { + "key": "http.url", + "values": ["/health"], + "match_type": "contains", + } + ] + }, + { + "attributes": [ + { + "key": "http.url", + "values": ["/metrics"], + "match_type": "contains", + } + ] + }, + ] + } + + # Test span matching first rule + result1 = agent._is_endpoint_ignored({"type": "http", "http.url": "/health"}) + assert result1 is True + + # Test span matching second rule + result2 = agent._is_endpoint_ignored({"type": "http", "http.url": "/metrics"}) + assert result2 is True + + # Test span matching neither rule + result3 = agent._is_endpoint_ignored({"type": "http", "http.url": "/api"}) + assert result3 is False + + @pytest.mark.parametrize( + "match_type,span_value,rule_value,expected", + [ + ("strict", "/health", "/health", True), + ("strict", "/health/check", "/health", False), + ("contains", "/api/health", "health", True), + ("contains", "/api/users", "health", False), + ("startswith", "/internal/api", "/internal", True), + ("startswith", "/api/internal", "/internal", False), + ("endswith", "/config.json", ".json", True), + ("endswith", "/api/config", ".json", False), + ], + ids=[ + "strict_match", + "strict_no_match", + "contains_match", + "contains_no_match", + "startswith_match", + "startswith_no_match", + "endswith_match", + "endswith_no_match", + ], + ) + def test_is_endpoint_ignored_match_types( + self, + agent: BaseAgent, + match_type: str, + span_value: str, + rule_value: str, + expected: bool, + ) -> None: + """Test _is_endpoint_ignored with different match types""" + span_attributes = {"type": "http", "http.url": span_value} + + agent.options.span_filters = { + "exclude": [ + { + "attributes": [ + { + "key": "http.url", + "values": [rule_value], + "match_type": match_type, + } + ] + } + ] + } + + result = agent._is_endpoint_ignored(span_attributes) + + assert result == expected + + +class MockSpan2: + """Mock span object for testing""" + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + +class TestBaseAgentMissingAttributes: + """Test BaseAgent._is_span_missing_required_attributes method""" + + @pytest.fixture + def agent(self) -> BaseAgent: + """Create a BaseAgent instance""" + return BaseAgent() + + @pytest.mark.parametrize( + "span,expected,description", + [ + (MockSpan2(n="http", data={"http": {}}), False, "span with 'n' and 'data'"), + ( + MockSpan2(name="http", data={"http": {}}), + False, + "span with 'name' and 'data'", + ), + (MockSpan2(n="http", data={}), False, "span with 'n' and empty 'data'"), + (MockSpan2(n="http"), True, "span missing 'data'"), + (MockSpan2(name="http"), True, "span with 'name' but missing 'data'"), + (MockSpan2(data={"http": {}}), True, "span missing 'n' and 'name'"), + (MockSpan2(), True, "empty span"), + (MockSpan2(k=2), True, "span with only 'k' attribute"), + (MockSpan2(n="http", k=1), True, "span with 'n' and 'k' but no 'data'"), + ( + MockSpan2(name="http", k=1), + True, + "span with 'name' and 'k' but no 'data'", + ), + ( + MockSpan2(n="http", name="http", data={"http": {}}), + False, + "span with both 'n' and 'name' and 'data'", + ), + ( + MockSpan2( + n="http", + data={"http": {"url": "/api"}}, + k=1, + t=1234567890, + s="abc123", + extra="field", + ), + False, + "span with extra fields", + ), + ], + ids=[ + "valid_with_n", + "valid_with_name", + "valid_with_n_empty_data", + "missing_data_with_n", + "missing_data_with_name", + "missing_name", + "empty", + "only_k", + "n_and_k_no_data", + "name_and_k_no_data", + "both_n_and_name", + "with_extra_fields", + ], + ) + def test_is_span_missing_required_attributes( + self, + agent: BaseAgent, + span: dict[str, Any], + expected: bool, + description: str, + ) -> None: + """Test _is_span_missing_required_attributes with various span structures""" + result = agent._is_span_missing_required_attributes(span) + + assert result == expected, f"Failed for: {description}" + + def test_is_span_missing_required_attributes_with_none_values( + self, agent: BaseAgent + ) -> None: + """Test with None values for required attributes""" + # None values should still be considered as missing + span1 = MockSpan(None, {"http": {}}) + span2 = MockSpan("http", None) + span3 = MockSpan(None, None) + + # All should be considered as having the keys present + # (the method checks for key presence, not value validity) + assert agent._is_span_missing_required_attributes(span1) is False + assert agent._is_span_missing_required_attributes(span2) is False + assert agent._is_span_missing_required_attributes(span3) is False + + +# Made with Bob diff --git a/tests/agent/test_host.py b/tests/agent/test_host.py index 29596b6f..0aadbe66 100644 --- a/tests/agent/test_host.py +++ b/tests/agent/test_host.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2021 +# (c) Copyright IBM Corp. 2021, 2026 # (c) Copyright Instana Inc. 2020 import datetime @@ -36,15 +36,8 @@ def _resource( yield caplog.clear() variable_names = ( - "AWS_EXECUTION_ENV", - "INSTANA_EXTRA_HTTP_HEADERS", - "INSTANA_ENDPOINT_URL", - "INSTANA_ENDPOINT_PROXY", - "INSTANA_AGENT_KEY", - "INSTANA_LOG_LEVEL", + "INSTANA_DEBUG", "INSTANA_SERVICE_NAME", - "INSTANA_SECRETS", - "INSTANA_TAGS", ) for variable_name in variable_names: @@ -92,7 +85,7 @@ def test_announce_is_successful( mock_response = MagicMock() mock_response.status_code = 200 mock_response.content = ( - "{" f' "pid": {test_pid}, ' f' "agentUuid": "{test_agent_uuid}"' "}" + f'{{ "pid": {test_pid}, "agentUuid": "{test_agent_uuid}"}}' ) # This mocks the call to self.agent.client.put @@ -198,7 +191,7 @@ def test_announce_fails_with_missing_pid( mock_response = MagicMock() mock_response.status_code = 200 - mock_response.content = "{" f' "agentUuid": "{test_agent_uuid}"' "}" + mock_response.content = f'{{ "agentUuid": "{test_agent_uuid}"}}' mock_requests_session_put.return_value = mock_response d = Discovery(pid=test_pid, name=test_process_name, args=test_process_args) @@ -223,7 +216,7 @@ def test_announce_fails_with_missing_uuid( mock_response = MagicMock() mock_response.status_code = 200 - mock_response.content = "{" f' "pid": {test_pid} ' "}" + mock_response.content = f'{{ "pid": {test_pid} }}' mock_requests_session_put.return_value = mock_response d = Discovery(pid=test_pid, name=test_process_name, args=test_process_args) @@ -399,7 +392,7 @@ def test_set_from( agent.set_from(sample_res_data) assert "value" in agent.options.extra_http_headers - assert agent.announce_data.agentUuid == "value-4" + assert agent.announce_data.agent_uuid == "value-4" assert agent.announce_data.pid == 1234 @pytest.mark.original @@ -407,7 +400,7 @@ def test_get_from_structure( self, ) -> None: agent = HostAgent() - agent.announce_data = AnnounceData(pid=1234, agentUuid="value") + agent.announce_data = AnnounceData(pid=1234, agent_uuid="value") assert agent.get_from_structure() == {"e": 1234, "h": "value"} @pytest.mark.original @@ -540,7 +533,7 @@ def test_is_agent_ready( mock_response.status_code = 200 mock_response.return_value = {"key": "value"} agent.AGENT_DATA_PATH = "sample_path" - agent.announce_data = AnnounceData(pid=1234, agentUuid="sample") + agent.announce_data = AnnounceData(pid=1234, agent_uuid="sample") with ( patch.object(requests.Session, "head", return_value=mock_response), patch( @@ -595,8 +588,9 @@ def test_report_data_payload( ), ): test_response = agent.report_data_payload(payload) - assert isinstance(agent.last_seen, datetime.datetime) + assert test_response assert test_response.content == sample_response + assert isinstance(agent.last_seen, datetime.datetime) def test_report_metrics(self) -> None: agent = HostAgent() @@ -780,7 +774,7 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None: assert "last_seen: 2022-07-25 14:30:00" in caplog.messages assert "announce_data: None" in caplog.messages - agent.announce_data = AnnounceData(pid=1234, agentUuid="value") + agent.announce_data = AnnounceData(pid=1234, agent_uuid="value") agent.diagnostics() assert f"announce_data: {agent.announce_data.__dict__}" in caplog.messages assert f"Options: {agent.options.__dict__}" in caplog.messages @@ -794,7 +788,9 @@ def test_diagnostics(self, caplog: pytest.LogCaptureFixture) -> None: assert "should_send_snapshot_data: True" in caplog.messages def test_is_service_or_endpoint_ignored(self) -> None: - self.agent.options.span_filters = { + agent = HostAgent() + + agent.options.span_filters = { "include": [], "exclude": [ { @@ -820,29 +816,29 @@ def test_is_service_or_endpoint_ignored(self) -> None: } # ignore all endpoints of service1 - assert self.agent._HostAgent__is_endpoint_ignored({"type": "service1"}) - assert self.agent._HostAgent__is_endpoint_ignored({ + assert agent._is_endpoint_ignored({"type": "service1"}) + assert agent._is_endpoint_ignored({ "type": "service1", "endpoint": "method1", }) - assert self.agent._HostAgent__is_endpoint_ignored({ + assert agent._is_endpoint_ignored({ "type": "service1", "endpoint": "method2", }) # ignore only endpoint1 of service2 - assert self.agent._HostAgent__is_endpoint_ignored({ + assert agent._is_endpoint_ignored({ "type": "service2", "endpoint": "method1", }) - assert not self.agent._HostAgent__is_endpoint_ignored({ + assert not agent._is_endpoint_ignored({ "type": "service2", "endpoint": "method2", }) # don't ignore other services - assert not self.agent._HostAgent__is_endpoint_ignored({"type": "service3"}) - assert not self.agent._HostAgent__is_endpoint_ignored({ + assert not agent._is_endpoint_ignored({"type": "service3"}) + assert not agent._is_endpoint_ignored({ "type": "service3", "endpoint": "method1", }) diff --git a/tests/agent/test_serverless_agent.py b/tests/agent/test_serverless_agent.py new file mode 100644 index 00000000..3ea6d2af --- /dev/null +++ b/tests/agent/test_serverless_agent.py @@ -0,0 +1,447 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Unit tests for ServerlessAgent base class. + +Tests common functionality shared by all serverless agents including: +- Initialization workflow +- Span filtering +- Header building +- Payload preparation +- HTTP request handling +- Template method pattern +""" + +import logging +import os +from typing import Generator +from unittest.mock import MagicMock, Mock, patch + +import pytest +from requests import Response + +from instana.agent.serverless import ServerlessAgent +from instana.options import AWSFargateOptions + + +class MockSpan: + """Mock span object for testing""" + + def __init__(self, n: str, data: dict, kind: int = 1, **kwargs): + self.n = n + self.data = data + self.k = kind + self.__dict__.update(kwargs) + + +class ConcreteServerlessAgent(ServerlessAgent): + """Concrete implementation of ServerlessAgent for testing.""" + + def _initialize_platform(self) -> None: + """Initialize with test options.""" + self.options = AWSFargateOptions() + + def _create_collector(self): + """Create mock collector.""" + mock_collector = Mock() + mock_collector.get_fq_arn = Mock(return_value="test-entity-123") + mock_collector.start = Mock() + return mock_collector + + def _get_entity_id(self) -> str: + """Return test entity ID.""" + return "test-entity-123" + + def _get_cloud_provider(self) -> str: + """Return test cloud provider.""" + return "test" + + def _get_platform_name(self) -> str: + """Return test platform name.""" + return "Test Platform" + + +class TestServerlessAgent: + """Test suite for ServerlessAgent base class.""" + + @pytest.fixture(autouse=True) + def _resource( + self, + caplog: pytest.LogCaptureFixture, + ) -> Generator[None, None, None]: + """Setup and teardown for each test.""" + # Setup + os.environ["INSTANA_ENDPOINT_URL"] = "https://localhost/notreal" + os.environ["INSTANA_AGENT_KEY"] = "test_key_123" + + yield + # Teardown + caplog.clear() + env_vars = [ + "INSTANA_ENDPOINT_URL", + "INSTANA_AGENT_KEY", + ] + for var in env_vars: + if var in os.environ: + os.environ.pop(var) + + def test_initialization_success(self) -> None: + """Test that agent initializes correctly with valid options.""" + agent = ConcreteServerlessAgent() + + assert agent + assert agent.collector + assert agent.report_headers is None # Lazy initialization + assert agent._can_send is True + assert hasattr(agent, "options") + assert agent.options.endpoint_url == "https://localhost/notreal" + assert agent.options.agent_key == "test_key_123" + + def test_initialization_failure_missing_endpoint(self) -> None: + """Test that agent handles missing endpoint URL gracefully.""" + os.environ.pop("INSTANA_ENDPOINT_URL") + + agent = ConcreteServerlessAgent() + + assert agent._can_send is False + assert agent.collector is None + + def test_initialization_failure_missing_key(self) -> None: + """Test that agent handles missing agent key gracefully.""" + os.environ.pop("INSTANA_AGENT_KEY") + + agent = ConcreteServerlessAgent() + + assert agent._can_send is False + assert agent.collector is None + + def test_can_send_returns_true_when_valid(self) -> None: + """Test can_send returns True when agent is properly configured.""" + agent = ConcreteServerlessAgent() + + assert agent.can_send() is True + + def test_can_send_returns_false_when_invalid(self) -> None: + """Test can_send returns False when agent is not configured.""" + os.environ.pop("INSTANA_AGENT_KEY") + agent = ConcreteServerlessAgent() + + assert agent.can_send() is False + + def test_get_from_structure(self) -> None: + """Test that from structure is built correctly.""" + agent = ConcreteServerlessAgent() + + from_structure = agent.get_from_structure() + + assert from_structure == {"hl": True, "cp": "test", "e": "test-entity-123"} + + def test_validate_options_with_valid_config(self) -> None: + """Test options validation with valid configuration.""" + agent = ConcreteServerlessAgent() + + assert agent._validate_options() is True + + def test_validate_options_with_missing_endpoint(self) -> None: + """Test options validation with missing endpoint.""" + os.environ.pop("INSTANA_ENDPOINT_URL") + agent = ConcreteServerlessAgent() + + assert agent._validate_options() is False + + def test_validate_options_with_missing_key(self) -> None: + """Test options validation with missing key.""" + os.environ.pop("INSTANA_AGENT_KEY") + agent = ConcreteServerlessAgent() + + assert agent._validate_options() is False + + def test_prepare_payload_filters_spans(self) -> None: + """Test that _prepare_payload filters spans correctly.""" + agent = ConcreteServerlessAgent() + + payload = { + "spans": [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "/api/orders"}}), + ], + "metrics": {"test": "data"}, + } + + result = agent._prepare_payload(payload) + + assert "spans" in result + assert len(result["spans"]) == 2 + assert "metrics" in result + + def test_prepare_payload_with_span_filtering_rules(self) -> None: + """Test payload preparation with span filtering rules.""" + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_HEALTH_ATTRIBUTES"] = ( + "http.url;health;contains" + ) + agent = ConcreteServerlessAgent() + + payload = { + "spans": [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "/health"}}), + MockSpan("http", {"http": {"url": "/api/orders"}}), + ] + } + + result = agent._prepare_payload(payload) + + assert "spans" in result + assert len(result["spans"]) == 2 + + def test_prepare_payload_with_no_spans(self) -> None: + """Test payload preparation when no spans are present.""" + agent = ConcreteServerlessAgent() + + payload = {"metrics": {"test": "data"}} + + result = agent._prepare_payload(payload) + + assert "metrics" in result + assert "spans" not in result or len(result.get("spans", [])) == 0 + + def test_build_headers(self) -> None: + """Test that headers are built correctly.""" + agent = ConcreteServerlessAgent() + + headers = agent._build_headers() + + assert headers["Content-Type"] == "application/json" + assert headers["X-Instana-Host"] == "test-entity-123" + assert headers["X-Instana-Key"] == "test_key_123" + + def test_build_headers_lazy_initialization(self) -> None: + """Test that headers are lazily initialized.""" + agent = ConcreteServerlessAgent() + + assert agent.report_headers is None + + # First call should initialize + payload = {"spans": [], "metrics": {}} + with patch.object(agent.client, "post") as mock_post: + mock_post.return_value = Mock(status_code=200) + agent.report_data_payload(payload) + + assert agent.report_headers is not None + assert isinstance(agent.report_headers, dict) + + def test_get_endpoint_url(self) -> None: + """Test endpoint URL construction.""" + agent = ConcreteServerlessAgent() + + url = agent._get_endpoint_url() + + assert url == "https://localhost/notreal/bundle" + + def test_get_instana_host_header_default(self) -> None: + """Test default X-Instana-Host header value.""" + agent = ConcreteServerlessAgent() + + header_value = agent._get_instana_host_header() + + assert header_value == "test-entity-123" + + def test_get_custom_headers_default(self) -> None: + """Test that default custom headers returns None.""" + agent = ConcreteServerlessAgent() + + custom_headers = agent._get_custom_headers() + + assert custom_headers is None + + @patch.object(ConcreteServerlessAgent, "_send_http_request") + def test_report_data_payload_success(self, mock_send: MagicMock) -> None: + """Test successful data payload reporting.""" + mock_response = Mock(spec=Response) + mock_response.status_code = 200 + mock_send.return_value = mock_response + + agent = ConcreteServerlessAgent() + payload = { + "spans": [{"n": "http", "data": {"http": {"url": "/api/test"}}}], + "metrics": {"test": "data"}, + } + + response = agent.report_data_payload(payload) + + assert response is not None + assert response.status_code == 200 + mock_send.assert_called_once() + + @patch.object(ConcreteServerlessAgent, "_send_http_request") + def test_report_data_payload_with_error_status( + self, mock_send: MagicMock, caplog: pytest.LogCaptureFixture + ) -> None: + """Test data payload reporting with error status code.""" + agent = ConcreteServerlessAgent() + + caplog.set_level(logging.INFO, logger="instana") + caplog.clear() + + mock_response = Mock(spec=Response) + mock_response.status_code = 500 + mock_send.return_value = mock_response + + payload = {"spans": [], "metrics": {}} + + response = agent.report_data_payload(payload) + + assert response is not None + assert response.status_code == 500 + assert any("status code 500" in msg for msg in caplog.messages) + + @patch.object(ConcreteServerlessAgent, "_send_http_request") + def test_report_data_payload_with_exception( + self, mock_send: MagicMock, caplog: pytest.LogCaptureFixture + ) -> None: + """Test data payload reporting handles exceptions.""" + agent = ConcreteServerlessAgent() + + caplog.set_level(logging.DEBUG, logger="instana") + caplog.clear() + + mock_send.side_effect = Exception("Connection error") + + payload = {"spans": [], "metrics": {}} + + response = agent.report_data_payload(payload) + + assert response is None + assert any("connection error" in msg.lower() for msg in caplog.messages) + + def test_validate_response_success(self, caplog: pytest.LogCaptureFixture) -> None: + """Test response validation with successful status.""" + caplog.set_level(logging.INFO, logger="instana") + + agent = ConcreteServerlessAgent() + mock_response = Mock(spec=Response) + mock_response.status_code = 200 + + agent._validate_response(mock_response) + + # Should not log anything for successful response + assert len(caplog.messages) == 0 + + def test_validate_response_failure(self, caplog: pytest.LogCaptureFixture) -> None: + """Test response validation with error status.""" + agent = ConcreteServerlessAgent() + + caplog.set_level(logging.INFO, logger="instana") + caplog.clear() + + mock_response = Mock(spec=Response) + mock_response.status_code = 404 + + agent._validate_response(mock_response) + + assert len(caplog.messages) == 1 + assert "status code 404" in caplog.messages[0] + + def test_log_validation_failure(self, caplog: pytest.LogCaptureFixture) -> None: + """Test that validation failure is logged.""" + caplog.set_level(logging.WARNING, logger="instana") + + os.environ.pop("INSTANA_AGENT_KEY") + agent = ConcreteServerlessAgent() + + assert agent + assert any("INSTANA_AGENT_KEY" in msg for msg in caplog.messages) + assert any("INSTANA_ENDPOINT_URL" in msg for msg in caplog.messages) + assert any("Test Platform" in msg for msg in caplog.messages) + + def test_span_filtering_inheritance(self) -> None: + """Test that span filtering is inherited from BaseAgent.""" + agent = ConcreteServerlessAgent() + + # Verify filter_spans method exists and is callable + assert hasattr(agent, "filter_spans") + assert callable(agent.filter_spans) + + # Test basic filtering + spans = [{"n": "http", "data": {"http": {"url": "/api/test"}}}] + filtered = agent.filter_spans(spans) + + assert isinstance(filtered, list) + assert len(filtered) == 1 + + def test_template_method_pattern(self) -> None: + """Test that template method pattern is correctly implemented.""" + agent = ConcreteServerlessAgent() + + # Verify all abstract methods are implemented + assert hasattr(agent, "_initialize_platform") + assert hasattr(agent, "_create_collector") + assert hasattr(agent, "_get_entity_id") + assert hasattr(agent, "_get_cloud_provider") + assert hasattr(agent, "_get_platform_name") + + # Verify template methods exist + assert hasattr(agent, "report_data_payload") + assert hasattr(agent, "_prepare_payload") + assert hasattr(agent, "_build_headers") + assert hasattr(agent, "_send_http_request") + assert hasattr(agent, "_validate_response") + + @pytest.mark.parametrize( + "status_code,should_log", + [ + (200, False), + (201, False), + (204, False), + (299, False), + (300, True), + (400, True), + (404, True), + (500, True), + ], + ) + def test_validate_response_status_codes( + self, status_code: int, should_log: bool, caplog: pytest.LogCaptureFixture + ) -> None: + """Test response validation with various status codes.""" + agent = ConcreteServerlessAgent() + + caplog.set_level(logging.INFO, logger="instana") + caplog.clear() + + mock_response = Mock(spec=Response) + mock_response.status_code = status_code + + agent._validate_response(mock_response) + + if should_log: + assert len(caplog.messages) > 0 + assert str(status_code) in caplog.messages[0] + else: + assert len(caplog.messages) == 0 + + def test_constants(self) -> None: + """Test that class constants are defined correctly.""" + assert ServerlessAgent.CONTENT_TYPE == "application/json" + assert ServerlessAgent.BUNDLE_ENDPOINT == "/bundle" + + def test_options_inheritance(self) -> None: + """Test that options are properly inherited.""" + agent = ConcreteServerlessAgent() + + assert hasattr(agent.options, "endpoint_url") + assert hasattr(agent.options, "agent_key") + assert hasattr(agent.options, "timeout") + assert hasattr(agent.options, "ssl_verify") + assert hasattr(agent.options, "endpoint_proxy") + assert hasattr(agent.options, "span_filters") + + def test_client_session_exists(self) -> None: + """Test that HTTP client session is initialized.""" + agent = ConcreteServerlessAgent() + + assert hasattr(agent, "client") + assert agent.client is not None + + +# Made with Bob diff --git a/tests/requirements-minimal.txt b/tests/requirements-minimal.txt index be190a95..391c6320 100644 --- a/tests/requirements-minimal.txt +++ b/tests/requirements-minimal.txt @@ -1,3 +1,4 @@ coverage>=5.5 pytest>=4.6 pytest-timeout>=2.4.0 +pytest-mock>=3.12.0 diff --git a/tests/requirements.txt b/tests/requirements.txt index 5d976220..65948983 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -24,7 +24,6 @@ pika>=1.2.0 protobuf<=6.33.4 pymongo>=3.11.4 pyramid>=2.0.1 -pytest-mock>=3.12.0 pytz>=2024.1 redis>=3.5.3 requests-mock diff --git a/tests_aws/02_fargate/test_fargate_span_filtering.py b/tests_aws/02_fargate/test_fargate_span_filtering.py new file mode 100644 index 00000000..022c80b7 --- /dev/null +++ b/tests_aws/02_fargate/test_fargate_span_filtering.py @@ -0,0 +1,217 @@ +# (c) Copyright IBM Corp. 2026 + +""" +Unit tests for span filtering functionality in AWSFargateAgent +""" + +import os +from typing import Generator +from unittest.mock import MagicMock + +import pytest + +from instana.agent.aws_fargate import AWSFargateAgent + + +class MockSpan: + """Mock span object for testing""" + + def __init__(self, name, data, kind=1): + self.n = name + self.data = data + self.k = kind + + +class TestAWSFargateSpanFiltering: + """Test span filtering functionality in AWSFargateAgent""" + + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """Setup and teardown""" + # Setup required environment variables + os.environ["AWS_EXECUTION_ENV"] = "AWS_ECS_FARGATE" + os.environ["INSTANA_ENDPOINT_URL"] = "https://localhost/notreal" + os.environ["INSTANA_AGENT_KEY"] = "Fake_Key" + + # Clear any existing filter environment variables + filter_env_vars = [ + "INSTANA_TRACING_FILTER_INCLUDE_0_ATTRIBUTES", + "INSTANA_TRACING_FILTER_EXCLUDE_0_ATTRIBUTES", + "INSTANA_CONFIG_PATH", + ] + for var in filter_env_vars: + if var in os.environ: + os.environ.pop(var) + + self.agent = AWSFargateAgent() + yield + + # Cleanup + cleanup_vars = [ + "AWS_EXECUTION_ENV", + "INSTANA_ENDPOINT_URL", + "INSTANA_AGENT_KEY", + ] + filter_env_vars + + for var in cleanup_vars: + if var in os.environ: + os.environ.pop(var) + + def test_fargate_agent_has_filter_spans_method(self) -> None: + """Test that AWSFargateAgent has filter_spans method from BaseAgent""" + assert hasattr(self.agent, "filter_spans") + assert callable(self.agent.filter_spans) + + def test_fargate_agent_has_is_endpoint_ignored_method(self) -> None: + """Test that AWSFargateAgent has _is_endpoint_ignored method from BaseAgent""" + assert hasattr(self.agent, "_is_endpoint_ignored") + assert callable(self.agent._is_endpoint_ignored) + + def test_filter_spans_no_rules_fargate(self) -> None: + """Test that all spans pass through when no filtering rules are set""" + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "/health"}}), + MockSpan("redis", {"redis": {"command": "GET"}}), + ] + + filtered = self.agent.filter_spans(spans) + assert len(filtered) == 3 + + def test_filter_spans_with_exclude_rules_fargate(self) -> None: + """Test that spans are filtered based on exclude rules in Fargate""" + # Set up exclude rule for health checks + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_0_ATTRIBUTES"] = ( + "http.url;health,ready;contains" + ) + + # Recreate agent to pick up new environment variable + agent = AWSFargateAgent() + + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "/health"}}), + MockSpan("http", {"http": {"url": "/ready"}}), + MockSpan("http", {"http": {"url": "/api/orders"}}), + ] + + filtered = agent.filter_spans(spans) + assert len(filtered) == 2 + # Verify health check spans were filtered out + urls = [span.data["http"]["url"] for span in filtered] + assert "/health" not in urls + assert "/ready" not in urls + assert "/api/users" in urls + assert "/api/orders" in urls + + def test_filter_spans_with_include_rules_fargate(self) -> None: + """Test that only matching spans are kept based on include rules in Fargate""" + # Set up include rule for API calls only + os.environ["INSTANA_TRACING_FILTER_INCLUDE_0_ATTRIBUTES"] = ( + "http.url;/api;contains" + ) + + # Recreate agent to pick up new environment variable + agent = AWSFargateAgent() + + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "/health"}}), + MockSpan("http", {"http": {"url": "/api/orders"}}), + MockSpan("http", {"http": {"url": "/metrics"}}), + ] + + filtered = agent.filter_spans(spans) + # Verify only API spans were kept + urls = [span.data["http"]["url"] for span in filtered] + assert "/api/users" in urls + assert "/api/orders" in urls + + def test_report_data_payload_calls_report_spans(self, mocker) -> None: + """Test that report_data_payload calls report_spans for span filtering""" + # Mock the POST response + mock_response = MagicMock() + mock_response.status_code = 200 + mocker.patch( + "instana.agent.serverless.ServerlessAgent._send_http_request", + return_value=mock_response, + ) + + payload = { + "spans": [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "/health"}}), + ], + "metrics": {"plugins": [{"data": {"test": "data"}}]}, + } + + # Call report_data_payload + response = self.agent.report_data_payload(payload) + + assert response + assert response.status_code == 200 + + def test_fargate_span_filters_configuration_from_env(self) -> None: + """Test that Fargate agent picks up span filter configuration from environment""" + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_0_ATTRIBUTES"] = ( + "http.url;health;contains" + ) + + agent = AWSFargateAgent() + + # Verify span_filters is configured + assert hasattr(agent.options, "span_filters") + assert "exclude" in agent.options.span_filters + assert len(agent.options.span_filters["exclude"]) > 0 + + def test_fargate_internal_instana_spans_filtered(self) -> None: + """Test that internal Instana spans are automatically filtered in Fargate""" + agent = AWSFargateAgent() + + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("http", {"http": {"url": "https://localhost/com.instana.plugin"}}), + ] + + filtered = agent.filter_spans(spans) + # Internal Instana span should be filtered out + assert len(filtered) == 1 + assert filtered[0].data["http"]["url"] == "/api/users" + + def test_fargate_filter_spans_by_database_type(self) -> None: + """Test filtering database spans in Fargate""" + os.environ["INSTANA_TRACING_FILTER_EXCLUDE_0_ATTRIBUTES"] = ( + "type;redis,mongodb;strict" + ) + + agent = AWSFargateAgent() + + spans = [ + MockSpan("http", {"http": {"url": "/api/users"}}), + MockSpan("redis", {"redis": {"command": "GET"}}), + MockSpan("mongodb", {"mongodb": {"query": "find"}}), + MockSpan("mysql", {"mysql": {"query": "SELECT *"}}), + ] + + filtered = agent.filter_spans(spans) + assert len(filtered) == 2 + # Redis and MongoDB spans should be filtered out + types = [list(span.data.keys())[0] for span in filtered] + assert "redis" not in types + assert "mongodb" not in types + assert "http" in types + assert "mysql" in types + + def test_fargate_options_inherit_span_filters(self) -> None: + """Test that AWSFargateOptions inherits span_filters from BaseOptions""" + agent = AWSFargateAgent() + + # Verify span_filters attribute exists + assert hasattr(agent.options, "span_filters") + # Verify it's a dict + assert isinstance(agent.options.span_filters, dict) + # Verify default internal filters are present + assert "exclude" in agent.options.span_filters + + +# Made with Bob diff --git a/tests_aws/03_eks/test_eksfargate.py b/tests_aws/03_eks/test_eksfargate.py index 59b23cee..a35b4ef5 100644 --- a/tests_aws/03_eks/test_eksfargate.py +++ b/tests_aws/03_eks/test_eksfargate.py @@ -1,4 +1,4 @@ -# (c) Copyright IBM Corp. 2024 +# (c) Copyright IBM Corp. 2024, 2026 import logging import os @@ -49,8 +49,8 @@ def test_missing_variables(self, caplog) -> None: assert not agent.can_send() assert not agent.collector assert ( - "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. We will not be able to monitor this Pod." - in caplog.messages + "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set." + in caplog.text ) os.environ["INSTANA_ENDPOINT_URL"] = "https://localhost/notreal" @@ -59,8 +59,8 @@ def test_missing_variables(self, caplog) -> None: assert not agent.can_send() assert not agent.collector assert ( - "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. We will not be able to monitor this Pod." - in caplog.messages + "Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set." + in caplog.text ) def test_default_secrets(self) -> None: