Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,4 @@ Users can be now overwrite the input and ouput attributes of spans created by in

- Added utility to set input and output data for any active span in a trace

[0.1.86]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main
[0.1.89]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main
77 changes: 77 additions & 0 deletions netra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,20 @@ def shutdown(cls) -> None:
except Exception:
pass

# Close simulation HTTP client
if hasattr(cls, "simulation") and cls.simulation is not None:
try:
cls.simulation.close()
except Exception:
pass

# Close evaluation HTTP client
if hasattr(cls, "evaluation") and cls.evaluation is not None:
try:
cls.evaluation.close()
except Exception:
pass

@classmethod
def get_meter(cls, name: str = "netra", version: Optional[str] = None) -> otel_metrics.Meter:
"""
Expand Down Expand Up @@ -409,6 +423,47 @@ def set_custom_event(cls, event_name: str, attributes: Any) -> None:
else:
logger.warning("Both event_name and attributes must be provided for custom events.")

@classmethod
def record_exception(
cls,
exception: BaseException,
attributes: Optional[Dict[str, Any]] = None,
) -> None:
"""Record a caught exception on the currently active span.

Use this inside ``except`` blocks to attach exception details to the
current span when the exception is being handled and will not propagate
to the SDK's automatic capture logic.

The method adds a standard OpenTelemetry exception event (with type,
message, and stacktrace), sets the span status to ERROR, and records
the ``netra.error_message`` attribute for consistency with the rest of
the SDK.

Args:
exception: The exception instance to record.
attributes: Optional extra attributes to attach to the exception
event.

Example::

@workflow
def process_order(order_id: str) -> str:
try:
result = call_payment_api(order_id)
except PaymentError as exc:
Netra.record_exception(exc)
return "fallback_result"
return result
"""
if not isinstance(exception, BaseException):
logger.error(
"record_exception: exception must be a BaseException instance, got %s",
type(exception),
)
return
SessionManager.record_exception(exception, attributes=attributes)

@classmethod
def add_conversation(cls, conversation_type: ConversationType, role: str, content: Any) -> None:
"""
Expand Down Expand Up @@ -461,6 +516,28 @@ def set_root_output(cls, value: Any) -> None:
"""
SessionManager.set_root_output(value)

@classmethod
def set_root_output_stream(cls, value: Any) -> Any:
"""
Wrap a stream so the accumulated output is set on the root span when iteration ends.

The returned object is a transparent proxy — iterate over it instead of the original::

stream = Netra.set_root_output_stream(stream)
for chunk in stream:
...

Supports both sync and async iterables. Returns *value* unchanged if no active trace
context exists or if *value* is not iterable.

Args:
value: The stream to wrap (Netra-instrumented or any generic iterable).

Returns:
A wrapped stream proxy, or *value* unchanged if wrapping is not possible.
"""
return SessionManager.set_root_output_stream(value)

@classmethod
def start_span(
cls,
Expand Down
4 changes: 4 additions & 0 deletions netra/client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from netra.client.base_client import BaseHttpClient
from netra.client.pagination import parse_paginated_response

__all__ = ["BaseHttpClient", "parse_paginated_response"]
179 changes: 179 additions & 0 deletions netra/client/base_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import logging
import os
from typing import Any, Dict, Optional

import httpx

from netra.config import Config

logger = logging.getLogger(__name__)

TELEMETRY_SUFFIX = "/telemetry"


class BaseHttpClient:
"""Base HTTP client providing common setup for all Netra API clients.

Subclasses must define:
_LOG_PREFIX: str — module-specific log prefix (e.g. "netra.dashboard").
_ENV_TIMEOUT: str — env var name for timeout override.
_DEFAULT_TIMEOUT: float — default timeout in seconds.

Attributes:
_client: The underlying httpx client instance.
"""

__slots__ = ("_client",)

_LOG_PREFIX: str = "netra"
_ENV_TIMEOUT: str = ""
_DEFAULT_TIMEOUT: float = 10.0

def __init__(self, config: Config) -> None:
"""Initialize the HTTP client.

Args:
config: The Netra configuration object.
"""
self._client: Optional[httpx.Client] = self._create_client(config)

def close(self) -> None:
"""Close the underlying HTTP client and release connection resources."""
if self._client:
try:
self._client.close()
except Exception:
logger.exception("%s: Error closing HTTP client", self._LOG_PREFIX)
finally:
self._client = None

def _ensure_client(self) -> Optional[httpx.Client]:
"""Return the underlying client, logging an error if it is not initialized.

Returns:
The httpx client, or None if not available.
"""
if not self._client:
logger.error("%s: Client not initialized", self._LOG_PREFIX)
return self._client

def _create_client(self, config: Config) -> Optional[httpx.Client]:
"""Create and configure the HTTP client.

Args:
config: The Netra configuration object.

Returns:
Configured httpx client or None if creation fails.
"""
endpoint = (config.otlp_endpoint or "").strip()
if not endpoint:
logger.error("%s: NETRA_OTLP_ENDPOINT is required", self._LOG_PREFIX)
return None

base_url = self._resolve_base_url(endpoint)
headers = self._build_headers(config)
timeout = self._parse_env_float(self._ENV_TIMEOUT, self._DEFAULT_TIMEOUT)

try:
return httpx.Client(base_url=base_url, headers=headers, timeout=timeout)
except Exception:
logger.exception("%s: Failed to create HTTP client", self._LOG_PREFIX)
return None

def _resolve_base_url(self, endpoint: str) -> str:
"""Extract base URL, removing telemetry suffix if present.

Args:
endpoint: The raw endpoint URL.

Returns:
The cleaned base URL.
"""
base_url = endpoint.rstrip("/")
if base_url.endswith(TELEMETRY_SUFFIX):
base_url = base_url[: -len(TELEMETRY_SUFFIX)]
return base_url

def _build_headers(self, config: Config) -> Dict[str, str]:
"""Build request headers from configuration.

Args:
config: The Netra configuration object.

Returns:
Dictionary of HTTP headers.
"""
headers: Dict[str, str] = dict(config.headers or {})
if config.api_key:
headers["x-api-key"] = config.api_key
return headers

def _extract_error_message(
self,
response: Optional[httpx.Response],
exc: Exception,
) -> Any:
"""Extract error message from response or exception.

Args:
response: The HTTP response object, if available.
exc: The exception that was raised.

Returns:
A descriptive error message string.
"""
if response is not None:
try:
response_json = response.json()
if isinstance(response_json, dict):
error_data = response_json.get("error", {})
if isinstance(error_data, dict):
return error_data.get("message", str(exc))
except Exception:
logger.exception(
"%s: Could not parse error from response body",
self._LOG_PREFIX,
)
return str(exc)

@staticmethod
def _parse_env_float(env_var: str, default: float) -> float:
"""Read an environment variable and parse it as a positive float.

Values that are zero or negative are treated as invalid and the
default is returned instead.

Args:
env_var: Name of the environment variable.
default: Value to return when the variable is unset or invalid.

Returns:
The parsed float (> 0), or default on failure.
"""
if not env_var:
return default
raw = os.getenv(env_var)
if not raw:
return default
try:
value = float(raw)
except ValueError:
logger.exception(
"Invalid value '%s' for %s, using default %.1f",
raw,
env_var,
default,
)
return default

if value <= 0:
logger.exception(
"Timeout value must be positive, got %.1f for %s; using default %.1f",
value,
env_var,
default,
)
return default

return value
54 changes: 54 additions & 0 deletions netra/client/pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging
from typing import Any, Dict, List, Optional, Tuple

logger = logging.getLogger(__name__)


def _empty_page() -> Tuple[List[Dict[str, Any]], bool, Optional[str]]:
"""Return a fresh empty-page tuple to avoid shared mutable state.

Returns:
A tuple of ([], False, None).
"""
return ([], False, None)


def parse_paginated_response(
result: Dict[str, Any],
items_key: str = "data",
) -> Tuple[List[Dict[str, Any]], bool, Optional[str]]:
"""Parse a standard paginated API response into items, has_next_page, and next_cursor.

Args:
result: The raw JSON response dict from the API.
items_key: Key within "data" that holds the list of items.

Returns:
A tuple of (items, has_next_page, next_cursor).
"""
data_block = result.get("data")
if data_block is None:
return _empty_page()

if not isinstance(data_block, dict):
logger.error("netra: Unexpected paginated response shape; 'data' is not a dict")
return _empty_page()

items = data_block.get(items_key, [])
if items is None:
items = []

if not isinstance(items, list):
logger.error("netra: Unexpected paginated response shape; '%s' is not a list", items_key)
return _empty_page()

page_info = data_block.get("pageInfo", {}) or {}
has_next_page = bool(page_info.get("hasNextPage", False)) if isinstance(page_info, dict) else False

next_cursor: Optional[str] = None
if items:
last_item = items[-1]
if isinstance(last_item, dict):
next_cursor = last_item.get("cursor")

return items, has_next_page, next_cursor
Loading