diff --git a/langfuse/_client/observe.py b/langfuse/_client/observe.py index 64882a20f..6d0ce60e0 100644 --- a/langfuse/_client/observe.py +++ b/langfuse/_client/observe.py @@ -26,6 +26,7 @@ ObservationTypeLiteralNoEvent, get_observation_types_list, ) +from langfuse._utils.control_flow import get_error_level from langfuse._client.environment_variables import ( LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED, ) @@ -302,7 +303,8 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any: return result except (Exception, asyncio.CancelledError) as e: langfuse_span_or_generation.update( - level="ERROR", status_message=str(e) or type(e).__name__ + level=get_error_level(e), + status_message=str(e) or type(e).__name__, ) raise e @@ -393,7 +395,8 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: return result except (Exception, asyncio.CancelledError) as e: langfuse_span_or_generation.update( - level="ERROR", status_message=str(e) or type(e).__name__ + level=get_error_level(e), + status_message=str(e) or type(e).__name__, ) raise e @@ -591,7 +594,8 @@ def _finalize_with_error(self, error: BaseException) -> None: return self.span.update( - level="ERROR", status_message=str(error) or type(error).__name__ + level=get_error_level(error), + status_message=str(error) or type(error).__name__, ).end() self._span_ended = True @@ -686,7 +690,8 @@ def _finalize_with_error(self, error: BaseException) -> None: return self.span.update( - level="ERROR", status_message=str(error) or type(error).__name__ + level=get_error_level(error), + status_message=str(error) or type(error).__name__, ).end() self._span_ended = True diff --git a/langfuse/_utils/control_flow.py b/langfuse/_utils/control_flow.py new file mode 100644 index 000000000..eb1e68c4b --- /dev/null +++ b/langfuse/_utils/control_flow.py @@ -0,0 +1,30 @@ +"""Utilities for detecting control-flow exceptions that should not be marked as errors. + +LangGraph uses GraphBubbleUp subclasses (e.g. GraphInterrupt, NodeInterrupt) for +expected control flow such as human-in-the-loop interrupts and handoffs. These are +not real errors and should be recorded at "DEFAULT" level in Langfuse, not "ERROR". +""" + +from typing import Literal, Set, Type + +CONTROL_FLOW_EXCEPTION_TYPES: Set[Type[BaseException]] = set() + +try: + from langgraph.errors import GraphBubbleUp + + CONTROL_FLOW_EXCEPTION_TYPES.add(GraphBubbleUp) +except ImportError: + pass + + +def get_error_level( + error: BaseException, +) -> Literal["DEFAULT", "ERROR"]: + """Return the appropriate Langfuse observation level for *error*. + + Returns ``"DEFAULT"`` for known control-flow exceptions (e.g. + ``GraphInterrupt``) and ``"ERROR"`` for everything else. + """ + if any(isinstance(error, t) for t in CONTROL_FLOW_EXCEPTION_TYPES): + return "DEFAULT" + return "ERROR" diff --git a/langfuse/langchain/CallbackHandler.py b/langfuse/langchain/CallbackHandler.py index 1349f6ae0..5660b34a2 100644 --- a/langfuse/langchain/CallbackHandler.py +++ b/langfuse/langchain/CallbackHandler.py @@ -85,16 +85,11 @@ ) LANGSMITH_TAG_HIDDEN: str = "langsmith:hidden" -CONTROL_FLOW_EXCEPTION_TYPES: Set[Type[BaseException]] = set() -LANGGRAPH_COMMAND_TYPE: Optional[Type[Any]] = None MAX_PENDING_RESUME_TRACE_CONTEXTS = 1024 -try: - from langgraph.errors import GraphBubbleUp +from langfuse._utils.control_flow import CONTROL_FLOW_EXCEPTION_TYPES, get_error_level - CONTROL_FLOW_EXCEPTION_TYPES.add(GraphBubbleUp) -except ImportError: - pass +LANGGRAPH_COMMAND_TYPE: Optional[Type[Any]] = None try: from langgraph.types import Command as LangGraphCommand @@ -358,10 +353,11 @@ def _get_error_level_and_status_message( ) -> tuple[Literal["DEFAULT", "ERROR"], str]: # LangGraph uses GraphBubbleUp subclasses for expected control flow such as # interrupts and handoffs, so they should stay visible without being errors. - if any(isinstance(error, t) for t in CONTROL_FLOW_EXCEPTION_TYPES): - return "DEFAULT", str(error) or type(error).__name__ + level = get_error_level(error) + if level == "DEFAULT": + return level, str(error) or type(error).__name__ - return "ERROR", str(error) + return level, str(error) def _get_observation_type_from_serialized( self, serialized: Optional[Dict[str, Any]], callback_type: str, **kwargs: Any