Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c8a1d4e141 |
@@ -20,6 +20,7 @@ import (
|
||||
|
||||
agentproto "github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/externalauth/gitprovider"
|
||||
"github.com/coder/coder/v2/coderd/rbac"
|
||||
"github.com/coder/coder/v2/coderd/rbac/policy"
|
||||
@@ -522,7 +523,7 @@ func WorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator tailnet.Coordinator,
|
||||
}
|
||||
}
|
||||
|
||||
status := dbAgent.Status(agentInactiveDisconnectTimeout)
|
||||
status := dbAgent.Status(dbtime.Now(), agentInactiveDisconnectTimeout)
|
||||
workspaceAgent.Status = codersdk.WorkspaceAgentStatus(status.Status)
|
||||
workspaceAgent.FirstConnectedAt = status.FirstConnectedAt
|
||||
workspaceAgent.LastConnectedAt = status.LastConnectedAt
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/rbac"
|
||||
"github.com/coder/coder/v2/coderd/rbac/policy"
|
||||
)
|
||||
@@ -620,7 +619,7 @@ type WorkspaceAgentConnectionStatus struct {
|
||||
DisconnectedAt *time.Time `json:"disconnected_at"`
|
||||
}
|
||||
|
||||
func (a WorkspaceAgent) Status(inactiveTimeout time.Duration) WorkspaceAgentConnectionStatus {
|
||||
func (a WorkspaceAgent) Status(now time.Time, inactiveTimeout time.Duration) WorkspaceAgentConnectionStatus {
|
||||
connectionTimeout := time.Duration(a.ConnectionTimeoutSeconds) * time.Second
|
||||
|
||||
status := WorkspaceAgentConnectionStatus{
|
||||
@@ -639,7 +638,7 @@ func (a WorkspaceAgent) Status(inactiveTimeout time.Duration) WorkspaceAgentConn
|
||||
switch {
|
||||
case !a.FirstConnectedAt.Valid:
|
||||
switch {
|
||||
case connectionTimeout > 0 && dbtime.Now().Sub(a.CreatedAt) > connectionTimeout:
|
||||
case connectionTimeout > 0 && now.Sub(a.CreatedAt) > connectionTimeout:
|
||||
// If the agent took too long to connect the first time,
|
||||
// mark it as timed out.
|
||||
status.Status = WorkspaceAgentStatusTimeout
|
||||
@@ -654,7 +653,7 @@ func (a WorkspaceAgent) Status(inactiveTimeout time.Duration) WorkspaceAgentConn
|
||||
// If we've disconnected after our last connection, we know the
|
||||
// agent is no longer connected.
|
||||
status.Status = WorkspaceAgentStatusDisconnected
|
||||
case dbtime.Now().Sub(a.LastConnectedAt.Time) > inactiveTimeout:
|
||||
case now.Sub(a.LastConnectedAt.Time) > inactiveTimeout:
|
||||
// The connection died without updating the last connected.
|
||||
status.Status = WorkspaceAgentStatusDisconnected
|
||||
// Client code needs an accurate disconnected at if the agent has been inactive.
|
||||
|
||||
@@ -317,21 +317,43 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
collect := func() {
|
||||
logger.Debug(ctx, "agent metrics collection is starting")
|
||||
timer := prometheus.NewTimer(metricsCollectorAgents)
|
||||
defer func() {
|
||||
logger.Debug(ctx, "agent metrics collection is done")
|
||||
timer.ObserveDuration()
|
||||
ticker.Reset(duration)
|
||||
}()
|
||||
|
||||
derpMap := derpMapFn()
|
||||
|
||||
// Use a consistent value for now for the duration of this collection
|
||||
// to avoid drift during the loop over workspaceAgents, which can cause
|
||||
// incorrect reporting of agent connection status.
|
||||
now := dbtime.Now()
|
||||
|
||||
workspaceAgents, err := db.GetWorkspaceAgentsForMetrics(ctx)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "can't get workspace agents", slog.Error(err))
|
||||
goto done
|
||||
return
|
||||
}
|
||||
|
||||
// Prepopulate our known agents and apps before processing, this saves us from having to make a database
|
||||
// roundtrip for every iteration of the loop to get the list of apps for the current agent.
|
||||
agentIDs := make([]uuid.UUID, 0, len(workspaceAgents))
|
||||
for _, agent := range workspaceAgents {
|
||||
agentIDs = append(agentIDs, agent.WorkspaceAgent.ID)
|
||||
}
|
||||
allApps, err := db.GetWorkspaceAppsByAgentIDs(ctx, agentIDs)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "can't get workspace apps", slog.Error(err))
|
||||
return
|
||||
}
|
||||
appsByAgentID := make(map[uuid.UUID][]database.WorkspaceApp, len(workspaceAgents))
|
||||
for _, app := range allApps {
|
||||
appsByAgentID[app.AgentID] = append(appsByAgentID[app.AgentID], app)
|
||||
}
|
||||
|
||||
for _, agent := range workspaceAgents {
|
||||
@@ -342,7 +364,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
}
|
||||
agentsGauge.WithLabelValues(VectorOperationAdd, 1, agent.OwnerUsername, agent.WorkspaceName, agent.TemplateName, templateVersionName)
|
||||
|
||||
connectionStatus := agent.WorkspaceAgent.Status(agentInactiveDisconnectTimeout)
|
||||
connectionStatus := agent.WorkspaceAgent.Status(now, agentInactiveDisconnectTimeout)
|
||||
node := (*coordinator.Load()).Node(agent.WorkspaceAgent.ID)
|
||||
|
||||
tailnetNode := "unknown"
|
||||
@@ -380,13 +402,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
}
|
||||
|
||||
// Collect information about registered applications
|
||||
apps, err := db.GetWorkspaceAppsByAgentID(ctx, agent.WorkspaceAgent.ID)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
logger.Error(ctx, "can't get workspace apps", slog.F("agent_id", agent.WorkspaceAgent.ID), slog.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
for _, app := range apps {
|
||||
for _, app := range appsByAgentID[agent.WorkspaceAgent.ID] {
|
||||
agentsAppsGauge.WithLabelValues(VectorOperationAdd, 1, agent.WorkspaceAgent.Name, agent.OwnerUsername, agent.WorkspaceName, app.DisplayName, string(app.Health))
|
||||
}
|
||||
}
|
||||
@@ -395,11 +411,15 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
agentsConnectionsGauge.Commit()
|
||||
agentsConnectionLatenciesGauge.Commit()
|
||||
agentsAppsGauge.Commit()
|
||||
}
|
||||
|
||||
done:
|
||||
logger.Debug(ctx, "agent metrics collection is done")
|
||||
timer.ObserveDuration()
|
||||
ticker.Reset(duration)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
collect()
|
||||
}
|
||||
}()
|
||||
return func() {
|
||||
|
||||
@@ -231,7 +231,7 @@ func (p *DBTokenProvider) Issue(ctx context.Context, rw http.ResponseWriter, r *
|
||||
}
|
||||
|
||||
// Check that the agent is online.
|
||||
agentStatus := dbReq.Agent.Status(p.WorkspaceAgentInactiveTimeout)
|
||||
agentStatus := dbReq.Agent.Status(dbtime.Now(), p.WorkspaceAgentInactiveTimeout)
|
||||
if agentStatus.Status != database.WorkspaceAgentStatusConnected {
|
||||
WriteWorkspaceAppOffline(p.Logger, p.DashboardURL, rw, r, &appReq, fmt.Sprintf("Agent state is %q, not %q", agentStatus.Status, database.WorkspaceAgentStatusConnected))
|
||||
return nil, "", false
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/coderd/util/namesgenerator"
|
||||
"github.com/coder/coder/v2/coderd/x/chatd/internal/agentselect"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
@@ -370,7 +371,7 @@ func (o CreateWorkspaceOptions) checkExistingWorkspace(
|
||||
)
|
||||
selected = agents[0]
|
||||
}
|
||||
status := selected.Status(agentInactiveDisconnectTimeout)
|
||||
status := selected.Status(dbtime.Now(), agentInactiveDisconnectTimeout)
|
||||
result := map[string]any{
|
||||
"created": false,
|
||||
"workspace_name": ws.Name,
|
||||
|
||||
@@ -75,10 +75,18 @@ func (r *Runner) Run(ctx context.Context, id string, logs io.Writer) error {
|
||||
return xerrors.Errorf("create user: %w", err)
|
||||
}
|
||||
newUser := newUserAndToken.User
|
||||
newUserClient := codersdk.New(r.client.URL,
|
||||
codersdk.WithSessionToken(newUserAndToken.SessionToken),
|
||||
codersdk.WithLogger(logger),
|
||||
codersdk.WithLogBodies())
|
||||
// Create a user client with an independent HTTP transport cloned from the
|
||||
// runner's client. Using codersdk.New directly would inherit
|
||||
// http.DefaultTransport, which is shared across all runners. That causes
|
||||
// all user WebSocket connections to reuse the same TCP connection pool and
|
||||
// land on the same coderd replica, concentrating load.
|
||||
newUserClient, err := loadtestutil.DupClientCopyingHeaders(r.client, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("create user client: %w", err)
|
||||
}
|
||||
newUserClient.SetSessionToken(newUserAndToken.SessionToken)
|
||||
newUserClient.SetLogger(logger)
|
||||
newUserClient.SetLogBodies(true)
|
||||
|
||||
logger.Info(ctx, fmt.Sprintf("user %q created", newUser.Username), slog.F("id", newUser.ID.String()))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user