Compare commits

...

3 Commits

Author SHA1 Message Date
Jon Ayers
801b467b75 make gen 2026-03-17 18:56:29 +00:00
Jon Ayers
192b25e30a fix(dbauthz): implement GetProvisionerJobsByIDs instead of panic stub 2026-03-17 00:11:01 +00:00
Jon Ayers
8c7111fe4a perf(coderd): split queue position query and parallelize workspaceBuildsData
Add GetProvisionerJobsByIDs query that fetches jobs without the expensive
queue position cross-join. Restructure workspaceBuildsData() to:

- Phase A: Run jobs, template versions, and resources queries in parallel
- Phase B: Only call GetProvisionerJobsByIDsWithQueuePosition for pending
  jobs (typically 0-5 instead of thousands), then merge results
- Phase C: Run metadata and agents queries in parallel

This dramatically reduces query time for 'coder list' with thousands of
workspaces since the expensive queue position query now processes only
pending jobs instead of all jobs.
2026-03-17 00:07:36 +00:00
7 changed files with 218 additions and 49 deletions

View File

@@ -3280,6 +3280,12 @@ func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uui
return q.db.GetProvisionerJobTimingsByJobID(ctx, jobID)
}
func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return q.db.GetProvisionerJobsByIDs(ctx, ids)
}
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160

View File

@@ -1831,6 +1831,14 @@ func (m queryMetricsStore) GetProvisionerJobTimingsByJobID(ctx context.Context,
return r0, r1
}
func (m queryMetricsStore) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobsByIDs(ctx, ids)
m.queryLatencies.WithLabelValues("GetProvisionerJobsByIDs").Observe(time.Since(start).Seconds())
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetProvisionerJobsByIDs").Inc()
return r0, r1
}
func (m queryMetricsStore) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobsByIDsWithQueuePosition(ctx, arg)

View File

@@ -3368,6 +3368,21 @@ func (mr *MockStoreMockRecorder) GetProvisionerJobTimingsByJobID(ctx, jobID any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobTimingsByJobID", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobTimingsByJobID), ctx, jobID)
}
// GetProvisionerJobsByIDs mocks base method.
func (m *MockStore) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetProvisionerJobsByIDs", ctx, ids)
ret0, _ := ret[0].([]database.ProvisionerJob)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetProvisionerJobsByIDs indicates an expected call of GetProvisionerJobsByIDs.
func (mr *MockStoreMockRecorder) GetProvisionerJobsByIDs(ctx, ids any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobsByIDs", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobsByIDs), ctx, ids)
}
// GetProvisionerJobsByIDsWithQueuePosition mocks base method.
func (m *MockStore) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
m.ctrl.T.Helper()

View File

@@ -378,6 +378,7 @@ type sqlcQuerier interface {
// Blocks until the row is available for update.
GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (ProvisionerJob, error)
GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]ProvisionerJobTiming, error)
GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error)
GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg GetProvisionerJobsByIDsWithQueuePositionParams) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error)
GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error)
GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]ProvisionerJob, error)

View File

@@ -13173,6 +13173,60 @@ func (q *sqlQuerier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID
return items, nil
}
const getProvisionerJobsByIDs = `-- name: GetProvisionerJobsByIDs :many
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, logs_length, logs_overflowed
FROM
provisioner_jobs
WHERE
id = ANY($1 :: uuid [ ])
`
func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error) {
rows, err := q.db.QueryContext(ctx, getProvisionerJobsByIDs, pq.Array(ids))
if err != nil {
return nil, err
}
defer rows.Close()
var items []ProvisionerJob
for rows.Next() {
var i ProvisionerJob
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CanceledAt,
&i.CompletedAt,
&i.Error,
&i.OrganizationID,
&i.InitiatorID,
&i.Provisioner,
&i.StorageMethod,
&i.Type,
&i.Input,
&i.WorkerID,
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
&i.JobStatus,
&i.LogsLength,
&i.LogsOverflowed,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many
WITH filtered_provisioner_jobs AS (
-- Step 1: Filter provisioner_jobs

View File

@@ -67,6 +67,14 @@ WHERE
id = $1
FOR UPDATE;
-- name: GetProvisionerJobsByIDs :many
SELECT
*
FROM
provisioner_jobs
WHERE
id = ANY(@ids :: uuid [ ]);
-- name: GetProvisionerJobsByIDsWithQueuePosition :many
WITH filtered_provisioner_jobs AS (
-- Step 1: Filter provisioner_jobs

View File

@@ -989,48 +989,108 @@ type workspaceBuildsData struct {
func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []database.WorkspaceBuild) (workspaceBuildsData, error) {
jobIDs := make([]uuid.UUID, 0, len(workspaceBuilds))
for _, build := range workspaceBuilds {
jobIDs = append(jobIDs, build.JobID)
}
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: jobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs: %w", err)
}
pendingJobIDs := []uuid.UUID{}
for _, job := range jobs {
if job.ProvisionerJob.JobStatus == database.ProvisionerJobStatusPending {
pendingJobIDs = append(pendingJobIDs, job.ProvisionerJob.ID)
}
}
pendingJobProvisioners, err := api.Database.GetEligibleProvisionerDaemonsByProvisionerJobIDs(ctx, pendingJobIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get provisioner daemons: %w", err)
}
templateVersionIDs := make([]uuid.UUID, 0, len(workspaceBuilds))
for _, build := range workspaceBuilds {
jobIDs = append(jobIDs, build.JobID)
templateVersionIDs = append(templateVersionIDs, build.TemplateVersionID)
}
// nolint:gocritic // Getting template versions by ID is a system function.
templateVersions, err := api.Database.GetTemplateVersionsByIDs(dbauthz.AsSystemRestricted(ctx), templateVersionIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get template versions: %w", err)
// Phase A: Fetch jobs, template versions, and resources in parallel.
// These three queries depend only on the build list and can run
// concurrently.
var (
jobs []database.ProvisionerJob
templateVersions []database.TemplateVersion
resources []database.WorkspaceResource
)
var eg errgroup.Group
eg.Go(func() error {
var err error
jobs, err = api.Database.GetProvisionerJobsByIDs(ctx, jobIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get provisioner jobs: %w", err)
}
return nil
})
eg.Go(func() error {
var err error
// nolint:gocritic // Getting template versions by ID is a system function.
templateVersions, err = api.Database.GetTemplateVersionsByIDs(dbauthz.AsSystemRestricted(ctx), templateVersionIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get template versions: %w", err)
}
return nil
})
eg.Go(func() error {
var err error
// nolint:gocritic // Getting workspace resources by job ID is a system function.
resources, err = api.Database.GetWorkspaceResourcesByJobIDs(dbauthz.AsSystemRestricted(ctx), jobIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get workspace resources by job: %w", err)
}
return nil
})
if err := eg.Wait(); err != nil {
return workspaceBuildsData{}, err
}
// nolint:gocritic // Getting workspace resources by job ID is a system function.
resources, err := api.Database.GetWorkspaceResourcesByJobIDs(dbauthz.AsSystemRestricted(ctx), jobIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get workspace resources by job: %w", err)
// Phase B: Get queue position and eligible daemons for pending
// jobs only. The queue position query is expensive (cross-join
// with all pending jobs and active daemons), so we only run it
// for the small number of actually-pending jobs.
var pendingJobIDs []uuid.UUID
for _, job := range jobs {
if job.JobStatus == database.ProvisionerJobStatusPending {
pendingJobIDs = append(pendingJobIDs, job.ID)
}
}
var queuePositionRows []database.GetProvisionerJobsByIDsWithQueuePositionRow
if len(pendingJobIDs) > 0 {
var err error
queuePositionRows, err = api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: pendingJobIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs queue position: %w", err)
}
}
var pendingJobProvisioners []database.GetEligibleProvisionerDaemonsByProvisionerJobIDsRow
if len(pendingJobIDs) > 0 {
var err error
pendingJobProvisioners, err = api.Database.GetEligibleProvisionerDaemonsByProvisionerJobIDs(ctx, pendingJobIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get provisioner daemons: %w", err)
}
}
// Merge job rows with queue position information so downstream
// consumers see the same type they expect.
queuePositionByID := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow, len(queuePositionRows))
for _, qpRow := range queuePositionRows {
queuePositionByID[qpRow.ID] = qpRow
}
jobsWithQueuePosition := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0, len(jobs))
for _, job := range jobs {
row := database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: 0,
QueueSize: 0,
}
if qpRow, ok := queuePositionByID[job.ID]; ok {
row.QueuePosition = qpRow.QueuePosition
row.QueueSize = qpRow.QueueSize
}
jobsWithQueuePosition = append(jobsWithQueuePosition, row)
}
if len(resources) == 0 {
return workspaceBuildsData{
jobs: jobs,
jobs: jobsWithQueuePosition,
templateVersions: templateVersions,
provisionerDaemons: pendingJobProvisioners,
}, nil
@@ -1041,21 +1101,38 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
resourceIDs = append(resourceIDs, resource.ID)
}
// nolint:gocritic // Getting workspace resource metadata by resource ID is a system function.
metadata, err := api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("fetching resource metadata: %w", err)
// Phase C: Fetch metadata and agents in parallel (both depend
// on resourceIDs which we just computed).
var (
metadata []database.WorkspaceResourceMetadatum
agents []database.WorkspaceAgent
)
var eg2 errgroup.Group
eg2.Go(func() error {
var err error
// nolint:gocritic // Getting workspace resource metadata by resource ID is a system function.
metadata, err = api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("fetching resource metadata: %w", err)
}
return nil
})
eg2.Go(func() error {
var err error
// nolint:gocritic // Getting workspace agents by resource IDs is a system function.
agents, err = api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return xerrors.Errorf("get workspace agents: %w", err)
}
return nil
})
if err := eg2.Wait(); err != nil {
return workspaceBuildsData{}, err
}
// nolint:gocritic // Getting workspace agents by resource IDs is a system function.
agents, err := api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return workspaceBuildsData{}, xerrors.Errorf("get workspace agents: %w", err)
}
if len(resources) == 0 {
if len(agents) == 0 {
return workspaceBuildsData{
jobs: jobs,
jobs: jobsWithQueuePosition,
templateVersions: templateVersions,
resources: resources,
metadata: metadata,
@@ -1074,23 +1151,23 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
logSources []database.WorkspaceAgentLogSource
)
var eg errgroup.Group
eg.Go(func() (err error) {
var eg3 errgroup.Group
eg3.Go(func() (err error) {
// nolint:gocritic // Getting workspace apps by agent IDs is a system function.
apps, err = api.Database.GetWorkspaceAppsByAgentIDs(dbauthz.AsSystemRestricted(ctx), agentIDs)
return err
})
eg.Go(func() (err error) {
eg3.Go(func() (err error) {
// nolint:gocritic // Getting workspace scripts by agent IDs is a system function.
scripts, err = api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), agentIDs)
return err
})
eg.Go(func() error {
eg3.Go(func() (err error) {
// nolint:gocritic // Getting workspace agent log sources by agent IDs is a system function.
logSources, err = api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), agentIDs)
return err
})
err = eg.Wait()
err := eg3.Wait()
if err != nil {
return workspaceBuildsData{}, err
}
@@ -1107,7 +1184,7 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
}
return workspaceBuildsData{
jobs: jobs,
jobs: jobsWithQueuePosition,
templateVersions: templateVersions,
resources: resources,
metadata: metadata,