Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ObservationTypeLiteralNoEvent,
get_observation_types_list,
)
from langfuse._utils.control_flow import get_error_level
from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)
Expand Down Expand Up @@ -302,7 +303,8 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
return result
except (Exception, asyncio.CancelledError) as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e) or type(e).__name__
level=get_error_level(e),
status_message=str(e) or type(e).__name__,
)

raise e
Expand Down Expand Up @@ -393,7 +395,8 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
return result
except (Exception, asyncio.CancelledError) as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e) or type(e).__name__
level=get_error_level(e),
status_message=str(e) or type(e).__name__,
)

raise e
Expand Down Expand Up @@ -591,7 +594,8 @@ def _finalize_with_error(self, error: BaseException) -> None:
return

self.span.update(
level="ERROR", status_message=str(error) or type(error).__name__
level=get_error_level(error),
status_message=str(error) or type(error).__name__,
).end()
self._span_ended = True

Expand Down Expand Up @@ -686,7 +690,8 @@ def _finalize_with_error(self, error: BaseException) -> None:
return

self.span.update(
level="ERROR", status_message=str(error) or type(error).__name__
level=get_error_level(error),
status_message=str(error) or type(error).__name__,
).end()
self._span_ended = True

Expand Down
30 changes: 30 additions & 0 deletions langfuse/_utils/control_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Utilities for detecting control-flow exceptions that should not be marked as errors.

LangGraph uses GraphBubbleUp subclasses (e.g. GraphInterrupt, NodeInterrupt) for
expected control flow such as human-in-the-loop interrupts and handoffs. These are
not real errors and should be recorded at "DEFAULT" level in Langfuse, not "ERROR".
"""

from typing import Literal, Set, Type

CONTROL_FLOW_EXCEPTION_TYPES: Set[Type[BaseException]] = set()

try:
from langgraph.errors import GraphBubbleUp

CONTROL_FLOW_EXCEPTION_TYPES.add(GraphBubbleUp)
except ImportError:
pass


def get_error_level(
error: BaseException,
) -> Literal["DEFAULT", "ERROR"]:
"""Return the appropriate Langfuse observation level for *error*.

Returns ``"DEFAULT"`` for known control-flow exceptions (e.g.
``GraphInterrupt``) and ``"ERROR"`` for everything else.
"""
if any(isinstance(error, t) for t in CONTROL_FLOW_EXCEPTION_TYPES):
return "DEFAULT"
return "ERROR"
16 changes: 6 additions & 10 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,11 @@
)

LANGSMITH_TAG_HIDDEN: str = "langsmith:hidden"
CONTROL_FLOW_EXCEPTION_TYPES: Set[Type[BaseException]] = set()
LANGGRAPH_COMMAND_TYPE: Optional[Type[Any]] = None
MAX_PENDING_RESUME_TRACE_CONTEXTS = 1024

try:
from langgraph.errors import GraphBubbleUp
from langfuse._utils.control_flow import CONTROL_FLOW_EXCEPTION_TYPES, get_error_level

CONTROL_FLOW_EXCEPTION_TYPES.add(GraphBubbleUp)
except ImportError:
pass
LANGGRAPH_COMMAND_TYPE: Optional[Type[Any]] = None

try:
from langgraph.types import Command as LangGraphCommand
Expand Down Expand Up @@ -358,10 +353,11 @@ def _get_error_level_and_status_message(
) -> tuple[Literal["DEFAULT", "ERROR"], str]:
# LangGraph uses GraphBubbleUp subclasses for expected control flow such as
# interrupts and handoffs, so they should stay visible without being errors.
if any(isinstance(error, t) for t in CONTROL_FLOW_EXCEPTION_TYPES):
return "DEFAULT", str(error) or type(error).__name__
level = get_error_level(error)
if level == "DEFAULT":
return level, str(error) or type(error).__name__

return "ERROR", str(error)
return level, str(error)

def _get_observation_type_from_serialized(
self, serialized: Optional[Dict[str, Any]], callback_type: str, **kwargs: Any
Expand Down