From 2bb79534ee2a117f15b7ed3e266493eb4d0847f2 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 9 Apr 2026 17:10:40 +0200 Subject: [PATCH 01/11] Add mid-turn message steering for running agent sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses #2223. Allow API clients to inject user messages into an active agent session without waiting for the current turn to finish. This is a common pattern in agentic coding tools where the user can steer or provide follow-up context while the agent is executing tool calls. New API endpoint: POST /sessions/:id/steer Runtime changes: - SteeredMessage type + buffered channel on LocalRuntime - Steer() enqueues, DrainSteeredMessages() batch-drains - Agent loop injects steered messages after tool execution and before the stop-condition check; emits user_message events so clients know when the LLM actually picks them up - Messages wrapped in tags for clear LLM attribution Server changes: - POST /sessions/:id/steer endpoint (202 Accepted) - SteerSession() on SessionManager with GetLocalRuntime() helper for PersistentRuntime unwrapping - Concurrent stream guard on RunSession (rejects if already streaming) - Proper defer ordering: streaming flag cleared before channel close No behavioral change to the TUI — the existing client-side message queue continues to work as before. The TUI can adopt mid-turn steering in a future change by calling LocalRuntime.Steer() directly. --- pkg/api/types.go | 7 ++++ pkg/runtime/loop.go | 33 +++++++++++++++-- pkg/runtime/persistent_runtime.go | 15 ++++++++ pkg/runtime/runtime.go | 47 ++++++++++++++++++++++++ pkg/server/server.go | 20 +++++++++++ pkg/server/session_manager.go | 59 +++++++++++++++++++++++++++---- 6 files changed, 173 insertions(+), 8 deletions(-) diff --git a/pkg/api/types.go b/pkg/api/types.go index 90f943421..bde812a8c 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -160,6 +160,13 @@ type ResumeElicitationRequest struct { Content map[string]any `json:"content"` // The submitted form data (only present when action is "accept") } +// SteerSessionRequest represents a request to inject user messages into a +// running agent session. The messages are picked up by the agent loop between +// tool execution and the next LLM call. +type SteerSessionRequest struct { + Messages []Message `json:"messages"` +} + // UpdateSessionTitleRequest represents a request to update a session's title type UpdateSessionTitleRequest struct { Title string `json:"title"` diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 44fbe00b9..289d0ba36 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -386,13 +386,42 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c // Record per-toolset model override for the next LLM turn. toolModelOverride = resolveToolCallModelOverride(res.Calls, agentTools) + // Only compact proactively when the model will continue (has + // tool calls to process on the next turn). If the model stopped + // and no steered messages override that, compaction is wasteful + // because no further LLM call follows. + if !res.Stopped { + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + } + + // Drain any steered (mid-turn) user messages that arrived while + // the current iteration was in progress. Injecting them here — + // after tool execution, before the stop check — ensures the LLM + // sees the new messages on the next iteration via GetMessages(). + if steered := r.DrainSteeredMessages(); len(steered) > 0 { + for _, sm := range steered { + wrapped := fmt.Sprintf( + "\nThe user sent the following message while you were working:\n%s\n\nPlease address this in your next response while continuing with your current tasks.\n", + sm.Content, + ) + userMsg := session.UserMessage(wrapped, sm.MultiContent...) + sess.AddMessage(userMsg) + events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1) + } + + // Force the loop to continue — the model must respond to + // the injected messages even if it was about to stop. + res.Stopped = false + + // Now that the loop will continue, compact if needed. + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + } + if res.Stopped { slog.Debug("Conversation stopped", "agent", a.Name()) r.executeStopHooks(ctx, sess, a, res.Content, events) break } - - r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) } }() diff --git a/pkg/runtime/persistent_runtime.go b/pkg/runtime/persistent_runtime.go index c6b704da7..dc19fe4b8 100644 --- a/pkg/runtime/persistent_runtime.go +++ b/pkg/runtime/persistent_runtime.go @@ -25,6 +25,21 @@ type streamingState struct { messageID int64 // ID of the current streaming message (0 if none) } +// GetLocalRuntime extracts the underlying *LocalRuntime from a Runtime +// implementation. It handles both *LocalRuntime and *PersistentRuntime +// (which embeds *LocalRuntime). Returns nil if the runtime type is not +// supported (e.g. RemoteRuntime). +func GetLocalRuntime(rt Runtime) *LocalRuntime { + switch r := rt.(type) { + case *LocalRuntime: + return r + case *PersistentRuntime: + return r.LocalRuntime + default: + return nil + } +} + // New creates a new runtime for an agent and its team. // The runtime automatically persists session changes to the configured store. // Returns a Runtime interface which wraps LocalRuntime with persistence handling. diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index f6a4de8bc..993193771 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -80,6 +80,18 @@ func ResumeReject(reason string) ResumeRequest { return ResumeRequest{Type: ResumeTypeReject, Reason: reason} } +// SteeredMessage is a user message injected mid-turn while the agent loop is +// running. It is enqueued via Steer() and drained inside the loop between +// tool execution and the stop-condition check. +type SteeredMessage struct { + Content string + MultiContent []chat.MessagePart +} + +// maxSteeredMessages is the maximum number of steered messages that can be +// buffered before Steer() starts rejecting new messages. +const maxSteeredMessages = 5 + // ToolHandlerFunc is a function type for handling tool calls type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) @@ -201,6 +213,12 @@ type LocalRuntime struct { currentAgentMu sync.RWMutex + // steerCh receives user messages injected mid-turn via Steer(). + // The agent loop drains this channel after tool execution, before + // checking the stop condition, so the LLM sees the new message on + // its next iteration. + steerCh chan SteeredMessage + // onToolsChanged is called when an MCP toolset reports a tool list change. onToolsChanged func(Event) @@ -291,6 +309,7 @@ func NewLocalRuntime(agents *team.Team, opts ...Opt) (*LocalRuntime, error) { currentAgent: defaultAgent.Name(), resumeChan: make(chan ResumeRequest), elicitationRequestCh: make(chan ElicitationResult), + steerCh: make(chan SteeredMessage, maxSteeredMessages), sessionCompaction: true, managedOAuth: true, sessionStore: session.NewInMemorySessionStore(), @@ -1015,6 +1034,34 @@ func (r *LocalRuntime) ResumeElicitation(ctx context.Context, action tools.Elici } } +// Steer enqueues a user message for mid-turn injection into the running +// agent loop. The message will be picked up after the current batch of tool +// calls finishes but before the loop checks whether to stop. Returns false +// if the steer buffer is full and the message was not enqueued. +func (r *LocalRuntime) Steer(msg SteeredMessage) bool { + select { + case r.steerCh <- msg: + return true + default: + return false + } +} + +// DrainSteeredMessages returns all pending steered messages without blocking. +// It is called inside the agent loop to batch-inject any messages that arrived +// while the current iteration was in progress. +func (r *LocalRuntime) DrainSteeredMessages() []SteeredMessage { + var msgs []SteeredMessage + for { + select { + case m := <-r.steerCh: + msgs = append(msgs, m) + default: + return msgs + } + } +} + // Run starts the agent's interaction loop func (r *LocalRuntime) startSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { diff --git a/pkg/server/server.go b/pkg/server/server.go index b9cf3b626..e8fc5a6a1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -62,6 +62,8 @@ func New(ctx context.Context, sessionStore session.Store, runConfig *config.Runt group.POST("/sessions/:id/agent/:agent", s.runAgent) group.POST("/sessions/:id/agent/:agent/:agent_name", s.runAgent) group.POST("/sessions/:id/elicitation", s.elicitation) + // Steer: inject user messages into a running agent session mid-turn + group.POST("/sessions/:id/steer", s.steerSession) // Agent tool count group.GET("/agents/:id/:agent_name/tools/count", s.getAgentToolCount) @@ -317,3 +319,21 @@ func (s *Server) elicitation(c echo.Context) error { return c.JSON(http.StatusOK, nil) } + +func (s *Server) steerSession(c echo.Context) error { + sessionID := c.Param("id") + var req api.SteerSessionRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + } + + if len(req.Messages) == 0 { + return echo.NewHTTPError(http.StatusBadRequest, "at least one message is required") + } + + if err := s.sm.SteerSession(c.Request().Context(), sessionID, req.Messages); err != nil { + return echo.NewHTTPError(http.StatusConflict, fmt.Sprintf("failed to steer session: %v", err)) + } + + return c.JSON(http.StatusAccepted, map[string]string{"status": "queued"}) +} diff --git a/pkg/server/session_manager.go b/pkg/server/session_manager.go index 6c26d3b58..9b4172048 100644 --- a/pkg/server/session_manager.go +++ b/pkg/server/session_manager.go @@ -23,10 +23,11 @@ import ( ) type activeRuntimes struct { - runtime runtime.Runtime - cancel context.CancelFunc - session *session.Session // The actual session object used by the runtime - titleGen *sessiontitle.Generator // Title generator (includes fallback models) + runtime runtime.Runtime + cancel context.CancelFunc + session *session.Session // The actual session object used by the runtime + titleGen *sessiontitle.Generator // Title generator (includes fallback models) + streaming bool // True while RunStream is active; prevents concurrent runs } // SessionManager manages sessions for HTTP and Connect-RPC servers. @@ -160,6 +161,14 @@ func (sm *SessionManager) RunSession(ctx context.Context, sessionID, agentFilena } runtimeSession, exists := sm.runtimeSessions.Load(sessionID) + + // Reject if a stream is already active for this session. The caller + // should use POST /sessions/:id/steer to inject follow-up messages + // into a running session instead of starting a second concurrent stream. + if exists && runtimeSession.streaming { + return nil, errors.New("session is already streaming; use /steer to send follow-up messages") + } + streamCtx, cancel := context.WithCancel(ctx) var titleGen *sessiontitle.Generator if !exists { @@ -182,6 +191,8 @@ func (sm *SessionManager) RunSession(ctx context.Context, sessionID, agentFilena titleGen = runtimeSession.titleGen } + runtimeSession.streaming = true + streamChan := make(chan runtime.Event) // Check if we need to generate a title @@ -194,8 +205,17 @@ func (sm *SessionManager) RunSession(ctx context.Context, sessionID, agentFilena } stream := runtimeSession.runtime.RunStream(streamCtx, sess) - defer cancel() - defer close(streamChan) + // Single defer to control ordering: clear the streaming flag + // BEFORE closing streamChan. When the client sees the channel + // close it may immediately call RunSession for the next queued + // message; streaming must already be false by then. + defer func() { + sm.mux.Lock() + runtimeSession.streaming = false + sm.mux.Unlock() + close(streamChan) + cancel() + }() for event := range stream { if streamCtx.Err() != nil { return @@ -230,6 +250,33 @@ func (sm *SessionManager) ResumeSession(ctx context.Context, sessionID, confirma return nil } +// SteerSession enqueues user messages for mid-turn injection into a running +// session. The messages are picked up by the agent loop after the current tool +// calls finish but before the next LLM call. Returns an error if the session +// is not actively running or if the steer buffer is full. +func (sm *SessionManager) SteerSession(_ context.Context, sessionID string, messages []api.Message) error { + rt, exists := sm.runtimeSessions.Load(sessionID) + if !exists { + return errors.New("session not found or not running") + } + + localRT := runtime.GetLocalRuntime(rt.runtime) + if localRT == nil { + return errors.New("steering not supported for this runtime type") + } + + for _, msg := range messages { + if !localRT.Steer(runtime.SteeredMessage{ + Content: msg.Content, + MultiContent: msg.MultiContent, + }) { + return errors.New("steer queue full") + } + } + + return nil +} + // ResumeElicitation resumes an elicitation request. func (sm *SessionManager) ResumeElicitation(ctx context.Context, sessionID, action string, content map[string]any) error { sm.mux.Lock() From 717c63afa8eb10538c4c01dafe3b7d98c633aa51 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Thu, 9 Apr 2026 17:36:24 +0200 Subject: [PATCH 02/11] Extract SteerQueue interface for pluggable storage Introduce a SteerQueue interface (Enqueue/Drain) so that callers can provide their own storage implementation for steered messages. The default InMemorySteerQueue uses a buffered channel and is created automatically. Custom implementations can be injected via the WithSteerQueue option on LocalRuntime. --- pkg/runtime/runtime.go | 94 ++++++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 26 deletions(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 993193771..90a520cb7 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -81,16 +81,64 @@ func ResumeReject(reason string) ResumeRequest { } // SteeredMessage is a user message injected mid-turn while the agent loop is -// running. It is enqueued via Steer() and drained inside the loop between +// running. It is enqueued via a SteerQueue and drained inside the loop between // tool execution and the stop-condition check. type SteeredMessage struct { Content string MultiContent []chat.MessagePart } -// maxSteeredMessages is the maximum number of steered messages that can be -// buffered before Steer() starts rejecting new messages. -const maxSteeredMessages = 5 +// SteerQueue is the interface for storing steered messages that are injected +// into a running agent loop mid-turn. Implementations must be safe for +// concurrent use: Enqueue is called from API handlers while Drain is called +// from the agent loop goroutine. +// +// The default implementation is InMemorySteerQueue. Callers that need +// durable or distributed storage can provide their own implementation +// via the WithSteerQueue option. +type SteerQueue interface { + // Enqueue adds a message to the queue. Returns false if the queue is + // full and the message was not accepted. + Enqueue(msg SteeredMessage) bool + // Drain returns all pending messages and removes them from the queue. + // It must not block — if the queue is empty it returns nil. + Drain() []SteeredMessage +} + +// inMemorySteerQueue is the default SteerQueue backed by a buffered channel. +type inMemorySteerQueue struct { + ch chan SteeredMessage +} + +// defaultSteerQueueCapacity is the buffer size for the default in-memory queue. +const defaultSteerQueueCapacity = 5 + +// NewInMemorySteerQueue creates a SteerQueue backed by a buffered channel +// with the given capacity. +func NewInMemorySteerQueue(capacity int) SteerQueue { + return &inMemorySteerQueue{ch: make(chan SteeredMessage, capacity)} +} + +func (q *inMemorySteerQueue) Enqueue(msg SteeredMessage) bool { + select { + case q.ch <- msg: + return true + default: + return false + } +} + +func (q *inMemorySteerQueue) Drain() []SteeredMessage { + var msgs []SteeredMessage + for { + select { + case m := <-q.ch: + msgs = append(msgs, m) + default: + return msgs + } + } +} // ToolHandlerFunc is a function type for handling tool calls type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) @@ -213,11 +261,10 @@ type LocalRuntime struct { currentAgentMu sync.RWMutex - // steerCh receives user messages injected mid-turn via Steer(). - // The agent loop drains this channel after tool execution, before - // checking the stop condition, so the LLM sees the new message on - // its next iteration. - steerCh chan SteeredMessage + // steerQueue stores user messages injected mid-turn. The agent loop + // drains this queue after tool execution, before checking the stop + // condition, so the LLM sees the new messages on its next iteration. + steerQueue SteerQueue // onToolsChanged is called when an MCP toolset reports a tool list change. onToolsChanged func(Event) @@ -246,6 +293,14 @@ func WithTracer(t trace.Tracer) Opt { } } +// WithSteerQueue sets a custom SteerQueue implementation for mid-turn message +// injection. If not provided, an in-memory buffered queue is used. +func WithSteerQueue(q SteerQueue) Opt { + return func(r *LocalRuntime) { + r.steerQueue = q + } +} + func WithSessionCompaction(sessionCompaction bool) Opt { return func(r *LocalRuntime) { r.sessionCompaction = sessionCompaction @@ -309,7 +364,7 @@ func NewLocalRuntime(agents *team.Team, opts ...Opt) (*LocalRuntime, error) { currentAgent: defaultAgent.Name(), resumeChan: make(chan ResumeRequest), elicitationRequestCh: make(chan ElicitationResult), - steerCh: make(chan SteeredMessage, maxSteeredMessages), + steerQueue: NewInMemorySteerQueue(defaultSteerQueueCapacity), sessionCompaction: true, managedOAuth: true, sessionStore: session.NewInMemorySessionStore(), @@ -1037,29 +1092,16 @@ func (r *LocalRuntime) ResumeElicitation(ctx context.Context, action tools.Elici // Steer enqueues a user message for mid-turn injection into the running // agent loop. The message will be picked up after the current batch of tool // calls finishes but before the loop checks whether to stop. Returns false -// if the steer buffer is full and the message was not enqueued. +// if the queue is full and the message was not enqueued. func (r *LocalRuntime) Steer(msg SteeredMessage) bool { - select { - case r.steerCh <- msg: - return true - default: - return false - } + return r.steerQueue.Enqueue(msg) } // DrainSteeredMessages returns all pending steered messages without blocking. // It is called inside the agent loop to batch-inject any messages that arrived // while the current iteration was in progress. func (r *LocalRuntime) DrainSteeredMessages() []SteeredMessage { - var msgs []SteeredMessage - for { - select { - case m := <-r.steerCh: - msgs = append(msgs, m) - default: - return msgs - } - } + return r.steerQueue.Drain() } // Run starts the agent's interaction loop From 324352314bba0320103c8889c9cd282ebc69707e Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 07:52:46 +0200 Subject: [PATCH 03/11] Support steering for remote runtimes Add SteerSession to the RemoteClient interface and implement it on the HTTP Client (POST /sessions/:id/steer). RemoteRuntime.Steer() delegates to the remote server so the TUI works identically regardless of whether the runtime is local or remote. App.Steer() now tries GetLocalRuntime first, then falls back to the Steerer interface so both PersistentRuntime and RemoteRuntime are handled. --- pkg/app/app.go | 21 +++++++++++++++++++++ pkg/runtime/client.go | 6 ++++++ pkg/runtime/remote_client.go | 3 +++ pkg/runtime/remote_runtime.go | 13 +++++++++++++ 4 files changed, 43 insertions(+) diff --git a/pkg/app/app.go b/pkg/app/app.go index bd0637fec..03a7c38bf 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -532,6 +532,27 @@ func (a *App) SubscribeWith(ctx context.Context, send func(tea.Msg)) { } } +// Steerer is implemented by runtimes that support mid-turn message injection. +type Steerer interface { + Steer(msg runtime.SteeredMessage) bool +} + +// Steer enqueues a user message for mid-turn injection into the running +// agent loop. Works with both local runtimes (via the SteerQueue) and +// remote runtimes (via POST /sessions/:id/steer). Returns false if +// steering is not supported by the runtime or the queue is full. +func (a *App) Steer(msg runtime.SteeredMessage) bool { + // Try unwrapping PersistentRuntime → LocalRuntime first + if lr := runtime.GetLocalRuntime(a.runtime); lr != nil { + return lr.Steer(msg) + } + // Try the Steerer interface (e.g. RemoteRuntime) + if s, ok := a.runtime.(Steerer); ok { + return s.Steer(msg) + } + return false +} + // Resume resumes the runtime with the given confirmation request func (a *App) Resume(req runtime.ResumeRequest) { a.runtime.Resume(context.Background(), req) diff --git a/pkg/runtime/client.go b/pkg/runtime/client.go index 8218e4eec..544534637 100644 --- a/pkg/runtime/client.go +++ b/pkg/runtime/client.go @@ -266,6 +266,12 @@ func (c *Client) ResumeSession(ctx context.Context, id, confirmation, reason, to return c.doRequest(ctx, http.MethodPost, "/api/sessions/"+id+"/resume", req, nil) } +// SteerSession injects user messages into a running session mid-turn. +func (c *Client) SteerSession(ctx context.Context, sessionID string, messages []api.Message) error { + req := api.SteerSessionRequest{Messages: messages} + return c.doRequest(ctx, http.MethodPost, "/api/sessions/"+sessionID+"/steer", req, nil) +} + // DeleteSession deletes a session by ID func (c *Client) DeleteSession(ctx context.Context, id string) error { return c.doRequest(ctx, "DELETE", "/api/sessions/"+id, nil, nil) diff --git a/pkg/runtime/remote_client.go b/pkg/runtime/remote_client.go index c1398afaf..ece04f534 100644 --- a/pkg/runtime/remote_client.go +++ b/pkg/runtime/remote_client.go @@ -30,6 +30,9 @@ type RemoteClient interface { // RunAgentWithAgentName executes an agent with a specific agent name RunAgentWithAgentName(ctx context.Context, sessionID, agent, agentName string, messages []api.Message) (<-chan Event, error) + // SteerSession injects user messages into a running session mid-turn + SteerSession(ctx context.Context, sessionID string, messages []api.Message) error + // UpdateSessionTitle updates the title of a session UpdateSessionTitle(ctx context.Context, sessionID, title string) error diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 5f03297cd..568503fc5 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -211,6 +211,19 @@ func (r *RemoteRuntime) Run(ctx context.Context, sess *session.Session) ([]sessi return sess.GetAllMessages(), nil } +// Steer enqueues a user message for mid-turn injection into the running +// agent loop on the remote server. Returns false if the session is not +// active or the steer queue is full. +func (r *RemoteRuntime) Steer(msg SteeredMessage) bool { + if r.sessionID == "" { + return false + } + err := r.client.SteerSession(context.Background(), r.sessionID, []api.Message{ + {Content: msg.Content, MultiContent: msg.MultiContent}, + }) + return err == nil +} + // Resume allows resuming execution after user confirmation func (r *RemoteRuntime) Resume(ctx context.Context, req ResumeRequest) { slog.Debug("Resuming remote runtime", "agent", r.currentAgent, "type", req.Type, "reason", req.Reason, "tool_name", req.ToolName, "session_id", r.sessionID) From 9424a4745abbfdd5b53cbccef879740c260b8354 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 10:15:46 +0200 Subject: [PATCH 04/11] Separate steer and follow-up into two distinct queues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the two-queue design proposed by rumpl: steering (urgent mid- turn injection) and follow-up (end-of-turn, one-at-a-time processing) are fundamentally different intents that need separate queues. - Rename SteeredMessage → QueuedMessage (shared by both queues) - Replace SteerQueue with MessageQueue interface: adds context.Context to all methods, adds Dequeue (pop one) and Len - Add followUpQueue to LocalRuntime with WithFollowUpQueue option - Split agent loop: steer drains ALL mid-turn, follow-up pops ONE after stop-hooks, then continues the loop for a new turn - Follow-up messages are plain user messages (no system-reminder wrap) - Add POST /sessions/:id/followup endpoint - Add FollowUpSession to RemoteClient, Client, RemoteRuntime, and App --- pkg/app/app.go | 39 +++++++--- pkg/runtime/client.go | 6 ++ pkg/runtime/loop.go | 25 +++++-- pkg/runtime/remote_client.go | 3 + pkg/runtime/remote_runtime.go | 14 +++- pkg/runtime/runtime.go | 133 +++++++++++++++++++++++----------- pkg/server/server.go | 20 +++++ pkg/server/session_manager.go | 28 ++++++- 8 files changed, 209 insertions(+), 59 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index 03a7c38bf..396e500c1 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -532,25 +532,44 @@ func (a *App) SubscribeWith(ctx context.Context, send func(tea.Msg)) { } } -// Steerer is implemented by runtimes that support mid-turn message injection. -type Steerer interface { - Steer(msg runtime.SteeredMessage) bool +// MessageInjector is implemented by runtimes that support mid-turn steering +// and end-of-turn follow-up message injection. +type MessageInjector interface { + Steer(msg runtime.QueuedMessage) bool + FollowUp(msg runtime.QueuedMessage) bool } // Steer enqueues a user message for mid-turn injection into the running -// agent loop. Works with both local runtimes (via the SteerQueue) and +// agent loop. Works with both local runtimes (via the steer queue) and // remote runtimes (via POST /sessions/:id/steer). Returns false if // steering is not supported by the runtime or the queue is full. -func (a *App) Steer(msg runtime.SteeredMessage) bool { +func (a *App) Steer(msg runtime.QueuedMessage) bool { + if inj := a.messageInjector(); inj != nil { + return inj.Steer(msg) + } + return false +} + +// FollowUp enqueues a message for end-of-turn processing. Each follow-up +// gets a full undivided agent turn. Returns false if the runtime does not +// support follow-ups or the queue is full. +func (a *App) FollowUp(msg runtime.QueuedMessage) bool { + if inj := a.messageInjector(); inj != nil { + return inj.FollowUp(msg) + } + return false +} + +func (a *App) messageInjector() MessageInjector { // Try unwrapping PersistentRuntime → LocalRuntime first if lr := runtime.GetLocalRuntime(a.runtime); lr != nil { - return lr.Steer(msg) + return lr } - // Try the Steerer interface (e.g. RemoteRuntime) - if s, ok := a.runtime.(Steerer); ok { - return s.Steer(msg) + // Try the MessageInjector interface (e.g. RemoteRuntime) + if inj, ok := a.runtime.(MessageInjector); ok { + return inj } - return false + return nil } // Resume resumes the runtime with the given confirmation request diff --git a/pkg/runtime/client.go b/pkg/runtime/client.go index 544534637..8ade2cc45 100644 --- a/pkg/runtime/client.go +++ b/pkg/runtime/client.go @@ -272,6 +272,12 @@ func (c *Client) SteerSession(ctx context.Context, sessionID string, messages [] return c.doRequest(ctx, http.MethodPost, "/api/sessions/"+sessionID+"/steer", req, nil) } +// FollowUpSession queues messages for end-of-turn processing. +func (c *Client) FollowUpSession(ctx context.Context, sessionID string, messages []api.Message) error { + req := api.SteerSessionRequest{Messages: messages} + return c.doRequest(ctx, http.MethodPost, "/api/sessions/"+sessionID+"/followup", req, nil) +} + // DeleteSession deletes a session by ID func (c *Client) DeleteSession(ctx context.Context, id string) error { return c.doRequest(ctx, "DELETE", "/api/sessions/"+id, nil, nil) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 289d0ba36..92f4173cb 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -394,11 +394,11 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) } - // Drain any steered (mid-turn) user messages that arrived while - // the current iteration was in progress. Injecting them here — - // after tool execution, before the stop check — ensures the LLM - // sees the new messages on the next iteration via GetMessages(). - if steered := r.DrainSteeredMessages(); len(steered) > 0 { + // --- STEERING: mid-turn injection --- + // Drain ALL pending steer messages. These are urgent course- + // corrections that the model should see on the very next + // iteration, wrapped in tags. + if steered := r.DrainSteeredMessages(ctx); len(steered) > 0 { for _, sm := range steered { wrapped := fmt.Sprintf( "\nThe user sent the following message while you were working:\n%s\n\nPlease address this in your next response while continuing with your current tasks.\n", @@ -420,6 +420,21 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c if res.Stopped { slog.Debug("Conversation stopped", "agent", a.Name()) r.executeStopHooks(ctx, sess, a, res.Content, events) + + // --- FOLLOW-UP: end-of-turn injection --- + // Pop exactly one follow-up message. Unlike steered + // messages, follow-ups are plain user messages that start + // a new turn — the model sees them as fresh input, not a + // mid-stream interruption. Each follow-up gets a full + // undivided agent turn. + if followUp, ok := r.DequeueFollowUp(ctx); ok { + userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) + sess.AddMessage(userMsg) + events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) + r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + continue // re-enter the loop for a new turn + } + break } } diff --git a/pkg/runtime/remote_client.go b/pkg/runtime/remote_client.go index ece04f534..993be1468 100644 --- a/pkg/runtime/remote_client.go +++ b/pkg/runtime/remote_client.go @@ -33,6 +33,9 @@ type RemoteClient interface { // SteerSession injects user messages into a running session mid-turn SteerSession(ctx context.Context, sessionID string, messages []api.Message) error + // FollowUpSession queues messages for end-of-turn processing + FollowUpSession(ctx context.Context, sessionID string, messages []api.Message) error + // UpdateSessionTitle updates the title of a session UpdateSessionTitle(ctx context.Context, sessionID, title string) error diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 568503fc5..1ade3f6f1 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -214,7 +214,7 @@ func (r *RemoteRuntime) Run(ctx context.Context, sess *session.Session) ([]sessi // Steer enqueues a user message for mid-turn injection into the running // agent loop on the remote server. Returns false if the session is not // active or the steer queue is full. -func (r *RemoteRuntime) Steer(msg SteeredMessage) bool { +func (r *RemoteRuntime) Steer(msg QueuedMessage) bool { if r.sessionID == "" { return false } @@ -224,6 +224,18 @@ func (r *RemoteRuntime) Steer(msg SteeredMessage) bool { return err == nil } +// FollowUp enqueues a message for end-of-turn processing on the remote +// server. Returns false if the session is not active or the queue is full. +func (r *RemoteRuntime) FollowUp(msg QueuedMessage) bool { + if r.sessionID == "" { + return false + } + err := r.client.FollowUpSession(context.Background(), r.sessionID, []api.Message{ + {Content: msg.Content, MultiContent: msg.MultiContent}, + }) + return err == nil +} + // Resume allows resuming execution after user confirmation func (r *RemoteRuntime) Resume(ctx context.Context, req ResumeRequest) { slog.Debug("Resuming remote runtime", "agent", r.currentAgent, "type", req.Type, "reason", req.Reason, "tool_name", req.ToolName, "session_id", r.sessionID) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 90a520cb7..3096d453a 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -80,46 +80,57 @@ func ResumeReject(reason string) ResumeRequest { return ResumeRequest{Type: ResumeTypeReject, Reason: reason} } -// SteeredMessage is a user message injected mid-turn while the agent loop is -// running. It is enqueued via a SteerQueue and drained inside the loop between -// tool execution and the stop-condition check. -type SteeredMessage struct { +// QueuedMessage is a user message waiting to be injected into the agent loop, +// either mid-turn (via the steer queue) or at end-of-turn (via the follow-up +// queue). +type QueuedMessage struct { Content string MultiContent []chat.MessagePart } -// SteerQueue is the interface for storing steered messages that are injected -// into a running agent loop mid-turn. Implementations must be safe for -// concurrent use: Enqueue is called from API handlers while Drain is called -// from the agent loop goroutine. +// MessageQueue is the interface for storing messages that are injected into +// the agent loop. Implementations must be safe for concurrent use: Enqueue +// is called from API handlers while Dequeue/Drain are called from the agent +// loop goroutine. // -// The default implementation is InMemorySteerQueue. Callers that need +// The default implementation is NewInMemoryMessageQueue. Callers that need // durable or distributed storage can provide their own implementation -// via the WithSteerQueue option. -type SteerQueue interface { +// via the WithSteerQueue or WithFollowUpQueue options. +type MessageQueue interface { // Enqueue adds a message to the queue. Returns false if the queue is - // full and the message was not accepted. - Enqueue(msg SteeredMessage) bool + // full or the context is cancelled. + Enqueue(ctx context.Context, msg QueuedMessage) bool + // Dequeue removes and returns the next message from the queue. + // Returns the message and true, or a zero value and false if the + // queue is empty. Must not block. + Dequeue(ctx context.Context) (QueuedMessage, bool) // Drain returns all pending messages and removes them from the queue. - // It must not block — if the queue is empty it returns nil. - Drain() []SteeredMessage + // Must not block — if the queue is empty it returns nil. + Drain(ctx context.Context) []QueuedMessage + // Len returns the current number of messages in the queue. + Len(ctx context.Context) int } -// inMemorySteerQueue is the default SteerQueue backed by a buffered channel. -type inMemorySteerQueue struct { - ch chan SteeredMessage +// inMemoryMessageQueue is the default MessageQueue backed by a buffered channel. +type inMemoryMessageQueue struct { + ch chan QueuedMessage } -// defaultSteerQueueCapacity is the buffer size for the default in-memory queue. -const defaultSteerQueueCapacity = 5 +const ( + // defaultSteerQueueCapacity is the buffer size for the default in-memory steer queue. + defaultSteerQueueCapacity = 5 + // defaultFollowUpQueueCapacity is the buffer size for the default in-memory follow-up queue. + // Higher than steer because follow-ups accumulate while waiting for the turn to end. + defaultFollowUpQueueCapacity = 20 +) -// NewInMemorySteerQueue creates a SteerQueue backed by a buffered channel +// NewInMemoryMessageQueue creates a MessageQueue backed by a buffered channel // with the given capacity. -func NewInMemorySteerQueue(capacity int) SteerQueue { - return &inMemorySteerQueue{ch: make(chan SteeredMessage, capacity)} +func NewInMemoryMessageQueue(capacity int) MessageQueue { + return &inMemoryMessageQueue{ch: make(chan QueuedMessage, capacity)} } -func (q *inMemorySteerQueue) Enqueue(msg SteeredMessage) bool { +func (q *inMemoryMessageQueue) Enqueue(_ context.Context, msg QueuedMessage) bool { select { case q.ch <- msg: return true @@ -128,8 +139,17 @@ func (q *inMemorySteerQueue) Enqueue(msg SteeredMessage) bool { } } -func (q *inMemorySteerQueue) Drain() []SteeredMessage { - var msgs []SteeredMessage +func (q *inMemoryMessageQueue) Dequeue(_ context.Context) (QueuedMessage, bool) { + select { + case m := <-q.ch: + return m, true + default: + return QueuedMessage{}, false + } +} + +func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage { + var msgs []QueuedMessage for { select { case m := <-q.ch: @@ -140,6 +160,10 @@ func (q *inMemorySteerQueue) Drain() []SteeredMessage { } } +func (q *inMemoryMessageQueue) Len(_ context.Context) int { + return len(q.ch) +} + // ToolHandlerFunc is a function type for handling tool calls type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) @@ -261,10 +285,13 @@ type LocalRuntime struct { currentAgentMu sync.RWMutex - // steerQueue stores user messages injected mid-turn. The agent loop - // drains this queue after tool execution, before checking the stop - // condition, so the LLM sees the new messages on its next iteration. - steerQueue SteerQueue + // steerQueue stores urgent mid-turn messages. The agent loop drains + // ALL pending messages after tool execution, before the stop check. + steerQueue MessageQueue + + // followUpQueue stores end-of-turn messages. The agent loop pops + // exactly ONE message after the model stops and stop-hooks have run. + followUpQueue MessageQueue // onToolsChanged is called when an MCP toolset reports a tool list change. onToolsChanged func(Event) @@ -293,14 +320,22 @@ func WithTracer(t trace.Tracer) Opt { } } -// WithSteerQueue sets a custom SteerQueue implementation for mid-turn message -// injection. If not provided, an in-memory buffered queue is used. -func WithSteerQueue(q SteerQueue) Opt { +// WithSteerQueue sets a custom MessageQueue for mid-turn message injection. +// If not provided, an in-memory buffered queue is used. +func WithSteerQueue(q MessageQueue) Opt { return func(r *LocalRuntime) { r.steerQueue = q } } +// WithFollowUpQueue sets a custom MessageQueue for end-of-turn follow-up +// messages. If not provided, an in-memory buffered queue is used. +func WithFollowUpQueue(q MessageQueue) Opt { + return func(r *LocalRuntime) { + r.followUpQueue = q + } +} + func WithSessionCompaction(sessionCompaction bool) Opt { return func(r *LocalRuntime) { r.sessionCompaction = sessionCompaction @@ -364,7 +399,8 @@ func NewLocalRuntime(agents *team.Team, opts ...Opt) (*LocalRuntime, error) { currentAgent: defaultAgent.Name(), resumeChan: make(chan ResumeRequest), elicitationRequestCh: make(chan ElicitationResult), - steerQueue: NewInMemorySteerQueue(defaultSteerQueueCapacity), + steerQueue: NewInMemoryMessageQueue(defaultSteerQueueCapacity), + followUpQueue: NewInMemoryMessageQueue(defaultFollowUpQueueCapacity), sessionCompaction: true, managedOAuth: true, sessionStore: session.NewInMemorySessionStore(), @@ -1089,19 +1125,32 @@ func (r *LocalRuntime) ResumeElicitation(ctx context.Context, action tools.Elici } } -// Steer enqueues a user message for mid-turn injection into the running -// agent loop. The message will be picked up after the current batch of tool -// calls finishes but before the loop checks whether to stop. Returns false -// if the queue is full and the message was not enqueued. -func (r *LocalRuntime) Steer(msg SteeredMessage) bool { - return r.steerQueue.Enqueue(msg) +// Steer enqueues a user message for urgent mid-turn injection into the +// running agent loop. The message will be picked up after the current batch +// of tool calls finishes but before the loop checks whether to stop. +// Returns false if the queue is full. +func (r *LocalRuntime) Steer(msg QueuedMessage) bool { + return r.steerQueue.Enqueue(context.Background(), msg) } // DrainSteeredMessages returns all pending steered messages without blocking. // It is called inside the agent loop to batch-inject any messages that arrived // while the current iteration was in progress. -func (r *LocalRuntime) DrainSteeredMessages() []SteeredMessage { - return r.steerQueue.Drain() +func (r *LocalRuntime) DrainSteeredMessages(ctx context.Context) []QueuedMessage { + return r.steerQueue.Drain(ctx) +} + +// FollowUp enqueues a message to be processed after the current agent turn +// finishes. Unlike Steer, follow-ups are popped one at a time and each gets +// a full undivided agent turn. Returns false if the queue is full. +func (r *LocalRuntime) FollowUp(msg QueuedMessage) bool { + return r.followUpQueue.Enqueue(context.Background(), msg) +} + +// DequeueFollowUp pops the next follow-up message. Called by the agent loop +// after the model stops and stop-hooks have run. +func (r *LocalRuntime) DequeueFollowUp(ctx context.Context) (QueuedMessage, bool) { + return r.followUpQueue.Dequeue(ctx) } // Run starts the agent's interaction loop diff --git a/pkg/server/server.go b/pkg/server/server.go index e8fc5a6a1..ede83122e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,6 +64,8 @@ func New(ctx context.Context, sessionStore session.Store, runConfig *config.Runt group.POST("/sessions/:id/elicitation", s.elicitation) // Steer: inject user messages into a running agent session mid-turn group.POST("/sessions/:id/steer", s.steerSession) + // Follow-up: queue messages for end-of-turn processing + group.POST("/sessions/:id/followup", s.followUpSession) // Agent tool count group.GET("/agents/:id/:agent_name/tools/count", s.getAgentToolCount) @@ -337,3 +339,21 @@ func (s *Server) steerSession(c echo.Context) error { return c.JSON(http.StatusAccepted, map[string]string{"status": "queued"}) } + +func (s *Server) followUpSession(c echo.Context) error { + sessionID := c.Param("id") + var req api.SteerSessionRequest + if err := c.Bind(&req); err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + } + + if len(req.Messages) == 0 { + return echo.NewHTTPError(http.StatusBadRequest, "at least one message is required") + } + + if err := s.sm.FollowUpSession(c.Request().Context(), sessionID, req.Messages); err != nil { + return echo.NewHTTPError(http.StatusConflict, fmt.Sprintf("failed to enqueue follow-up: %v", err)) + } + + return c.JSON(http.StatusAccepted, map[string]string{"status": "queued"}) +} diff --git a/pkg/server/session_manager.go b/pkg/server/session_manager.go index 9b4172048..2d1fe6abd 100644 --- a/pkg/server/session_manager.go +++ b/pkg/server/session_manager.go @@ -266,7 +266,7 @@ func (sm *SessionManager) SteerSession(_ context.Context, sessionID string, mess } for _, msg := range messages { - if !localRT.Steer(runtime.SteeredMessage{ + if !localRT.Steer(runtime.QueuedMessage{ Content: msg.Content, MultiContent: msg.MultiContent, }) { @@ -277,6 +277,32 @@ func (sm *SessionManager) SteerSession(_ context.Context, sessionID string, mess return nil } +// FollowUpSession enqueues user messages for end-of-turn processing in a +// running session. Each message is popped one at a time after the current +// turn finishes, giving each follow-up a full undivided agent turn. +func (sm *SessionManager) FollowUpSession(_ context.Context, sessionID string, messages []api.Message) error { + rt, exists := sm.runtimeSessions.Load(sessionID) + if !exists { + return errors.New("session not found or not running") + } + + localRT := runtime.GetLocalRuntime(rt.runtime) + if localRT == nil { + return errors.New("follow-up not supported for this runtime type") + } + + for _, msg := range messages { + if !localRT.FollowUp(runtime.QueuedMessage{ + Content: msg.Content, + MultiContent: msg.MultiContent, + }) { + return errors.New("follow-up queue full") + } + } + + return nil +} + // ResumeElicitation resumes an elicitation request. func (sm *SessionManager) ResumeElicitation(ctx context.Context, sessionID, action string, content map[string]any) error { sm.mux.Lock() From eea7d5bd91039bc36d1d0f8d671368562ff37d14 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 10:23:00 +0200 Subject: [PATCH 05/11] Remove GetLocalRuntime in favor of MessageInjector interface GetLocalRuntime is no longer needed: PersistentRuntime inherits Steer() and FollowUp() from embedded *LocalRuntime, and RemoteRuntime implements them directly. All call sites now use the MessageInjector interface for dispatch, which is cleaner and doesn't require knowledge of concrete runtime wrapper types. --- pkg/app/app.go | 23 ++--------------------- pkg/runtime/persistent_runtime.go | 19 ++++++------------- pkg/server/session_manager.go | 12 ++++++------ 3 files changed, 14 insertions(+), 40 deletions(-) diff --git a/pkg/app/app.go b/pkg/app/app.go index 396e500c1..6203803c5 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -532,19 +532,12 @@ func (a *App) SubscribeWith(ctx context.Context, send func(tea.Msg)) { } } -// MessageInjector is implemented by runtimes that support mid-turn steering -// and end-of-turn follow-up message injection. -type MessageInjector interface { - Steer(msg runtime.QueuedMessage) bool - FollowUp(msg runtime.QueuedMessage) bool -} - // Steer enqueues a user message for mid-turn injection into the running // agent loop. Works with both local runtimes (via the steer queue) and // remote runtimes (via POST /sessions/:id/steer). Returns false if // steering is not supported by the runtime or the queue is full. func (a *App) Steer(msg runtime.QueuedMessage) bool { - if inj := a.messageInjector(); inj != nil { + if inj, ok := a.runtime.(runtime.MessageInjector); ok { return inj.Steer(msg) } return false @@ -554,24 +547,12 @@ func (a *App) Steer(msg runtime.QueuedMessage) bool { // gets a full undivided agent turn. Returns false if the runtime does not // support follow-ups or the queue is full. func (a *App) FollowUp(msg runtime.QueuedMessage) bool { - if inj := a.messageInjector(); inj != nil { + if inj, ok := a.runtime.(runtime.MessageInjector); ok { return inj.FollowUp(msg) } return false } -func (a *App) messageInjector() MessageInjector { - // Try unwrapping PersistentRuntime → LocalRuntime first - if lr := runtime.GetLocalRuntime(a.runtime); lr != nil { - return lr - } - // Try the MessageInjector interface (e.g. RemoteRuntime) - if inj, ok := a.runtime.(MessageInjector); ok { - return inj - } - return nil -} - // Resume resumes the runtime with the given confirmation request func (a *App) Resume(req runtime.ResumeRequest) { a.runtime.Resume(context.Background(), req) diff --git a/pkg/runtime/persistent_runtime.go b/pkg/runtime/persistent_runtime.go index dc19fe4b8..5b3cedd10 100644 --- a/pkg/runtime/persistent_runtime.go +++ b/pkg/runtime/persistent_runtime.go @@ -25,19 +25,12 @@ type streamingState struct { messageID int64 // ID of the current streaming message (0 if none) } -// GetLocalRuntime extracts the underlying *LocalRuntime from a Runtime -// implementation. It handles both *LocalRuntime and *PersistentRuntime -// (which embeds *LocalRuntime). Returns nil if the runtime type is not -// supported (e.g. RemoteRuntime). -func GetLocalRuntime(rt Runtime) *LocalRuntime { - switch r := rt.(type) { - case *LocalRuntime: - return r - case *PersistentRuntime: - return r.LocalRuntime - default: - return nil - } +// MessageInjector is implemented by runtimes that support mid-turn steering +// and end-of-turn follow-up message injection. Both LocalRuntime (and +// PersistentRuntime via embedding) and RemoteRuntime satisfy this interface. +type MessageInjector interface { + Steer(msg QueuedMessage) bool + FollowUp(msg QueuedMessage) bool } // New creates a new runtime for an agent and its team. diff --git a/pkg/server/session_manager.go b/pkg/server/session_manager.go index 2d1fe6abd..c0f4e1f1e 100644 --- a/pkg/server/session_manager.go +++ b/pkg/server/session_manager.go @@ -260,13 +260,13 @@ func (sm *SessionManager) SteerSession(_ context.Context, sessionID string, mess return errors.New("session not found or not running") } - localRT := runtime.GetLocalRuntime(rt.runtime) - if localRT == nil { + inj, ok := rt.runtime.(runtime.MessageInjector) + if !ok { return errors.New("steering not supported for this runtime type") } for _, msg := range messages { - if !localRT.Steer(runtime.QueuedMessage{ + if !inj.Steer(runtime.QueuedMessage{ Content: msg.Content, MultiContent: msg.MultiContent, }) { @@ -286,13 +286,13 @@ func (sm *SessionManager) FollowUpSession(_ context.Context, sessionID string, m return errors.New("session not found or not running") } - localRT := runtime.GetLocalRuntime(rt.runtime) - if localRT == nil { + inj, ok := rt.runtime.(runtime.MessageInjector) + if !ok { return errors.New("follow-up not supported for this runtime type") } for _, msg := range messages { - if !localRT.FollowUp(runtime.QueuedMessage{ + if !inj.FollowUp(runtime.QueuedMessage{ Content: msg.Content, MultiContent: msg.MultiContent, }) { From 45f220b2d31fe2a1cf36909cfb26ecf3082979b6 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 13:03:58 +0200 Subject: [PATCH 06/11] Add Lock + Confirm/Cancel semantics to MessageQueue Add Confirm and Cancel methods to the MessageQueue interface so that persistent queue implementations can use a transactional dequeue pattern: Dequeue locks a message (in-flight), Confirm permanently removes it after sess.AddMessage succeeds, Cancel releases it back to the queue on failure. This prevents message loss when the process crashes or the context is cancelled between dequeue and session persistence. The in-memory implementation treats Confirm/Cancel as no-ops since the message is already consumed from the channel on Dequeue. The agent loop now calls Confirm after successfully adding a follow-up message to the session. Drain (used for steer messages) auto-confirms all messages in a batch. --- pkg/runtime/loop.go | 5 +++++ pkg/runtime/runtime.go | 33 +++++++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 92f4173cb..c411bcd23 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -427,9 +427,14 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c // a new turn — the model sees them as fresh input, not a // mid-stream interruption. Each follow-up gets a full // undivided agent turn. + // + // Dequeue locks the message; Confirm is called after + // AddMessage succeeds so persistent queue implementations + // can safely re-queue on failure. if followUp, ok := r.DequeueFollowUp(ctx); ok { userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) sess.AddMessage(userMsg) + _ = r.followUpQueue.Confirm(ctx) events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) continue // re-enter the loop for a new turn diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 3096d453a..13ad4cf63 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -93,6 +93,14 @@ type QueuedMessage struct { // is called from API handlers while Dequeue/Drain are called from the agent // loop goroutine. // +// Dequeue uses a Lock + Confirm/Cancel pattern: Dequeue locks the next +// message (making it invisible to subsequent Dequeue calls), Confirm +// permanently removes it after the message has been successfully added to +// the session, and Cancel releases it back to the queue if processing +// fails. This prevents message loss in persistent queue implementations +// when the process crashes or the context is cancelled between dequeue +// and session persistence. +// // The default implementation is NewInMemoryMessageQueue. Callers that need // durable or distributed storage can provide their own implementation // via the WithSteerQueue or WithFollowUpQueue options. @@ -100,11 +108,20 @@ type MessageQueue interface { // Enqueue adds a message to the queue. Returns false if the queue is // full or the context is cancelled. Enqueue(ctx context.Context, msg QueuedMessage) bool - // Dequeue removes and returns the next message from the queue. - // Returns the message and true, or a zero value and false if the - // queue is empty. Must not block. + // Dequeue locks and returns the next message from the queue. The + // message is invisible to subsequent Dequeue calls until Confirm or + // Cancel is called. Returns the message and true, or a zero value + // and false if the queue is empty. Must not block. Dequeue(ctx context.Context) (QueuedMessage, bool) - // Drain returns all pending messages and removes them from the queue. + // Confirm permanently removes the most recently dequeued message. + // Must be called after the message has been successfully persisted + // to the session. For in-memory queues this is a no-op. + Confirm(ctx context.Context) error + // Cancel releases the most recently dequeued message back to the + // queue. For in-memory queues this is a no-op (the message was + // already consumed from the channel). + Cancel(ctx context.Context) error + // Drain locks, returns, and auto-confirms all pending messages. // Must not block — if the queue is empty it returns nil. Drain(ctx context.Context) []QueuedMessage // Len returns the current number of messages in the queue. @@ -148,6 +165,14 @@ func (q *inMemoryMessageQueue) Dequeue(_ context.Context) (QueuedMessage, bool) } } +// Confirm is a no-op for in-memory queues — the message was already +// removed from the channel on Dequeue. +func (q *inMemoryMessageQueue) Confirm(_ context.Context) error { return nil } + +// Cancel is a no-op for in-memory queues — the message cannot be put +// back into a buffered channel without risking deadlock. +func (q *inMemoryMessageQueue) Cancel(_ context.Context) error { return nil } + func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage { var msgs []QueuedMessage for { From bb03a146365fed3644fbe0a00385780e284bfc44 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 14:51:14 +0200 Subject: [PATCH 07/11] Address review: extract queue file, add Steer/FollowUp to Runtime, return error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move QueuedMessage, MessageQueue interface, and inMemoryMessageQueue to dedicated message_queue.go file - Add Steer() and FollowUp() to the Runtime interface — all runtimes implement them, no need for a separate MessageInjector interface - Return error instead of bool from Steer/FollowUp for richer failure information (queue full, no active session, network errors) - Simplify App and SessionManager: call runtime.Steer/FollowUp directly without type assertions --- pkg/app/app.go | 21 ++--- pkg/app/app_test.go | 2 + pkg/runtime/commands_test.go | 2 + pkg/runtime/message_queue.go | 121 +++++++++++++++++++++++++++ pkg/runtime/persistent_runtime.go | 8 -- pkg/runtime/remote_runtime.go | 20 ++--- pkg/runtime/runtime.go | 134 +++++------------------------- pkg/server/session_manager.go | 22 ++--- 8 files changed, 164 insertions(+), 166 deletions(-) create mode 100644 pkg/runtime/message_queue.go diff --git a/pkg/app/app.go b/pkg/app/app.go index 6203803c5..1a6ba0d3e 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -533,24 +533,15 @@ func (a *App) SubscribeWith(ctx context.Context, send func(tea.Msg)) { } // Steer enqueues a user message for mid-turn injection into the running -// agent loop. Works with both local runtimes (via the steer queue) and -// remote runtimes (via POST /sessions/:id/steer). Returns false if -// steering is not supported by the runtime or the queue is full. -func (a *App) Steer(msg runtime.QueuedMessage) bool { - if inj, ok := a.runtime.(runtime.MessageInjector); ok { - return inj.Steer(msg) - } - return false +// agent loop. Works with both local and remote runtimes. +func (a *App) Steer(msg runtime.QueuedMessage) error { + return a.runtime.Steer(msg) } // FollowUp enqueues a message for end-of-turn processing. Each follow-up -// gets a full undivided agent turn. Returns false if the runtime does not -// support follow-ups or the queue is full. -func (a *App) FollowUp(msg runtime.QueuedMessage) bool { - if inj, ok := a.runtime.(runtime.MessageInjector); ok { - return inj.FollowUp(msg) - } - return false +// gets a full undivided agent turn. +func (a *App) FollowUp(msg runtime.QueuedMessage) error { + return a.runtime.FollowUp(msg) } // Resume resumes the runtime with the given confirmation request diff --git a/pkg/app/app_test.go b/pkg/app/app_test.go index eee69fca1..2f32cff20 100644 --- a/pkg/app/app_test.go +++ b/pkg/app/app_test.go @@ -67,6 +67,8 @@ func (m *mockRuntime) UpdateSessionTitle(_ context.Context, sess *session.Sessio func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } func (m *mockRuntime) Stop() {} +func (m *mockRuntime) Steer(_ runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) FollowUp(_ runtime.QueuedMessage) error { return nil } // Verify mockRuntime implements runtime.Runtime var _ runtime.Runtime = (*mockRuntime)(nil) diff --git a/pkg/runtime/commands_test.go b/pkg/runtime/commands_test.go index fa6999233..00e4a2195 100644 --- a/pkg/runtime/commands_test.go +++ b/pkg/runtime/commands_test.go @@ -69,6 +69,8 @@ func (m *mockRuntime) UpdateSessionTitle(context.Context, *session.Session, stri } func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } +func (m *mockRuntime) Steer(QueuedMessage) error { return nil } +func (m *mockRuntime) FollowUp(QueuedMessage) error { return nil } func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan Event) { } diff --git a/pkg/runtime/message_queue.go b/pkg/runtime/message_queue.go new file mode 100644 index 000000000..9696b5af6 --- /dev/null +++ b/pkg/runtime/message_queue.go @@ -0,0 +1,121 @@ +package runtime + +import ( + "context" + + "github.com/docker/docker-agent/pkg/chat" +) + +// QueuedMessage is a user message waiting to be injected into the agent loop, +// either mid-turn (via the steer queue) or at end-of-turn (via the follow-up +// queue). +type QueuedMessage struct { + Content string + MultiContent []chat.MessagePart +} + +// MessageQueue is the interface for storing messages that are injected into +// the agent loop. Implementations must be safe for concurrent use: Enqueue +// is called from API handlers while Dequeue/Drain are called from the agent +// loop goroutine. +// +// Dequeue uses a Lock + Confirm/Cancel pattern: Dequeue locks the next +// message (making it invisible to subsequent Dequeue calls), Confirm +// permanently removes it after the message has been successfully processed, +// and Cancel releases it back to the queue if processing fails. This +// prevents message loss in persistent queue implementations where the +// session store is also durable. +// +// Note: for the default in-memory queue, Confirm and Cancel are no-ops +// because the message is consumed from the channel on Dequeue and the +// session is also in-memory. The pattern exists so that persistent +// implementations (with a durable session store) can guarantee +// exactly-once delivery. +// +// The default implementation is NewInMemoryMessageQueue. Callers that need +// durable or distributed storage can provide their own implementation +// via the WithSteerQueue or WithFollowUpQueue options. +type MessageQueue interface { + // Enqueue adds a message to the queue. Returns false if the queue is + // full or the context is cancelled. + Enqueue(ctx context.Context, msg QueuedMessage) bool + // Dequeue locks and returns the next message from the queue. The + // message is invisible to subsequent Dequeue calls until Confirm or + // Cancel is called. Returns the message and true, or a zero value + // and false if the queue is empty. Must not block. + Dequeue(ctx context.Context) (QueuedMessage, bool) + // Confirm permanently removes the most recently dequeued message. + // Must be called after the message has been successfully persisted + // to the session. For in-memory queues this is a no-op. + Confirm(ctx context.Context) error + // Cancel releases the most recently dequeued message back to the + // queue. For in-memory queues this is a no-op (the message was + // already consumed from the channel). + Cancel(ctx context.Context) error + // Drain locks, returns, and auto-confirms all pending messages. + // Must not block — if the queue is empty it returns nil. + Drain(ctx context.Context) []QueuedMessage + // Len returns the current number of messages in the queue. + Len(ctx context.Context) int +} + +// inMemoryMessageQueue is the default MessageQueue backed by a buffered channel. +type inMemoryMessageQueue struct { + ch chan QueuedMessage +} + +const ( + // defaultSteerQueueCapacity is the buffer size for the default in-memory steer queue. + defaultSteerQueueCapacity = 5 + // defaultFollowUpQueueCapacity is the buffer size for the default in-memory follow-up queue. + // Higher than steer because follow-ups accumulate while waiting for the turn to end. + defaultFollowUpQueueCapacity = 20 +) + +// NewInMemoryMessageQueue creates a MessageQueue backed by a buffered channel +// with the given capacity. +func NewInMemoryMessageQueue(capacity int) MessageQueue { + return &inMemoryMessageQueue{ch: make(chan QueuedMessage, capacity)} +} + +func (q *inMemoryMessageQueue) Enqueue(_ context.Context, msg QueuedMessage) bool { + select { + case q.ch <- msg: + return true + default: + return false + } +} + +func (q *inMemoryMessageQueue) Dequeue(_ context.Context) (QueuedMessage, bool) { + select { + case m := <-q.ch: + return m, true + default: + return QueuedMessage{}, false + } +} + +// Confirm is a no-op for in-memory queues — the message was already +// removed from the channel on Dequeue. +func (q *inMemoryMessageQueue) Confirm(_ context.Context) error { return nil } + +// Cancel is a no-op for in-memory queues — the message cannot be put +// back into a buffered channel without risking deadlock. +func (q *inMemoryMessageQueue) Cancel(_ context.Context) error { return nil } + +func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage { + var msgs []QueuedMessage + for { + select { + case m := <-q.ch: + msgs = append(msgs, m) + default: + return msgs + } + } +} + +func (q *inMemoryMessageQueue) Len(_ context.Context) int { + return len(q.ch) +} diff --git a/pkg/runtime/persistent_runtime.go b/pkg/runtime/persistent_runtime.go index 5b3cedd10..c6b704da7 100644 --- a/pkg/runtime/persistent_runtime.go +++ b/pkg/runtime/persistent_runtime.go @@ -25,14 +25,6 @@ type streamingState struct { messageID int64 // ID of the current streaming message (0 if none) } -// MessageInjector is implemented by runtimes that support mid-turn steering -// and end-of-turn follow-up message injection. Both LocalRuntime (and -// PersistentRuntime via embedding) and RemoteRuntime satisfy this interface. -type MessageInjector interface { - Steer(msg QueuedMessage) bool - FollowUp(msg QueuedMessage) bool -} - // New creates a new runtime for an agent and its team. // The runtime automatically persists session changes to the configured store. // Returns a Runtime interface which wraps LocalRuntime with persistence handling. diff --git a/pkg/runtime/remote_runtime.go b/pkg/runtime/remote_runtime.go index 1ade3f6f1..fd220c9a3 100644 --- a/pkg/runtime/remote_runtime.go +++ b/pkg/runtime/remote_runtime.go @@ -212,28 +212,24 @@ func (r *RemoteRuntime) Run(ctx context.Context, sess *session.Session) ([]sessi } // Steer enqueues a user message for mid-turn injection into the running -// agent loop on the remote server. Returns false if the session is not -// active or the steer queue is full. -func (r *RemoteRuntime) Steer(msg QueuedMessage) bool { +// agent loop on the remote server. +func (r *RemoteRuntime) Steer(msg QueuedMessage) error { if r.sessionID == "" { - return false + return errors.New("no active session") } - err := r.client.SteerSession(context.Background(), r.sessionID, []api.Message{ + return r.client.SteerSession(context.Background(), r.sessionID, []api.Message{ {Content: msg.Content, MultiContent: msg.MultiContent}, }) - return err == nil } -// FollowUp enqueues a message for end-of-turn processing on the remote -// server. Returns false if the session is not active or the queue is full. -func (r *RemoteRuntime) FollowUp(msg QueuedMessage) bool { +// FollowUp enqueues a message for end-of-turn processing on the remote server. +func (r *RemoteRuntime) FollowUp(msg QueuedMessage) error { if r.sessionID == "" { - return false + return errors.New("no active session") } - err := r.client.FollowUpSession(context.Background(), r.sessionID, []api.Message{ + return r.client.FollowUpSession(context.Background(), r.sessionID, []api.Message{ {Content: msg.Content, MultiContent: msg.MultiContent}, }) - return err == nil } // Resume allows resuming execution after user confirmation diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 13ad4cf63..d7e589bc6 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -80,115 +80,6 @@ func ResumeReject(reason string) ResumeRequest { return ResumeRequest{Type: ResumeTypeReject, Reason: reason} } -// QueuedMessage is a user message waiting to be injected into the agent loop, -// either mid-turn (via the steer queue) or at end-of-turn (via the follow-up -// queue). -type QueuedMessage struct { - Content string - MultiContent []chat.MessagePart -} - -// MessageQueue is the interface for storing messages that are injected into -// the agent loop. Implementations must be safe for concurrent use: Enqueue -// is called from API handlers while Dequeue/Drain are called from the agent -// loop goroutine. -// -// Dequeue uses a Lock + Confirm/Cancel pattern: Dequeue locks the next -// message (making it invisible to subsequent Dequeue calls), Confirm -// permanently removes it after the message has been successfully added to -// the session, and Cancel releases it back to the queue if processing -// fails. This prevents message loss in persistent queue implementations -// when the process crashes or the context is cancelled between dequeue -// and session persistence. -// -// The default implementation is NewInMemoryMessageQueue. Callers that need -// durable or distributed storage can provide their own implementation -// via the WithSteerQueue or WithFollowUpQueue options. -type MessageQueue interface { - // Enqueue adds a message to the queue. Returns false if the queue is - // full or the context is cancelled. - Enqueue(ctx context.Context, msg QueuedMessage) bool - // Dequeue locks and returns the next message from the queue. The - // message is invisible to subsequent Dequeue calls until Confirm or - // Cancel is called. Returns the message and true, or a zero value - // and false if the queue is empty. Must not block. - Dequeue(ctx context.Context) (QueuedMessage, bool) - // Confirm permanently removes the most recently dequeued message. - // Must be called after the message has been successfully persisted - // to the session. For in-memory queues this is a no-op. - Confirm(ctx context.Context) error - // Cancel releases the most recently dequeued message back to the - // queue. For in-memory queues this is a no-op (the message was - // already consumed from the channel). - Cancel(ctx context.Context) error - // Drain locks, returns, and auto-confirms all pending messages. - // Must not block — if the queue is empty it returns nil. - Drain(ctx context.Context) []QueuedMessage - // Len returns the current number of messages in the queue. - Len(ctx context.Context) int -} - -// inMemoryMessageQueue is the default MessageQueue backed by a buffered channel. -type inMemoryMessageQueue struct { - ch chan QueuedMessage -} - -const ( - // defaultSteerQueueCapacity is the buffer size for the default in-memory steer queue. - defaultSteerQueueCapacity = 5 - // defaultFollowUpQueueCapacity is the buffer size for the default in-memory follow-up queue. - // Higher than steer because follow-ups accumulate while waiting for the turn to end. - defaultFollowUpQueueCapacity = 20 -) - -// NewInMemoryMessageQueue creates a MessageQueue backed by a buffered channel -// with the given capacity. -func NewInMemoryMessageQueue(capacity int) MessageQueue { - return &inMemoryMessageQueue{ch: make(chan QueuedMessage, capacity)} -} - -func (q *inMemoryMessageQueue) Enqueue(_ context.Context, msg QueuedMessage) bool { - select { - case q.ch <- msg: - return true - default: - return false - } -} - -func (q *inMemoryMessageQueue) Dequeue(_ context.Context) (QueuedMessage, bool) { - select { - case m := <-q.ch: - return m, true - default: - return QueuedMessage{}, false - } -} - -// Confirm is a no-op for in-memory queues — the message was already -// removed from the channel on Dequeue. -func (q *inMemoryMessageQueue) Confirm(_ context.Context) error { return nil } - -// Cancel is a no-op for in-memory queues — the message cannot be put -// back into a buffered channel without risking deadlock. -func (q *inMemoryMessageQueue) Cancel(_ context.Context) error { return nil } - -func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage { - var msgs []QueuedMessage - for { - select { - case m := <-q.ch: - msgs = append(msgs, m) - default: - return msgs - } - } -} - -func (q *inMemoryMessageQueue) Len(_ context.Context) int { - return len(q.ch) -} - // ToolHandlerFunc is a function type for handling tool calls type ToolHandlerFunc func(ctx context.Context, sess *session.Session, toolCall tools.ToolCall, events chan Event) (*tools.ToolCallResult, error) @@ -248,6 +139,14 @@ type Runtime interface { // if the runtime does not support local title generation (e.g. remote runtimes). TitleGenerator() *sessiontitle.Generator + // Steer enqueues a user message for urgent mid-turn injection into the + // running agent loop. Returns an error if the queue is full or steering + // is not available. + Steer(msg QueuedMessage) error + // FollowUp enqueues a message for end-of-turn processing. Each follow-up + // gets a full undivided agent turn. Returns an error if the queue is full. + FollowUp(msg QueuedMessage) error + // Close releases resources held by the runtime (e.g., session store connections). Close() error } @@ -1153,9 +1052,11 @@ func (r *LocalRuntime) ResumeElicitation(ctx context.Context, action tools.Elici // Steer enqueues a user message for urgent mid-turn injection into the // running agent loop. The message will be picked up after the current batch // of tool calls finishes but before the loop checks whether to stop. -// Returns false if the queue is full. -func (r *LocalRuntime) Steer(msg QueuedMessage) bool { - return r.steerQueue.Enqueue(context.Background(), msg) +func (r *LocalRuntime) Steer(msg QueuedMessage) error { + if !r.steerQueue.Enqueue(context.Background(), msg) { + return errors.New("steer queue full") + } + return nil } // DrainSteeredMessages returns all pending steered messages without blocking. @@ -1167,9 +1068,12 @@ func (r *LocalRuntime) DrainSteeredMessages(ctx context.Context) []QueuedMessage // FollowUp enqueues a message to be processed after the current agent turn // finishes. Unlike Steer, follow-ups are popped one at a time and each gets -// a full undivided agent turn. Returns false if the queue is full. -func (r *LocalRuntime) FollowUp(msg QueuedMessage) bool { - return r.followUpQueue.Enqueue(context.Background(), msg) +// a full undivided agent turn. +func (r *LocalRuntime) FollowUp(msg QueuedMessage) error { + if !r.followUpQueue.Enqueue(context.Background(), msg) { + return errors.New("follow-up queue full") + } + return nil } // DequeueFollowUp pops the next follow-up message. Called by the agent loop diff --git a/pkg/server/session_manager.go b/pkg/server/session_manager.go index c0f4e1f1e..f8685fa17 100644 --- a/pkg/server/session_manager.go +++ b/pkg/server/session_manager.go @@ -260,17 +260,12 @@ func (sm *SessionManager) SteerSession(_ context.Context, sessionID string, mess return errors.New("session not found or not running") } - inj, ok := rt.runtime.(runtime.MessageInjector) - if !ok { - return errors.New("steering not supported for this runtime type") - } - for _, msg := range messages { - if !inj.Steer(runtime.QueuedMessage{ + if err := rt.runtime.Steer(runtime.QueuedMessage{ Content: msg.Content, MultiContent: msg.MultiContent, - }) { - return errors.New("steer queue full") + }); err != nil { + return err } } @@ -286,17 +281,12 @@ func (sm *SessionManager) FollowUpSession(_ context.Context, sessionID string, m return errors.New("session not found or not running") } - inj, ok := rt.runtime.(runtime.MessageInjector) - if !ok { - return errors.New("follow-up not supported for this runtime type") - } - for _, msg := range messages { - if !inj.FollowUp(runtime.QueuedMessage{ + if err := rt.runtime.FollowUp(runtime.QueuedMessage{ Content: msg.Content, MultiContent: msg.MultiContent, - }) { - return errors.New("follow-up queue full") + }); err != nil { + return err } } From e4cc4c6e013ab0cf5594cd993d409bfd3eacfd8e Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 14:55:11 +0200 Subject: [PATCH 08/11] =?UTF-8?q?Remove=20Confirm=20call=20from=20loop=20?= =?UTF-8?q?=E2=80=94=20sess.AddMessage=20is=20in-memory?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As rumpl pointed out, calling Confirm after sess.AddMessage does not protect against anything: AddMessage is an in-memory operation, not a store write. If the process dies between Dequeue and AddMessage, the in-memory session is lost regardless. The Confirm/Cancel methods remain on the MessageQueue interface for implementations that integrate with their own persistence layer, but the agent loop does not call them since it has no durable persistence point between dequeue and the next LLM call. --- pkg/runtime/loop.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index c411bcd23..92f4173cb 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -427,14 +427,9 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c // a new turn — the model sees them as fresh input, not a // mid-stream interruption. Each follow-up gets a full // undivided agent turn. - // - // Dequeue locks the message; Confirm is called after - // AddMessage succeeds so persistent queue implementations - // can safely re-queue on failure. if followUp, ok := r.DequeueFollowUp(ctx); ok { userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) sess.AddMessage(userMsg) - _ = r.followUpQueue.Confirm(ctx) events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) continue // re-enter the loop for a new turn From 015540508ac5f4e4c86303af5dbd499dfcf9814c Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 15:29:26 +0200 Subject: [PATCH 09/11] Fix build: add Steer/FollowUp to mockRuntime in cli tests --- pkg/cli/runner_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/cli/runner_test.go b/pkg/cli/runner_test.go index 6cd27a8bb..4f39c1d04 100644 --- a/pkg/cli/runner_test.go +++ b/pkg/cli/runner_test.go @@ -60,6 +60,8 @@ func (m *mockRuntime) ExecuteMCPPrompt(context.Context, string, map[string]strin func (m *mockRuntime) UpdateSessionTitle(context.Context, *session.Session, string) error { return nil } func (m *mockRuntime) TitleGenerator() *sessiontitle.Generator { return nil } func (m *mockRuntime) Close() error { return nil } +func (m *mockRuntime) Steer(runtime.QueuedMessage) error { return nil } +func (m *mockRuntime) FollowUp(runtime.QueuedMessage) error { return nil } func (m *mockRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {} func (m *mockRuntime) Resume(_ context.Context, req runtime.ResumeRequest) { From 3c28081950367b90f90e1dae81b494dd1952e5f4 Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 16:00:07 +0200 Subject: [PATCH 10/11] Use continue instead of res.Stopped = false for steer injection Cleaner and consistent with the follow-up case: explicitly re-enter the loop rather than relying on the reader understanding that setting res.Stopped = false will make the subsequent check pass through. --- pkg/runtime/loop.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 92f4173cb..67e0e4d6d 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -409,12 +409,10 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c events <- UserMessage(sm.Content, sess.ID, sm.MultiContent, len(sess.Messages)-1) } - // Force the loop to continue — the model must respond to - // the injected messages even if it was about to stop. - res.Stopped = false - - // Now that the loop will continue, compact if needed. + // The model must respond to the injected messages — compact + // if needed and re-enter the loop for the next LLM call. r.compactIfNeeded(ctx, sess, a, m, contextLimit, messageCountBeforeTools, events) + continue } if res.Stopped { From 91a9a0c5b83e4648224aae0045e919799bd6b5cf Mon Sep 17 00:00:00 2001 From: Trung Nguyen Date: Fri, 10 Apr 2026 16:03:18 +0200 Subject: [PATCH 11/11] Remove unused methods: Confirm, Cancel, Len, DrainSteeredMessages, DequeueFollowUp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Slim down the MessageQueue interface to Enqueue, Dequeue, and Drain — the three methods actually called. Remove wrapper methods on LocalRuntime (DrainSteeredMessages, DequeueFollowUp) and call the queues directly from loop.go. --- pkg/runtime/loop.go | 4 ++-- pkg/runtime/message_queue.go | 44 ++++-------------------------------- pkg/runtime/runtime.go | 13 ----------- 3 files changed, 6 insertions(+), 55 deletions(-) diff --git a/pkg/runtime/loop.go b/pkg/runtime/loop.go index 67e0e4d6d..0b653f6a5 100644 --- a/pkg/runtime/loop.go +++ b/pkg/runtime/loop.go @@ -398,7 +398,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c // Drain ALL pending steer messages. These are urgent course- // corrections that the model should see on the very next // iteration, wrapped in tags. - if steered := r.DrainSteeredMessages(ctx); len(steered) > 0 { + if steered := r.steerQueue.Drain(ctx); len(steered) > 0 { for _, sm := range steered { wrapped := fmt.Sprintf( "\nThe user sent the following message while you were working:\n%s\n\nPlease address this in your next response while continuing with your current tasks.\n", @@ -425,7 +425,7 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c // a new turn — the model sees them as fresh input, not a // mid-stream interruption. Each follow-up gets a full // undivided agent turn. - if followUp, ok := r.DequeueFollowUp(ctx); ok { + if followUp, ok := r.followUpQueue.Dequeue(ctx); ok { userMsg := session.UserMessage(followUp.Content, followUp.MultiContent...) sess.AddMessage(userMsg) events <- UserMessage(followUp.Content, sess.ID, followUp.MultiContent, len(sess.Messages)-1) diff --git a/pkg/runtime/message_queue.go b/pkg/runtime/message_queue.go index 9696b5af6..99feecb5f 100644 --- a/pkg/runtime/message_queue.go +++ b/pkg/runtime/message_queue.go @@ -19,19 +19,6 @@ type QueuedMessage struct { // is called from API handlers while Dequeue/Drain are called from the agent // loop goroutine. // -// Dequeue uses a Lock + Confirm/Cancel pattern: Dequeue locks the next -// message (making it invisible to subsequent Dequeue calls), Confirm -// permanently removes it after the message has been successfully processed, -// and Cancel releases it back to the queue if processing fails. This -// prevents message loss in persistent queue implementations where the -// session store is also durable. -// -// Note: for the default in-memory queue, Confirm and Cancel are no-ops -// because the message is consumed from the channel on Dequeue and the -// session is also in-memory. The pattern exists so that persistent -// implementations (with a durable session store) can guarantee -// exactly-once delivery. -// // The default implementation is NewInMemoryMessageQueue. Callers that need // durable or distributed storage can provide their own implementation // via the WithSteerQueue or WithFollowUpQueue options. @@ -39,24 +26,13 @@ type MessageQueue interface { // Enqueue adds a message to the queue. Returns false if the queue is // full or the context is cancelled. Enqueue(ctx context.Context, msg QueuedMessage) bool - // Dequeue locks and returns the next message from the queue. The - // message is invisible to subsequent Dequeue calls until Confirm or - // Cancel is called. Returns the message and true, or a zero value - // and false if the queue is empty. Must not block. + // Dequeue removes and returns the next message from the queue. + // Returns the message and true, or a zero value and false if the + // queue is empty. Must not block. Dequeue(ctx context.Context) (QueuedMessage, bool) - // Confirm permanently removes the most recently dequeued message. - // Must be called after the message has been successfully persisted - // to the session. For in-memory queues this is a no-op. - Confirm(ctx context.Context) error - // Cancel releases the most recently dequeued message back to the - // queue. For in-memory queues this is a no-op (the message was - // already consumed from the channel). - Cancel(ctx context.Context) error - // Drain locks, returns, and auto-confirms all pending messages. + // Drain returns all pending messages and removes them from the queue. // Must not block — if the queue is empty it returns nil. Drain(ctx context.Context) []QueuedMessage - // Len returns the current number of messages in the queue. - Len(ctx context.Context) int } // inMemoryMessageQueue is the default MessageQueue backed by a buffered channel. @@ -96,14 +72,6 @@ func (q *inMemoryMessageQueue) Dequeue(_ context.Context) (QueuedMessage, bool) } } -// Confirm is a no-op for in-memory queues — the message was already -// removed from the channel on Dequeue. -func (q *inMemoryMessageQueue) Confirm(_ context.Context) error { return nil } - -// Cancel is a no-op for in-memory queues — the message cannot be put -// back into a buffered channel without risking deadlock. -func (q *inMemoryMessageQueue) Cancel(_ context.Context) error { return nil } - func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage { var msgs []QueuedMessage for { @@ -115,7 +83,3 @@ func (q *inMemoryMessageQueue) Drain(_ context.Context) []QueuedMessage { } } } - -func (q *inMemoryMessageQueue) Len(_ context.Context) int { - return len(q.ch) -} diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index d7e589bc6..711607615 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -1059,13 +1059,6 @@ func (r *LocalRuntime) Steer(msg QueuedMessage) error { return nil } -// DrainSteeredMessages returns all pending steered messages without blocking. -// It is called inside the agent loop to batch-inject any messages that arrived -// while the current iteration was in progress. -func (r *LocalRuntime) DrainSteeredMessages(ctx context.Context) []QueuedMessage { - return r.steerQueue.Drain(ctx) -} - // FollowUp enqueues a message to be processed after the current agent turn // finishes. Unlike Steer, follow-ups are popped one at a time and each gets // a full undivided agent turn. @@ -1076,12 +1069,6 @@ func (r *LocalRuntime) FollowUp(msg QueuedMessage) error { return nil } -// DequeueFollowUp pops the next follow-up message. Called by the agent loop -// after the model stops and stop-hooks have run. -func (r *LocalRuntime) DequeueFollowUp(ctx context.Context) (QueuedMessage, bool) { - return r.followUpQueue.Dequeue(ctx) -} - // Run starts the agent's interaction loop func (r *LocalRuntime) startSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {