Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d51e4c2
refactor(screentracker): internalize initial prompt handling in PTYCo…
johnstcn Feb 4, 2026
241671a
refactor(server): use PTYConversation's internal snapshot loop and In…
johnstcn Feb 4, 2026
c80b288
Update pty_conversation tests for new NewPTY API and status behavior
johnstcn Feb 4, 2026
35fed24
fix: race condition in initial prompt sending
johnstcn Feb 4, 2026
2c907a5
httpapi: replace StartSnapshotLoop with Conversation accessor
johnstcn Feb 4, 2026
ccec233
refactor(screentracker): remove redundant exported fields from PTYCon…
johnstcn Feb 4, 2026
d16c552
refactor: use srv.Conversation().Start(ctx) instead of removed StartS…
johnstcn Feb 4, 2026
1fa447e
refactor(tests): remove direct field access to InitialPromptSent
johnstcn Feb 4, 2026
24ff106
refactor: move conversation.Start() into NewServer() and remove Conve…
johnstcn Feb 4, 2026
9a197fc
refactor(screentracker): move status check from sendLocked to Send()
johnstcn Feb 4, 2026
3dd8c56
docs: add clarifying comments addressing review feedback
johnstcn Feb 4, 2026
f5bd08d
refactor: use Conversation interface in Server struct
johnstcn Feb 4, 2026
86c9d91
refactor: remove unnecessary agentType local variable
johnstcn Feb 4, 2026
b5437f7
refactor: simplify nil channel handling in Start()
johnstcn Feb 4, 2026
593b65f
refactor: remove unused InitialPrompt field from PTYConversation
johnstcn Feb 4, 2026
6079777
refactor: replace initialPromptReady/Sent with outbound message queue
johnstcn Feb 5, 2026
002fd67
screentracker: use no-op defaults for optional callbacks
johnstcn Feb 5, 2026
a6b54a4
screentracker: remove duplicate agentReady nil check
johnstcn Feb 5, 2026
3259c1f
screentracker: remove unnecessary comment and replace goto with label…
johnstcn Feb 5, 2026
54b8ef4
refactor: use separate snapshot/send goroutines with stableSignal cha…
johnstcn Feb 5, 2026
b157ff7
e2e: always rebuild binary when AGENTAPI_BINARY_PATH is not set
johnstcn Feb 5, 2026
7d488f3
Fix stableSignal deadlock in pty_conversation.go
johnstcn Feb 5, 2026
f725683
Optimize ReadyForInitialPrompt check and fix stability comparison
johnstcn Feb 5, 2026
0b9e687
refactor: use channel for initialPromptReady one-time signaling
johnstcn Feb 5, 2026
18e2773
fix: handle context cancellation in inner select
johnstcn Feb 9, 2026
1ed2370
unify send path: route all sends through outbound queue
johnstcn Feb 9, 2026
777a103
refactor: remove test escape hatches, convert to TickerFunc
johnstcn Feb 10, 2026
75d89c3
fix: split lock in sendMessage and fix test clock races
johnstcn Feb 10, 2026
c17e6d7
Remove SkipSendMessageStatusCheck from PTYConversationConfig
johnstcn Feb 10, 2026
c073f04
Reorder readiness check for short-circuit and expand re-apply comment
johnstcn Feb 10, 2026
429f35d
refactor: simplify test helpers in pty_conversation_test.go
johnstcn Feb 10, 2026
85d3965
Replace wall-clock deadline with context timeout in sendWithClockDriv…
johnstcn Feb 10, 2026
a5edc13
Remove msgNoTime/stripTimes, assert full ConversationMessage values
johnstcn Feb 10, 2026
38f119d
Simplify assertMessages to use require.Equal instead of per-field loop
johnstcn Feb 10, 2026
9a1b238
screentracker test: add ctx.Done select and onWrite comment
johnstcn Feb 10, 2026
8ca7856
screentracker test: extract driveClockUntil helper
johnstcn Feb 10, 2026
a134116
fix: rewrite util.WaitFor to use a single timer
johnstcn Feb 10, 2026
131e4a5
e2e: improve debug output
johnstcn Feb 10, 2026
8debc4e
refactor: inline fillToStable
johnstcn Feb 10, 2026
87d36ab
refactor: simplify advance test helpers
johnstcn Feb 10, 2026
0e37003
chore: fix e2e race condition
johnstcn Feb 10, 2026
5df1ec3
Merge branch 'main' into conversation-ew5m
johnstcn Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func runServer(ctx context.Context, logger *slog.Logger, argsToPass []string) er
fmt.Println(srv.GetOpenAPI())
return nil
}
srv.StartSnapshotLoop(ctx)
logger.Info("Starting server on port", "port", port)
processExitCh := make(chan error, 1)
go func() {
Expand Down
47 changes: 26 additions & 21 deletions e2e/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,10 @@ func runEchoAgent(scriptPath string) {
if entry.ThinkDurationMS > 0 {
redrawTerminal(messages, true)
spinnerCtx, spinnerCancel := context.WithCancel(ctx)
go runSpinner(spinnerCtx)
spinnerDone := runSpinner(spinnerCtx)
time.Sleep(time.Duration(entry.ThinkDurationMS) * time.Millisecond)
if spinnerCancel != nil {
spinnerCancel()
}
spinnerCancel()
<-spinnerDone
}

messages = append(messages, st.ConversationMessage{
Expand Down Expand Up @@ -133,9 +132,10 @@ func runEchoAgent(scriptPath string) {
if entry.ThinkDurationMS > 0 {
redrawTerminal(messages, true)
spinnerCtx, spinnerCancel := context.WithCancel(ctx)
go runSpinner(spinnerCtx)
spinnerDone := runSpinner(spinnerCtx)
time.Sleep(time.Duration(entry.ThinkDurationMS) * time.Millisecond)
spinnerCancel()
<-spinnerDone
}

messages = append(messages, st.ConversationMessage{
Expand Down Expand Up @@ -190,21 +190,26 @@ func cleanTerminalInput(input string) string {
return strings.TrimSpace(input)
}

func runSpinner(ctx context.Context) {
spinnerChars := []string{"|", "/", "-", "\\"}
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
i := 0

for {
select {
case <-ticker.C:
fmt.Printf("\rThinking %s", spinnerChars[i%len(spinnerChars)])
i++
case <-ctx.Done():
// Clear spinner on cancellation
fmt.Print("\r" + strings.Repeat(" ", 20) + "\r")
return
func runSpinner(ctx context.Context) <-chan struct{} {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: turns out there was a race condition in our e2e echo agent 😂

done := make(chan struct{})
go func() {
defer close(done)
spinnerChars := []string{"|", "/", "-", "\\"}
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
i := 0

for {
select {
case <-ticker.C:
fmt.Printf("\rThinking %s", spinnerChars[i%len(spinnerChars)])
i++
case <-ctx.Done():
// Clear spinner on cancellation
fmt.Print("\r" + strings.Repeat(" ", 20) + "\r")
return
}
}
}
}()
return done
}
19 changes: 10 additions & 9 deletions e2e/echo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,11 @@ func setup(ctx context.Context, t testing.TB, p *params) ([]ScriptEntry, *agenta
cwd, err := os.Getwd()
require.NoError(t, err, "Failed to get current working directory")
binaryPath = filepath.Join(cwd, "..", "out", "agentapi")
_, err = os.Stat(binaryPath)
if err != nil {
Comment on lines -136 to -137
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: rebuilding binary here on every run as a stale binary caused me to miss a potential issue

t.Logf("Building binary at %s", binaryPath)
buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".")
buildCmd.Dir = filepath.Join(cwd, "..")
t.Logf("run: %s", buildCmd.String())
require.NoError(t, buildCmd.Run(), "Failed to build binary")
}
t.Logf("Building binary at %s", binaryPath)
buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".")
buildCmd.Dir = filepath.Join(cwd, "..")
t.Logf("run: %s", buildCmd.String())
require.NoError(t, buildCmd.Run(), "Failed to build binary")
}

serverPort, err := getFreePort()
Expand Down Expand Up @@ -254,7 +251,11 @@ func waitAgentAPIStable(ctx context.Context, t testing.TB, apiClient *agentapisd
return nil
}
} else {
t.Logf("Got %T event", evt)
var sb strings.Builder
if err := json.NewEncoder(&sb).Encode(evt); err != nil {
t.Logf("Failed to encode event: %v", err)
}
t.Logf("Got event: %s", sb.String())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review: improved logging

}
case err := <-errs:
return fmt.Errorf("read events: %w", err)
Expand Down
66 changes: 30 additions & 36 deletions lib/httpapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Server struct {
srv *http.Server
mu sync.RWMutex
logger *slog.Logger
conversation *st.PTYConversation
conversation st.Conversation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review: this is the whole point of this PR

agentio *termexec.Process
agentType mf.AgentType
emitter *EventEmitter
Expand Down Expand Up @@ -244,6 +244,14 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
return mf.FormatToolCall(config.AgentType, message)
}

emitter := NewEventEmitter(1024)

// Format initial prompt into message parts if provided
var initialPrompt []st.MessagePart
if config.InitialPrompt != "" {
initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt)
}

conversation := st.NewPTY(ctx, st.PTYConversationConfig{
AgentType: config.AgentType,
AgentIO: config.Process,
Expand All @@ -253,9 +261,17 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
FormatMessage: formatMessage,
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
FormatToolCall: formatToolCall,
Logger: logger,
}, config.InitialPrompt)
emitter := NewEventEmitter(1024)
InitialPrompt: initialPrompt,
// OnSnapshot uses a callback rather than passing the emitter directly
// to keep the screentracker package decoupled from httpapi concerns.
// This preserves clean package boundaries and avoids import cycles.
OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) {
emitter.UpdateStatusAndEmitChanges(status, config.AgentType)
emitter.UpdateMessagesAndEmitChanges(messages)
emitter.UpdateScreenAndEmitChanges(screen)
},
Comment on lines 265 to 272
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review: Could alternatively extract an Emitter interface and pass this in.

Logger: logger,
})

// Create temporary directory for uploads
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")
Expand All @@ -281,6 +297,16 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
// Register API routes
s.registerRoutes()

// Start the conversation polling loop if we have a process.
// Process is nil only when --print-openapi is used (no agent runs).
// The process is already running at this point - termexec.StartProcess()
// blocks until the PTY is created and the process is active. Agent
// readiness (waiting for the prompt) is handled asynchronously inside
// conversation.Start() via ReadyForInitialPrompt.
if config.Process != nil {
s.conversation.Start(ctx)
}

return s, nil
}

Expand Down Expand Up @@ -336,38 +362,6 @@ func sseMiddleware(ctx huma.Context, next func(huma.Context)) {
next(ctx)
}

func (s *Server) StartSnapshotLoop(ctx context.Context) {
s.conversation.Start(ctx)
go func() {
ticker := s.clock.NewTicker(snapshotInterval)
defer ticker.Stop()
for {
currentStatus := s.conversation.Status()

// Send initial prompt when agent becomes stable for the first time
if !s.conversation.InitialPromptSent && convertStatus(currentStatus) == AgentStatusStable {
if err := s.conversation.Send(FormatMessage(s.agentType, s.conversation.InitialPrompt)...); err != nil {
s.logger.Error("Failed to send initial prompt", "error", err)
} else {
s.conversation.InitialPromptSent = true
s.conversation.ReadyForInitialPrompt = false
currentStatus = st.ConversationStatusChanging
s.logger.Info("Initial prompt sent successfully")
}
}
s.emitter.UpdateStatusAndEmitChanges(currentStatus, s.agentType)
s.emitter.UpdateMessagesAndEmitChanges(s.conversation.Messages())
s.emitter.UpdateScreenAndEmitChanges(s.conversation.Text())

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}()
}

// registerRoutes sets up all API endpoints
func (s *Server) registerRoutes() {
// GET /status endpoint
Expand Down
Loading