Compare commits

...

1 Commits

Author SHA1 Message Date
Mathias Fredriksson
ef602038df fix(cli/exp_mcp): improve status detection reliability for AI tasks 2026-02-25 14:43:06 +00:00
2 changed files with 392 additions and 315 deletions

View File

@@ -382,9 +382,6 @@ type taskReport struct {
// message only happens when interacting via the AI AgentAPI (as opposed to
// interacting with the terminal directly).
messageID *int64
// selfReported must be set if the update is directly from the AI agent
// (as opposed to the screen watcher).
selfReported bool
// state must always be set.
state codersdk.WorkspaceAppStatusState
// summary is optional.
@@ -428,30 +425,7 @@ func (r *RootCmd) mcpServer() *serpent.Command {
*lastReport.messageID >= *report.messageID {
return report, false
}
// If this is not a user message, and the status is "working" and not
// self-reported (meaning it came from the screen watcher), then it
// means one of two things:
//
// 1. The AI agent is not working; the user is interacting with the
// terminal directly.
// 2. The AI agent is working.
//
// At the moment, we have no way to tell the difference between these
// two states. In the future, if we can reliably distinguish between
// user and AI agent activity, we can change this.
//
// If this is our first update, we assume it is the AI agent working and
// accept the update.
//
// Otherwise we discard the update. This risks missing cases where the
// user manually submits a new prompt and the AI agent becomes active
// (and does not update itself), but it avoids spamming useless status
// updates as the user is typing, so the tradeoff is worth it.
if report.messageID == nil &&
report.state == codersdk.WorkspaceAppStatusStateWorking &&
!report.selfReported && lastReport.state != "" {
return report, false
}
// Keep track of the last message ID so we can tell when a message is
// new or if it has been re-emitted.
if report.messageID == nil {
@@ -651,7 +625,7 @@ func (s *mcpServer) startWatcher(ctx context.Context, inv *serpent.Invocation) {
}
}
case err := <-errCh:
if !errors.Is(err, context.Canceled) {
if err != nil && !errors.Is(err, context.Canceled) {
cliui.Warnf(inv.Stderr, "Received error from screen event watcher: %s", err)
}
break loop
@@ -706,10 +680,9 @@ func (s *mcpServer) startServer(ctx context.Context, inv *serpent.Invocation, in
state = codersdk.WorkspaceAppStatusStateWorking
}
return s.queue.Push(taskReport{
link: args.Link,
selfReported: true,
state: state,
summary: args.Summary,
link: args.Link,
state: state,
summary: args.Summary,
})
}),
}

View File

@@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"slices"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
@@ -723,6 +725,248 @@ func TestExpMcpServerOptionalUserToken(t *testing.T) {
<-cmdDone
}
func makeStatusEvent(status agentapi.AgentStatus) *codersdk.ServerSentEvent {
return &codersdk.ServerSentEvent{
Type: ServerSentEventTypeStatusChange,
Data: agentapi.EventStatusChange{
Status: status,
},
}
}
func makeMessageEvent(id int64, role agentapi.ConversationRole) *codersdk.ServerSentEvent {
return &codersdk.ServerSentEvent{
Type: ServerSentEventTypeMessageUpdate,
Data: agentapi.EventMessageUpdate{
Id: id,
Role: role,
},
}
}
// mcpFakeAgentAPI is a reusable fake AgentAPI server. It handles SSE
// on /events, pushing events to the connected subscriber automatically
// when SetStatus or SendMessage is called (matching the real AgentAPI's
// behavior). SetStatus deduplicates consecutive identical statuses.
type mcpFakeAgentAPI struct {
server *httptest.Server
mu sync.Mutex
sender func(codersdk.ServerSentEvent) error // active SSE conn, nil if disconnected
status agentapi.AgentStatus // current status, "" until first SetStatus
connected chan struct{} // closed when SSE connection arrives
disconnect chan struct{}
t testing.TB
}
func newMCPFakeAgentAPI(t testing.TB) *mcpFakeAgentAPI {
t.Helper()
f := &mcpFakeAgentAPI{
connected: make(chan struct{}),
disconnect: make(chan struct{}),
t: t,
}
f.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/events" {
http.NotFound(w, r)
return
}
sseCtx, sseCancel := context.WithCancel(r.Context())
defer sseCancel()
r = r.WithContext(sseCtx)
send, closed, err := httpapi.ServerSentEventSender(w, r)
if err != nil {
httpapi.Write(sseCtx, w, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
Detail: err.Error(),
})
return
}
// Echo initial state on connect, matching the real AgentAPI
// which sends full state (messages + status) to every new
// subscriber before streaming.
_ = send(*makeMessageEvent(0, agentapi.RoleAgent))
f.mu.Lock()
f.sender = send
if f.status != "" {
_ = send(*makeStatusEvent(f.status))
}
disc := f.disconnect
conn := f.connected
f.mu.Unlock()
close(conn)
select {
case <-closed:
case <-disc:
sseCancel()
<-closed
}
f.mu.Lock()
f.sender = nil
f.mu.Unlock()
}))
t.Cleanup(f.Close)
return f
}
func (f *mcpFakeAgentAPI) Close() {
// Force-close active connections so in-flight SSE handlers
// observe r.Context() cancellation and can return. Without
// this, server.Close() blocks forever waiting for handlers
// that are stuck in their event select loop.
f.server.CloseClientConnections()
f.server.Close()
}
// SetStatus stores the status, deduplicates, and pushes to the live
// SSE connection if one exists.
func (f *mcpFakeAgentAPI) SetStatus(status agentapi.AgentStatus) {
f.mu.Lock()
defer f.mu.Unlock()
if f.status == status {
return
}
f.status = status
if f.sender != nil {
_ = f.sender(*makeStatusEvent(status))
}
}
// SendMessage pushes a message event to the live SSE connection.
func (f *mcpFakeAgentAPI) SendMessage(id int64, role agentapi.ConversationRole) {
f.mu.Lock()
defer f.mu.Unlock()
if f.sender != nil {
_ = f.sender(*makeMessageEvent(id, role))
}
}
func (f *mcpFakeAgentAPI) URL() string {
return f.server.URL
}
// WaitForConnection blocks until an SSE connection is established,
// then resets the signal for the next connection.
func (f *mcpFakeAgentAPI) WaitForConnection(ctx context.Context, t testing.TB) {
t.Helper()
select {
case <-f.connected:
case <-ctx.Done():
t.Fatal("timed out waiting for SSE connection")
}
f.mu.Lock()
f.connected = make(chan struct{})
f.mu.Unlock()
}
// Disconnect tears down the active SSE connection. The stored
// status is preserved so it can be echoed on the next connect,
// matching the real AgentAPI where the agent's state persists
// across subscriber reconnections.
func (f *mcpFakeAgentAPI) Disconnect() {
f.mu.Lock()
old := f.disconnect
f.disconnect = make(chan struct{})
f.mu.Unlock()
close(old)
}
// mcpTestFixture encapsulates the repeated deployment, workspace,
// watcher, and MCP server setup used by reporter tests.
type mcpTestFixture struct {
t *testing.T
ctx context.Context
pty *ptytest.PTY
lastAppStatus codersdk.WorkspaceAppStatus
watcher <-chan codersdk.Workspace
}
func newMCPTestFixture(t *testing.T, agentAPIURL string) (context.Context, *mcpTestFixture) {
t.Helper()
client, db := coderdtest.NewWithDatabase(t, nil)
user := coderdtest.CreateFirstUser(t, client)
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
OrganizationID: user.OrganizationID,
OwnerID: user2.ID,
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
a[0].Apps = []*proto.App{{Slug: "vscode"}}
return a
}).Do()
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong))
t.Cleanup(cancel)
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
require.NoError(t, err)
args := []string{
"exp", "mcp", "server",
"--agent-url", client.URL.String(),
"--agent-token", r.AgentToken,
"--app-status-slug", "vscode",
"--allowed-tools=coder_report_task",
}
if agentAPIURL != "" {
args = append(args, "--ai-agentapi-url", agentAPIURL)
}
inv, _ := clitest.New(t, args...)
inv = inv.WithContext(ctx)
pty := ptytest.New(t)
inv.Stdin = pty.Input()
inv.Stdout = pty.Output()
inv.Stderr = io.Discard
clitest.Start(t, inv)
// JSON-RPC initialize handshake.
pty.WriteLine(`{"jsonrpc":"2.0","id":1,"method":"initialize"}`)
_ = pty.ReadLine(ctx) // echo
_ = pty.ReadLine(ctx) // init response
return ctx, &mcpTestFixture{
t: t,
ctx: ctx,
pty: pty,
watcher: watcher,
}
}
func (f *mcpTestFixture) NextUpdate() codersdk.WorkspaceAppStatus {
f.t.Helper()
for {
select {
case <-f.ctx.Done():
require.FailNow(f.t, "timed out waiting for status update")
case w, ok := <-f.watcher:
require.True(f.t, ok, "watch channel closed")
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != f.lastAppStatus.ID {
f.t.Logf("Got status update: %s > %s", f.lastAppStatus.State, w.LatestAppStatus.State)
f.lastAppStatus = *w.LatestAppStatus
return f.lastAppStatus
}
}
}
}
func (f *mcpTestFixture) SendToolCall(state, summary, link string) {
f.t.Helper()
payload := fmt.Sprintf(`{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":%q,"summary":%q,"link":%q}}}`, state, summary, link)
f.pty.WriteLine(payload)
_ = f.pty.ReadLine(f.ctx) // echo
output := f.pty.ReadLine(f.ctx)
require.NotEmpty(f.t, output, "did not receive a response from coder_report_task")
require.True(f.t, json.Valid([]byte(output)), "did not receive valid JSON from coder_report_task")
}
func TestExpMcpReporter(t *testing.T) {
t.Parallel()
@@ -763,34 +1007,21 @@ func TestExpMcpReporter(t *testing.T) {
<-cmdDone
})
makeStatusEvent := func(status agentapi.AgentStatus) *codersdk.ServerSentEvent {
return &codersdk.ServerSentEvent{
Type: ServerSentEventTypeStatusChange,
Data: agentapi.EventStatusChange{
Status: status,
},
}
type testMessage struct {
id int64
role agentapi.ConversationRole
}
makeMessageEvent := func(id int64, role agentapi.ConversationRole) *codersdk.ServerSentEvent {
return &codersdk.ServerSentEvent{
Type: ServerSentEventTypeMessageUpdate,
Data: agentapi.EventMessageUpdate{
Id: id,
Role: role,
},
}
}
type test struct {
// event simulates an event from the screen watcher.
event *codersdk.ServerSentEvent
// state, summary, and uri simulate a tool call from the AI agent.
// Screen watcher simulation (via fake AgentAPI):
setStatus *agentapi.AgentStatus
message *testMessage
// Agent self-report (via tool call):
state codersdk.WorkspaceAppStatusState
summary string
uri string
expected *codersdk.WorkspaceAppStatus
}
ptr := func(s agentapi.AgentStatus) *agentapi.AgentStatus { return &s }
runs := []struct {
name string
@@ -816,7 +1047,7 @@ func TestExpMcpReporter(t *testing.T) {
// Terminal goes quiet but the AI agent forgot the update, and it is
// caught by the screen watcher. Message and URI are preserved.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "doing work",
@@ -826,32 +1057,39 @@ func TestExpMcpReporter(t *testing.T) {
// A stable update now from the watcher should be discarded, as it is a
// duplicate.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
},
// Terminal becomes active again according to the screen watcher, but no
// new user message. This could be the AI agent being active again, but
// it could also be the user messing around. We will prefer not updating
// the status so the "working" update here should be skipped.
//
// TODO: How do we test the no-op updates? This update is skipped
// because of the logic mentioned above, but how do we prove this update
// was skipped because of that and not that the next update was skipped
// because it is a duplicate state? We could mock the queue?
// Terminal becomes active again. With Branch 3 removed, this
// working transition is now accepted.
{
event: makeStatusEvent(agentapi.StatusRunning),
setStatus: ptr(agentapi.StatusRunning),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateWorking,
Message: "doing work",
URI: "https://dev.coder.com",
},
},
// Agent messages are ignored.
{
event: makeMessageEvent(0, agentapi.RoleAgent),
message: &testMessage{id: 0, role: agentapi.RoleAgent},
},
// The watcher reports the screen is active again...
// Duplicate working from watcher should be discarded.
{
event: makeStatusEvent(agentapi.StatusRunning),
setStatus: ptr(agentapi.StatusRunning),
},
// ... but this time we have a new user message so we know there is AI
// agent activity. This time the "working" update will not be skipped.
// Watcher reports stable.
{
event: makeMessageEvent(1, agentapi.RoleUser),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "doing work",
URI: "https://dev.coder.com",
},
},
// A new user message triggers a working update because of
// the new message ID.
{
message: &testMessage{id: 1, role: agentapi.RoleUser},
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateWorking,
Message: "doing work",
@@ -860,7 +1098,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// Watcher reports stable again.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "doing work",
@@ -876,7 +1114,7 @@ func TestExpMcpReporter(t *testing.T) {
// The "working" status from the watcher should be accepted, even though
// there is no new user message, because it is the first update.
{
event: makeStatusEvent(agentapi.StatusRunning),
setStatus: ptr(agentapi.StatusRunning),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateWorking,
Message: "",
@@ -885,7 +1123,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// Stable update should be accepted.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "",
@@ -894,7 +1132,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// Zero ID should be accepted.
{
event: makeMessageEvent(0, agentapi.RoleUser),
message: &testMessage{id: 0, role: agentapi.RoleUser},
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateWorking,
Message: "",
@@ -903,7 +1141,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// Stable again.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "",
@@ -912,7 +1150,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// Next ID.
{
event: makeMessageEvent(1, agentapi.RoleUser),
message: &testMessage{id: 1, role: agentapi.RoleUser},
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateWorking,
Message: "",
@@ -947,7 +1185,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// Once the watcher reports stable, then we record idle.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "finished",
@@ -964,7 +1202,7 @@ func TestExpMcpReporter(t *testing.T) {
},
// After failure, watcher reports stable -> idle.
{
event: makeStatusEvent(agentapi.StatusStable),
setStatus: ptr(agentapi.StatusStable),
expected: &codersdk.WorkspaceAppStatus{
State: codersdk.WorkspaceAppStatusStateIdle,
Message: "something broke",
@@ -1025,273 +1263,139 @@ func TestExpMcpReporter(t *testing.T) {
t.Run(run.name, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitShort))
// Create a test deployment and workspace.
client, db := coderdtest.NewWithDatabase(t, nil)
user := coderdtest.CreateFirstUser(t, client)
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
OrganizationID: user.OrganizationID,
OwnerID: user2.ID,
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
a[0].Apps = []*proto.App{
{
Slug: "vscode",
},
}
return a
}).Do()
// Watch the workspace for changes.
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
require.NoError(t, err)
var lastAppStatus codersdk.WorkspaceAppStatus
nextUpdate := func() codersdk.WorkspaceAppStatus {
for {
select {
case <-ctx.Done():
require.FailNow(t, "timed out waiting for status update")
case w, ok := <-watcher:
require.True(t, ok, "watch channel closed")
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID {
t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State)
lastAppStatus = *w.LatestAppStatus
return lastAppStatus
}
}
}
}
args := []string{
"exp", "mcp", "server",
// We need the agent credentials, AI AgentAPI url (if not
// disabled), and a slug for reporting.
"--agent-url", client.URL.String(),
"--agent-token", r.AgentToken,
"--app-status-slug", "vscode",
"--allowed-tools=coder_report_task",
}
// Mock the AI AgentAPI server.
listening := make(chan func(sse codersdk.ServerSentEvent) error)
var fake *mcpFakeAgentAPI
agentAPIURL := ""
if !run.disableAgentAPI {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
send, closed, err := httpapi.ServerSentEventSender(w, r)
if err != nil {
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
Detail: err.Error(),
})
return
}
// Send initial message.
send(*makeMessageEvent(0, agentapi.RoleAgent))
listening <- send
<-closed
}))
t.Cleanup(srv.Close)
aiAgentAPIURL := srv.URL
args = append(args, "--ai-agentapi-url", aiAgentAPIURL)
fake = newMCPFakeAgentAPI(t)
agentAPIURL = fake.URL()
}
ctx, fixture := newMCPTestFixture(t, agentAPIURL)
if fake != nil {
fake.WaitForConnection(ctx, t)
}
inv, _ := clitest.New(t, args...)
inv = inv.WithContext(ctx)
pty := ptytest.New(t)
inv.Stdin = pty.Input()
inv.Stdout = pty.Output()
stderr := ptytest.New(t)
inv.Stderr = stderr.Output()
// Run the MCP server.
cmdDone := make(chan struct{})
go func() {
defer close(cmdDone)
err := inv.Run()
assert.NoError(t, err)
}()
// Initialize.
payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}`
pty.WriteLine(payload)
_ = pty.ReadLine(ctx) // ignore echo
_ = pty.ReadLine(ctx) // ignore init response
var sender func(sse codersdk.ServerSentEvent) error
if !run.disableAgentAPI {
sender = <-listening
}
for _, test := range run.tests {
if test.event != nil {
err := sender(*test.event)
require.NoError(t, err)
for _, tc := range run.tests {
if tc.setStatus != nil {
fake.SetStatus(*tc.setStatus)
} else if tc.message != nil {
fake.SendMessage(tc.message.id, tc.message.role)
} else {
// Call the tool and ensure it works.
payload := fmt.Sprintf(`{"jsonrpc":"2.0","id":3,"method":"tools/call", "params": {"name": "coder_report_task", "arguments": {"state": %q, "summary": %q, "link": %q}}}`, test.state, test.summary, test.uri)
pty.WriteLine(payload)
_ = pty.ReadLine(ctx) // ignore echo
output := pty.ReadLine(ctx)
require.NotEmpty(t, output, "did not receive a response from coder_report_task")
// Ensure it is valid JSON.
_, err = json.Marshal(output)
require.NoError(t, err, "did not receive valid JSON from coder_report_task")
fixture.SendToolCall(string(tc.state), tc.summary, tc.uri)
}
if test.expected != nil {
got := nextUpdate()
require.Equal(t, got.State, test.expected.State)
require.Equal(t, got.Message, test.expected.Message)
require.Equal(t, got.URI, test.expected.URI)
if tc.expected != nil {
got := fixture.NextUpdate()
require.Equal(t, got.State, tc.expected.State)
require.Equal(t, got.Message, tc.expected.Message)
require.Equal(t, got.URI, tc.expected.URI)
}
}
cancel()
<-cmdDone
})
}
t.Run("Reconnect", func(t *testing.T) {
t.Parallel()
// Create a test deployment and workspace.
client, db := coderdtest.NewWithDatabase(t, nil)
user := coderdtest.CreateFirstUser(t, client)
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
fake := newMCPFakeAgentAPI(t)
ctx, fixture := newMCPTestFixture(t, fake.URL())
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
OrganizationID: user.OrganizationID,
OwnerID: user2.ID,
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
a[0].Apps = []*proto.App{
{
Slug: "vscode",
},
}
return a
}).Do()
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong))
// Watch the workspace for changes.
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
require.NoError(t, err)
var lastAppStatus codersdk.WorkspaceAppStatus
nextUpdate := func() codersdk.WorkspaceAppStatus {
for {
select {
case <-ctx.Done():
require.FailNow(t, "timed out waiting for status update")
case w, ok := <-watcher:
require.True(t, ok, "watch channel closed")
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID {
t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State)
lastAppStatus = *w.LatestAppStatus
return lastAppStatus
}
}
}
}
// Mock AI AgentAPI server that supports disconnect/reconnect.
disconnect := make(chan struct{})
listening := make(chan func(sse codersdk.ServerSentEvent) error)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create a cancelable context so we can stop the SSE sender
// goroutine on disconnect without waiting for the HTTP
// serve loop to cancel r.Context().
sseCtx, sseCancel := context.WithCancel(r.Context())
defer sseCancel()
r = r.WithContext(sseCtx)
send, closed, err := httpapi.ServerSentEventSender(w, r)
if err != nil {
httpapi.Write(sseCtx, w, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
Detail: err.Error(),
})
return
}
// Send initial message so the watcher knows the agent is active.
send(*makeMessageEvent(0, agentapi.RoleAgent))
select {
case listening <- send:
case <-r.Context().Done():
return
}
select {
case <-closed:
case <-disconnect:
sseCancel()
<-closed
}
}))
t.Cleanup(srv.Close)
inv, _ := clitest.New(t,
"exp", "mcp", "server",
"--agent-url", client.URL.String(),
"--agent-token", r.AgentToken,
"--app-status-slug", "vscode",
"--allowed-tools=coder_report_task",
"--ai-agentapi-url", srv.URL,
)
inv = inv.WithContext(ctx)
pty := ptytest.New(t)
inv.Stdin = pty.Input()
inv.Stdout = pty.Output()
stderr := ptytest.New(t)
inv.Stderr = stderr.Output()
// Run the MCP server.
clitest.Start(t, inv)
// Initialize.
payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}`
pty.WriteLine(payload)
_ = pty.ReadLine(ctx) // ignore echo
_ = pty.ReadLine(ctx) // ignore init response
// Get first sender from the initial SSE connection.
sender := testutil.RequireReceive(ctx, t, listening)
fake.WaitForConnection(ctx, t)
// Self-report a working status via tool call.
toolPayload := `{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"doing work","link":""}}}`
pty.WriteLine(toolPayload)
_ = pty.ReadLine(ctx) // ignore echo
_ = pty.ReadLine(ctx) // ignore response
got := nextUpdate()
fixture.SendToolCall("working", "doing work", "")
got := fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
require.Equal(t, "doing work", got.Message)
// Watcher sends stable, verify idle is reported.
err = sender(*makeStatusEvent(agentapi.StatusStable))
require.NoError(t, err)
got = nextUpdate()
fake.SetStatus(agentapi.StatusStable)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
// Disconnect the SSE connection by signaling the handler to return.
testutil.RequireSend(ctx, t, disconnect, struct{}{})
fake.Disconnect()
// Wait for the watcher to reconnect and get the new sender.
sender = testutil.RequireReceive(ctx, t, listening)
// Wait for the watcher to reconnect.
fake.WaitForConnection(ctx, t)
// After reconnect, self-report a working status again.
toolPayload = `{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"reconnected","link":""}}}`
pty.WriteLine(toolPayload)
_ = pty.ReadLine(ctx) // ignore echo
_ = pty.ReadLine(ctx) // ignore response
got = nextUpdate()
fixture.SendToolCall("working", "reconnected", "")
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
require.Equal(t, "reconnected", got.Message)
// Verify the watcher still processes events after reconnect.
err = sender(*makeStatusEvent(agentapi.StatusStable))
require.NoError(t, err)
got = nextUpdate()
fake.SetStatus(agentapi.StatusStable)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
})
// ReconnectStatusRecovery verifies that after an SSE reconnection,
// the initial status echo catches up on state changes that occurred
// while disconnected (matching real AgentAPI behavior).
t.Run("ReconnectStatusRecovery", func(t *testing.T) {
t.Parallel()
fake := newMCPFakeAgentAPI(t)
ctx, fixture := newMCPTestFixture(t, fake.URL())
fake.WaitForConnection(ctx, t)
// Self-report working.
fixture.SendToolCall("working", "doing work", "")
got := fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
// Watcher sends stable -> idle.
fake.SetStatus(agentapi.StatusStable)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
cancel()
// Disconnect SSE.
fake.Disconnect()
// While disconnected, the agent started working again. Set the
// fake's status so the next SSE connection echoes it.
fake.SetStatus(agentapi.StatusRunning)
// On reconnect, the fake echoes the current status (running)
// as an initial SSE event, just like the real AgentAPI.
fake.WaitForConnection(ctx, t)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
})
// WorkingTransition verifies that watcher running transitions are
// accepted after a previous idle state (Branch 3 removal).
t.Run("WorkingTransition", func(t *testing.T) {
t.Parallel()
fake := newMCPFakeAgentAPI(t)
ctx, fixture := newMCPTestFixture(t, fake.URL())
fake.WaitForConnection(ctx, t)
// Self-report working.
fixture.SendToolCall("working", "doing work", "")
got := fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
require.Equal(t, "doing work", got.Message)
// Watcher sends stable -> idle, summary inherited.
fake.SetStatus(agentapi.StatusStable)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
require.Equal(t, "doing work", got.Message)
// Watcher sends running -> working. Previously blocked by Branch 3.
fake.SetStatus(agentapi.StatusRunning)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
require.Equal(t, "doing work", got.Message)
// Watcher sends stable -> idle again.
fake.SetStatus(agentapi.StatusStable)
got = fixture.NextUpdate()
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
})
}