Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a3b912f464 | |||
| a039830554 | |||
| 8869b534cf | |||
| 12559706bc | |||
| 4d29eb80fe |
@@ -564,7 +564,7 @@ func (r *RootCmd) scaletestCleanup() *serpent.Command {
|
||||
cliui.Errorf(inv.Stderr, "Found %d scaletest workspaces\n", len(workspaces))
|
||||
if len(workspaces) != 0 {
|
||||
cliui.Infof(inv.Stdout, "Deleting scaletest workspaces...")
|
||||
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{})
|
||||
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{}, harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
for i, w := range workspaces {
|
||||
const testName = "cleanup-workspace"
|
||||
@@ -597,7 +597,7 @@ func (r *RootCmd) scaletestCleanup() *serpent.Command {
|
||||
cliui.Errorf(inv.Stderr, "Found %d scaletest users\n", len(users))
|
||||
if len(users) != 0 {
|
||||
cliui.Infof(inv.Stdout, "Deleting scaletest users...")
|
||||
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{})
|
||||
harness := harness.NewTestHarness(cleanupStrategy.toStrategy(), harness.ConcurrentExecutionStrategy{}, harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
for i, u := range users {
|
||||
const testName = "cleanup-users"
|
||||
@@ -742,7 +742,7 @@ func (r *RootCmd) scaletestCreateWorkspaces() *serpent.Command {
|
||||
}()
|
||||
tracer := tracerProvider.Tracer(scaletestTracerName)
|
||||
|
||||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
for i := 0; i < int(count); i++ {
|
||||
const name = "workspacebuild"
|
||||
id := strconv.Itoa(i)
|
||||
@@ -1155,7 +1155,7 @@ func (r *RootCmd) scaletestWorkspaceUpdates() *serpent.Command {
|
||||
configs = append(configs, config)
|
||||
}
|
||||
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
for i, config := range configs {
|
||||
name := fmt.Sprintf("workspaceupdates-%dw", config.WorkspaceCount)
|
||||
id := strconv.Itoa(i)
|
||||
@@ -1355,7 +1355,7 @@ func (r *RootCmd) scaletestWorkspaceTraffic() *serpent.Command {
|
||||
return xerrors.Errorf("could not parse --output flags")
|
||||
}
|
||||
|
||||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
for idx, ws := range workspaces {
|
||||
var (
|
||||
agent codersdk.WorkspaceAgent
|
||||
@@ -1586,7 +1586,7 @@ func (r *RootCmd) scaletestDashboard() *serpent.Command {
|
||||
|
||||
metrics := dashboard.NewMetrics(reg)
|
||||
|
||||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(strategy.toStrategy(), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
users, err := getScaletestUsers(ctx, client)
|
||||
if err != nil {
|
||||
@@ -1809,7 +1809,7 @@ func (r *RootCmd) scaletestAutostart() *serpent.Command {
|
||||
setupBarrier := new(sync.WaitGroup)
|
||||
setupBarrier.Add(int(workspaceCount))
|
||||
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
for i := range workspaceCount {
|
||||
id := strconv.Itoa(int(i))
|
||||
config := autostart.Config{
|
||||
|
||||
@@ -129,7 +129,7 @@ Examples:
|
||||
return xerrors.Errorf("prepare request body: %w", err)
|
||||
}
|
||||
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
for i := range concurrentUsers {
|
||||
id := strconv.Itoa(int(i))
|
||||
|
||||
@@ -102,7 +102,8 @@ func (r *RootCmd) scaletestDynamicParameters() *serpent.Command {
|
||||
th := harness.NewTestHarness(
|
||||
timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}),
|
||||
// there is no cleanup since it's just a connection that we sever.
|
||||
nil)
|
||||
nil,
|
||||
harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
for i, part := range partitions {
|
||||
for j := range part.ConcurrentEvaluations {
|
||||
|
||||
@@ -192,7 +192,7 @@ func (r *RootCmd) scaletestNotifications() *serpent.Command {
|
||||
triggerTimes,
|
||||
)
|
||||
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
for i, config := range configs {
|
||||
id := strconv.Itoa(i)
|
||||
|
||||
@@ -110,7 +110,7 @@ func (r *RootCmd) scaletestPrebuilds() *serpent.Command {
|
||||
deletionBarrier := new(sync.WaitGroup)
|
||||
deletionBarrier.Add(int(numTemplates))
|
||||
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy())
|
||||
th := harness.NewTestHarness(timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}), cleanupStrategy.toStrategy(), harness.WithLogWriter(inv.Stderr))
|
||||
|
||||
tags, err := ParseProvisionerTags(provisionerTags)
|
||||
if err != nil {
|
||||
|
||||
@@ -122,6 +122,7 @@ After all runners connect, it waits for the baseline duration before triggering
|
||||
th := harness.NewTestHarness(
|
||||
timeoutStrategy.wrapStrategy(harness.ConcurrentExecutionStrategy{}),
|
||||
cleanupStrategy.toStrategy(),
|
||||
harness.WithLogWriter(inv.Stderr),
|
||||
)
|
||||
|
||||
// Create runners
|
||||
|
||||
@@ -3867,6 +3867,37 @@ func TestGetProvisionerJobsByIDsWithQueuePosition(t *testing.T) {
|
||||
queueSizes: nil, // TODO(yevhenii): should it be empty array instead?
|
||||
queuePositions: nil,
|
||||
},
|
||||
// Many daemons with identical tags should produce same results as one.
|
||||
{
|
||||
name: "duplicate-daemons-same-tags",
|
||||
jobTags: []database.StringMap{
|
||||
{"a": "1"},
|
||||
{"a": "1", "b": "2"},
|
||||
},
|
||||
daemonTags: []database.StringMap{
|
||||
{"a": "1", "b": "2"},
|
||||
{"a": "1", "b": "2"},
|
||||
{"a": "1", "b": "2"},
|
||||
},
|
||||
queueSizes: []int64{2, 2},
|
||||
queuePositions: []int64{1, 2},
|
||||
},
|
||||
// Jobs that don't match any queried job's daemon should still
|
||||
// have correct queue positions.
|
||||
{
|
||||
name: "irrelevant-daemons-filtered",
|
||||
jobTags: []database.StringMap{
|
||||
{"a": "1"},
|
||||
{"x": "9"},
|
||||
},
|
||||
daemonTags: []database.StringMap{
|
||||
{"a": "1"},
|
||||
{"x": "9"},
|
||||
},
|
||||
queueSizes: []int64{1},
|
||||
queuePositions: []int64{1},
|
||||
skipJobIDs: map[int]struct{}{1: {}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@@ -4192,6 +4223,51 @@ func TestGetProvisionerJobsByIDsWithQueuePosition_OrderValidation(t *testing.T)
|
||||
assert.EqualValues(t, []int64{1, 2, 3, 4, 5, 6}, queuePositions, "expected queue positions to be set correctly")
|
||||
}
|
||||
|
||||
func TestGetProvisionerJobsByIDsWithQueuePosition_DuplicateDaemons(t *testing.T) {
|
||||
t.Parallel()
|
||||
db, _ := dbtestutil.NewDB(t)
|
||||
now := dbtime.Now()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
|
||||
// Create 3 pending jobs with the same tags.
|
||||
jobs := make([]database.ProvisionerJob, 3)
|
||||
for i := range jobs {
|
||||
jobs[i] = dbgen.ProvisionerJob(t, db, nil, database.ProvisionerJob{
|
||||
CreatedAt: now.Add(-time.Duration(3-i) * time.Minute),
|
||||
Tags: database.StringMap{"scope": "organization", "owner": ""},
|
||||
})
|
||||
}
|
||||
|
||||
// Create 50 daemons with identical tags (simulates scale).
|
||||
for i := range 50 {
|
||||
dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{
|
||||
Name: fmt.Sprintf("daemon_%d", i),
|
||||
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho},
|
||||
Tags: database.StringMap{"scope": "organization", "owner": ""},
|
||||
})
|
||||
}
|
||||
|
||||
jobIDs := make([]uuid.UUID, len(jobs))
|
||||
for i, j := range jobs {
|
||||
jobIDs[i] = j.ID
|
||||
}
|
||||
|
||||
results, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx,
|
||||
database.GetProvisionerJobsByIDsWithQueuePositionParams{
|
||||
IDs: jobIDs,
|
||||
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, results, 3)
|
||||
|
||||
// All daemons have identical tags, so queue should be same as
|
||||
// if there were just one daemon.
|
||||
for i, r := range results {
|
||||
assert.Equal(t, int64(3), r.QueueSize, "job %d queue size", i)
|
||||
assert.Equal(t, int64(i+1), r.QueuePosition, "job %d queue position", i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGroupRemovalTrigger(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
@@ -12551,7 +12551,7 @@ const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByI
|
||||
WITH filtered_provisioner_jobs AS (
|
||||
-- Step 1: Filter provisioner_jobs
|
||||
SELECT
|
||||
id, created_at
|
||||
id, created_at, tags
|
||||
FROM
|
||||
provisioner_jobs
|
||||
WHERE
|
||||
@@ -12566,21 +12566,32 @@ pending_jobs AS (
|
||||
WHERE
|
||||
job_status = 'pending'
|
||||
),
|
||||
online_provisioner_daemons AS (
|
||||
SELECT id, tags FROM provisioner_daemons pd
|
||||
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - ($2::bigint || ' ms')::interval)
|
||||
unique_daemon_tags AS (
|
||||
SELECT DISTINCT tags FROM provisioner_daemons pd
|
||||
WHERE pd.last_seen_at IS NOT NULL
|
||||
AND pd.last_seen_at >= (NOW() - ($2::bigint || ' ms')::interval)
|
||||
),
|
||||
relevant_daemon_tags AS (
|
||||
SELECT udt.tags
|
||||
FROM unique_daemon_tags udt
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM filtered_provisioner_jobs fpj
|
||||
WHERE provisioner_tagset_contains(udt.tags, fpj.tags)
|
||||
)
|
||||
),
|
||||
ranked_jobs AS (
|
||||
-- Step 3: Rank only pending jobs based on provisioner availability
|
||||
SELECT
|
||||
pj.id,
|
||||
pj.created_at,
|
||||
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
|
||||
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
|
||||
ROW_NUMBER() OVER (PARTITION BY rdt.tags ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
|
||||
COUNT(*) OVER (PARTITION BY rdt.tags) AS queue_size
|
||||
FROM
|
||||
pending_jobs pj
|
||||
INNER JOIN online_provisioner_daemons opd
|
||||
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
|
||||
INNER JOIN
|
||||
relevant_daemon_tags rdt
|
||||
ON
|
||||
provisioner_tagset_contains(rdt.tags, pj.tags)
|
||||
),
|
||||
final_jobs AS (
|
||||
-- Step 4: Compute best queue position and max queue size per job
|
||||
|
||||
@@ -79,7 +79,7 @@ WHERE
|
||||
WITH filtered_provisioner_jobs AS (
|
||||
-- Step 1: Filter provisioner_jobs
|
||||
SELECT
|
||||
id, created_at
|
||||
id, created_at, tags
|
||||
FROM
|
||||
provisioner_jobs
|
||||
WHERE
|
||||
@@ -94,21 +94,32 @@ pending_jobs AS (
|
||||
WHERE
|
||||
job_status = 'pending'
|
||||
),
|
||||
online_provisioner_daemons AS (
|
||||
SELECT id, tags FROM provisioner_daemons pd
|
||||
WHERE pd.last_seen_at IS NOT NULL AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
|
||||
unique_daemon_tags AS (
|
||||
SELECT DISTINCT tags FROM provisioner_daemons pd
|
||||
WHERE pd.last_seen_at IS NOT NULL
|
||||
AND pd.last_seen_at >= (NOW() - (@stale_interval_ms::bigint || ' ms')::interval)
|
||||
),
|
||||
relevant_daemon_tags AS (
|
||||
SELECT udt.tags
|
||||
FROM unique_daemon_tags udt
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM filtered_provisioner_jobs fpj
|
||||
WHERE provisioner_tagset_contains(udt.tags, fpj.tags)
|
||||
)
|
||||
),
|
||||
ranked_jobs AS (
|
||||
-- Step 3: Rank only pending jobs based on provisioner availability
|
||||
SELECT
|
||||
pj.id,
|
||||
pj.created_at,
|
||||
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
|
||||
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
|
||||
ROW_NUMBER() OVER (PARTITION BY rdt.tags ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
|
||||
COUNT(*) OVER (PARTITION BY rdt.tags) AS queue_size
|
||||
FROM
|
||||
pending_jobs pj
|
||||
INNER JOIN online_provisioner_daemons opd
|
||||
ON provisioner_tagset_contains(opd.tags, pj.tags) -- Join only on the small pending set
|
||||
INNER JOIN
|
||||
relevant_daemon_tags rdt
|
||||
ON
|
||||
provisioner_tagset_contains(rdt.tags, pj.tags)
|
||||
),
|
||||
final_jobs AS (
|
||||
-- Step 4: Compute best queue position and max queue size per job
|
||||
|
||||
@@ -2,6 +2,7 @@ package harness
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -17,17 +18,30 @@ type TestHarness struct {
|
||||
runStrategy ExecutionStrategy
|
||||
cleanupStrategy ExecutionStrategy
|
||||
|
||||
mut *sync.Mutex
|
||||
runIDs map[string]struct{}
|
||||
runs []*TestRun
|
||||
started bool
|
||||
done chan struct{}
|
||||
elapsed time.Duration
|
||||
mut *sync.Mutex
|
||||
runIDs map[string]struct{}
|
||||
runs []*TestRun
|
||||
started bool
|
||||
done chan struct{}
|
||||
elapsed time.Duration
|
||||
logWriter io.Writer
|
||||
}
|
||||
|
||||
// TestHarnessOption is a functional option for NewTestHarness.
|
||||
type TestHarnessOption func(*TestHarness)
|
||||
|
||||
// WithLogWriter sets an additional writer that all test run logs
|
||||
// are tee'd to (e.g. os.Stderr). The per-run in-memory buffer is
|
||||
// always written to; this writer receives a copy.
|
||||
func WithLogWriter(w io.Writer) TestHarnessOption {
|
||||
return func(h *TestHarness) {
|
||||
h.logWriter = w
|
||||
}
|
||||
}
|
||||
|
||||
// NewTestHarness creates a new TestHarness with the given execution strategies.
|
||||
func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy) *TestHarness {
|
||||
return &TestHarness{
|
||||
func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy, opts ...TestHarnessOption) *TestHarness {
|
||||
h := &TestHarness{
|
||||
runStrategy: runStrategy,
|
||||
cleanupStrategy: cleanupStrategy,
|
||||
mut: new(sync.Mutex),
|
||||
@@ -35,6 +49,10 @@ func NewTestHarness(runStrategy, cleanupStrategy ExecutionStrategy) *TestHarness
|
||||
runs: []*TestRun{},
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(h)
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
// Run runs the registered tests using the given ExecutionStrategy. The provided
|
||||
|
||||
@@ -45,6 +45,7 @@ type Collectable interface {
|
||||
// This is a convenience method that calls NewTestRun() and h.RegisterRun().
|
||||
func (h *TestHarness) AddRun(testName string, id string, runner Runnable) *TestRun {
|
||||
run := NewTestRun(testName, id, runner)
|
||||
run.logWriter = h.logWriter
|
||||
h.RegisterRun(run)
|
||||
|
||||
return run
|
||||
@@ -69,9 +70,10 @@ func (h *TestHarness) RegisterRun(run *TestRun) {
|
||||
|
||||
// TestRun is a single test run and it's accompanying state.
|
||||
type TestRun struct {
|
||||
testName string
|
||||
id string
|
||||
runner Runnable
|
||||
testName string
|
||||
id string
|
||||
runner Runnable
|
||||
logWriter io.Writer
|
||||
|
||||
logs *syncBuffer
|
||||
done chan struct{}
|
||||
@@ -119,7 +121,11 @@ func (r *TestRun) Run(ctx context.Context) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
err = r.runner.Run(ctx, r.id, r.logs)
|
||||
var w io.Writer = r.logs
|
||||
if r.logWriter != nil {
|
||||
w = io.MultiWriter(r.logs, r.logWriter)
|
||||
}
|
||||
err = r.runner.Run(ctx, r.id, w)
|
||||
|
||||
//nolint:revive // we use named returns because we mutate it in a defer
|
||||
return
|
||||
@@ -145,7 +151,11 @@ func (r *TestRun) Cleanup(ctx context.Context) (err error) {
|
||||
}
|
||||
}()
|
||||
|
||||
err = c.Cleanup(ctx, r.id, r.logs)
|
||||
var w io.Writer = r.logs
|
||||
if r.logWriter != nil {
|
||||
w = io.MultiWriter(r.logs, r.logWriter)
|
||||
}
|
||||
err = c.Cleanup(ctx, r.id, w)
|
||||
//nolint:revive // we use named returns because we mutate it in a defer
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user