feat: add task status reporting load generator runner (#20538)

Adds the Runner, Config, and Metrics for the scaletest load generator for task status.

Part of https://github.com/coder/internal/issues/913
This commit is contained in:
Spike Curtis
2025-11-13 16:53:02 +04:00
committed by GitHub
parent 5bfbb0301f
commit a3c851c0e6
6 changed files with 1001 additions and 0 deletions
+148
View File
@@ -0,0 +1,148 @@
package taskstatus
import (
"context"
"net/http"
"net/url"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
)
// createExternalWorkspaceResult contains the results from creating an external workspace.
type createExternalWorkspaceResult struct {
WorkspaceID uuid.UUID
AgentToken string
}
// client abstracts the details of using codersdk.Client for workspace operations.
// This interface allows for easier testing by enabling mock implementations and
// provides a cleaner separation of concerns.
//
// The interface is designed to be initialized in two phases:
// 1. Create the client with newClient(coderClient)
// 2. Configure logging when the io.Writer is available in Run()
type client interface {
// createExternalWorkspace creates an external workspace and returns the workspace ID
// and agent token for the first external agent found in the workspace resources.
createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error)
// watchWorkspace watches for updates to a workspace.
watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error)
// initialize sets up the client with the provided logger, which is only available after Run() is called.
initialize(logger slog.Logger)
}
// appStatusPatcher abstracts the details of using agentsdk.Client for updating app status.
// This interface is separate from client because it requires an agent token which is only
// available after creating an external workspace.
type appStatusPatcher interface {
// patchAppStatus updates the status of a workspace app.
patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error
// initialize sets up the patcher with the provided logger and agent token.
initialize(logger slog.Logger, agentToken string)
}
// sdkClient is the concrete implementation of the client interface using
// codersdk.Client.
type sdkClient struct {
coderClient *codersdk.Client
}
// newClient creates a new client implementation using the provided codersdk.Client.
func newClient(coderClient *codersdk.Client) client {
return &sdkClient{
coderClient: coderClient,
}
}
func (c *sdkClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) {
// Create the workspace
workspace, err := c.coderClient.CreateUserWorkspace(ctx, codersdk.Me, req)
if err != nil {
return createExternalWorkspaceResult{}, err
}
// Get the workspace with latest build details
workspace, err = c.coderClient.WorkspaceByOwnerAndName(ctx, codersdk.Me, workspace.Name, codersdk.WorkspaceOptions{})
if err != nil {
return createExternalWorkspaceResult{}, err
}
// Find external agents in resources
for _, resource := range workspace.LatestBuild.Resources {
if resource.Type != "coder_external_agent" || len(resource.Agents) == 0 {
continue
}
// Get credentials for the first agent
agent := resource.Agents[0]
credentials, err := c.coderClient.WorkspaceExternalAgentCredentials(ctx, workspace.ID, agent.Name)
if err != nil {
return createExternalWorkspaceResult{}, err
}
return createExternalWorkspaceResult{
WorkspaceID: workspace.ID,
AgentToken: credentials.AgentToken,
}, nil
}
return createExternalWorkspaceResult{}, xerrors.Errorf("no external agent found in workspace")
}
func (c *sdkClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) {
return c.coderClient.WatchWorkspace(ctx, workspaceID)
}
func (c *sdkClient) initialize(logger slog.Logger) {
// Configure the coder client logging
c.coderClient.SetLogger(logger)
c.coderClient.SetLogBodies(true)
}
// sdkAppStatusPatcher is the concrete implementation of the appStatusPatcher interface
// using agentsdk.Client.
type sdkAppStatusPatcher struct {
agentClient *agentsdk.Client
url *url.URL
httpClient *http.Client
}
// newAppStatusPatcher creates a new appStatusPatcher implementation.
func newAppStatusPatcher(client *codersdk.Client) appStatusPatcher {
return &sdkAppStatusPatcher{
url: client.URL,
httpClient: client.HTTPClient,
}
}
func (p *sdkAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error {
if p.agentClient == nil {
panic("agentClient not initialized - call initialize first")
}
return p.agentClient.PatchAppStatus(ctx, req)
}
func (p *sdkAppStatusPatcher) initialize(logger slog.Logger, agentToken string) {
// Create and configure the agent client with the provided token
p.agentClient = agentsdk.New(
p.url,
agentsdk.WithFixedToken(agentToken),
codersdk.WithHTTPClient(p.httpClient),
codersdk.WithLogger(logger),
codersdk.WithLogBodies(),
)
}
// Ensure sdkClient implements the client interface.
var _ client = (*sdkClient)(nil)
// Ensure sdkAppStatusPatcher implements the appStatusPatcher interface.
var _ appStatusPatcher = (*sdkAppStatusPatcher)(nil)
+73
View File
@@ -0,0 +1,73 @@
package taskstatus
import (
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
)
type Config struct {
// TemplateID is the template ID to use for creating the external workspace.
TemplateID uuid.UUID `json:"template_id"`
// WorkspaceName is the name for the external workspace to create.
WorkspaceName string `json:"workspace_name"`
// AppSlug is the slug of the app designated as the AI Agent.
AppSlug string `json:"app_slug"`
// When the runner has connected to the watch-ws endpoint, it will call Done once on this wait group. Used to
// coordinate multiple runners from the higher layer.
ConnectedWaitGroup *sync.WaitGroup `json:"-"`
// We read on this channel before starting to report task statuses. Used to coordinate multiple runners from the
// higher layer.
StartReporting chan struct{} `json:"-"`
// Time between reporting task statuses.
ReportStatusPeriod time.Duration `json:"report_status_period"`
// Total time to report task statuses, starting from when we successfully read from the StartReporting channel.
ReportStatusDuration time.Duration `json:"report_status_duration"`
Metrics *Metrics `json:"-"`
MetricLabelValues []string `json:"metric_label_values"`
}
func (c *Config) Validate() error {
if c.TemplateID == uuid.Nil {
return xerrors.Errorf("validate template_id: must not be nil")
}
if c.WorkspaceName == "" {
return xerrors.Errorf("validate workspace_name: must not be empty")
}
if c.AppSlug == "" {
return xerrors.Errorf("validate app_slug: must not be empty")
}
if c.ConnectedWaitGroup == nil {
return xerrors.Errorf("validate connected_wait_group: must not be nil")
}
if c.StartReporting == nil {
return xerrors.Errorf("validate start_reporting: must not be nil")
}
if c.ReportStatusPeriod <= 0 {
return xerrors.Errorf("validate report_status_period: must be greater than zero")
}
if c.ReportStatusDuration <= 0 {
return xerrors.Errorf("validate report_status_duration: must be greater than zero")
}
if c.Metrics == nil {
return xerrors.Errorf("validate metrics: must not be nil")
}
return nil
}
+36
View File
@@ -0,0 +1,36 @@
package taskstatus
import "github.com/prometheus/client_golang/prometheus"
type Metrics struct {
TaskStatusToWorkspaceUpdateLatencySeconds prometheus.HistogramVec
MissingStatusUpdatesTotal prometheus.CounterVec
ReportTaskStatusErrorsTotal prometheus.CounterVec
}
func NewMetrics(reg prometheus.Registerer, labelNames ...string) *Metrics {
m := &Metrics{
TaskStatusToWorkspaceUpdateLatencySeconds: *prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "scaletest",
Name: "task_status_to_workspace_update_latency_seconds",
Help: "Time in seconds between reporting a task status and receiving the workspace update.",
}, labelNames),
MissingStatusUpdatesTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "scaletest",
Name: "missing_status_updates_total",
Help: "Total number of missing status updates.",
}, labelNames),
ReportTaskStatusErrorsTotal: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "scaletest",
Name: "report_task_status_errors_total",
Help: "Total number of errors when reporting task status.",
}, labelNames),
}
reg.MustRegister(m.TaskStatusToWorkspaceUpdateLatencySeconds)
reg.MustRegister(m.MissingStatusUpdatesTotal)
reg.MustRegister(m.ReportTaskStatusErrorsTotal)
return m
}
+232
View File
@@ -0,0 +1,232 @@
package taskstatus
import (
"context"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/scaletest/harness"
"github.com/coder/coder/v2/scaletest/loadtestutil"
"github.com/coder/quartz"
)
const statusUpdatePrefix = "scaletest status update:"
type Runner struct {
client client
patcher appStatusPatcher
cfg Config
logger slog.Logger
// workspaceID is set after creating the external workspace
workspaceID uuid.UUID
mu sync.Mutex
reportTimes map[int]time.Time
doneReporting bool
// testing only
clock quartz.Clock
}
var _ harness.Runnable = &Runner{}
// NewRunner creates a new Runner with the provided codersdk.Client and configuration.
func NewRunner(coderClient *codersdk.Client, cfg Config) *Runner {
return &Runner{
client: newClient(coderClient),
patcher: newAppStatusPatcher(coderClient),
cfg: cfg,
clock: quartz.NewReal(),
reportTimes: make(map[int]time.Time),
}
}
func (r *Runner) Run(ctx context.Context, name string, logs io.Writer) error {
shouldMarkConnectedDone := true
defer func() {
if shouldMarkConnectedDone {
r.cfg.ConnectedWaitGroup.Done()
}
}()
logs = loadtestutil.NewSyncWriter(logs)
r.logger = slog.Make(sloghuman.Sink(logs)).Leveled(slog.LevelDebug).Named(name)
r.client.initialize(r.logger)
// Create the external workspace
r.logger.Info(ctx, "creating external workspace",
slog.F("template_id", r.cfg.TemplateID),
slog.F("workspace_name", r.cfg.WorkspaceName))
result, err := r.client.createExternalWorkspace(ctx, codersdk.CreateWorkspaceRequest{
TemplateID: r.cfg.TemplateID,
Name: r.cfg.WorkspaceName,
})
if err != nil {
return xerrors.Errorf("create external workspace: %w", err)
}
// Set the workspace ID
r.workspaceID = result.WorkspaceID
r.logger.Info(ctx, "created external workspace", slog.F("workspace_id", r.workspaceID))
// Initialize the patcher with the agent token
r.patcher.initialize(r.logger, result.AgentToken)
r.logger.Info(ctx, "initialized app status patcher with agent token")
// ensure these labels are initialized, so we see the time series right away in prometheus.
r.cfg.Metrics.MissingStatusUpdatesTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Add(0)
workspaceUpdatesCtx, cancelWorkspaceUpdates := context.WithCancel(ctx)
defer cancelWorkspaceUpdates()
workspaceUpdatesResult := make(chan error, 1)
shouldMarkConnectedDone = false // we are passing this responsibility to the watchWorkspaceUpdates goroutine
go func() {
workspaceUpdatesResult <- r.watchWorkspaceUpdates(workspaceUpdatesCtx)
}()
err = r.reportTaskStatus(ctx)
if err != nil {
return xerrors.Errorf("report task status: %w", err)
}
err = <-workspaceUpdatesResult
if err != nil {
return xerrors.Errorf("watch workspace: %w", err)
}
return nil
}
func (r *Runner) watchWorkspaceUpdates(ctx context.Context) error {
shouldMarkConnectedDone := true
defer func() {
if shouldMarkConnectedDone {
r.cfg.ConnectedWaitGroup.Done()
}
}()
updates, err := r.client.watchWorkspace(ctx, r.workspaceID)
if err != nil {
return xerrors.Errorf("watch workspace: %w", err)
}
shouldMarkConnectedDone = false
r.cfg.ConnectedWaitGroup.Done()
defer func() {
r.mu.Lock()
defer r.mu.Unlock()
r.cfg.Metrics.MissingStatusUpdatesTotal.
WithLabelValues(r.cfg.MetricLabelValues...).
Add(float64(len(r.reportTimes)))
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case workspace := <-updates:
if workspace.LatestAppStatus == nil {
continue
}
msgNo, ok := parseStatusMessage(workspace.LatestAppStatus.Message)
if !ok {
continue
}
r.mu.Lock()
reportTime, ok := r.reportTimes[msgNo]
delete(r.reportTimes, msgNo)
allDone := r.doneReporting && len(r.reportTimes) == 0
r.mu.Unlock()
if !ok {
return xerrors.Errorf("report time not found for message %d", msgNo)
}
latency := r.clock.Since(reportTime, "watchWorkspaceUpdates")
r.cfg.Metrics.TaskStatusToWorkspaceUpdateLatencySeconds.
WithLabelValues(r.cfg.MetricLabelValues...).
Observe(latency.Seconds())
if allDone {
return nil
}
}
}
}
func (r *Runner) reportTaskStatus(ctx context.Context) error {
defer func() {
r.mu.Lock()
defer r.mu.Unlock()
r.doneReporting = true
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-r.cfg.StartReporting:
r.logger.Info(ctx, "starting to report task status")
}
startedReporting := r.clock.Now("reportTaskStatus", "startedReporting")
msgNo := 0
done := xerrors.New("done reporting task status") // sentinel error
waiter := r.clock.TickerFunc(ctx, r.cfg.ReportStatusPeriod, func() error {
r.mu.Lock()
now := r.clock.Now("reportTaskStatus", "tick")
r.reportTimes[msgNo] = now
// It's important that we set doneReporting along with a final report, since the watchWorkspaceUpdates goroutine
// needs a update to wake up and check if we're done. We could introduce a secondary signaling channel, but
// it adds a lot of complexity and will be hard to test. We expect the tick period to be much smaller than the
// report status duration, so one extra tick is not a big deal.
if now.After(startedReporting.Add(r.cfg.ReportStatusDuration)) {
r.doneReporting = true
}
r.mu.Unlock()
err := r.patcher.patchAppStatus(ctx, agentsdk.PatchAppStatus{
AppSlug: r.cfg.AppSlug,
Message: statusUpdatePrefix + strconv.Itoa(msgNo),
State: codersdk.WorkspaceAppStatusStateWorking,
URI: "https://example.com/example-status/",
})
if err != nil {
r.logger.Error(ctx, "failed to report task status", slog.Error(err))
r.cfg.Metrics.ReportTaskStatusErrorsTotal.WithLabelValues(r.cfg.MetricLabelValues...).Inc()
}
msgNo++
// note that it's safe to read r.doneReporting here without a lock because we're the only goroutine that sets
// it.
if r.doneReporting {
return done // causes the ticker to exit due to the sentinel error
}
return nil
}, "reportTaskStatus")
err := waiter.Wait()
if xerrors.Is(err, done) {
return nil
}
return err
}
func parseStatusMessage(message string) (int, bool) {
if !strings.HasPrefix(message, statusUpdatePrefix) {
return 0, false
}
message = strings.TrimPrefix(message, statusUpdatePrefix)
msgNo, err := strconv.Atoi(message)
if err != nil {
return 0, false
}
return msgNo, true
}
+482
View File
@@ -0,0 +1,482 @@
package taskstatus
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/quartz"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/testutil"
)
// fakeClient implements the client interface for testing
type fakeClient struct {
t *testing.T
logger slog.Logger
// Channels for controlling the behavior
workspaceUpdatesCh chan codersdk.Workspace
}
func newFakeClient(t *testing.T) *fakeClient {
return &fakeClient{
t: t,
workspaceUpdatesCh: make(chan codersdk.Workspace),
}
}
func (m *fakeClient) initialize(logger slog.Logger) {
m.logger = logger
}
func (m *fakeClient) watchWorkspace(ctx context.Context, workspaceID uuid.UUID) (<-chan codersdk.Workspace, error) {
m.logger.Debug(ctx, "called fake WatchWorkspace", slog.F("workspace_id", workspaceID.String()))
return m.workspaceUpdatesCh, nil
}
const testAgentToken = "test-agent-token"
func (m *fakeClient) createExternalWorkspace(ctx context.Context, req codersdk.CreateWorkspaceRequest) (createExternalWorkspaceResult, error) {
m.logger.Debug(ctx, "called fake CreateExternalWorkspace", slog.F("req", req))
// Return a fake workspace ID and token for testing
return createExternalWorkspaceResult{
WorkspaceID: uuid.UUID{1, 2, 3, 4}, // Fake workspace ID
AgentToken: testAgentToken,
}, nil
}
// fakeAppStatusPatcher implements the appStatusPatcher interface for testing
type fakeAppStatusPatcher struct {
t *testing.T
logger slog.Logger
agentToken string
// Channels for controlling the behavior
patchStatusCalls chan agentsdk.PatchAppStatus
patchStatusErrors chan error
}
func newFakeAppStatusPatcher(t *testing.T) *fakeAppStatusPatcher {
return &fakeAppStatusPatcher{
t: t,
patchStatusCalls: make(chan agentsdk.PatchAppStatus),
patchStatusErrors: make(chan error, 1),
}
}
func (p *fakeAppStatusPatcher) initialize(logger slog.Logger, agentToken string) {
p.logger = logger
p.agentToken = agentToken
}
func (p *fakeAppStatusPatcher) patchAppStatus(ctx context.Context, req agentsdk.PatchAppStatus) error {
assert.NotEmpty(p.t, p.agentToken)
p.logger.Debug(ctx, "called fake PatchAppStatus", slog.F("req", req))
// Send the request to the channel so tests can verify it
select {
case p.patchStatusCalls <- req:
case <-ctx.Done():
return ctx.Err()
}
// Check if there's an error to return
select {
case err := <-p.patchStatusErrors:
return err
default:
return nil
}
}
func TestRunner_Run(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
mClock := quartz.NewMock(t)
fClient := newFakeClient(t)
fPatcher := newFakeAppStatusPatcher(t)
templateID := uuid.UUID{5, 6, 7, 8}
workspaceName := "test-workspace"
appSlug := "test-app"
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg, "test")
connectedWaitGroup := &sync.WaitGroup{}
connectedWaitGroup.Add(1)
startReporting := make(chan struct{})
cfg := Config{
TemplateID: templateID,
WorkspaceName: workspaceName,
AppSlug: appSlug,
ConnectedWaitGroup: connectedWaitGroup,
StartReporting: startReporting,
ReportStatusPeriod: 10 * time.Second,
ReportStatusDuration: 35 * time.Second,
Metrics: metrics,
MetricLabelValues: []string{"test"},
}
runner := &Runner{
client: fClient,
patcher: fPatcher,
cfg: cfg,
clock: mClock,
reportTimes: make(map[int]time.Time),
}
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
defer tickerTrap.Close()
sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates")
defer sinceTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
go func() {
runErr <- runner.Run(ctx, "test-runner", testutil.NewTestLogWriter(t))
}()
// Wait for the runner to connect and watch workspace
connectedWaitGroup.Wait()
// Signal to start reporting
close(startReporting)
// Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off.
tickerTrap.MustWait(ctx).MustRelease(ctx)
// at this point, the patcher must be initialized
require.Equal(t, testAgentToken, fPatcher.agentToken)
updateDelay := time.Duration(0)
for i := 0; i < 4; i++ {
tickWaiter := mClock.Advance((10 * time.Second) - updateDelay)
patchCall := testutil.RequireReceive(ctx, t, fPatcher.patchStatusCalls)
require.Equal(t, appSlug, patchCall.AppSlug)
require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message)
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State)
tickWaiter.MustWait(ctx)
// Send workspace update 1, 2, 3, or 4 seconds after the report
updateDelay = time.Duration(i+1) * time.Second
mClock.Advance(updateDelay)
workspace := codersdk.Workspace{
LatestAppStatus: &codersdk.WorkspaceAppStatus{
Message: fmt.Sprintf("scaletest status update:%d", i),
},
}
testutil.RequireSend(ctx, t, fClient.workspaceUpdatesCh, workspace)
sinceTrap.MustWait(ctx).MustRelease(ctx)
}
// Wait for the runner to complete
err := testutil.RequireReceive(ctx, t, runErr)
require.NoError(t, err)
// Verify metrics were updated correctly
metricFamilies, err := reg.Gather()
require.NoError(t, err)
var latencyMetricFound bool
var missingUpdatesFound bool
for _, mf := range metricFamilies {
switch mf.GetName() {
case "coderd_scaletest_task_status_to_workspace_update_latency_seconds":
latencyMetricFound = true
require.Len(t, mf.GetMetric(), 1)
hist := mf.GetMetric()[0].GetHistogram()
assert.Equal(t, uint64(4), hist.GetSampleCount())
case "coderd_scaletest_missing_status_updates_total":
missingUpdatesFound = true
require.Len(t, mf.GetMetric(), 1)
counter := mf.GetMetric()[0].GetCounter()
assert.Equal(t, float64(0), counter.GetValue())
}
}
assert.True(t, latencyMetricFound, "latency metric not found")
assert.True(t, missingUpdatesFound, "missing updates metric not found")
}
func TestRunner_RunMissedUpdate(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
runCtx, cancel := context.WithCancel(testCtx)
defer cancel()
mClock := quartz.NewMock(t)
fClient := newFakeClient(t)
fPatcher := newFakeAppStatusPatcher(t)
templateID := uuid.UUID{5, 6, 7, 8}
workspaceName := "test-workspace"
appSlug := "test-app"
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg, "test")
connectedWaitGroup := &sync.WaitGroup{}
connectedWaitGroup.Add(1)
startReporting := make(chan struct{})
cfg := Config{
TemplateID: templateID,
WorkspaceName: workspaceName,
AppSlug: appSlug,
ConnectedWaitGroup: connectedWaitGroup,
StartReporting: startReporting,
ReportStatusPeriod: 10 * time.Second,
ReportStatusDuration: 35 * time.Second,
Metrics: metrics,
MetricLabelValues: []string{"test"},
}
runner := &Runner{
client: fClient,
patcher: fPatcher,
cfg: cfg,
clock: mClock,
reportTimes: make(map[int]time.Time),
}
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
defer tickerTrap.Close()
sinceTrap := mClock.Trap().Since("watchWorkspaceUpdates")
defer sinceTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
go func() {
runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t))
}()
// Wait for the runner to connect and watch workspace
connectedWaitGroup.Wait()
// Signal to start reporting
close(startReporting)
// Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off.
tickerTrap.MustWait(testCtx).MustRelease(testCtx)
updateDelay := time.Duration(0)
for i := 0; i < 4; i++ {
tickWaiter := mClock.Advance((10 * time.Second) - updateDelay)
patchCall := testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls)
require.Equal(t, appSlug, patchCall.AppSlug)
require.Equal(t, fmt.Sprintf("scaletest status update:%d", i), patchCall.Message)
require.Equal(t, codersdk.WorkspaceAppStatusStateWorking, patchCall.State)
tickWaiter.MustWait(testCtx)
// Send workspace update 1, 2, 3, or 4 seconds after the report
updateDelay = time.Duration(i+1) * time.Second
mClock.Advance(updateDelay)
workspace := codersdk.Workspace{
LatestAppStatus: &codersdk.WorkspaceAppStatus{
Message: fmt.Sprintf("scaletest status update:%d", i),
},
}
if i != 2 {
// skip the third update, to test that we report missed updates and still complete.
testutil.RequireSend(testCtx, t, fClient.workspaceUpdatesCh, workspace)
sinceTrap.MustWait(testCtx).MustRelease(testCtx)
}
}
// Cancel the run context to simulate the runner being killed.
cancel()
// Wait for the runner to complete
err := testutil.RequireReceive(testCtx, t, runErr)
require.ErrorIs(t, err, context.Canceled)
// Verify metrics were updated correctly
metricFamilies, err := reg.Gather()
require.NoError(t, err)
// Check that metrics were recorded
var latencyMetricFound bool
var missingUpdatesFound bool
for _, mf := range metricFamilies {
switch mf.GetName() {
case "coderd_scaletest_task_status_to_workspace_update_latency_seconds":
latencyMetricFound = true
require.Len(t, mf.GetMetric(), 1)
hist := mf.GetMetric()[0].GetHistogram()
assert.Equal(t, uint64(3), hist.GetSampleCount())
case "coderd_scaletest_missing_status_updates_total":
missingUpdatesFound = true
require.Len(t, mf.GetMetric(), 1)
counter := mf.GetMetric()[0].GetCounter()
assert.Equal(t, float64(1), counter.GetValue())
}
}
assert.True(t, latencyMetricFound, "latency metric not found")
assert.True(t, missingUpdatesFound, "missing updates metric not found")
}
func TestRunner_Run_WithErrors(t *testing.T) {
t.Parallel()
testCtx := testutil.Context(t, testutil.WaitShort)
runCtx, cancel := context.WithCancel(testCtx)
defer cancel()
mClock := quartz.NewMock(t)
fClient := newFakeClient(t)
fPatcher := newFakeAppStatusPatcher(t)
templateID := uuid.UUID{5, 6, 7, 8}
workspaceName := "test-workspace"
appSlug := "test-app"
reg := prometheus.NewRegistry()
metrics := NewMetrics(reg, "test")
connectedWaitGroup := &sync.WaitGroup{}
connectedWaitGroup.Add(1)
startReporting := make(chan struct{})
cfg := Config{
TemplateID: templateID,
WorkspaceName: workspaceName,
AppSlug: appSlug,
ConnectedWaitGroup: connectedWaitGroup,
StartReporting: startReporting,
ReportStatusPeriod: 10 * time.Second,
ReportStatusDuration: 35 * time.Second,
Metrics: metrics,
MetricLabelValues: []string{"test"},
}
runner := &Runner{
client: fClient,
patcher: fPatcher,
cfg: cfg,
clock: mClock,
reportTimes: make(map[int]time.Time),
}
tickerTrap := mClock.Trap().TickerFunc("reportTaskStatus")
defer tickerTrap.Close()
// Run the runner in a goroutine
runErr := make(chan error, 1)
go func() {
runErr <- runner.Run(runCtx, "test-runner", testutil.NewTestLogWriter(t))
}()
connectedWaitGroup.Wait()
close(startReporting)
// Wait for the initial TickerFunc call before advancing time, otherwise our ticks will be off.
tickerTrap.MustWait(testCtx).MustRelease(testCtx)
for i := 0; i < 4; i++ {
tickWaiter := mClock.Advance(10 * time.Second)
testutil.RequireSend(testCtx, t, fPatcher.patchStatusErrors, xerrors.New("a bad thing happened"))
_ = testutil.RequireReceive(testCtx, t, fPatcher.patchStatusCalls)
tickWaiter.MustWait(testCtx)
}
// Cancel the run context to simulate the runner being killed.
cancel()
// Wait for the runner to complete
err := testutil.RequireReceive(testCtx, t, runErr)
require.ErrorIs(t, err, context.Canceled)
// Verify metrics were updated correctly
metricFamilies, err := reg.Gather()
require.NoError(t, err)
var missingUpdatesFound bool
var reportTaskStatusErrorsFound bool
for _, mf := range metricFamilies {
switch mf.GetName() {
case "coderd_scaletest_missing_status_updates_total":
missingUpdatesFound = true
require.Len(t, mf.GetMetric(), 1)
counter := mf.GetMetric()[0].GetCounter()
assert.Equal(t, float64(4), counter.GetValue())
case "coderd_scaletest_report_task_status_errors_total":
reportTaskStatusErrorsFound = true
require.Len(t, mf.GetMetric(), 1)
counter := mf.GetMetric()[0].GetCounter()
assert.Equal(t, float64(4), counter.GetValue())
}
}
assert.True(t, missingUpdatesFound, "missing updates metric not found")
assert.True(t, reportTaskStatusErrorsFound, "report task status errors metric not found")
}
func TestParseStatusMessage(t *testing.T) {
t.Parallel()
tests := []struct {
name string
message string
wantNum int
wantOk bool
}{
{
name: "valid message",
message: "scaletest status update:42",
wantNum: 42,
wantOk: true,
},
{
name: "valid message zero",
message: "scaletest status update:0",
wantNum: 0,
wantOk: true,
},
{
name: "invalid prefix",
message: "wrong prefix:42",
wantNum: 0,
wantOk: false,
},
{
name: "invalid number",
message: "scaletest status update:abc",
wantNum: 0,
wantOk: false,
},
{
name: "empty message",
message: "",
wantNum: 0,
wantOk: false,
},
{
name: "missing number",
message: "scaletest status update:",
wantNum: 0,
wantOk: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
gotNum, gotOk := parseStatusMessage(tt.message)
assert.Equal(t, tt.wantNum, gotNum)
assert.Equal(t, tt.wantOk, gotOk)
})
}
}
+30
View File
@@ -2,7 +2,9 @@ package testutil
import (
"context"
"io"
"strings"
"sync"
"testing"
"github.com/hashicorp/yamux"
@@ -51,3 +53,31 @@ func isQueryCanceledError(err error) bool {
}
return false
}
type testLogWriter struct {
t testing.TB
mu sync.Mutex
testOver bool
}
func NewTestLogWriter(t testing.TB) io.Writer {
w := &testLogWriter{t: t}
t.Cleanup(func() {
w.mu.Lock()
defer w.mu.Unlock()
w.testOver = true
})
return w
}
func (w *testLogWriter) Write(p []byte) (n int, err error) {
n = len(p)
w.mu.Lock()
defer w.mu.Unlock()
if w.testOver {
return n, nil
}
w.t.Logf("%q", string(p))
return n, nil
}