-
Notifications
You must be signed in to change notification settings - Fork 105
refactor: move snapshot loop and initial prompt logic into PTYConversation #179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d51e4c2
241671a
c80b288
35fed24
2c907a5
ccec233
d16c552
1fa447e
24ff106
9a197fc
3dd8c56
f5bd08d
86c9d91
b5437f7
593b65f
6079777
002fd67
a6b54a4
3259c1f
54b8ef4
b157ff7
7d488f3
f725683
0b9e687
18e2773
1ed2370
777a103
75d89c3
c17e6d7
c073f04
429f35d
85d3965
a5edc13
38f119d
9a1b238
8ca7856
a134116
131e4a5
8debc4e
87d36ab
0e37003
5df1ec3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -133,14 +133,11 @@ func setup(ctx context.Context, t testing.TB, p *params) ([]ScriptEntry, *agenta | |
| cwd, err := os.Getwd() | ||
| require.NoError(t, err, "Failed to get current working directory") | ||
| binaryPath = filepath.Join(cwd, "..", "out", "agentapi") | ||
| _, err = os.Stat(binaryPath) | ||
| if err != nil { | ||
|
Comment on lines
-136
to
-137
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. review: rebuilding binary here on every run as a stale binary caused me to miss a potential issue |
||
| t.Logf("Building binary at %s", binaryPath) | ||
| buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".") | ||
| buildCmd.Dir = filepath.Join(cwd, "..") | ||
| t.Logf("run: %s", buildCmd.String()) | ||
| require.NoError(t, buildCmd.Run(), "Failed to build binary") | ||
| } | ||
| t.Logf("Building binary at %s", binaryPath) | ||
| buildCmd := exec.CommandContext(ctx, "go", "build", "-o", binaryPath, ".") | ||
| buildCmd.Dir = filepath.Join(cwd, "..") | ||
| t.Logf("run: %s", buildCmd.String()) | ||
| require.NoError(t, buildCmd.Run(), "Failed to build binary") | ||
| } | ||
|
|
||
| serverPort, err := getFreePort() | ||
|
|
@@ -254,7 +251,11 @@ func waitAgentAPIStable(ctx context.Context, t testing.TB, apiClient *agentapisd | |
| return nil | ||
| } | ||
| } else { | ||
| t.Logf("Got %T event", evt) | ||
| var sb strings.Builder | ||
| if err := json.NewEncoder(&sb).Encode(evt); err != nil { | ||
| t.Logf("Failed to encode event: %v", err) | ||
| } | ||
| t.Logf("Got event: %s", sb.String()) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. review: improved logging |
||
| } | ||
| case err := <-errs: | ||
| return fmt.Errorf("read events: %w", err) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,7 @@ type Server struct { | |
| srv *http.Server | ||
| mu sync.RWMutex | ||
| logger *slog.Logger | ||
| conversation *st.PTYConversation | ||
| conversation st.Conversation | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. self-review: this is the whole point of this PR |
||
| agentio *termexec.Process | ||
| agentType mf.AgentType | ||
| emitter *EventEmitter | ||
|
|
@@ -244,6 +244,14 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { | |
| return mf.FormatToolCall(config.AgentType, message) | ||
| } | ||
|
|
||
| emitter := NewEventEmitter(1024) | ||
|
|
||
| // Format initial prompt into message parts if provided | ||
| var initialPrompt []st.MessagePart | ||
| if config.InitialPrompt != "" { | ||
| initialPrompt = FormatMessage(config.AgentType, config.InitialPrompt) | ||
| } | ||
|
|
||
| conversation := st.NewPTY(ctx, st.PTYConversationConfig{ | ||
| AgentType: config.AgentType, | ||
| AgentIO: config.Process, | ||
|
|
@@ -253,9 +261,17 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { | |
| FormatMessage: formatMessage, | ||
| ReadyForInitialPrompt: isAgentReadyForInitialPrompt, | ||
| FormatToolCall: formatToolCall, | ||
| Logger: logger, | ||
| }, config.InitialPrompt) | ||
| emitter := NewEventEmitter(1024) | ||
| InitialPrompt: initialPrompt, | ||
| // OnSnapshot uses a callback rather than passing the emitter directly | ||
| // to keep the screentracker package decoupled from httpapi concerns. | ||
| // This preserves clean package boundaries and avoids import cycles. | ||
| OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) { | ||
| emitter.UpdateStatusAndEmitChanges(status, config.AgentType) | ||
| emitter.UpdateMessagesAndEmitChanges(messages) | ||
| emitter.UpdateScreenAndEmitChanges(screen) | ||
| }, | ||
|
Comment on lines
265
to
272
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Self-review: Could alternatively extract an Emitter interface and pass this in. |
||
| Logger: logger, | ||
| }) | ||
|
|
||
| // Create temporary directory for uploads | ||
| tempDir, err := os.MkdirTemp("", "agentapi-uploads-") | ||
|
|
@@ -281,6 +297,16 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) { | |
| // Register API routes | ||
| s.registerRoutes() | ||
|
|
||
| // Start the conversation polling loop if we have a process. | ||
| // Process is nil only when --print-openapi is used (no agent runs). | ||
| // The process is already running at this point - termexec.StartProcess() | ||
| // blocks until the PTY is created and the process is active. Agent | ||
| // readiness (waiting for the prompt) is handled asynchronously inside | ||
| // conversation.Start() via ReadyForInitialPrompt. | ||
| if config.Process != nil { | ||
| s.conversation.Start(ctx) | ||
| } | ||
|
|
||
| return s, nil | ||
| } | ||
|
|
||
|
|
@@ -336,38 +362,6 @@ func sseMiddleware(ctx huma.Context, next func(huma.Context)) { | |
| next(ctx) | ||
| } | ||
|
|
||
| func (s *Server) StartSnapshotLoop(ctx context.Context) { | ||
| s.conversation.Start(ctx) | ||
| go func() { | ||
| ticker := s.clock.NewTicker(snapshotInterval) | ||
| defer ticker.Stop() | ||
| for { | ||
| currentStatus := s.conversation.Status() | ||
|
|
||
| // Send initial prompt when agent becomes stable for the first time | ||
| if !s.conversation.InitialPromptSent && convertStatus(currentStatus) == AgentStatusStable { | ||
| if err := s.conversation.Send(FormatMessage(s.agentType, s.conversation.InitialPrompt)...); err != nil { | ||
| s.logger.Error("Failed to send initial prompt", "error", err) | ||
| } else { | ||
| s.conversation.InitialPromptSent = true | ||
| s.conversation.ReadyForInitialPrompt = false | ||
| currentStatus = st.ConversationStatusChanging | ||
| s.logger.Info("Initial prompt sent successfully") | ||
| } | ||
| } | ||
| s.emitter.UpdateStatusAndEmitChanges(currentStatus, s.agentType) | ||
| s.emitter.UpdateMessagesAndEmitChanges(s.conversation.Messages()) | ||
| s.emitter.UpdateScreenAndEmitChanges(s.conversation.Text()) | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| // registerRoutes sets up all API endpoints | ||
| func (s *Server) registerRoutes() { | ||
| // GET /status endpoint | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
review: turns out there was a race condition in our e2e echo agent 😂