|
|
|
|
@@ -4,12 +4,14 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"net/http"
|
|
|
|
|
"net/http/httptest"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"runtime"
|
|
|
|
|
"slices"
|
|
|
|
|
"sync"
|
|
|
|
|
"testing"
|
|
|
|
|
|
|
|
|
|
"github.com/google/go-cmp/cmp"
|
|
|
|
|
@@ -723,6 +725,248 @@ func TestExpMcpServerOptionalUserToken(t *testing.T) {
|
|
|
|
|
<-cmdDone
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func makeStatusEvent(status agentapi.AgentStatus) *codersdk.ServerSentEvent {
|
|
|
|
|
return &codersdk.ServerSentEvent{
|
|
|
|
|
Type: ServerSentEventTypeStatusChange,
|
|
|
|
|
Data: agentapi.EventStatusChange{
|
|
|
|
|
Status: status,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func makeMessageEvent(id int64, role agentapi.ConversationRole) *codersdk.ServerSentEvent {
|
|
|
|
|
return &codersdk.ServerSentEvent{
|
|
|
|
|
Type: ServerSentEventTypeMessageUpdate,
|
|
|
|
|
Data: agentapi.EventMessageUpdate{
|
|
|
|
|
Id: id,
|
|
|
|
|
Role: role,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// mcpFakeAgentAPI is a reusable fake AgentAPI server. It handles SSE
|
|
|
|
|
// on /events, pushing events to the connected subscriber automatically
|
|
|
|
|
// when SetStatus or SendMessage is called (matching the real AgentAPI's
|
|
|
|
|
// behavior). SetStatus deduplicates consecutive identical statuses.
|
|
|
|
|
type mcpFakeAgentAPI struct {
|
|
|
|
|
server *httptest.Server
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
sender func(codersdk.ServerSentEvent) error // active SSE conn, nil if disconnected
|
|
|
|
|
status agentapi.AgentStatus // current status, "" until first SetStatus
|
|
|
|
|
connected chan struct{} // closed when SSE connection arrives
|
|
|
|
|
disconnect chan struct{}
|
|
|
|
|
t testing.TB
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newMCPFakeAgentAPI(t testing.TB) *mcpFakeAgentAPI {
|
|
|
|
|
t.Helper()
|
|
|
|
|
f := &mcpFakeAgentAPI{
|
|
|
|
|
connected: make(chan struct{}),
|
|
|
|
|
disconnect: make(chan struct{}),
|
|
|
|
|
t: t,
|
|
|
|
|
}
|
|
|
|
|
f.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
if r.URL.Path != "/events" {
|
|
|
|
|
http.NotFound(w, r)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
sseCtx, sseCancel := context.WithCancel(r.Context())
|
|
|
|
|
defer sseCancel()
|
|
|
|
|
r = r.WithContext(sseCtx)
|
|
|
|
|
|
|
|
|
|
send, closed, err := httpapi.ServerSentEventSender(w, r)
|
|
|
|
|
if err != nil {
|
|
|
|
|
httpapi.Write(sseCtx, w, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
|
Message: "Internal error setting up server-sent events.",
|
|
|
|
|
Detail: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Echo initial state on connect, matching the real AgentAPI
|
|
|
|
|
// which sends full state (messages + status) to every new
|
|
|
|
|
// subscriber before streaming.
|
|
|
|
|
_ = send(*makeMessageEvent(0, agentapi.RoleAgent))
|
|
|
|
|
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
f.sender = send
|
|
|
|
|
if f.status != "" {
|
|
|
|
|
_ = send(*makeStatusEvent(f.status))
|
|
|
|
|
}
|
|
|
|
|
disc := f.disconnect
|
|
|
|
|
conn := f.connected
|
|
|
|
|
f.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
close(conn)
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-closed:
|
|
|
|
|
case <-disc:
|
|
|
|
|
sseCancel()
|
|
|
|
|
<-closed
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
f.sender = nil
|
|
|
|
|
f.mu.Unlock()
|
|
|
|
|
}))
|
|
|
|
|
t.Cleanup(f.Close)
|
|
|
|
|
return f
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *mcpFakeAgentAPI) Close() {
|
|
|
|
|
// Force-close active connections so in-flight SSE handlers
|
|
|
|
|
// observe r.Context() cancellation and can return. Without
|
|
|
|
|
// this, server.Close() blocks forever waiting for handlers
|
|
|
|
|
// that are stuck in their event select loop.
|
|
|
|
|
f.server.CloseClientConnections()
|
|
|
|
|
f.server.Close()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SetStatus stores the status, deduplicates, and pushes to the live
|
|
|
|
|
// SSE connection if one exists.
|
|
|
|
|
func (f *mcpFakeAgentAPI) SetStatus(status agentapi.AgentStatus) {
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
if f.status == status {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
f.status = status
|
|
|
|
|
if f.sender != nil {
|
|
|
|
|
_ = f.sender(*makeStatusEvent(status))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SendMessage pushes a message event to the live SSE connection.
|
|
|
|
|
func (f *mcpFakeAgentAPI) SendMessage(id int64, role agentapi.ConversationRole) {
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
if f.sender != nil {
|
|
|
|
|
_ = f.sender(*makeMessageEvent(id, role))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *mcpFakeAgentAPI) URL() string {
|
|
|
|
|
return f.server.URL
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WaitForConnection blocks until an SSE connection is established,
|
|
|
|
|
// then resets the signal for the next connection.
|
|
|
|
|
func (f *mcpFakeAgentAPI) WaitForConnection(ctx context.Context, t testing.TB) {
|
|
|
|
|
t.Helper()
|
|
|
|
|
select {
|
|
|
|
|
case <-f.connected:
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
t.Fatal("timed out waiting for SSE connection")
|
|
|
|
|
}
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
f.connected = make(chan struct{})
|
|
|
|
|
f.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Disconnect tears down the active SSE connection. The stored
|
|
|
|
|
// status is preserved so it can be echoed on the next connect,
|
|
|
|
|
// matching the real AgentAPI where the agent's state persists
|
|
|
|
|
// across subscriber reconnections.
|
|
|
|
|
func (f *mcpFakeAgentAPI) Disconnect() {
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
old := f.disconnect
|
|
|
|
|
f.disconnect = make(chan struct{})
|
|
|
|
|
f.mu.Unlock()
|
|
|
|
|
close(old)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// mcpTestFixture encapsulates the repeated deployment, workspace,
|
|
|
|
|
// watcher, and MCP server setup used by reporter tests.
|
|
|
|
|
type mcpTestFixture struct {
|
|
|
|
|
t *testing.T
|
|
|
|
|
ctx context.Context
|
|
|
|
|
pty *ptytest.PTY
|
|
|
|
|
lastAppStatus codersdk.WorkspaceAppStatus
|
|
|
|
|
watcher <-chan codersdk.Workspace
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newMCPTestFixture(t *testing.T, agentAPIURL string) (context.Context, *mcpTestFixture) {
|
|
|
|
|
t.Helper()
|
|
|
|
|
|
|
|
|
|
client, db := coderdtest.NewWithDatabase(t, nil)
|
|
|
|
|
user := coderdtest.CreateFirstUser(t, client)
|
|
|
|
|
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
|
|
|
|
|
|
|
|
|
|
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
|
|
|
|
OrganizationID: user.OrganizationID,
|
|
|
|
|
OwnerID: user2.ID,
|
|
|
|
|
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
|
|
|
|
|
a[0].Apps = []*proto.App{{Slug: "vscode"}}
|
|
|
|
|
return a
|
|
|
|
|
}).Do()
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong))
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
args := []string{
|
|
|
|
|
"exp", "mcp", "server",
|
|
|
|
|
"--agent-url", client.URL.String(),
|
|
|
|
|
"--agent-token", r.AgentToken,
|
|
|
|
|
"--app-status-slug", "vscode",
|
|
|
|
|
"--allowed-tools=coder_report_task",
|
|
|
|
|
}
|
|
|
|
|
if agentAPIURL != "" {
|
|
|
|
|
args = append(args, "--ai-agentapi-url", agentAPIURL)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inv, _ := clitest.New(t, args...)
|
|
|
|
|
inv = inv.WithContext(ctx)
|
|
|
|
|
|
|
|
|
|
pty := ptytest.New(t)
|
|
|
|
|
inv.Stdin = pty.Input()
|
|
|
|
|
inv.Stdout = pty.Output()
|
|
|
|
|
inv.Stderr = io.Discard
|
|
|
|
|
|
|
|
|
|
clitest.Start(t, inv)
|
|
|
|
|
|
|
|
|
|
// JSON-RPC initialize handshake.
|
|
|
|
|
pty.WriteLine(`{"jsonrpc":"2.0","id":1,"method":"initialize"}`)
|
|
|
|
|
_ = pty.ReadLine(ctx) // echo
|
|
|
|
|
_ = pty.ReadLine(ctx) // init response
|
|
|
|
|
|
|
|
|
|
return ctx, &mcpTestFixture{
|
|
|
|
|
t: t,
|
|
|
|
|
ctx: ctx,
|
|
|
|
|
pty: pty,
|
|
|
|
|
watcher: watcher,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *mcpTestFixture) NextUpdate() codersdk.WorkspaceAppStatus {
|
|
|
|
|
f.t.Helper()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-f.ctx.Done():
|
|
|
|
|
require.FailNow(f.t, "timed out waiting for status update")
|
|
|
|
|
case w, ok := <-f.watcher:
|
|
|
|
|
require.True(f.t, ok, "watch channel closed")
|
|
|
|
|
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != f.lastAppStatus.ID {
|
|
|
|
|
f.t.Logf("Got status update: %s > %s", f.lastAppStatus.State, w.LatestAppStatus.State)
|
|
|
|
|
f.lastAppStatus = *w.LatestAppStatus
|
|
|
|
|
return f.lastAppStatus
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *mcpTestFixture) SendToolCall(state, summary, link string) {
|
|
|
|
|
f.t.Helper()
|
|
|
|
|
payload := fmt.Sprintf(`{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":%q,"summary":%q,"link":%q}}}`, state, summary, link)
|
|
|
|
|
f.pty.WriteLine(payload)
|
|
|
|
|
_ = f.pty.ReadLine(f.ctx) // echo
|
|
|
|
|
output := f.pty.ReadLine(f.ctx)
|
|
|
|
|
require.NotEmpty(f.t, output, "did not receive a response from coder_report_task")
|
|
|
|
|
require.True(f.t, json.Valid([]byte(output)), "did not receive valid JSON from coder_report_task")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
@@ -763,34 +1007,21 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
<-cmdDone
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
makeStatusEvent := func(status agentapi.AgentStatus) *codersdk.ServerSentEvent {
|
|
|
|
|
return &codersdk.ServerSentEvent{
|
|
|
|
|
Type: ServerSentEventTypeStatusChange,
|
|
|
|
|
Data: agentapi.EventStatusChange{
|
|
|
|
|
Status: status,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
type testMessage struct {
|
|
|
|
|
id int64
|
|
|
|
|
role agentapi.ConversationRole
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
makeMessageEvent := func(id int64, role agentapi.ConversationRole) *codersdk.ServerSentEvent {
|
|
|
|
|
return &codersdk.ServerSentEvent{
|
|
|
|
|
Type: ServerSentEventTypeMessageUpdate,
|
|
|
|
|
Data: agentapi.EventMessageUpdate{
|
|
|
|
|
Id: id,
|
|
|
|
|
Role: role,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type test struct {
|
|
|
|
|
// event simulates an event from the screen watcher.
|
|
|
|
|
event *codersdk.ServerSentEvent
|
|
|
|
|
// state, summary, and uri simulate a tool call from the AI agent.
|
|
|
|
|
// Screen watcher simulation (via fake AgentAPI):
|
|
|
|
|
setStatus *agentapi.AgentStatus
|
|
|
|
|
message *testMessage
|
|
|
|
|
// Agent self-report (via tool call):
|
|
|
|
|
state codersdk.WorkspaceAppStatusState
|
|
|
|
|
summary string
|
|
|
|
|
uri string
|
|
|
|
|
expected *codersdk.WorkspaceAppStatus
|
|
|
|
|
}
|
|
|
|
|
ptr := func(s agentapi.AgentStatus) *agentapi.AgentStatus { return &s }
|
|
|
|
|
|
|
|
|
|
runs := []struct {
|
|
|
|
|
name string
|
|
|
|
|
@@ -816,7 +1047,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
// Terminal goes quiet but the AI agent forgot the update, and it is
|
|
|
|
|
// caught by the screen watcher. Message and URI are preserved.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "doing work",
|
|
|
|
|
@@ -826,32 +1057,39 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
// A stable update now from the watcher should be discarded, as it is a
|
|
|
|
|
// duplicate.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
},
|
|
|
|
|
// Terminal becomes active again according to the screen watcher, but no
|
|
|
|
|
// new user message. This could be the AI agent being active again, but
|
|
|
|
|
// it could also be the user messing around. We will prefer not updating
|
|
|
|
|
// the status so the "working" update here should be skipped.
|
|
|
|
|
//
|
|
|
|
|
// TODO: How do we test the no-op updates? This update is skipped
|
|
|
|
|
// because of the logic mentioned above, but how do we prove this update
|
|
|
|
|
// was skipped because of that and not that the next update was skipped
|
|
|
|
|
// because it is a duplicate state? We could mock the queue?
|
|
|
|
|
// Terminal becomes active again. With Branch 3 removed, this
|
|
|
|
|
// working transition is now accepted.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusRunning),
|
|
|
|
|
setStatus: ptr(agentapi.StatusRunning),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateWorking,
|
|
|
|
|
Message: "doing work",
|
|
|
|
|
URI: "https://dev.coder.com",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
// Agent messages are ignored.
|
|
|
|
|
{
|
|
|
|
|
event: makeMessageEvent(0, agentapi.RoleAgent),
|
|
|
|
|
message: &testMessage{id: 0, role: agentapi.RoleAgent},
|
|
|
|
|
},
|
|
|
|
|
// The watcher reports the screen is active again...
|
|
|
|
|
// Duplicate working from watcher should be discarded.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusRunning),
|
|
|
|
|
setStatus: ptr(agentapi.StatusRunning),
|
|
|
|
|
},
|
|
|
|
|
// ... but this time we have a new user message so we know there is AI
|
|
|
|
|
// agent activity. This time the "working" update will not be skipped.
|
|
|
|
|
// Watcher reports stable.
|
|
|
|
|
{
|
|
|
|
|
event: makeMessageEvent(1, agentapi.RoleUser),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "doing work",
|
|
|
|
|
URI: "https://dev.coder.com",
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
// A new user message triggers a working update because of
|
|
|
|
|
// the new message ID.
|
|
|
|
|
{
|
|
|
|
|
message: &testMessage{id: 1, role: agentapi.RoleUser},
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateWorking,
|
|
|
|
|
Message: "doing work",
|
|
|
|
|
@@ -860,7 +1098,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// Watcher reports stable again.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "doing work",
|
|
|
|
|
@@ -876,7 +1114,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
// The "working" status from the watcher should be accepted, even though
|
|
|
|
|
// there is no new user message, because it is the first update.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusRunning),
|
|
|
|
|
setStatus: ptr(agentapi.StatusRunning),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateWorking,
|
|
|
|
|
Message: "",
|
|
|
|
|
@@ -885,7 +1123,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// Stable update should be accepted.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "",
|
|
|
|
|
@@ -894,7 +1132,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// Zero ID should be accepted.
|
|
|
|
|
{
|
|
|
|
|
event: makeMessageEvent(0, agentapi.RoleUser),
|
|
|
|
|
message: &testMessage{id: 0, role: agentapi.RoleUser},
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateWorking,
|
|
|
|
|
Message: "",
|
|
|
|
|
@@ -903,7 +1141,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// Stable again.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "",
|
|
|
|
|
@@ -912,7 +1150,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// Next ID.
|
|
|
|
|
{
|
|
|
|
|
event: makeMessageEvent(1, agentapi.RoleUser),
|
|
|
|
|
message: &testMessage{id: 1, role: agentapi.RoleUser},
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateWorking,
|
|
|
|
|
Message: "",
|
|
|
|
|
@@ -947,7 +1185,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// Once the watcher reports stable, then we record idle.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "finished",
|
|
|
|
|
@@ -964,7 +1202,7 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
// After failure, watcher reports stable -> idle.
|
|
|
|
|
{
|
|
|
|
|
event: makeStatusEvent(agentapi.StatusStable),
|
|
|
|
|
setStatus: ptr(agentapi.StatusStable),
|
|
|
|
|
expected: &codersdk.WorkspaceAppStatus{
|
|
|
|
|
State: codersdk.WorkspaceAppStatusStateIdle,
|
|
|
|
|
Message: "something broke",
|
|
|
|
|
@@ -1025,273 +1263,139 @@ func TestExpMcpReporter(t *testing.T) {
|
|
|
|
|
t.Run(run.name, func(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitShort))
|
|
|
|
|
|
|
|
|
|
// Create a test deployment and workspace.
|
|
|
|
|
client, db := coderdtest.NewWithDatabase(t, nil)
|
|
|
|
|
user := coderdtest.CreateFirstUser(t, client)
|
|
|
|
|
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
|
|
|
|
|
|
|
|
|
|
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
|
|
|
|
OrganizationID: user.OrganizationID,
|
|
|
|
|
OwnerID: user2.ID,
|
|
|
|
|
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
|
|
|
|
|
a[0].Apps = []*proto.App{
|
|
|
|
|
{
|
|
|
|
|
Slug: "vscode",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
return a
|
|
|
|
|
}).Do()
|
|
|
|
|
|
|
|
|
|
// Watch the workspace for changes.
|
|
|
|
|
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
var lastAppStatus codersdk.WorkspaceAppStatus
|
|
|
|
|
nextUpdate := func() codersdk.WorkspaceAppStatus {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
require.FailNow(t, "timed out waiting for status update")
|
|
|
|
|
case w, ok := <-watcher:
|
|
|
|
|
require.True(t, ok, "watch channel closed")
|
|
|
|
|
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID {
|
|
|
|
|
t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State)
|
|
|
|
|
lastAppStatus = *w.LatestAppStatus
|
|
|
|
|
return lastAppStatus
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
args := []string{
|
|
|
|
|
"exp", "mcp", "server",
|
|
|
|
|
// We need the agent credentials, AI AgentAPI url (if not
|
|
|
|
|
// disabled), and a slug for reporting.
|
|
|
|
|
"--agent-url", client.URL.String(),
|
|
|
|
|
"--agent-token", r.AgentToken,
|
|
|
|
|
"--app-status-slug", "vscode",
|
|
|
|
|
"--allowed-tools=coder_report_task",
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mock the AI AgentAPI server.
|
|
|
|
|
listening := make(chan func(sse codersdk.ServerSentEvent) error)
|
|
|
|
|
var fake *mcpFakeAgentAPI
|
|
|
|
|
agentAPIURL := ""
|
|
|
|
|
if !run.disableAgentAPI {
|
|
|
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
send, closed, err := httpapi.ServerSentEventSender(w, r)
|
|
|
|
|
if err != nil {
|
|
|
|
|
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
|
Message: "Internal error setting up server-sent events.",
|
|
|
|
|
Detail: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Send initial message.
|
|
|
|
|
send(*makeMessageEvent(0, agentapi.RoleAgent))
|
|
|
|
|
listening <- send
|
|
|
|
|
<-closed
|
|
|
|
|
}))
|
|
|
|
|
t.Cleanup(srv.Close)
|
|
|
|
|
aiAgentAPIURL := srv.URL
|
|
|
|
|
args = append(args, "--ai-agentapi-url", aiAgentAPIURL)
|
|
|
|
|
fake = newMCPFakeAgentAPI(t)
|
|
|
|
|
agentAPIURL = fake.URL()
|
|
|
|
|
}
|
|
|
|
|
ctx, fixture := newMCPTestFixture(t, agentAPIURL)
|
|
|
|
|
|
|
|
|
|
if fake != nil {
|
|
|
|
|
fake.WaitForConnection(ctx, t)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inv, _ := clitest.New(t, args...)
|
|
|
|
|
inv = inv.WithContext(ctx)
|
|
|
|
|
|
|
|
|
|
pty := ptytest.New(t)
|
|
|
|
|
inv.Stdin = pty.Input()
|
|
|
|
|
inv.Stdout = pty.Output()
|
|
|
|
|
stderr := ptytest.New(t)
|
|
|
|
|
inv.Stderr = stderr.Output()
|
|
|
|
|
|
|
|
|
|
// Run the MCP server.
|
|
|
|
|
cmdDone := make(chan struct{})
|
|
|
|
|
go func() {
|
|
|
|
|
defer close(cmdDone)
|
|
|
|
|
err := inv.Run()
|
|
|
|
|
assert.NoError(t, err)
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Initialize.
|
|
|
|
|
payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}`
|
|
|
|
|
pty.WriteLine(payload)
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore echo
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore init response
|
|
|
|
|
|
|
|
|
|
var sender func(sse codersdk.ServerSentEvent) error
|
|
|
|
|
if !run.disableAgentAPI {
|
|
|
|
|
sender = <-listening
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, test := range run.tests {
|
|
|
|
|
if test.event != nil {
|
|
|
|
|
err := sender(*test.event)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
for _, tc := range run.tests {
|
|
|
|
|
if tc.setStatus != nil {
|
|
|
|
|
fake.SetStatus(*tc.setStatus)
|
|
|
|
|
} else if tc.message != nil {
|
|
|
|
|
fake.SendMessage(tc.message.id, tc.message.role)
|
|
|
|
|
} else {
|
|
|
|
|
// Call the tool and ensure it works.
|
|
|
|
|
payload := fmt.Sprintf(`{"jsonrpc":"2.0","id":3,"method":"tools/call", "params": {"name": "coder_report_task", "arguments": {"state": %q, "summary": %q, "link": %q}}}`, test.state, test.summary, test.uri)
|
|
|
|
|
pty.WriteLine(payload)
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore echo
|
|
|
|
|
output := pty.ReadLine(ctx)
|
|
|
|
|
require.NotEmpty(t, output, "did not receive a response from coder_report_task")
|
|
|
|
|
// Ensure it is valid JSON.
|
|
|
|
|
_, err = json.Marshal(output)
|
|
|
|
|
require.NoError(t, err, "did not receive valid JSON from coder_report_task")
|
|
|
|
|
fixture.SendToolCall(string(tc.state), tc.summary, tc.uri)
|
|
|
|
|
}
|
|
|
|
|
if test.expected != nil {
|
|
|
|
|
got := nextUpdate()
|
|
|
|
|
require.Equal(t, got.State, test.expected.State)
|
|
|
|
|
require.Equal(t, got.Message, test.expected.Message)
|
|
|
|
|
require.Equal(t, got.URI, test.expected.URI)
|
|
|
|
|
if tc.expected != nil {
|
|
|
|
|
got := fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, got.State, tc.expected.State)
|
|
|
|
|
require.Equal(t, got.Message, tc.expected.Message)
|
|
|
|
|
require.Equal(t, got.URI, tc.expected.URI)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
cancel()
|
|
|
|
|
<-cmdDone
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t.Run("Reconnect", func(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
// Create a test deployment and workspace.
|
|
|
|
|
client, db := coderdtest.NewWithDatabase(t, nil)
|
|
|
|
|
user := coderdtest.CreateFirstUser(t, client)
|
|
|
|
|
client, user2 := coderdtest.CreateAnotherUser(t, client, user.OrganizationID)
|
|
|
|
|
fake := newMCPFakeAgentAPI(t)
|
|
|
|
|
ctx, fixture := newMCPTestFixture(t, fake.URL())
|
|
|
|
|
|
|
|
|
|
r := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
|
|
|
|
OrganizationID: user.OrganizationID,
|
|
|
|
|
OwnerID: user2.ID,
|
|
|
|
|
}).WithAgent(func(a []*proto.Agent) []*proto.Agent {
|
|
|
|
|
a[0].Apps = []*proto.App{
|
|
|
|
|
{
|
|
|
|
|
Slug: "vscode",
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
return a
|
|
|
|
|
}).Do()
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(testutil.Context(t, testutil.WaitLong))
|
|
|
|
|
|
|
|
|
|
// Watch the workspace for changes.
|
|
|
|
|
watcher, err := client.WatchWorkspace(ctx, r.Workspace.ID)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
var lastAppStatus codersdk.WorkspaceAppStatus
|
|
|
|
|
nextUpdate := func() codersdk.WorkspaceAppStatus {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
require.FailNow(t, "timed out waiting for status update")
|
|
|
|
|
case w, ok := <-watcher:
|
|
|
|
|
require.True(t, ok, "watch channel closed")
|
|
|
|
|
if w.LatestAppStatus != nil && w.LatestAppStatus.ID != lastAppStatus.ID {
|
|
|
|
|
t.Logf("Got status update: %s > %s", lastAppStatus.State, w.LatestAppStatus.State)
|
|
|
|
|
lastAppStatus = *w.LatestAppStatus
|
|
|
|
|
return lastAppStatus
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mock AI AgentAPI server that supports disconnect/reconnect.
|
|
|
|
|
disconnect := make(chan struct{})
|
|
|
|
|
listening := make(chan func(sse codersdk.ServerSentEvent) error)
|
|
|
|
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
// Create a cancelable context so we can stop the SSE sender
|
|
|
|
|
// goroutine on disconnect without waiting for the HTTP
|
|
|
|
|
// serve loop to cancel r.Context().
|
|
|
|
|
sseCtx, sseCancel := context.WithCancel(r.Context())
|
|
|
|
|
defer sseCancel()
|
|
|
|
|
r = r.WithContext(sseCtx)
|
|
|
|
|
|
|
|
|
|
send, closed, err := httpapi.ServerSentEventSender(w, r)
|
|
|
|
|
if err != nil {
|
|
|
|
|
httpapi.Write(sseCtx, w, http.StatusInternalServerError, codersdk.Response{
|
|
|
|
|
Message: "Internal error setting up server-sent events.",
|
|
|
|
|
Detail: err.Error(),
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Send initial message so the watcher knows the agent is active.
|
|
|
|
|
send(*makeMessageEvent(0, agentapi.RoleAgent))
|
|
|
|
|
select {
|
|
|
|
|
case listening <- send:
|
|
|
|
|
case <-r.Context().Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-closed:
|
|
|
|
|
case <-disconnect:
|
|
|
|
|
sseCancel()
|
|
|
|
|
<-closed
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
t.Cleanup(srv.Close)
|
|
|
|
|
|
|
|
|
|
inv, _ := clitest.New(t,
|
|
|
|
|
"exp", "mcp", "server",
|
|
|
|
|
"--agent-url", client.URL.String(),
|
|
|
|
|
"--agent-token", r.AgentToken,
|
|
|
|
|
"--app-status-slug", "vscode",
|
|
|
|
|
"--allowed-tools=coder_report_task",
|
|
|
|
|
"--ai-agentapi-url", srv.URL,
|
|
|
|
|
)
|
|
|
|
|
inv = inv.WithContext(ctx)
|
|
|
|
|
|
|
|
|
|
pty := ptytest.New(t)
|
|
|
|
|
inv.Stdin = pty.Input()
|
|
|
|
|
inv.Stdout = pty.Output()
|
|
|
|
|
stderr := ptytest.New(t)
|
|
|
|
|
inv.Stderr = stderr.Output()
|
|
|
|
|
|
|
|
|
|
// Run the MCP server.
|
|
|
|
|
clitest.Start(t, inv)
|
|
|
|
|
|
|
|
|
|
// Initialize.
|
|
|
|
|
payload := `{"jsonrpc":"2.0","id":1,"method":"initialize"}`
|
|
|
|
|
pty.WriteLine(payload)
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore echo
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore init response
|
|
|
|
|
|
|
|
|
|
// Get first sender from the initial SSE connection.
|
|
|
|
|
sender := testutil.RequireReceive(ctx, t, listening)
|
|
|
|
|
fake.WaitForConnection(ctx, t)
|
|
|
|
|
|
|
|
|
|
// Self-report a working status via tool call.
|
|
|
|
|
toolPayload := `{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"doing work","link":""}}}`
|
|
|
|
|
pty.WriteLine(toolPayload)
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore echo
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore response
|
|
|
|
|
got := nextUpdate()
|
|
|
|
|
fixture.SendToolCall("working", "doing work", "")
|
|
|
|
|
got := fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
|
|
|
|
require.Equal(t, "doing work", got.Message)
|
|
|
|
|
|
|
|
|
|
// Watcher sends stable, verify idle is reported.
|
|
|
|
|
err = sender(*makeStatusEvent(agentapi.StatusStable))
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
got = nextUpdate()
|
|
|
|
|
fake.SetStatus(agentapi.StatusStable)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
|
|
|
|
|
|
|
|
|
// Disconnect the SSE connection by signaling the handler to return.
|
|
|
|
|
testutil.RequireSend(ctx, t, disconnect, struct{}{})
|
|
|
|
|
fake.Disconnect()
|
|
|
|
|
|
|
|
|
|
// Wait for the watcher to reconnect and get the new sender.
|
|
|
|
|
sender = testutil.RequireReceive(ctx, t, listening)
|
|
|
|
|
// Wait for the watcher to reconnect.
|
|
|
|
|
fake.WaitForConnection(ctx, t)
|
|
|
|
|
|
|
|
|
|
// After reconnect, self-report a working status again.
|
|
|
|
|
toolPayload = `{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"coder_report_task","arguments":{"state":"working","summary":"reconnected","link":""}}}`
|
|
|
|
|
pty.WriteLine(toolPayload)
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore echo
|
|
|
|
|
_ = pty.ReadLine(ctx) // ignore response
|
|
|
|
|
got = nextUpdate()
|
|
|
|
|
fixture.SendToolCall("working", "reconnected", "")
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
|
|
|
|
require.Equal(t, "reconnected", got.Message)
|
|
|
|
|
|
|
|
|
|
// Verify the watcher still processes events after reconnect.
|
|
|
|
|
err = sender(*makeStatusEvent(agentapi.StatusStable))
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
got = nextUpdate()
|
|
|
|
|
fake.SetStatus(agentapi.StatusStable)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// ReconnectStatusRecovery verifies that after an SSE reconnection,
|
|
|
|
|
// the initial status echo catches up on state changes that occurred
|
|
|
|
|
// while disconnected (matching real AgentAPI behavior).
|
|
|
|
|
t.Run("ReconnectStatusRecovery", func(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
fake := newMCPFakeAgentAPI(t)
|
|
|
|
|
ctx, fixture := newMCPTestFixture(t, fake.URL())
|
|
|
|
|
|
|
|
|
|
fake.WaitForConnection(ctx, t)
|
|
|
|
|
|
|
|
|
|
// Self-report working.
|
|
|
|
|
fixture.SendToolCall("working", "doing work", "")
|
|
|
|
|
got := fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
|
|
|
|
|
|
|
|
|
// Watcher sends stable -> idle.
|
|
|
|
|
fake.SetStatus(agentapi.StatusStable)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
|
|
|
|
|
|
|
|
|
cancel()
|
|
|
|
|
// Disconnect SSE.
|
|
|
|
|
fake.Disconnect()
|
|
|
|
|
|
|
|
|
|
// While disconnected, the agent started working again. Set the
|
|
|
|
|
// fake's status so the next SSE connection echoes it.
|
|
|
|
|
fake.SetStatus(agentapi.StatusRunning)
|
|
|
|
|
|
|
|
|
|
// On reconnect, the fake echoes the current status (running)
|
|
|
|
|
// as an initial SSE event, just like the real AgentAPI.
|
|
|
|
|
fake.WaitForConnection(ctx, t)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
// WorkingTransition verifies that watcher running transitions are
|
|
|
|
|
// accepted after a previous idle state (Branch 3 removal).
|
|
|
|
|
t.Run("WorkingTransition", func(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
fake := newMCPFakeAgentAPI(t)
|
|
|
|
|
ctx, fixture := newMCPTestFixture(t, fake.URL())
|
|
|
|
|
|
|
|
|
|
fake.WaitForConnection(ctx, t)
|
|
|
|
|
|
|
|
|
|
// Self-report working.
|
|
|
|
|
fixture.SendToolCall("working", "doing work", "")
|
|
|
|
|
got := fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
|
|
|
|
require.Equal(t, "doing work", got.Message)
|
|
|
|
|
|
|
|
|
|
// Watcher sends stable -> idle, summary inherited.
|
|
|
|
|
fake.SetStatus(agentapi.StatusStable)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
|
|
|
|
require.Equal(t, "doing work", got.Message)
|
|
|
|
|
|
|
|
|
|
// Watcher sends running -> working. Previously blocked by Branch 3.
|
|
|
|
|
fake.SetStatus(agentapi.StatusRunning)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, got.State)
|
|
|
|
|
require.Equal(t, "doing work", got.Message)
|
|
|
|
|
|
|
|
|
|
// Watcher sends stable -> idle again.
|
|
|
|
|
fake.SetStatus(agentapi.StatusStable)
|
|
|
|
|
got = fixture.NextUpdate()
|
|
|
|
|
require.Equal(t, codersdk.WorkspaceAppStatusStateIdle, got.State)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|