Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions src/agents/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
# when such Node is propagated with or for interoperation with KNIME.
# ------------------------------------------------------------------------

from langchain_core.messages import AIMessage
from dataclasses import dataclass
from typing import Protocol, Sequence
from langchain_core.messages import BaseMessage, AIMessage, ToolMessage
from langchain_core.language_models.chat_models import BaseChatModel


LANGGRAPH_RECURSION_MESSAGE = "Sorry, need more steps to process this request."
RECURSION_CONTINUE_PROMPT = (
Expand Down Expand Up @@ -85,5 +89,81 @@ def validate_ai_message(msg: AIMessage):
)


class RecursionError(RuntimeError):
class Conversation(Protocol):
def append_messages(self, messages): ...

def append_error(self, error): ...

def get_messages(self): ...


class Toolset(Protocol):
# need to be openai compatible
@property
def tools(self): ...

def execute(self, tool_calls) -> list[ToolMessage]: ...


class Context(Protocol):
def is_cancelled(self) -> bool: ...


@dataclass
class AgentConfig:
iteration_limit: int = 10


class IterationLimitError(RuntimeError):
pass


class CancelError(RuntimeError):
pass


class Agent:
def __init__(
self,
conversation: Conversation,
llm: BaseChatModel,
toolset: Toolset,
config: AgentConfig,
):
tools = toolset.tools
if tools:
self._agent = llm.bind_tools(toolset.tools)
else:
self._agent = llm
self._conversation = conversation
self._config = config
self._toolset = toolset

def run(self):
"""Run the agents turn in the conversation."""
for _ in range(self._config.iteration_limit):
try:
response = self._agent.invoke(self._conversation.get_messages())
except Exception as error:
self._conversation.append_error(error)
return
self._append_messages(response)

if response.tool_calls:
try:
results = self._toolset.execute(response.tool_calls)
except Exception as error:
self._conversation.append_error(error)
return
self._append_messages(results)
else:
return

raise IterationLimitError("Reached iteration limit")

def _append_messages(self, messages: Sequence[BaseMessage] | BaseMessage):
if not isinstance(
messages, Sequence
): # TODO double check that's done correctly
messages = [messages]
self._conversation.append_messages(messages)
Loading