Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 44e90aa0ca |
+31
-2
@@ -1495,9 +1495,16 @@ func (api *API) watchChatGit(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
go httpapi.HeartbeatClose(ctx, logger, cancel, clientConn)
|
||||
|
||||
// Proxy agent → client.
|
||||
// Proxy agent → client. When the agent reports a branch or
|
||||
// remote origin that differs from what we last saw, mark the
|
||||
// chat's diff status as stale so the gitsync worker refreshes
|
||||
// PR metadata from the git provider.
|
||||
agentCh := agentStream.Chan()
|
||||
var wg sync.WaitGroup
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
lastSeenBranch string
|
||||
lastSeenOrigin string
|
||||
)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -1512,6 +1519,28 @@ func (api *API) watchChatGit(rw http.ResponseWriter, r *http.Request) {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
if msg.Type == codersdk.WorkspaceAgentGitServerMessageTypeChanges && api.gitSyncWorker != nil {
|
||||
if len(msg.Repositories) > 0 {
|
||||
repo := msg.Repositories[0]
|
||||
branch := strings.TrimSpace(repo.Branch)
|
||||
origin := strings.TrimSpace(repo.RemoteOrigin)
|
||||
if branch != "" && origin != "" && (branch != lastSeenBranch || origin != lastSeenOrigin) {
|
||||
lastSeenBranch = branch
|
||||
lastSeenOrigin = origin
|
||||
// AsChatd is used here because the git watch proxy
|
||||
// runs in the context of the chat daemon, consistent
|
||||
// with the external auth handler's MarkStale calls.
|
||||
//nolint:gocritic // AsChatd matches the authorization used by the gitsync worker.
|
||||
api.gitSyncWorker.MarkStale(dbauthz.AsChatd(ctx), gitsync.MarkStaleParams{
|
||||
ChatID: chat.ID,
|
||||
WorkspaceID: chat.WorkspaceID.UUID,
|
||||
OwnerID: chat.OwnerID,
|
||||
Branch: branch,
|
||||
Origin: origin,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := clientStream.Send(msg); err != nil {
|
||||
logger.Debug(ctx, "failed to forward agent message to client", slog.Error(err))
|
||||
cancel()
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/httpmw"
|
||||
"github.com/coder/coder/v2/coderd/workspaceapps/appurl"
|
||||
"github.com/coder/coder/v2/coderd/x/gitsync"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/codersdk/workspacesdk"
|
||||
"github.com/coder/coder/v2/codersdk/workspacesdk/agentconnmock"
|
||||
@@ -33,6 +34,7 @@ import (
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
"github.com/coder/coder/v2/tailnet/tailnettest"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/quartz"
|
||||
"github.com/coder/websocket"
|
||||
)
|
||||
|
||||
@@ -431,6 +433,207 @@ func TestWatchChatGit(t *testing.T) {
|
||||
agentSrv.Close()
|
||||
})
|
||||
|
||||
t.Run("AgentChangesMarkDiffStale", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// This test ensures that when the agent sends a changes
|
||||
// message with branch and remote origin, the proxy calls
|
||||
// MarkStale to keep the ChatDiffStatus up to date.
|
||||
|
||||
var (
|
||||
ctx = testutil.Context(t, testutil.WaitLong)
|
||||
logger = slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug).Named("coderd")
|
||||
|
||||
mCtrl = gomock.NewController(t)
|
||||
mDB = dbmock.NewMockStore(mCtrl)
|
||||
mCoordinator = tailnettest.NewMockCoordinator(mCtrl)
|
||||
mAgentConn = agentconnmock.NewMockAgentConn(mCtrl)
|
||||
|
||||
chatID = uuid.New()
|
||||
ownerID = uuid.New()
|
||||
workspaceID = uuid.New()
|
||||
agentID = uuid.New()
|
||||
resourceID = uuid.New()
|
||||
|
||||
r = chi.NewMux()
|
||||
|
||||
fAgentProvider = fakeAgentProvider{
|
||||
agentConn: func(ctx context.Context, aID uuid.UUID) (_ workspacesdk.AgentConn, release func(), _ error) {
|
||||
return mAgentConn, func() {}, nil
|
||||
},
|
||||
}
|
||||
|
||||
api = API{
|
||||
ctx: ctx,
|
||||
Options: &Options{
|
||||
AgentInactiveDisconnectTimeout: testutil.WaitShort,
|
||||
Database: mDB,
|
||||
Logger: logger,
|
||||
DeploymentValues: &codersdk.DeploymentValues{},
|
||||
TailnetCoordinator: tailnettest.NewFakeCoordinator(),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
var tailnetCoordinator tailnet.Coordinator = mCoordinator
|
||||
api.TailnetCoordinator.Store(&tailnetCoordinator)
|
||||
api.agentProvider = fAgentProvider
|
||||
|
||||
// Setup the gitsync worker backed by the same mock DB.
|
||||
// We don't start it; MarkStale calls store methods
|
||||
// directly without requiring the background loop.
|
||||
api.gitSyncWorker = gitsync.NewWorker(
|
||||
mDB, nil,
|
||||
func(_ context.Context, _ uuid.UUID) error { return nil },
|
||||
quartz.NewReal(),
|
||||
logger.Named("gitsync"),
|
||||
)
|
||||
|
||||
// Setup: Create a fake agent-side websocket server.
|
||||
agentDone := make(chan struct{})
|
||||
closeAgentDone := sync.OnceFunc(func() { close(agentDone) })
|
||||
t.Cleanup(closeAgentDone)
|
||||
agentStreamReady := make(chan *wsjson.Stream[
|
||||
codersdk.WorkspaceAgentGitClientMessage,
|
||||
codersdk.WorkspaceAgentGitServerMessage,
|
||||
], 1)
|
||||
agentSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ws, err := websocket.Accept(w, r, nil)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
s := wsjson.NewStream[
|
||||
codersdk.WorkspaceAgentGitClientMessage,
|
||||
codersdk.WorkspaceAgentGitServerMessage,
|
||||
](ws, websocket.MessageText, websocket.MessageText, logger)
|
||||
agentStreamReady <- s
|
||||
<-agentDone
|
||||
}))
|
||||
defer agentSrv.Close()
|
||||
|
||||
// And: Return a chat with a valid workspace ID and owner.
|
||||
mDB.EXPECT().GetChatByID(gomock.Any(), chatID).Return(database.Chat{
|
||||
ID: chatID,
|
||||
OwnerID: ownerID,
|
||||
WorkspaceID: uuid.NullUUID{UUID: workspaceID, Valid: true},
|
||||
}, nil)
|
||||
|
||||
// And: Return a connected agent.
|
||||
mDB.EXPECT().GetWorkspaceAgentsInLatestBuildByWorkspaceID(gomock.Any(), workspaceID).
|
||||
Return([]database.WorkspaceAgent{{
|
||||
ID: agentID,
|
||||
ResourceID: resourceID,
|
||||
LifecycleState: database.WorkspaceAgentLifecycleStateReady,
|
||||
FirstConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()},
|
||||
LastConnectedAt: sql.NullTime{Valid: true, Time: dbtime.Now()},
|
||||
}}, nil)
|
||||
|
||||
// And: Allow db2sdk.WorkspaceAgent to complete.
|
||||
mCoordinator.EXPECT().Node(gomock.Any()).Return(nil)
|
||||
|
||||
// And: WatchGit dials our fake agent server.
|
||||
mAgentConn.EXPECT().WatchGit(gomock.Any(), gomock.Any(), chatID).
|
||||
DoAndReturn(func(ctx context.Context, _ slog.Logger, _ uuid.UUID) (*wsjson.Stream[codersdk.WorkspaceAgentGitServerMessage, codersdk.WorkspaceAgentGitClientMessage], error) {
|
||||
agentURL := strings.Replace(agentSrv.URL, "http://", "ws://", 1)
|
||||
conn, resp, err := websocket.Dial(ctx, agentURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp != nil && resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
s := wsjson.NewStream[
|
||||
codersdk.WorkspaceAgentGitServerMessage,
|
||||
codersdk.WorkspaceAgentGitClientMessage,
|
||||
](conn, websocket.MessageText, websocket.MessageText, logger)
|
||||
return s, nil
|
||||
})
|
||||
|
||||
// Expect MarkStale to call UpsertChatDiffStatusReference
|
||||
// with the branch/origin from the agent's changes message.
|
||||
upsertCalled := make(chan database.UpsertChatDiffStatusReferenceParams, 1)
|
||||
mDB.EXPECT().UpsertChatDiffStatusReference(gomock.Any(), gomock.Any()).
|
||||
DoAndReturn(func(_ context.Context, arg database.UpsertChatDiffStatusReferenceParams) (database.ChatDiffStatus, error) {
|
||||
upsertCalled <- arg
|
||||
return database.ChatDiffStatus{ChatID: chatID}, nil
|
||||
})
|
||||
|
||||
// And: We mount the HTTP handler.
|
||||
r.With(httpmw.ExtractChatParam(mDB)).
|
||||
Get("/chats/{chat}/stream/git", api.watchChatGit)
|
||||
|
||||
// Given: We create the HTTP server.
|
||||
coderdSrv := httptest.NewServer(r)
|
||||
defer coderdSrv.Close()
|
||||
|
||||
// And: Dial the WebSocket as a client.
|
||||
wsURL := strings.Replace(coderdSrv.URL, "http://", "ws://", 1)
|
||||
clientConn, resp, err := websocket.Dial(ctx, fmt.Sprintf("%s/chats/%s/stream/git", wsURL, chatID), nil)
|
||||
require.NoError(t, err)
|
||||
if resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
|
||||
clientStream := wsjson.NewStream[
|
||||
codersdk.WorkspaceAgentGitServerMessage,
|
||||
codersdk.WorkspaceAgentGitClientMessage,
|
||||
](clientConn, websocket.MessageText, websocket.MessageText, logger)
|
||||
clientCh := clientStream.Chan()
|
||||
|
||||
// And: Wait for the agent stream to be ready.
|
||||
agentStream := testutil.RequireReceive(ctx, t, agentStreamReady)
|
||||
|
||||
// When: The agent sends a changes message with branch
|
||||
// and remote origin.
|
||||
err = agentStream.Send(codersdk.WorkspaceAgentGitServerMessage{
|
||||
Type: codersdk.WorkspaceAgentGitServerMessageTypeChanges,
|
||||
Repositories: []codersdk.WorkspaceAgentRepoChanges{{
|
||||
RepoRoot: "/home/coder/project",
|
||||
Branch: "feature-branch",
|
||||
RemoteOrigin: "https://github.com/org/repo.git",
|
||||
}},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Then: The message is forwarded to the client.
|
||||
serverMsg := testutil.RequireReceive(ctx, t, clientCh)
|
||||
require.Equal(t, codersdk.WorkspaceAgentGitServerMessageTypeChanges, serverMsg.Type)
|
||||
require.Len(t, serverMsg.Repositories, 1)
|
||||
require.Equal(t, "feature-branch", serverMsg.Repositories[0].Branch)
|
||||
|
||||
// And: MarkStale was called via UpsertChatDiffStatusReference.
|
||||
arg := testutil.RequireReceive(ctx, t, upsertCalled)
|
||||
require.Equal(t, chatID, arg.ChatID)
|
||||
require.Equal(t, "feature-branch", arg.GitBranch)
|
||||
require.Equal(t, "https://github.com/org/repo.git", arg.GitRemoteOrigin)
|
||||
|
||||
// When: The agent sends the same branch/origin again,
|
||||
// MarkStale should NOT be called a second time.
|
||||
err = agentStream.Send(codersdk.WorkspaceAgentGitServerMessage{
|
||||
Type: codersdk.WorkspaceAgentGitServerMessageTypeChanges,
|
||||
Repositories: []codersdk.WorkspaceAgentRepoChanges{{
|
||||
RepoRoot: "/home/coder/project",
|
||||
Branch: "feature-branch",
|
||||
RemoteOrigin: "https://github.com/org/repo.git",
|
||||
}},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Then: The message is still forwarded to the client.
|
||||
serverMsg = testutil.RequireReceive(ctx, t, clientCh)
|
||||
require.Equal(t, codersdk.WorkspaceAgentGitServerMessageTypeChanges, serverMsg.Type)
|
||||
|
||||
// And: No additional UpsertChatDiffStatusReference call
|
||||
// was made (the mock would fail if called more than once).
|
||||
|
||||
// Cleanup.
|
||||
_ = clientStream.Close(websocket.StatusNormalClosure)
|
||||
closeAgentDone()
|
||||
coderdSrv.Close()
|
||||
agentSrv.Close()
|
||||
})
|
||||
|
||||
t.Run("ClientDisconnectTearsDown", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user