Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Ayers a3b912f464 perf: optimize queue position query by deduplicating daemons 2026-03-06 04:05:26 +00:00
Jon Ayers a039830554 test: add baseline tests for queue position deduplication 2026-03-06 04:05:26 +00:00
Jon Ayers 8869b534cf feat(scaletest): add harness-level log tee to stream runner logs to stderr
Add WithLogWriter option to TestHarness that tees all test run logs
to an external writer (e.g. stderr) in addition to the in-memory buffer.
This makes scaletest runner logs visible during execution.

- Add TestHarnessOption func type and WithLogWriter constructor
- Propagate logWriter from harness to TestRun via AddRun
- Use io.MultiWriter in TestRun.Run() and Cleanup() when logWriter is set
- Pass harness.WithLogWriter(inv.Stderr) at all CLI call sites
2026-03-06 01:26:37 +00:00
Jon Ayers 12559706bc Revert "feat(scaletest): add LogOutput to prebuilds Config for tee-ing logs to stderr"
This reverts commit 4d29eb80fe.
2026-03-06 01:24:13 +00:00
Jon Ayers 4d29eb80fe feat(scaletest): add LogOutput to prebuilds Config for tee-ing logs to stderr 2026-03-06 00:54:28 +00:00
11 changed files with 168 additions and 40 deletions
+7 -7
View File
@@ -564,7 +564,7 @@ func (r *RootCmd) scaletestCleanup() *serpent.Command {
cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces))
if len(workspaces) != 0 {
cliui.Infof(inv.Stdout, "Deleting scaletest workspaces...")
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{})
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{}, harness.WithLogWriter(inv.Stderr))
for i, w := range workspaces {
const testName = "cleanup-workspace"
@@ -597,7 +597,7 @@ func (r *RootCmd) scaletestCleanup() *serpent.Command {
cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users))
if len(users) != 0 {
cliui.Infof(inv.Stdout, "Deleting scaletest users...")
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{})
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{}, harness.WithLogWriter(inv.Stderr))
for i, u := range users {
const testName = "cleanup-users"
@@ -742,7 +742,7 @@ func (r *RootCmd) scaletestCreateWorkspaces() *serpent.Command {
}()
tracer := tracerProvider.Tracer(scaletestTracerName)
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
for i := 0; i < int(count); i++ {
const name = "workspacebuild"
id := strconv.Itoa(i)
@@ -1155,7 +1155,7 @@ func (r *RootCmd) scaletestWorkspaceUpdates() *serpent.Command {
configs = append(configs, config)
}
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
for i, config := range configs {
name := fmt.Sprintf("workspaceupdates-%dw", config.WorkspaceCount)
id := strconv.Itoa(i)
@@ -1355,7 +1355,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *serpent.Command {
return xerrors.Errorf("could not parse --output flags")
}
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
for idx, ws := range workspaces {
var (
agent codersdk.WorkspaceAgent
@@ -1586,7 +1586,7 @@ func (r *RootCmd) scaletestDashboard() *serpent.Command {
metrics := dashboard.NewMetrics(reg)
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
users, err := getScaletestUsers(ctx, client)
if err != nil {
@@ -1809,7 +1809,7 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command {
setupBarrier := new(sync.WaitGroup)
setupBarrier.Add(int(workspaceCount))
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
for i := range workspaceCount {
id := strconv.Itoa(int(i))
config := autostart.Config{
+1 -1
View File
@@ -129,7 +129,7 @@ Examples:
return xerrors.Errorf("prepare request body: %w", err)
}
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
for i := range concurrentUsers {
id := strconv.Itoa(int(i))
+2 -1
View File
@@ -102,7 +102,8 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command {
th := harness.NewTestHarness(
timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}),
// there is no cleanup since it's just a connection that we sever.
nil)
nil,
harness.WithLogWriter(inv.Stderr))
for i, part := range partitions {
for j := range part.ConcurrentEvaluations {
+1 -1
View File
@@ -192,7 +192,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
triggerTimes,
)
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
for i, config := range configs {
id := strconv.Itoa(i)
+1 -1
View File
@@ -110,7 +110,7 @@ func (r *RootCmd) scaletestPrebuilds() *serpent.Command {
deletionBarrier := new(sync.WaitGroup)
deletionBarrier.Add(int(numTemplates))
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
tags, err := ParseProvisionerTags(provisionerTags)
if err != nil {
+1
View File
@@ -122,6 +122,7 @@ After all runners connect, it waits for the baseline duration before triggering
th := harness.NewTestHarness(
timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}),
cleanupStrategy.toStrategy(),
harness.WithLogWriter(inv.Stderr),
)
// Create runners
+76
View File
@@ -3867,6 +3867,37 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
queueSizes: nil, // TODO(yevhenii): should it be empty array instead?
queuePositions: nil,
},
// Many daemons with identical tags should produce same results as one.
{
name: "duplicate-daemons-same-tags",
jobTags: []database.StringMap{
{"a": "1"},
{"a": "1", "b": "2"},
},
daemonTags: []database.StringMap{
{"a": "1", "b": "2"},
{"a": "1", "b": "2"},
{"a": "1", "b": "2"},
},
queueSizes: []int64{2, 2},
queuePositions: []int64{1, 2},
},
// Jobs that don't match any queried job's daemon should still
// have correct queue positions.
{
name: "irrelevant-daemons-filtered",
jobTags: []database.StringMap{
{"a": "1"},
{"x": "9"},
},
daemonTags: []database.StringMap{
{"a": "1"},
{"x": "9"},
},
queueSizes: []int64{1},
queuePositions: []int64{1},
skipJobIDs: map[int]struct{}{1: {}},
},
}
for _, tc := range testCases {
@@ -4192,6 +4223,51 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T)
assert.EqualValues(t, []int64{1, 2, 3, 4, 5, 6}, queuePositions, "expected queue positions to be set correctly")
}
func TestGetProvisionerJobsByIDsWithQueuePosition_DuplicateDaemons(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
now := dbtime.Now()
ctx := testutil.Context(t, testutil.WaitShort)
// Create 3 pending jobs with the same tags.
jobs := make([]database.ProvisionerJob, 3)
for i := range jobs {
jobs[i] = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
CreatedAt: now.Add(-time.Duration(3-i) * time.Minute),
Tags: database.StringMap{"scope": "organization", "owner": ""},
})
}
// Create 50 daemons with identical tags (simulates scale).
for i := range 50 {
dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{
Name: fmt.Sprintf("daemon_%d", i),
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho},
Tags: database.StringMap{"scope": "organization", "owner": ""},
})
}
jobIDs := make([]uuid.UUID, len(jobs))
for i, j := range jobs {
jobIDs[i] = j.ID
}
results, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx,
database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, results, 3)
// All daemons have identical tags, so queue should be same as
// if there were just one daemon.
for i, r := range results {
assert.Equal(t, int64(3), r.QueueSize, "job %d queue size", i)
assert.Equal(t, int64(i+1), r.QueuePosition, "job %d queue position", i)
}
}
func TestGroupRemovalTrigger(t *testing.T) {
t.Parallel()
+19 -8
View File
@@ -12551,7 +12551,7 @@ const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByI
WITH filtered_provisioner_jobs AS (
-- Step 1: Filter provisioner_jobs
SELECT
id, created_at
id, created_at, tags
FROM
provisioner_jobs
WHERE
@@ -12566,21 +12566,32 @@ pending_jobs AS (
WHERE
job_status = 'pending'
),
online_provisioner_daemons AS (
SELECT id, tags FROM provisioner_daemons pd
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - ($2::bigint || ' ms')::interval)
unique_daemon_tags AS (
SELECT DISTINCT tags FROM provisioner_daemons pd
WHERE pd.last_seen_at IS NOT NULL
AND pd.last_seen_at >= (NOW() - ($2::bigint || ' ms')::interval)
),
relevant_daemon_tags AS (
SELECT udt.tags
FROM unique_daemon_tags udt
WHERE EXISTS (
SELECT 1 FROM filtered_provisioner_jobs fpj
WHERE provisioner_tagset_contains(udt.tags, fpj.tags)
)
),
ranked_jobs AS (
-- Step 3: Rank only pending jobs based on provisioner availability
SELECT
pj.id,
pj.created_at,
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
ROW_NUMBER() OVER (PARTITION BY rdt.tags ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY rdt.tags) AS queue_size
FROM
pending_jobs pj
INNER JOIN online_provisioner_daemons opd
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
INNER JOIN
relevant_daemon_tags rdt
ON
provisioner_tagset_contains(rdt.tags, pj.tags)
),
final_jobs AS (
-- Step 4: Compute best queue position and max queue size per job
+19 -8
View File
@@ -79,7 +79,7 @@ WHERE
WITH filtered_provisioner_jobs AS (
-- Step 1: Filter provisioner_jobs
SELECT
id, created_at
id, created_at, tags
FROM
provisioner_jobs
WHERE
@@ -94,21 +94,32 @@ pending_jobs AS (
WHERE
job_status = 'pending'
),
online_provisioner_daemons AS (
SELECT id, tags FROM provisioner_daemons pd
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
unique_daemon_tags AS (
SELECT DISTINCT tags FROM provisioner_daemons pd
WHERE pd.last_seen_at IS NOT NULL
AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
),
relevant_daemon_tags AS (
SELECT udt.tags
FROM unique_daemon_tags udt
WHERE EXISTS (
SELECT 1 FROM filtered_provisioner_jobs fpj
WHERE provisioner_tagset_contains(udt.tags, fpj.tags)
)
),
ranked_jobs AS (
-- Step 3: Rank only pending jobs based on provisioner availability
SELECT
pj.id,
pj.created_at,
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
ROW_NUMBER() OVER (PARTITION BY rdt.tags ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY rdt.tags) AS queue_size
FROM
pending_jobs pj
INNER JOIN online_provisioner_daemons opd
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
INNER JOIN
relevant_daemon_tags rdt
ON
provisioner_tagset_contains(rdt.tags, pj.tags)
),
final_jobs AS (
-- Step 4: Compute best queue position and max queue size per job
+26 -8
View File
@@ -2,6 +2,7 @@ package harness
import (
"context"
"io"
"sync"
"time"
@@ -17,17 +18,30 @@ type TestHarness struct {
runStrategy ExecutionStrategy
cleanupStrategy ExecutionStrategy
mut *sync.Mutex
runIDs map[string]struct{}
runs []*TestRun
started bool
done chan struct{}
elapsed time.Duration
mut *sync.Mutex
runIDs map[string]struct{}
runs []*TestRun
started bool
done chan struct{}
elapsed time.Duration
logWriter io.Writer
}
// TestHarnessOption is a functional option for NewTestHarness.
type TestHarnessOption func(*TestHarness)
// WithLogWriter sets an additional writer that all test run logs
// are tee'd to (e.g. os.Stderr). The per-run in-memory buffer is
// always written to; this writer receives a copy.
func WithLogWriter(w io.Writer) TestHarnessOption {
return func(h *TestHarness) {
h.logWriter = w
}
}
// NewTestHarness creates a new TestHarness with the given execution strategies.
func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy) *TestHarness {
return &TestHarness{
func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy, opts ...TestHarnessOption) *TestHarness {
h := &TestHarness{
runStrategy: runStrategy,
cleanupStrategy: cleanupStrategy,
mut: new(sync.Mutex),
@@ -35,6 +49,10 @@ func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy) *TestHarness
runs: []*TestRun{},
done: make(chan struct{}),
}
for _, opt := range opts {
opt(h)
}
return h
}
// Run runs the registered tests using the given ExecutionStrategy. The provided
+15 -5
View File
@@ -45,6 +45,7 @@ type Collectable interface {
// This is a convenience method that calls NewTestRun() and h.RegisterRun().
func (h *TestHarness) AddRun(testName string, id string, runner Runnable) *TestRun {
run := NewTestRun(testName, id, runner)
run.logWriter = h.logWriter
h.RegisterRun(run)
return run
@@ -69,9 +70,10 @@ func (h *TestHarness) RegisterRun(run *TestRun) {
// TestRun is a single test run and it's accompanying state.
type TestRun struct {
testName string
id string
runner Runnable
testName string
id string
runner Runnable
logWriter io.Writer
logs *syncBuffer
done chan struct{}
@@ -119,7 +121,11 @@ func (r *TestRun) Run(ctx context.Context) (err error) {
}
}()
err = r.runner.Run(ctx, r.id, r.logs)
var w io.Writer = r.logs
if r.logWriter != nil {
w = io.MultiWriter(r.logs, r.logWriter)
}
err = r.runner.Run(ctx, r.id, w)
//nolint:revive // we use named returns because we mutate it in a defer
return
@@ -145,7 +151,11 @@ func (r *TestRun) Cleanup(ctx context.Context) (err error) {
}
}()
err = c.Cleanup(ctx, r.id, r.logs)
var w io.Writer = r.logs
if r.logWriter != nil {
w = io.MultiWriter(r.logs, r.logWriter)
}
err = c.Cleanup(ctx, r.id, w)
//nolint:revive // we use named returns because we mutate it in a defer
return
}