feat: add UpdateAppStatus to the workspace agent API (#22219)
<!-- If you have used AI to produce some or all of this PR, please ensure you have read our [AI Contribution guidelines](https://coder.com/docs/about/contributing/AI_CONTRIBUTING) before submitting. --> part of https://github.com/coder/coder/issues/21335 This moves updating app status (used by Tasks) into the workspace agent API over dRPC. This will allow us to update the status without having to re-authenticate each time, like we would with an HTTP PATCH request. Further PRs in this stack will pipe these requests thru from the CLI MCP server to the agentsock and finally to this dRPC call to coderd.
This commit is contained in:
@@ -235,6 +235,10 @@ type FakeAgentAPI struct {
|
||||
pushResourcesMonitoringUsageFunc func(*agentproto.PushResourcesMonitoringUsageRequest) (*agentproto.PushResourcesMonitoringUsageResponse, error)
|
||||
}
|
||||
|
||||
func (*FakeAgentAPI) UpdateAppStatus(context.Context, *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (f *FakeAgentAPI) GetManifest(context.Context, *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
|
||||
return f.manifest, nil
|
||||
}
|
||||
|
||||
+543
-329
File diff suppressed because it is too large
Load Diff
+20
-1
@@ -436,7 +436,7 @@ message CreateSubAgentRequest {
|
||||
}
|
||||
|
||||
repeated DisplayApp display_apps = 6;
|
||||
|
||||
|
||||
optional bytes id = 7;
|
||||
}
|
||||
|
||||
@@ -494,6 +494,24 @@ message ReportBoundaryLogsRequest {
|
||||
|
||||
message ReportBoundaryLogsResponse {}
|
||||
|
||||
// UpdateAppStatusRequest updates the given Workspace App's status. c.f. agentsdk.PatchAppStatus
|
||||
message UpdateAppStatusRequest {
|
||||
string slug = 1;
|
||||
|
||||
enum AppStatusState {
|
||||
WORKING = 0;
|
||||
IDLE = 1;
|
||||
COMPLETE = 2;
|
||||
FAILURE = 3;
|
||||
}
|
||||
AppStatusState state = 2;
|
||||
|
||||
string message = 3;
|
||||
string uri = 4;
|
||||
}
|
||||
|
||||
message UpdateAppStatusResponse {}
|
||||
|
||||
service Agent {
|
||||
rpc GetManifest(GetManifestRequest) returns (Manifest);
|
||||
rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner);
|
||||
@@ -512,4 +530,5 @@ service Agent {
|
||||
rpc DeleteSubAgent(DeleteSubAgentRequest) returns (DeleteSubAgentResponse);
|
||||
rpc ListSubAgents(ListSubAgentsRequest) returns (ListSubAgentsResponse);
|
||||
rpc ReportBoundaryLogs(ReportBoundaryLogsRequest) returns (ReportBoundaryLogsResponse);
|
||||
rpc UpdateAppStatus(UpdateAppStatusRequest) returns (UpdateAppStatusResponse);
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ type DRPCAgentClient interface {
|
||||
DeleteSubAgent(ctx context.Context, in *DeleteSubAgentRequest) (*DeleteSubAgentResponse, error)
|
||||
ListSubAgents(ctx context.Context, in *ListSubAgentsRequest) (*ListSubAgentsResponse, error)
|
||||
ReportBoundaryLogs(ctx context.Context, in *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
|
||||
UpdateAppStatus(ctx context.Context, in *UpdateAppStatusRequest) (*UpdateAppStatusResponse, error)
|
||||
}
|
||||
|
||||
type drpcAgentClient struct {
|
||||
@@ -221,6 +222,15 @@ func (c *drpcAgentClient) ReportBoundaryLogs(ctx context.Context, in *ReportBoun
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *drpcAgentClient) UpdateAppStatus(ctx context.Context, in *UpdateAppStatusRequest) (*UpdateAppStatusResponse, error) {
|
||||
out := new(UpdateAppStatusResponse)
|
||||
err := c.cc.Invoke(ctx, "/coder.agent.v2.Agent/UpdateAppStatus", drpcEncoding_File_agent_proto_agent_proto{}, in, out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type DRPCAgentServer interface {
|
||||
GetManifest(context.Context, *GetManifestRequest) (*Manifest, error)
|
||||
GetServiceBanner(context.Context, *GetServiceBannerRequest) (*ServiceBanner, error)
|
||||
@@ -239,6 +249,7 @@ type DRPCAgentServer interface {
|
||||
DeleteSubAgent(context.Context, *DeleteSubAgentRequest) (*DeleteSubAgentResponse, error)
|
||||
ListSubAgents(context.Context, *ListSubAgentsRequest) (*ListSubAgentsResponse, error)
|
||||
ReportBoundaryLogs(context.Context, *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
|
||||
UpdateAppStatus(context.Context, *UpdateAppStatusRequest) (*UpdateAppStatusResponse, error)
|
||||
}
|
||||
|
||||
type DRPCAgentUnimplementedServer struct{}
|
||||
@@ -311,9 +322,13 @@ func (s *DRPCAgentUnimplementedServer) ReportBoundaryLogs(context.Context, *Repo
|
||||
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
|
||||
}
|
||||
|
||||
func (s *DRPCAgentUnimplementedServer) UpdateAppStatus(context.Context, *UpdateAppStatusRequest) (*UpdateAppStatusResponse, error) {
|
||||
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
|
||||
}
|
||||
|
||||
type DRPCAgentDescription struct{}
|
||||
|
||||
func (DRPCAgentDescription) NumMethods() int { return 17 }
|
||||
func (DRPCAgentDescription) NumMethods() int { return 18 }
|
||||
|
||||
func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
|
||||
switch n {
|
||||
@@ -470,6 +485,15 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver,
|
||||
in1.(*ReportBoundaryLogsRequest),
|
||||
)
|
||||
}, DRPCAgentServer.ReportBoundaryLogs, true
|
||||
case 17:
|
||||
return "/coder.agent.v2.Agent/UpdateAppStatus", drpcEncoding_File_agent_proto_agent_proto{},
|
||||
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
|
||||
return srv.(DRPCAgentServer).
|
||||
UpdateAppStatus(
|
||||
ctx,
|
||||
in1.(*UpdateAppStatusRequest),
|
||||
)
|
||||
}, DRPCAgentServer.UpdateAppStatus, true
|
||||
default:
|
||||
return "", nil, nil, nil, false
|
||||
}
|
||||
@@ -750,3 +774,19 @@ func (x *drpcAgent_ReportBoundaryLogsStream) SendAndClose(m *ReportBoundaryLogsR
|
||||
}
|
||||
return x.CloseSend()
|
||||
}
|
||||
|
||||
type DRPCAgent_UpdateAppStatusStream interface {
|
||||
drpc.Stream
|
||||
SendAndClose(*UpdateAppStatusResponse) error
|
||||
}
|
||||
|
||||
type drpcAgent_UpdateAppStatusStream struct {
|
||||
drpc.Stream
|
||||
}
|
||||
|
||||
func (x *drpcAgent_UpdateAppStatusStream) SendAndClose(m *UpdateAppStatusResponse) error {
|
||||
if err := x.MsgSend(m, drpcEncoding_File_agent_proto_agent_proto{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return x.CloseSend()
|
||||
}
|
||||
|
||||
@@ -73,9 +73,13 @@ type DRPCAgentClient27 interface {
|
||||
ReportBoundaryLogs(ctx context.Context, in *ReportBoundaryLogsRequest) (*ReportBoundaryLogsResponse, error)
|
||||
}
|
||||
|
||||
// DRPCAgentClient28 is the Agent API at v2.8. It adds a SubagentId field to the
|
||||
// WorkspaceAgentDevcontainer message, and a Id field to the CreateSubAgentRequest
|
||||
// message. Compatible with Coder v2.31+
|
||||
// DRPCAgentClient28 is the Agent API at v2.8. It adds
|
||||
// - a SubagentId field to the WorkspaceAgentDevcontainer message
|
||||
// - an Id field to the CreateSubAgentRequest message.
|
||||
// - UpdateAppStatus RPC.
|
||||
//
|
||||
// Compatible with Coder v2.31+
|
||||
type DRPCAgentClient28 interface {
|
||||
DRPCAgentClient27
|
||||
UpdateAppStatus(ctx context.Context, in *UpdateAppStatusRequest) (*UpdateAppStatusResponse, error)
|
||||
}
|
||||
|
||||
@@ -179,6 +179,8 @@ func New(opts Options, workspace database.Workspace) *API {
|
||||
Database: opts.Database,
|
||||
Log: opts.Log,
|
||||
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
|
||||
Clock: opts.Clock,
|
||||
NotificationsEnqueuer: opts.NotificationsEnqueuer,
|
||||
}
|
||||
|
||||
api.MetadataAPI = &MetadataAPI{
|
||||
|
||||
@@ -2,6 +2,10 @@ package agentapi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
@@ -9,7 +13,14 @@ import (
|
||||
"cdr.dev/slog/v3"
|
||||
agentproto "github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/notifications"
|
||||
strutil "github.com/coder/coder/v2/coderd/util/strings"
|
||||
"github.com/coder/coder/v2/coderd/workspacestats"
|
||||
"github.com/coder/coder/v2/coderd/wspubsub"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/quartz"
|
||||
)
|
||||
|
||||
type AppsAPI struct {
|
||||
@@ -17,6 +28,8 @@ type AppsAPI struct {
|
||||
Database database.Store
|
||||
Log slog.Logger
|
||||
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error
|
||||
NotificationsEnqueuer notifications.Enqueuer
|
||||
Clock quartz.Clock
|
||||
}
|
||||
|
||||
func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
|
||||
@@ -104,3 +117,230 @@ func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.Bat
|
||||
}
|
||||
return &agentproto.BatchUpdateAppHealthResponse{}, nil
|
||||
}
|
||||
|
||||
func (a *AppsAPI) UpdateAppStatus(ctx context.Context, req *agentproto.UpdateAppStatusRequest) (*agentproto.UpdateAppStatusResponse, error) {
|
||||
if len(req.Message) > 160 {
|
||||
return nil, codersdk.NewError(http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Message is too long.",
|
||||
Detail: "Message must be less than 160 characters.",
|
||||
Validations: []codersdk.ValidationError{
|
||||
{Field: "message", Detail: "Message must be less than 160 characters."},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
var dbState database.WorkspaceAppStatusState
|
||||
switch req.State {
|
||||
case agentproto.UpdateAppStatusRequest_COMPLETE:
|
||||
dbState = database.WorkspaceAppStatusStateComplete
|
||||
case agentproto.UpdateAppStatusRequest_FAILURE:
|
||||
dbState = database.WorkspaceAppStatusStateFailure
|
||||
case agentproto.UpdateAppStatusRequest_WORKING:
|
||||
dbState = database.WorkspaceAppStatusStateWorking
|
||||
case agentproto.UpdateAppStatusRequest_IDLE:
|
||||
dbState = database.WorkspaceAppStatusStateIdle
|
||||
default:
|
||||
return nil, codersdk.NewError(http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Invalid state provided.",
|
||||
Detail: fmt.Sprintf("invalid state: %q", req.State),
|
||||
Validations: []codersdk.ValidationError{
|
||||
{Field: "state", Detail: "State must be one of: complete, failure, working, idle."},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
workspaceAgent, err := a.AgentFn(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
app, err := a.Database.GetWorkspaceAppByAgentIDAndSlug(ctx, database.GetWorkspaceAppByAgentIDAndSlugParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
Slug: req.Slug,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, codersdk.NewError(http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Failed to get workspace app.",
|
||||
Detail: fmt.Sprintf("No app found with slug %q", req.Slug),
|
||||
})
|
||||
}
|
||||
|
||||
workspace, err := a.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
|
||||
if err != nil {
|
||||
return nil, codersdk.NewError(http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Failed to get workspace.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
// Treat the message as untrusted input.
|
||||
cleaned := strutil.UISanitize(req.Message)
|
||||
|
||||
// Get the latest status for the workspace app to detect no-op updates
|
||||
// nolint:gocritic // This is a system restricted operation.
|
||||
latestAppStatus, err := a.Database.GetLatestWorkspaceAppStatusByAppID(dbauthz.AsSystemRestricted(ctx), app.ID)
|
||||
if err != nil && !xerrors.Is(err, sql.ErrNoRows) {
|
||||
return nil, codersdk.NewError(http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to get latest workspace app status.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
}
|
||||
// If no rows found, latestAppStatus will be a zero-value struct (ID == uuid.Nil)
|
||||
|
||||
// nolint:gocritic // This is a system restricted operation.
|
||||
_, err = a.Database.InsertWorkspaceAppStatus(dbauthz.AsSystemRestricted(ctx), database.InsertWorkspaceAppStatusParams{
|
||||
ID: uuid.New(),
|
||||
CreatedAt: dbtime.Now(),
|
||||
WorkspaceID: workspace.ID,
|
||||
AgentID: workspaceAgent.ID,
|
||||
AppID: app.ID,
|
||||
State: dbState,
|
||||
Message: cleaned,
|
||||
Uri: sql.NullString{
|
||||
String: req.Uri,
|
||||
Valid: req.Uri != "",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, codersdk.NewError(http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to insert workspace app status.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
}
|
||||
|
||||
if a.PublishWorkspaceUpdateFn != nil {
|
||||
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAgentAppStatusUpdate)
|
||||
if err != nil {
|
||||
return nil, codersdk.NewError(http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to publish workspace update.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Notify on state change to Working/Idle for AI tasks
|
||||
a.enqueueAITaskStateNotification(ctx, app.ID, latestAppStatus, dbState, workspace, workspaceAgent)
|
||||
|
||||
if shouldBump(dbState, latestAppStatus) {
|
||||
// We pass time.Time{} for nextAutostart since we don't have access to
|
||||
// TemplateScheduleStore here. The activity bump logic handles this by
|
||||
// defaulting to the template's activity_bump duration (typically 1 hour).
|
||||
workspacestats.ActivityBumpWorkspace(ctx, a.Log, a.Database, workspace.ID, time.Time{})
|
||||
}
|
||||
// just return a blank response because it doesn't contain any settable fields at present.
|
||||
return new(agentproto.UpdateAppStatusResponse), nil
|
||||
}
|
||||
|
||||
func shouldBump(dbState database.WorkspaceAppStatusState, latestAppStatus database.WorkspaceAppStatus) bool {
|
||||
// Bump deadline when agent reports working or transitions away from working.
|
||||
// This prevents auto-pause during active work and gives users time to interact
|
||||
// after work completes.
|
||||
|
||||
// Bump if reporting working state.
|
||||
if dbState == database.WorkspaceAppStatusStateWorking {
|
||||
return true
|
||||
}
|
||||
|
||||
// Bump if transitioning away from working state.
|
||||
if latestAppStatus.ID != uuid.Nil {
|
||||
prevState := latestAppStatus.State
|
||||
if prevState == database.WorkspaceAppStatusStateWorking {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// enqueueAITaskStateNotification enqueues a notification when an AI task's app
|
||||
// transitions to Working or Idle.
|
||||
// No-op if:
|
||||
// - the workspace agent app isn't configured as an AI task,
|
||||
// - the new state equals the latest persisted state,
|
||||
// - the workspace agent is not ready (still starting up).
|
||||
func (a *AppsAPI) enqueueAITaskStateNotification(
|
||||
ctx context.Context,
|
||||
appID uuid.UUID,
|
||||
latestAppStatus database.WorkspaceAppStatus,
|
||||
newAppStatus database.WorkspaceAppStatusState,
|
||||
workspace database.Workspace,
|
||||
agent database.WorkspaceAgent,
|
||||
) {
|
||||
var notificationTemplate uuid.UUID
|
||||
switch newAppStatus {
|
||||
case database.WorkspaceAppStatusStateWorking:
|
||||
notificationTemplate = notifications.TemplateTaskWorking
|
||||
case database.WorkspaceAppStatusStateIdle:
|
||||
notificationTemplate = notifications.TemplateTaskIdle
|
||||
case database.WorkspaceAppStatusStateComplete:
|
||||
notificationTemplate = notifications.TemplateTaskCompleted
|
||||
case database.WorkspaceAppStatusStateFailure:
|
||||
notificationTemplate = notifications.TemplateTaskFailed
|
||||
default:
|
||||
// Not a notifiable state, do nothing
|
||||
return
|
||||
}
|
||||
|
||||
if !workspace.TaskID.Valid {
|
||||
// Workspace has no task ID, do nothing.
|
||||
return
|
||||
}
|
||||
|
||||
// Only send notifications when the agent is ready. We want to skip
|
||||
// any state transitions that occur whilst the workspace is starting
|
||||
// up as it doesn't make sense to receive them.
|
||||
if agent.LifecycleState != database.WorkspaceAgentLifecycleStateReady {
|
||||
a.Log.Debug(ctx, "skipping AI task notification because agent is not ready",
|
||||
slog.F("agent_id", agent.ID),
|
||||
slog.F("lifecycle_state", agent.LifecycleState),
|
||||
slog.F("new_app_status", newAppStatus),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
task, err := a.Database.GetTaskByID(ctx, workspace.TaskID.UUID)
|
||||
if err != nil {
|
||||
a.Log.Warn(ctx, "failed to get task", slog.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if !task.WorkspaceAppID.Valid || task.WorkspaceAppID.UUID != appID {
|
||||
// Non-task app, do nothing.
|
||||
return
|
||||
}
|
||||
|
||||
// Skip if the latest persisted state equals the new state (no new transition)
|
||||
// Note: uuid.Nil check is valid here. If no previous status exists,
|
||||
// GetLatestWorkspaceAppStatusByAppID returns sql.ErrNoRows and we get a zero-value struct.
|
||||
if latestAppStatus.ID != uuid.Nil && latestAppStatus.State == newAppStatus {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip the initial "Working" notification when the task first starts.
|
||||
// This is obvious to the user since they just created the task.
|
||||
// We still notify on the first "Idle" status and all subsequent transitions.
|
||||
if latestAppStatus.ID == uuid.Nil && newAppStatus == database.WorkspaceAppStatusStateWorking {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := a.NotificationsEnqueuer.EnqueueWithData(
|
||||
// nolint:gocritic // Need notifier actor to enqueue notifications
|
||||
dbauthz.AsNotifier(ctx),
|
||||
workspace.OwnerID,
|
||||
notificationTemplate,
|
||||
map[string]string{
|
||||
"task": task.Name,
|
||||
"workspace": workspace.Name,
|
||||
},
|
||||
map[string]any{
|
||||
// Use a 1-minute bucketed timestamp to bypass per-day dedupe,
|
||||
// allowing identical content to resend within the same day
|
||||
// (but not more than once every 10s).
|
||||
"dedupe_bypass_ts": a.Clock.Now().UTC().Truncate(time.Minute),
|
||||
},
|
||||
"api-workspace-agent-app-status",
|
||||
// Associate this notification with related entities
|
||||
workspace.ID, workspace.OwnerID, workspace.OrganizationID, appID,
|
||||
); err != nil {
|
||||
a.Log.Warn(ctx, "failed to notify of task state", slog.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
package agentapi
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/util/ptr"
|
||||
)
|
||||
|
||||
func TestShouldBump(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
prevState *database.WorkspaceAppStatusState // nil means no previous state
|
||||
newState database.WorkspaceAppStatusState
|
||||
shouldBump bool
|
||||
}{
|
||||
{
|
||||
name: "FirstStatusBumps",
|
||||
prevState: nil,
|
||||
newState: database.WorkspaceAppStatusStateWorking,
|
||||
shouldBump: true,
|
||||
},
|
||||
{
|
||||
name: "WorkingToIdleBumps",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateWorking),
|
||||
newState: database.WorkspaceAppStatusStateIdle,
|
||||
shouldBump: true,
|
||||
},
|
||||
{
|
||||
name: "WorkingToCompleteBumps",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateWorking),
|
||||
newState: database.WorkspaceAppStatusStateComplete,
|
||||
shouldBump: true,
|
||||
},
|
||||
{
|
||||
name: "CompleteToIdleNoBump",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateComplete),
|
||||
newState: database.WorkspaceAppStatusStateIdle,
|
||||
shouldBump: false,
|
||||
},
|
||||
{
|
||||
name: "CompleteToCompleteNoBump",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateComplete),
|
||||
newState: database.WorkspaceAppStatusStateComplete,
|
||||
shouldBump: false,
|
||||
},
|
||||
{
|
||||
name: "FailureToIdleNoBump",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateFailure),
|
||||
newState: database.WorkspaceAppStatusStateIdle,
|
||||
shouldBump: false,
|
||||
},
|
||||
{
|
||||
name: "FailureToFailureNoBump",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateFailure),
|
||||
newState: database.WorkspaceAppStatusStateFailure,
|
||||
shouldBump: false,
|
||||
},
|
||||
{
|
||||
name: "CompleteToWorkingBumps",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateComplete),
|
||||
newState: database.WorkspaceAppStatusStateWorking,
|
||||
shouldBump: true,
|
||||
},
|
||||
{
|
||||
name: "FailureToCompleteNoBump",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateFailure),
|
||||
newState: database.WorkspaceAppStatusStateComplete,
|
||||
shouldBump: false,
|
||||
},
|
||||
{
|
||||
name: "WorkingToFailureBumps",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateWorking),
|
||||
newState: database.WorkspaceAppStatusStateFailure,
|
||||
shouldBump: true,
|
||||
},
|
||||
{
|
||||
name: "IdleToIdleNoBump",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateIdle),
|
||||
newState: database.WorkspaceAppStatusStateIdle,
|
||||
shouldBump: false,
|
||||
},
|
||||
{
|
||||
name: "IdleToWorkingBumps",
|
||||
prevState: ptr.Ref(database.WorkspaceAppStatusStateIdle),
|
||||
newState: database.WorkspaceAppStatusStateWorking,
|
||||
shouldBump: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var prevAppStatus database.WorkspaceAppStatus
|
||||
// If there's a previous state, report it first.
|
||||
if tt.prevState != nil {
|
||||
prevAppStatus.ID = uuid.UUID{1}
|
||||
prevAppStatus.State = *tt.prevState
|
||||
}
|
||||
|
||||
didBump := shouldBump(tt.newState, prevAppStatus)
|
||||
if tt.shouldBump {
|
||||
require.True(t, didBump, "wanted deadline to bump but it didn't")
|
||||
} else {
|
||||
require.False(t, didBump, "wanted deadline not to bump but it did")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,13 @@ package agentapi_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
|
||||
@@ -12,8 +16,12 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/agentapi"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmock"
|
||||
"github.com/coder/coder/v2/coderd/notifications"
|
||||
"github.com/coder/coder/v2/coderd/notifications/notificationstest"
|
||||
"github.com/coder/coder/v2/coderd/wspubsub"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/quartz"
|
||||
)
|
||||
|
||||
func TestBatchUpdateAppHealths(t *testing.T) {
|
||||
@@ -253,3 +261,183 @@ func TestBatchUpdateAppHealths(t *testing.T) {
|
||||
require.Nil(t, resp)
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkspaceAgentAppStatus(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
ctrl := gomock.NewController(t)
|
||||
mDB := dbmock.NewMockStore(ctrl)
|
||||
fEnq := ¬ificationstest.FakeEnqueuer{}
|
||||
mClock := quartz.NewMock(t)
|
||||
agent := database.WorkspaceAgent{
|
||||
ID: uuid.UUID{2},
|
||||
LifecycleState: database.WorkspaceAgentLifecycleStateReady,
|
||||
}
|
||||
workspaceUpdates := make(chan wspubsub.WorkspaceEventKind, 100)
|
||||
|
||||
api := &agentapi.AppsAPI{
|
||||
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
|
||||
return agent, nil
|
||||
},
|
||||
Database: mDB,
|
||||
Log: testutil.Logger(t),
|
||||
PublishWorkspaceUpdateFn: func(_ context.Context, agnt *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
|
||||
assert.Equal(t, *agnt, agent)
|
||||
testutil.AssertSend(ctx, t, workspaceUpdates, kind)
|
||||
return nil
|
||||
},
|
||||
NotificationsEnqueuer: fEnq,
|
||||
Clock: mClock,
|
||||
}
|
||||
|
||||
app := database.WorkspaceApp{
|
||||
ID: uuid.UUID{8},
|
||||
}
|
||||
mDB.EXPECT().GetWorkspaceAppByAgentIDAndSlug(gomock.Any(), database.GetWorkspaceAppByAgentIDAndSlugParams{
|
||||
AgentID: agent.ID,
|
||||
Slug: "vscode",
|
||||
}).Times(1).Return(app, nil)
|
||||
task := database.Task{
|
||||
ID: uuid.UUID{7},
|
||||
WorkspaceAppID: uuid.NullUUID{
|
||||
Valid: true,
|
||||
UUID: app.ID,
|
||||
},
|
||||
}
|
||||
mDB.EXPECT().GetTaskByID(gomock.Any(), task.ID).Times(1).Return(task, nil)
|
||||
workspace := database.Workspace{
|
||||
ID: uuid.UUID{9},
|
||||
TaskID: uuid.NullUUID{
|
||||
Valid: true,
|
||||
UUID: task.ID,
|
||||
},
|
||||
}
|
||||
mDB.EXPECT().GetWorkspaceByAgentID(gomock.Any(), agent.ID).Times(1).Return(workspace, nil)
|
||||
appStatus := database.WorkspaceAppStatus{
|
||||
ID: uuid.UUID{6},
|
||||
}
|
||||
mDB.EXPECT().GetLatestWorkspaceAppStatusByAppID(gomock.Any(), app.ID).Times(1).Return(appStatus, nil)
|
||||
mDB.EXPECT().InsertWorkspaceAppStatus(
|
||||
gomock.Any(),
|
||||
gomock.Cond(func(params database.InsertWorkspaceAppStatusParams) bool {
|
||||
if params.AgentID == agent.ID && params.AppID == app.ID {
|
||||
assert.Equal(t, "testing", params.Message)
|
||||
assert.Equal(t, database.WorkspaceAppStatusStateComplete, params.State)
|
||||
assert.True(t, params.Uri.Valid)
|
||||
assert.Equal(t, "https://example.com", params.Uri.String)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})).Times(1).Return(database.WorkspaceAppStatus{}, nil)
|
||||
|
||||
_, err := api.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
||||
Slug: "vscode",
|
||||
Message: "testing",
|
||||
Uri: "https://example.com",
|
||||
State: agentproto.UpdateAppStatusRequest_COMPLETE,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
kind := testutil.RequireReceive(ctx, t, workspaceUpdates)
|
||||
require.Equal(t, wspubsub.WorkspaceEventKindAgentAppStatusUpdate, kind)
|
||||
sent := fEnq.Sent(notificationstest.WithTemplateID(notifications.TemplateTaskCompleted))
|
||||
require.Len(t, sent, 1)
|
||||
})
|
||||
|
||||
t.Run("FailUnknownApp", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
ctrl := gomock.NewController(t)
|
||||
mDB := dbmock.NewMockStore(ctrl)
|
||||
agent := database.WorkspaceAgent{
|
||||
ID: uuid.UUID{2},
|
||||
LifecycleState: database.WorkspaceAgentLifecycleStateReady,
|
||||
}
|
||||
|
||||
mDB.EXPECT().GetWorkspaceAppByAgentIDAndSlug(gomock.Any(), gomock.Any()).
|
||||
Times(1).
|
||||
Return(database.WorkspaceApp{}, sql.ErrNoRows)
|
||||
|
||||
api := &agentapi.AppsAPI{
|
||||
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
|
||||
return agent, nil
|
||||
},
|
||||
Database: mDB,
|
||||
Log: testutil.Logger(t),
|
||||
}
|
||||
_, err := api.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
||||
Slug: "unknown",
|
||||
Message: "testing",
|
||||
Uri: "https://example.com",
|
||||
State: agentproto.UpdateAppStatusRequest_COMPLETE,
|
||||
})
|
||||
require.ErrorContains(t, err, "No app found with slug")
|
||||
var sdkErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &sdkErr)
|
||||
require.Equal(t, http.StatusBadRequest, sdkErr.StatusCode())
|
||||
})
|
||||
|
||||
t.Run("FailUnknownState", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
ctrl := gomock.NewController(t)
|
||||
mDB := dbmock.NewMockStore(ctrl)
|
||||
agent := database.WorkspaceAgent{
|
||||
ID: uuid.UUID{2},
|
||||
LifecycleState: database.WorkspaceAgentLifecycleStateReady,
|
||||
}
|
||||
|
||||
api := &agentapi.AppsAPI{
|
||||
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
|
||||
return agent, nil
|
||||
},
|
||||
Database: mDB,
|
||||
Log: testutil.Logger(t),
|
||||
}
|
||||
|
||||
_, err := api.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
||||
Slug: "vscode",
|
||||
Message: "testing",
|
||||
Uri: "https://example.com",
|
||||
State: 77,
|
||||
})
|
||||
require.ErrorContains(t, err, "Invalid state")
|
||||
var sdkErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &sdkErr)
|
||||
require.Equal(t, http.StatusBadRequest, sdkErr.StatusCode())
|
||||
})
|
||||
|
||||
t.Run("FailTooLong", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
ctrl := gomock.NewController(t)
|
||||
mDB := dbmock.NewMockStore(ctrl)
|
||||
agent := database.WorkspaceAgent{
|
||||
ID: uuid.UUID{2},
|
||||
LifecycleState: database.WorkspaceAgentLifecycleStateReady,
|
||||
}
|
||||
|
||||
api := &agentapi.AppsAPI{
|
||||
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
|
||||
return agent, nil
|
||||
},
|
||||
Database: mDB,
|
||||
Log: testutil.Logger(t),
|
||||
}
|
||||
|
||||
_, err := api.UpdateAppStatus(ctx, &agentproto.UpdateAppStatusRequest{
|
||||
Slug: "vscode",
|
||||
Message: strings.Repeat("a", 161),
|
||||
Uri: "https://example.com",
|
||||
State: agentproto.UpdateAppStatusRequest_COMPLETE,
|
||||
})
|
||||
require.ErrorContains(t, err, "Message is too long")
|
||||
var sdkErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &sdkErr)
|
||||
require.Equal(t, http.StatusBadRequest, sdkErr.StatusCode())
|
||||
})
|
||||
}
|
||||
|
||||
Generated
+1
@@ -9545,6 +9545,7 @@ const docTemplate = `{
|
||||
],
|
||||
"summary": "Patch workspace agent app status",
|
||||
"operationId": "patch-workspace-agent-app-status",
|
||||
"deprecated": true,
|
||||
"parameters": [
|
||||
{
|
||||
"description": "app status",
|
||||
|
||||
Generated
+1
@@ -8444,6 +8444,7 @@
|
||||
"tags": ["Agents"],
|
||||
"summary": "Patch workspace agent app status",
|
||||
"operationId": "patch-workspace-agent-app-status",
|
||||
"deprecated": true,
|
||||
"parameters": [
|
||||
{
|
||||
"description": "app status",
|
||||
|
||||
+35
-201
@@ -25,6 +25,7 @@ import (
|
||||
"tailscale.com/tailcfg"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/coderd/agentapi"
|
||||
"github.com/coder/coder/v2/coderd/agentapi/metadatabatcher"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/db2sdk"
|
||||
@@ -35,14 +36,11 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/httpmw"
|
||||
"github.com/coder/coder/v2/coderd/httpmw/loggermw"
|
||||
"github.com/coder/coder/v2/coderd/jwtutils"
|
||||
"github.com/coder/coder/v2/coderd/notifications"
|
||||
"github.com/coder/coder/v2/coderd/prebuilds"
|
||||
"github.com/coder/coder/v2/coderd/rbac"
|
||||
"github.com/coder/coder/v2/coderd/rbac/policy"
|
||||
"github.com/coder/coder/v2/coderd/telemetry"
|
||||
maputil "github.com/coder/coder/v2/coderd/util/maps"
|
||||
strutil "github.com/coder/coder/v2/coderd/util/strings"
|
||||
"github.com/coder/coder/v2/coderd/workspacestats"
|
||||
"github.com/coder/coder/v2/coderd/wspubsub"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
@@ -295,6 +293,7 @@ func (api *API) patchWorkspaceAgentLogs(rw http.ResponseWriter, r *http.Request)
|
||||
// @Param request body agentsdk.PatchAppStatus true "app status"
|
||||
// @Success 200 {object} codersdk.Response
|
||||
// @Router /workspaceagents/me/app-status [patch]
|
||||
// @Deprecated Use UpdateAppStatus on the Agent API instead.
|
||||
func (api *API) patchWorkspaceAgentAppStatus(rw http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
workspaceAgent := httpmw.WorkspaceAgent(r)
|
||||
@@ -304,45 +303,6 @@ func (api *API) patchWorkspaceAgentAppStatus(rw http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
app, err := api.Database.GetWorkspaceAppByAgentIDAndSlug(ctx, database.GetWorkspaceAppByAgentIDAndSlugParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
Slug: req.AppSlug,
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Failed to get workspace app.",
|
||||
Detail: fmt.Sprintf("No app found with slug %q", req.AppSlug),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if len(req.Message) > 160 {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Message is too long.",
|
||||
Detail: "Message must be less than 160 characters.",
|
||||
Validations: []codersdk.ValidationError{
|
||||
{Field: "message", Detail: "Message must be less than 160 characters."},
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
switch req.State {
|
||||
case codersdk.WorkspaceAppStatusStateComplete,
|
||||
codersdk.WorkspaceAppStatusStateFailure,
|
||||
codersdk.WorkspaceAppStatusStateWorking,
|
||||
codersdk.WorkspaceAppStatusStateIdle: // valid states
|
||||
default:
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Invalid state provided.",
|
||||
Detail: fmt.Sprintf("invalid state: %q", req.State),
|
||||
Validations: []codersdk.ValidationError{
|
||||
{Field: "state", Detail: "State must be one of: complete, failure, working."},
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
@@ -352,176 +312,50 @@ func (api *API) patchWorkspaceAgentAppStatus(rw http.ResponseWriter, r *http.Req
|
||||
return
|
||||
}
|
||||
|
||||
// Treat the message as untrusted input.
|
||||
cleaned := strutil.UISanitize(req.Message)
|
||||
|
||||
// Get the latest status for the workspace app to detect no-op updates
|
||||
// nolint:gocritic // This is a system restricted operation.
|
||||
latestAppStatus, err := api.Database.GetLatestWorkspaceAppStatusByAppID(dbauthz.AsSystemRestricted(ctx), app.ID)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to get latest workspace app status.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
// If no rows found, latestAppStatus will be a zero-value struct (ID == uuid.Nil)
|
||||
|
||||
// nolint:gocritic // This is a system restricted operation.
|
||||
_, err = api.Database.InsertWorkspaceAppStatus(dbauthz.AsSystemRestricted(ctx), database.InsertWorkspaceAppStatusParams{
|
||||
ID: uuid.New(),
|
||||
CreatedAt: dbtime.Now(),
|
||||
WorkspaceID: workspace.ID,
|
||||
AgentID: workspaceAgent.ID,
|
||||
AppID: app.ID,
|
||||
State: database.WorkspaceAppStatusState(req.State),
|
||||
Message: cleaned,
|
||||
Uri: sql.NullString{
|
||||
String: req.URI,
|
||||
Valid: req.URI != "",
|
||||
// This functionality has been moved to the AppsAPI in the agentapi. We keep this HTTP handler around for back
|
||||
// compatibility with older agents. We'll translate the request into the protobuf so there is only one primary
|
||||
// implementation.
|
||||
appAPI := &agentapi.AppsAPI{
|
||||
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
|
||||
return workspaceAgent, nil
|
||||
},
|
||||
})
|
||||
Database: api.Database,
|
||||
Log: api.Logger,
|
||||
PublishWorkspaceUpdateFn: func(ctx context.Context, agent *database.WorkspaceAgent, kind wspubsub.WorkspaceEventKind) error {
|
||||
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
|
||||
Kind: kind,
|
||||
WorkspaceID: workspace.ID,
|
||||
AgentID: &agent.ID,
|
||||
})
|
||||
return nil
|
||||
},
|
||||
NotificationsEnqueuer: api.NotificationsEnqueuer,
|
||||
Clock: api.Clock,
|
||||
}
|
||||
protoReq, err := agentsdk.ProtoFromPatchAppStatus(req)
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to insert workspace app status.",
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Failed to parse request.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
api.publishWorkspaceUpdate(ctx, workspace.OwnerID, wspubsub.WorkspaceEvent{
|
||||
Kind: wspubsub.WorkspaceEventKindAgentAppStatusUpdate,
|
||||
WorkspaceID: workspace.ID,
|
||||
AgentID: &workspaceAgent.ID,
|
||||
})
|
||||
|
||||
// Notify on state change to Working/Idle for AI tasks
|
||||
api.enqueueAITaskStateNotification(ctx, app.ID, latestAppStatus, req.State, workspace, workspaceAgent)
|
||||
|
||||
// Bump deadline when agent reports working or transitions away from working.
|
||||
// This prevents auto-pause during active work and gives users time to interact
|
||||
// after work completes.
|
||||
shouldBump := false
|
||||
newState := database.WorkspaceAppStatusState(req.State)
|
||||
|
||||
// Bump if reporting working state.
|
||||
if newState == database.WorkspaceAppStatusStateWorking {
|
||||
shouldBump = true
|
||||
}
|
||||
|
||||
// Bump if transitioning away from working state.
|
||||
if latestAppStatus.ID != uuid.Nil {
|
||||
prevState := latestAppStatus.State
|
||||
if prevState == database.WorkspaceAppStatusStateWorking {
|
||||
shouldBump = true
|
||||
_, err = appAPI.UpdateAppStatus(r.Context(), protoReq)
|
||||
if err != nil {
|
||||
sdkErr := new(codersdk.Error)
|
||||
if xerrors.As(err, &sdkErr) {
|
||||
httpapi.Write(ctx, rw, sdkErr.StatusCode(), sdkErr.Response)
|
||||
return
|
||||
}
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to update app status.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
if shouldBump {
|
||||
// We pass time.Time{} for nextAutostart since we don't have access to
|
||||
// TemplateScheduleStore here. The activity bump logic handles this by
|
||||
// defaulting to the template's activity_bump duration (typically 1 hour).
|
||||
workspacestats.ActivityBumpWorkspace(ctx, api.Logger, api.Database, workspace.ID, time.Time{})
|
||||
}
|
||||
|
||||
httpapi.Write(ctx, rw, http.StatusOK, nil)
|
||||
}
|
||||
|
||||
// enqueueAITaskStateNotification enqueues a notification when an AI task's app
|
||||
// transitions to Working or Idle.
|
||||
// No-op if:
|
||||
// - the workspace agent app isn't configured as an AI task,
|
||||
// - the new state equals the latest persisted state,
|
||||
// - the workspace agent is not ready (still starting up).
|
||||
func (api *API) enqueueAITaskStateNotification(
|
||||
ctx context.Context,
|
||||
appID uuid.UUID,
|
||||
latestAppStatus database.WorkspaceAppStatus,
|
||||
newAppStatus codersdk.WorkspaceAppStatusState,
|
||||
workspace database.Workspace,
|
||||
agent database.WorkspaceAgent,
|
||||
) {
|
||||
// Select notification template based on the new state
|
||||
var notificationTemplate uuid.UUID
|
||||
switch newAppStatus {
|
||||
case codersdk.WorkspaceAppStatusStateWorking:
|
||||
notificationTemplate = notifications.TemplateTaskWorking
|
||||
case codersdk.WorkspaceAppStatusStateIdle:
|
||||
notificationTemplate = notifications.TemplateTaskIdle
|
||||
case codersdk.WorkspaceAppStatusStateComplete:
|
||||
notificationTemplate = notifications.TemplateTaskCompleted
|
||||
case codersdk.WorkspaceAppStatusStateFailure:
|
||||
notificationTemplate = notifications.TemplateTaskFailed
|
||||
default:
|
||||
// Not a notifiable state, do nothing
|
||||
return
|
||||
}
|
||||
|
||||
if !workspace.TaskID.Valid {
|
||||
// Workspace has no task ID, do nothing.
|
||||
return
|
||||
}
|
||||
|
||||
// Only send notifications when the agent is ready. We want to skip
|
||||
// any state transitions that occur whilst the workspace is starting
|
||||
// up as it doesn't make sense to receive them.
|
||||
if agent.LifecycleState != database.WorkspaceAgentLifecycleStateReady {
|
||||
api.Logger.Debug(ctx, "skipping AI task notification because agent is not ready",
|
||||
slog.F("agent_id", agent.ID),
|
||||
slog.F("lifecycle_state", agent.LifecycleState),
|
||||
slog.F("new_app_status", newAppStatus),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
task, err := api.Database.GetTaskByID(ctx, workspace.TaskID.UUID)
|
||||
if err != nil {
|
||||
api.Logger.Warn(ctx, "failed to get task", slog.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if !task.WorkspaceAppID.Valid || task.WorkspaceAppID.UUID != appID {
|
||||
// Non-task app, do nothing.
|
||||
return
|
||||
}
|
||||
|
||||
// Skip if the latest persisted state equals the new state (no new transition)
|
||||
// Note: uuid.Nil check is valid here. If no previous status exists,
|
||||
// GetLatestWorkspaceAppStatusByAppID returns sql.ErrNoRows and we get a zero-value struct.
|
||||
if latestAppStatus.ID != uuid.Nil && latestAppStatus.State == database.WorkspaceAppStatusState(newAppStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip the initial "Working" notification when task first starts.
|
||||
// This is obvious to the user since they just created the task.
|
||||
// We still notify on first "Idle" status and all subsequent transitions.
|
||||
if latestAppStatus.ID == uuid.Nil && newAppStatus == codersdk.WorkspaceAppStatusStateWorking {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := api.NotificationsEnqueuer.EnqueueWithData(
|
||||
// nolint:gocritic // Need notifier actor to enqueue notifications
|
||||
dbauthz.AsNotifier(ctx),
|
||||
workspace.OwnerID,
|
||||
notificationTemplate,
|
||||
map[string]string{
|
||||
"task": task.Name,
|
||||
"workspace": workspace.Name,
|
||||
},
|
||||
map[string]any{
|
||||
// Use a 1-minute bucketed timestamp to bypass per-day dedupe,
|
||||
// allowing identical content to resend within the same day
|
||||
// (but not more than once every 10s).
|
||||
"dedupe_bypass_ts": api.Clock.Now().UTC().Truncate(time.Minute),
|
||||
},
|
||||
"api-workspace-agent-app-status",
|
||||
// Associate this notification with related entities
|
||||
workspace.ID, workspace.OwnerID, workspace.OrganizationID, appID,
|
||||
); err != nil {
|
||||
api.Logger.Warn(ctx, "failed to notify of task state", slog.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// workspaceAgentLogs returns the logs associated with a workspace agent
|
||||
//
|
||||
// @Summary Get logs by workspace agent
|
||||
|
||||
@@ -464,3 +464,16 @@ func ProtoFromDevcontainer(dc codersdk.WorkspaceAgentDevcontainer) *proto.Worksp
|
||||
SubagentId: subagentID,
|
||||
}
|
||||
}
|
||||
|
||||
func ProtoFromPatchAppStatus(pas PatchAppStatus) (*proto.UpdateAppStatusRequest, error) {
|
||||
state, ok := proto.UpdateAppStatusRequest_AppStatusState_value[strings.ToUpper(string(pas.State))]
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf("Invalid state: %s", pas.State)
|
||||
}
|
||||
return &proto.UpdateAppStatusRequest{
|
||||
Slug: pas.AppSlug,
|
||||
State: proto.UpdateAppStatusRequest_AppStatusState(state),
|
||||
Message: pas.Message,
|
||||
Uri: pas.URI,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -535,6 +535,14 @@ func NewTestError(statusCode int, method string, u string) *Error {
|
||||
}
|
||||
}
|
||||
|
||||
// NewError creates a new Error with the response and status code.
|
||||
func NewError(statusCode int, response Response) *Error {
|
||||
return &Error{
|
||||
statusCode: statusCode,
|
||||
Response: response,
|
||||
}
|
||||
}
|
||||
|
||||
type closeFunc func() error
|
||||
|
||||
func (c closeFunc) Close() error {
|
||||
|
||||
@@ -63,6 +63,7 @@ import (
|
||||
//
|
||||
// API v2.8:
|
||||
// - Added support for pre-created sub agents on the Agent API.
|
||||
// - Added support for UpdateAppStatus on the Agent API.
|
||||
const (
|
||||
CurrentMajor = 2
|
||||
CurrentMinor = 8
|
||||
|
||||
Reference in New Issue
Block a user