Compare commits
3 Commits
main
...
list-works
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
801b467b75 | ||
|
|
192b25e30a | ||
|
|
8c7111fe4a |
@@ -3280,6 +3280,12 @@ func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uui
|
||||
return q.db.GetProvisionerJobTimingsByJobID(ctx, jobID)
|
||||
}
|
||||
|
||||
func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
|
||||
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
|
||||
// Details in https://github.com/coder/coder/issues/16160
|
||||
return q.db.GetProvisionerJobsByIDs(ctx, ids)
|
||||
}
|
||||
|
||||
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
|
||||
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
|
||||
// Details in https://github.com/coder/coder/issues/16160
|
||||
|
||||
@@ -1831,6 +1831,14 @@ func (m queryMetricsStore) GetProvisionerJobTimingsByJobID(ctx context.Context,
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.GetProvisionerJobsByIDs(ctx, ids)
|
||||
m.queryLatencies.WithLabelValues("GetProvisionerJobsByIDs").Observe(time.Since(start).Seconds())
|
||||
m.queryCounts.WithLabelValues(httpmw.ExtractHTTPRoute(ctx), httpmw.ExtractHTTPMethod(ctx), "GetProvisionerJobsByIDs").Inc()
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m queryMetricsStore) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.GetProvisionerJobsByIDsWithQueuePosition(ctx, arg)
|
||||
|
||||
@@ -3368,6 +3368,21 @@ func (mr *MockStoreMockRecorder) GetProvisionerJobTimingsByJobID(ctx, jobID any)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobTimingsByJobID", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobTimingsByJobID), ctx, jobID)
|
||||
}
|
||||
|
||||
// GetProvisionerJobsByIDs mocks base method.
|
||||
func (m *MockStore) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetProvisionerJobsByIDs", ctx, ids)
|
||||
ret0, _ := ret[0].([]database.ProvisionerJob)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetProvisionerJobsByIDs indicates an expected call of GetProvisionerJobsByIDs.
|
||||
func (mr *MockStoreMockRecorder) GetProvisionerJobsByIDs(ctx, ids any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobsByIDs", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobsByIDs), ctx, ids)
|
||||
}
|
||||
|
||||
// GetProvisionerJobsByIDsWithQueuePosition mocks base method.
|
||||
func (m *MockStore) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg database.GetProvisionerJobsByIDsWithQueuePositionParams) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
||||
@@ -378,6 +378,7 @@ type sqlcQuerier interface {
|
||||
// Blocks until the row is available for update.
|
||||
GetProvisionerJobByIDWithLock(ctx context.Context, id uuid.UUID) (ProvisionerJob, error)
|
||||
GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]ProvisionerJobTiming, error)
|
||||
GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error)
|
||||
GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, arg GetProvisionerJobsByIDsWithQueuePositionParams) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error)
|
||||
GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error)
|
||||
GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]ProvisionerJob, error)
|
||||
|
||||
@@ -13173,6 +13173,60 @@ func (q *sqlQuerier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getProvisionerJobsByIDs = `-- name: GetProvisionerJobsByIDs :many
|
||||
SELECT
|
||||
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status, logs_length, logs_overflowed
|
||||
FROM
|
||||
provisioner_jobs
|
||||
WHERE
|
||||
id = ANY($1 :: uuid [ ])
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getProvisionerJobsByIDs, pq.Array(ids))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []ProvisionerJob
|
||||
for rows.Next() {
|
||||
var i ProvisionerJob
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.StartedAt,
|
||||
&i.CanceledAt,
|
||||
&i.CompletedAt,
|
||||
&i.Error,
|
||||
&i.OrganizationID,
|
||||
&i.InitiatorID,
|
||||
&i.Provisioner,
|
||||
&i.StorageMethod,
|
||||
&i.Type,
|
||||
&i.Input,
|
||||
&i.WorkerID,
|
||||
&i.FileID,
|
||||
&i.Tags,
|
||||
&i.ErrorCode,
|
||||
&i.TraceMetadata,
|
||||
&i.JobStatus,
|
||||
&i.LogsLength,
|
||||
&i.LogsOverflowed,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getProvisionerJobsByIDsWithQueuePosition = `-- name: GetProvisionerJobsByIDsWithQueuePosition :many
|
||||
WITH filtered_provisioner_jobs AS (
|
||||
-- Step 1: Filter provisioner_jobs
|
||||
|
||||
@@ -67,6 +67,14 @@ WHERE
|
||||
id = $1
|
||||
FOR UPDATE;
|
||||
|
||||
-- name: GetProvisionerJobsByIDs :many
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
provisioner_jobs
|
||||
WHERE
|
||||
id = ANY(@ids :: uuid [ ]);
|
||||
|
||||
-- name: GetProvisionerJobsByIDsWithQueuePosition :many
|
||||
WITH filtered_provisioner_jobs AS (
|
||||
-- Step 1: Filter provisioner_jobs
|
||||
|
||||
@@ -989,48 +989,108 @@ type workspaceBuildsData struct {
|
||||
|
||||
func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []database.WorkspaceBuild) (workspaceBuildsData, error) {
|
||||
jobIDs := make([]uuid.UUID, 0, len(workspaceBuilds))
|
||||
for _, build := range workspaceBuilds {
|
||||
jobIDs = append(jobIDs, build.JobID)
|
||||
}
|
||||
jobs, err := api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
|
||||
IDs: jobIDs,
|
||||
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
|
||||
})
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs: %w", err)
|
||||
}
|
||||
pendingJobIDs := []uuid.UUID{}
|
||||
for _, job := range jobs {
|
||||
if job.ProvisionerJob.JobStatus == database.ProvisionerJobStatusPending {
|
||||
pendingJobIDs = append(pendingJobIDs, job.ProvisionerJob.ID)
|
||||
}
|
||||
}
|
||||
|
||||
pendingJobProvisioners, err := api.Database.GetEligibleProvisionerDaemonsByProvisionerJobIDs(ctx, pendingJobIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get provisioner daemons: %w", err)
|
||||
}
|
||||
|
||||
templateVersionIDs := make([]uuid.UUID, 0, len(workspaceBuilds))
|
||||
for _, build := range workspaceBuilds {
|
||||
jobIDs = append(jobIDs, build.JobID)
|
||||
templateVersionIDs = append(templateVersionIDs, build.TemplateVersionID)
|
||||
}
|
||||
|
||||
// nolint:gocritic // Getting template versions by ID is a system function.
|
||||
templateVersions, err := api.Database.GetTemplateVersionsByIDs(dbauthz.AsSystemRestricted(ctx), templateVersionIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get template versions: %w", err)
|
||||
// Phase A: Fetch jobs, template versions, and resources in parallel.
|
||||
// These three queries depend only on the build list and can run
|
||||
// concurrently.
|
||||
var (
|
||||
jobs []database.ProvisionerJob
|
||||
templateVersions []database.TemplateVersion
|
||||
resources []database.WorkspaceResource
|
||||
)
|
||||
var eg errgroup.Group
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
jobs, err = api.Database.GetProvisionerJobsByIDs(ctx, jobIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return xerrors.Errorf("get provisioner jobs: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
// nolint:gocritic // Getting template versions by ID is a system function.
|
||||
templateVersions, err = api.Database.GetTemplateVersionsByIDs(dbauthz.AsSystemRestricted(ctx), templateVersionIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return xerrors.Errorf("get template versions: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
eg.Go(func() error {
|
||||
var err error
|
||||
// nolint:gocritic // Getting workspace resources by job ID is a system function.
|
||||
resources, err = api.Database.GetWorkspaceResourcesByJobIDs(dbauthz.AsSystemRestricted(ctx), jobIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return xerrors.Errorf("get workspace resources by job: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return workspaceBuildsData{}, err
|
||||
}
|
||||
|
||||
// nolint:gocritic // Getting workspace resources by job ID is a system function.
|
||||
resources, err := api.Database.GetWorkspaceResourcesByJobIDs(dbauthz.AsSystemRestricted(ctx), jobIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get workspace resources by job: %w", err)
|
||||
// Phase B: Get queue position and eligible daemons for pending
|
||||
// jobs only. The queue position query is expensive (cross-join
|
||||
// with all pending jobs and active daemons), so we only run it
|
||||
// for the small number of actually-pending jobs.
|
||||
var pendingJobIDs []uuid.UUID
|
||||
for _, job := range jobs {
|
||||
if job.JobStatus == database.ProvisionerJobStatusPending {
|
||||
pendingJobIDs = append(pendingJobIDs, job.ID)
|
||||
}
|
||||
}
|
||||
|
||||
var queuePositionRows []database.GetProvisionerJobsByIDsWithQueuePositionRow
|
||||
if len(pendingJobIDs) > 0 {
|
||||
var err error
|
||||
queuePositionRows, err = api.Database.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
|
||||
IDs: pendingJobIDs,
|
||||
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
|
||||
})
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get provisioner jobs queue position: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
var pendingJobProvisioners []database.GetEligibleProvisionerDaemonsByProvisionerJobIDsRow
|
||||
if len(pendingJobIDs) > 0 {
|
||||
var err error
|
||||
pendingJobProvisioners, err = api.Database.GetEligibleProvisionerDaemonsByProvisionerJobIDs(ctx, pendingJobIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get provisioner daemons: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge job rows with queue position information so downstream
|
||||
// consumers see the same type they expect.
|
||||
queuePositionByID := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow, len(queuePositionRows))
|
||||
for _, qpRow := range queuePositionRows {
|
||||
queuePositionByID[qpRow.ID] = qpRow
|
||||
}
|
||||
jobsWithQueuePosition := make([]database.GetProvisionerJobsByIDsWithQueuePositionRow, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
row := database.GetProvisionerJobsByIDsWithQueuePositionRow{
|
||||
ID: job.ID,
|
||||
CreatedAt: job.CreatedAt,
|
||||
ProvisionerJob: job,
|
||||
QueuePosition: 0,
|
||||
QueueSize: 0,
|
||||
}
|
||||
if qpRow, ok := queuePositionByID[job.ID]; ok {
|
||||
row.QueuePosition = qpRow.QueuePosition
|
||||
row.QueueSize = qpRow.QueueSize
|
||||
}
|
||||
jobsWithQueuePosition = append(jobsWithQueuePosition, row)
|
||||
}
|
||||
|
||||
if len(resources) == 0 {
|
||||
return workspaceBuildsData{
|
||||
jobs: jobs,
|
||||
jobs: jobsWithQueuePosition,
|
||||
templateVersions: templateVersions,
|
||||
provisionerDaemons: pendingJobProvisioners,
|
||||
}, nil
|
||||
@@ -1041,21 +1101,38 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
|
||||
resourceIDs = append(resourceIDs, resource.ID)
|
||||
}
|
||||
|
||||
// nolint:gocritic // Getting workspace resource metadata by resource ID is a system function.
|
||||
metadata, err := api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("fetching resource metadata: %w", err)
|
||||
// Phase C: Fetch metadata and agents in parallel (both depend
|
||||
// on resourceIDs which we just computed).
|
||||
var (
|
||||
metadata []database.WorkspaceResourceMetadatum
|
||||
agents []database.WorkspaceAgent
|
||||
)
|
||||
var eg2 errgroup.Group
|
||||
eg2.Go(func() error {
|
||||
var err error
|
||||
// nolint:gocritic // Getting workspace resource metadata by resource ID is a system function.
|
||||
metadata, err = api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return xerrors.Errorf("fetching resource metadata: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
eg2.Go(func() error {
|
||||
var err error
|
||||
// nolint:gocritic // Getting workspace agents by resource IDs is a system function.
|
||||
agents, err = api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return xerrors.Errorf("get workspace agents: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err := eg2.Wait(); err != nil {
|
||||
return workspaceBuildsData{}, err
|
||||
}
|
||||
|
||||
// nolint:gocritic // Getting workspace agents by resource IDs is a system function.
|
||||
agents, err := api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return workspaceBuildsData{}, xerrors.Errorf("get workspace agents: %w", err)
|
||||
}
|
||||
|
||||
if len(resources) == 0 {
|
||||
if len(agents) == 0 {
|
||||
return workspaceBuildsData{
|
||||
jobs: jobs,
|
||||
jobs: jobsWithQueuePosition,
|
||||
templateVersions: templateVersions,
|
||||
resources: resources,
|
||||
metadata: metadata,
|
||||
@@ -1074,23 +1151,23 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
|
||||
logSources []database.WorkspaceAgentLogSource
|
||||
)
|
||||
|
||||
var eg errgroup.Group
|
||||
eg.Go(func() (err error) {
|
||||
var eg3 errgroup.Group
|
||||
eg3.Go(func() (err error) {
|
||||
// nolint:gocritic // Getting workspace apps by agent IDs is a system function.
|
||||
apps, err = api.Database.GetWorkspaceAppsByAgentIDs(dbauthz.AsSystemRestricted(ctx), agentIDs)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() (err error) {
|
||||
eg3.Go(func() (err error) {
|
||||
// nolint:gocritic // Getting workspace scripts by agent IDs is a system function.
|
||||
scripts, err = api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), agentIDs)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
eg3.Go(func() (err error) {
|
||||
// nolint:gocritic // Getting workspace agent log sources by agent IDs is a system function.
|
||||
logSources, err = api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), agentIDs)
|
||||
return err
|
||||
})
|
||||
err = eg.Wait()
|
||||
err := eg3.Wait()
|
||||
if err != nil {
|
||||
return workspaceBuildsData{}, err
|
||||
}
|
||||
@@ -1107,7 +1184,7 @@ func (api *API) workspaceBuildsData(ctx context.Context, workspaceBuilds []datab
|
||||
}
|
||||
|
||||
return workspaceBuildsData{
|
||||
jobs: jobs,
|
||||
jobs: jobsWithQueuePosition,
|
||||
templateVersions: templateVersions,
|
||||
resources: resources,
|
||||
metadata: metadata,
|
||||
|
||||
Reference in New Issue
Block a user