Compare commits

...

2 Commits

Author SHA1 Message Date
Sas Swart 26ab5c038d Merge remote-tracking branch 'origin/main' into jjs/script-executor-ordering 2026-01-05 13:32:21 +00:00
Sas Swart 5102932712 wip: integrate script executor and unit manager 2025-12-08 14:34:31 +00:00
12 changed files with 508 additions and 61 deletions
+15 -1
View File
@@ -47,6 +47,7 @@ import (
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/proto/resourcesmonitor"
"github.com/coder/coder/v2/agent/reconnectingpty"
"github.com/coder/coder/v2/agent/unit"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/cli/gitauth"
"github.com/coder/coder/v2/coderd/database/dbtime"
@@ -298,6 +299,7 @@ type agent struct {
socketServerEnabled bool
socketPath string
socketServer *agentsocket.Server
unitManager *unit.Manager
}
func (a *agent) TailnetConn() *tailnet.Conn {
@@ -340,12 +342,17 @@ func (a *agent) init() {
panic(err)
}
a.sshServer = sshSrv
// Create a shared unit manager for script ordering.
a.unitManager = unit.NewManager()
a.scriptRunner = agentscripts.New(agentscripts.Options{
LogDir: a.logDir,
DataDirBase: a.scriptDataDir,
Logger: a.logger,
SSHServer: sshSrv,
Filesystem: a.filesystem,
UnitManager: a.unitManager,
GetScriptLogger: func(logSourceID uuid.UUID) agentscripts.ScriptLogger {
return a.logSender.GetScriptLogger(logSourceID)
},
@@ -391,9 +398,16 @@ func (a *agent) initSocketServer() {
return
}
opts := []agentsocket.Option{
agentsocket.WithPath(a.socketPath),
}
if a.unitManager != nil {
opts = append(opts, agentsocket.WithUnitManager(a.unitManager))
}
server, err := agentsocket.NewServer(
a.logger.Named("socket"),
agentsocket.WithPath(a.socketPath),
opts...,
)
if err != nil {
a.logger.Warn(a.hardCtx, "failed to create socket server", slog.Error(err), slog.F("path", a.socketPath))
+46
View File
@@ -24,6 +24,7 @@ import (
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/agent/unit"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
@@ -57,6 +58,7 @@ type Options struct {
SSHServer *agentssh.Server
Filesystem afero.Fs
GetScriptLogger func(logSourceID uuid.UUID) ScriptLogger
UnitManager *unit.Manager
}
// New creates a runner for the provided scripts.
@@ -112,6 +114,22 @@ func (r *Runner) ScriptBinDir() string {
return filepath.Join(r.dataDir, "bin")
}
// Scripts returns the list of scripts managed by this runner.
func (r *Runner) Scripts() []codersdk.WorkspaceAgentScript {
r.initMutex.Lock()
defer r.initMutex.Unlock()
return r.scripts
}
// getScriptUnitID returns the unit ID for a script, preferring DisplayName
// and falling back to LogSourceID if DisplayName is empty.
func (r *Runner) getScriptUnitID(script codersdk.WorkspaceAgentScript) string {
if script.DisplayName != "" {
return script.DisplayName
}
return script.LogSourceID.String()
}
func (r *Runner) RegisterMetrics(reg prometheus.Registerer) {
if reg == nil {
// If no registry, do nothing.
@@ -145,6 +163,18 @@ func (r *Runner) Init(scripts []codersdk.WorkspaceAgentScript, scriptCompleted S
return xerrors.Errorf("create script bin dir: %w", err)
}
// Register all scripts with the unit manager when we become aware of them.
if r.UnitManager != nil {
for _, script := range r.scripts {
unitID := unit.ID(r.getScriptUnitID(script))
if err := r.UnitManager.Register(unitID); err != nil {
if !errors.Is(err, unit.ErrUnitAlreadyRegistered) {
r.Logger.Warn(r.cronCtx, "failed to register script with unit manager", slog.Error(err), slog.F("script", script.LogSourceID))
}
}
}
}
for _, script := range r.scripts {
if script.Cron == "" {
continue
@@ -284,6 +314,14 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript,
)
logger.Info(ctx, "running agent script", slog.F("script", script.Script))
// Update script status to started when execution begins.
if r.UnitManager != nil {
unitID := unit.ID(r.getScriptUnitID(script))
if err := r.UnitManager.UpdateStatus(unitID, unit.StatusStarted); err != nil {
logger.Warn(ctx, "failed to update script status to started", slog.Error(err))
}
}
fileWriter, err := r.Filesystem.OpenFile(logPath, os.O_CREATE|os.O_RDWR, 0o600)
if err != nil {
return xerrors.Errorf("open %s script log file: %w", logPath, err)
@@ -357,6 +395,14 @@ func (r *Runner) run(ctx context.Context, script codersdk.WorkspaceAgentScript,
return
}
// Update unit manager status to completed.
if r.UnitManager != nil {
unitID := r.getScriptUnitID(script)
if updateErr := r.UnitManager.UpdateStatus(unit.ID(unitID), unit.StatusComplete); updateErr != nil {
logger.Warn(ctx, "failed to update script status to completed", slog.Error(updateErr))
}
}
// We want to check this outside of the goroutine to avoid a race condition
timedOut := errors.Is(err, ErrTimeout)
pipesLeftOpen := errors.Is(err, ErrOutputPipesOpen)
+34 -1
View File
@@ -15,7 +15,8 @@ import (
type Option func(*options)
type options struct {
path string
path string
unitManager *unit.Manager
}
// WithPath sets the socket path. If not provided or empty, the client will
@@ -29,6 +30,14 @@ func WithPath(path string) Option {
}
}
// WithUnitManager sets the unit manager to use. If not provided, a new one
// will be created.
func WithUnitManager(unitManager *unit.Manager) Option {
return func(opts *options) {
opts.unitManager = unitManager
}
}
// Client provides a client for communicating with the workspace agentsocket API.
type Client struct {
client proto.DRPCAgentSocketClient
@@ -129,6 +138,30 @@ func (c *Client) SyncStatus(ctx context.Context, unitName unit.ID) (SyncStatusRe
}, nil
}
// SyncList returns a list of all units in the dependency graph.
func (c *Client) SyncList(ctx context.Context) ([]ScriptInfo, error) {
resp, err := c.client.SyncList(ctx, &proto.SyncListRequest{})
if err != nil {
return nil, err
}
var scriptInfos []ScriptInfo
for _, script := range resp.Scripts {
scriptInfos = append(scriptInfos, ScriptInfo{
ID: script.Id,
Status: script.Status,
})
}
return scriptInfos, nil
}
// ScriptInfo contains information about a unit in the dependency graph.
type ScriptInfo struct {
ID string `table:"id,default_sort" json:"id"`
Status string `table:"status" json:"status"`
}
// SyncStatusResponse contains the status information for a unit.
type SyncStatusResponse struct {
UnitName unit.ID `table:"unit,default_sort" json:"unit_name"`
+255 -57
View File
@@ -642,6 +642,146 @@ func (x *SyncStatusResponse) GetDependencies() []*DependencyInfo {
return nil
}
type SyncListRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
func (x *SyncListRequest) Reset() {
*x = SyncListRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_agent_agentsocket_proto_agentsocket_proto_msgTypes[13]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SyncListRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SyncListRequest) ProtoMessage() {}
func (x *SyncListRequest) ProtoReflect() protoreflect.Message {
mi := &file_agent_agentsocket_proto_agentsocket_proto_msgTypes[13]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SyncListRequest.ProtoReflect.Descriptor instead.
func (*SyncListRequest) Descriptor() ([]byte, []int) {
return file_agent_agentsocket_proto_agentsocket_proto_rawDescGZIP(), []int{13}
}
type ScriptInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
}
func (x *ScriptInfo) Reset() {
*x = ScriptInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_agent_agentsocket_proto_agentsocket_proto_msgTypes[14]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ScriptInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ScriptInfo) ProtoMessage() {}
func (x *ScriptInfo) ProtoReflect() protoreflect.Message {
mi := &file_agent_agentsocket_proto_agentsocket_proto_msgTypes[14]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ScriptInfo.ProtoReflect.Descriptor instead.
func (*ScriptInfo) Descriptor() ([]byte, []int) {
return file_agent_agentsocket_proto_agentsocket_proto_rawDescGZIP(), []int{14}
}
func (x *ScriptInfo) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *ScriptInfo) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
type SyncListResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Scripts []*ScriptInfo `protobuf:"bytes,1,rep,name=scripts,proto3" json:"scripts,omitempty"`
}
func (x *SyncListResponse) Reset() {
*x = SyncListResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_agent_agentsocket_proto_agentsocket_proto_msgTypes[15]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SyncListResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SyncListResponse) ProtoMessage() {}
func (x *SyncListResponse) ProtoReflect() protoreflect.Message {
mi := &file_agent_agentsocket_proto_agentsocket_proto_msgTypes[15]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SyncListResponse.ProtoReflect.Descriptor instead.
func (*SyncListResponse) Descriptor() ([]byte, []int) {
return file_agent_agentsocket_proto_agentsocket_proto_rawDescGZIP(), []int{15}
}
func (x *SyncListResponse) GetScripts() []*ScriptInfo {
if x != nil {
return x.Scripts
}
return nil
}
var File_agent_agentsocket_proto_agentsocket_proto protoreflect.FileDescriptor
var file_agent_agentsocket_proto_agentsocket_proto_rawDesc = []byte{
@@ -693,46 +833,62 @@ var file_agent_agentsocket_proto_agentsocket_proto_rawDesc = []byte{
0x24, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63,
0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x63,
0x79, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x0c, 0x64, 0x65, 0x70, 0x65, 0x6e, 0x64, 0x65, 0x6e, 0x63,
0x69, 0x65, 0x73, 0x32, 0xbb, 0x04, 0x0a, 0x0b, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x6f, 0x63,
0x6b, 0x65, 0x74, 0x12, 0x4d, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x21, 0x2e, 0x63, 0x6f,
0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e,
0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22,
0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b,
0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x5c, 0x0a, 0x09, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12,
0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63,
0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x72, 0x74,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53,
0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x59, 0x0a, 0x08, 0x53, 0x79, 0x6e, 0x63, 0x57, 0x61, 0x6e, 0x74, 0x12, 0x25, 0x2e, 0x63,
0x69, 0x65, 0x73, 0x22, 0x11, 0x0a, 0x0f, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x69, 0x73, 0x74, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x34, 0x0a, 0x0a, 0x53, 0x63, 0x72, 0x69, 0x70, 0x74,
0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4e, 0x0a, 0x10,
0x53, 0x79, 0x6e, 0x63, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x3a, 0x0a, 0x07, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
0x0b, 0x32, 0x20, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73,
0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x63, 0x72, 0x69, 0x70, 0x74, 0x49,
0x6e, 0x66, 0x6f, 0x52, 0x07, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x73, 0x32, 0x96, 0x05, 0x0a,
0x0b, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x12, 0x4d, 0x0a, 0x04,
0x50, 0x69, 0x6e, 0x67, 0x12, 0x21, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65,
0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x50,
0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5c, 0x0a, 0x09, 0x53,
0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72,
0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e,
0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x72, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f,
0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x72,
0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x59, 0x0a, 0x08, 0x53, 0x79, 0x6e,
0x63, 0x57, 0x61, 0x6e, 0x74, 0x12, 0x25, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67,
0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e,
0x63, 0x57, 0x61, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x63,
0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74,
0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x57, 0x61, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e,
0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x57,
0x61, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0c, 0x53,
0x79, 0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x29, 0x2e, 0x63, 0x6f,
0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e,
0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61,
0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x57, 0x61, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x65, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x70,
0x6c, 0x65, 0x74, 0x65, 0x12, 0x29, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65,
0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63,
0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x2a, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63,
0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x70, 0x6c,
0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5c, 0x0a, 0x09, 0x53,
0x79, 0x6e, 0x63, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72,
0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e,
0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f,
0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x61, 0x64,
0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5f, 0x0a, 0x0a, 0x53, 0x79, 0x6e,
0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53,
0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f,
0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74,
0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x59, 0x0a, 0x08, 0x53, 0x79,
0x6e, 0x63, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x25, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61,
0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79,
0x6e, 0x63, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x5c, 0x0a, 0x09, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12,
0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63,
0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x61, 0x64, 0x79,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53,
0x79, 0x6e, 0x63, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x5f, 0x0a, 0x0a, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27,
0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b,
0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e,
0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53,
0x79, 0x6e, 0x63, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x61,
0x67, 0x65, 0x6e, 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6e, 0x63, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e,
0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x63, 0x6b, 0x65,
0x74, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f,
0x76, 0x32, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x73, 0x6f,
0x63, 0x6b, 0x65, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
@@ -747,7 +903,7 @@ func file_agent_agentsocket_proto_agentsocket_proto_rawDescGZIP() []byte {
return file_agent_agentsocket_proto_agentsocket_proto_rawDescData
}
var file_agent_agentsocket_proto_agentsocket_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_agent_agentsocket_proto_agentsocket_proto_msgTypes = make([]protoimpl.MessageInfo, 16)
var file_agent_agentsocket_proto_agentsocket_proto_goTypes = []interface{}{
(*PingRequest)(nil), // 0: coder.agentsocket.v1.PingRequest
(*PingResponse)(nil), // 1: coder.agentsocket.v1.PingResponse
@@ -762,26 +918,32 @@ var file_agent_agentsocket_proto_agentsocket_proto_goTypes = []interface{}{
(*SyncStatusRequest)(nil), // 10: coder.agentsocket.v1.SyncStatusRequest
(*DependencyInfo)(nil), // 11: coder.agentsocket.v1.DependencyInfo
(*SyncStatusResponse)(nil), // 12: coder.agentsocket.v1.SyncStatusResponse
(*SyncListRequest)(nil), // 13: coder.agentsocket.v1.SyncListRequest
(*ScriptInfo)(nil), // 14: coder.agentsocket.v1.ScriptInfo
(*SyncListResponse)(nil), // 15: coder.agentsocket.v1.SyncListResponse
}
var file_agent_agentsocket_proto_agentsocket_proto_depIdxs = []int32{
11, // 0: coder.agentsocket.v1.SyncStatusResponse.dependencies:type_name -> coder.agentsocket.v1.DependencyInfo
0, // 1: coder.agentsocket.v1.AgentSocket.Ping:input_type -> coder.agentsocket.v1.PingRequest
2, // 2: coder.agentsocket.v1.AgentSocket.SyncStart:input_type -> coder.agentsocket.v1.SyncStartRequest
4, // 3: coder.agentsocket.v1.AgentSocket.SyncWant:input_type -> coder.agentsocket.v1.SyncWantRequest
6, // 4: coder.agentsocket.v1.AgentSocket.SyncComplete:input_type -> coder.agentsocket.v1.SyncCompleteRequest
8, // 5: coder.agentsocket.v1.AgentSocket.SyncReady:input_type -> coder.agentsocket.v1.SyncReadyRequest
10, // 6: coder.agentsocket.v1.AgentSocket.SyncStatus:input_type -> coder.agentsocket.v1.SyncStatusRequest
1, // 7: coder.agentsocket.v1.AgentSocket.Ping:output_type -> coder.agentsocket.v1.PingResponse
3, // 8: coder.agentsocket.v1.AgentSocket.SyncStart:output_type -> coder.agentsocket.v1.SyncStartResponse
5, // 9: coder.agentsocket.v1.AgentSocket.SyncWant:output_type -> coder.agentsocket.v1.SyncWantResponse
7, // 10: coder.agentsocket.v1.AgentSocket.SyncComplete:output_type -> coder.agentsocket.v1.SyncCompleteResponse
9, // 11: coder.agentsocket.v1.AgentSocket.SyncReady:output_type -> coder.agentsocket.v1.SyncReadyResponse
12, // 12: coder.agentsocket.v1.AgentSocket.SyncStatus:output_type -> coder.agentsocket.v1.SyncStatusResponse
7, // [7:13] is the sub-list for method output_type
1, // [1:7] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
14, // 1: coder.agentsocket.v1.SyncListResponse.scripts:type_name -> coder.agentsocket.v1.ScriptInfo
0, // 2: coder.agentsocket.v1.AgentSocket.Ping:input_type -> coder.agentsocket.v1.PingRequest
2, // 3: coder.agentsocket.v1.AgentSocket.SyncStart:input_type -> coder.agentsocket.v1.SyncStartRequest
4, // 4: coder.agentsocket.v1.AgentSocket.SyncWant:input_type -> coder.agentsocket.v1.SyncWantRequest
6, // 5: coder.agentsocket.v1.AgentSocket.SyncComplete:input_type -> coder.agentsocket.v1.SyncCompleteRequest
8, // 6: coder.agentsocket.v1.AgentSocket.SyncReady:input_type -> coder.agentsocket.v1.SyncReadyRequest
10, // 7: coder.agentsocket.v1.AgentSocket.SyncStatus:input_type -> coder.agentsocket.v1.SyncStatusRequest
13, // 8: coder.agentsocket.v1.AgentSocket.SyncList:input_type -> coder.agentsocket.v1.SyncListRequest
1, // 9: coder.agentsocket.v1.AgentSocket.Ping:output_type -> coder.agentsocket.v1.PingResponse
3, // 10: coder.agentsocket.v1.AgentSocket.SyncStart:output_type -> coder.agentsocket.v1.SyncStartResponse
5, // 11: coder.agentsocket.v1.AgentSocket.SyncWant:output_type -> coder.agentsocket.v1.SyncWantResponse
7, // 12: coder.agentsocket.v1.AgentSocket.SyncComplete:output_type -> coder.agentsocket.v1.SyncCompleteResponse
9, // 13: coder.agentsocket.v1.AgentSocket.SyncReady:output_type -> coder.agentsocket.v1.SyncReadyResponse
12, // 14: coder.agentsocket.v1.AgentSocket.SyncStatus:output_type -> coder.agentsocket.v1.SyncStatusResponse
15, // 15: coder.agentsocket.v1.AgentSocket.SyncList:output_type -> coder.agentsocket.v1.SyncListResponse
9, // [9:16] is the sub-list for method output_type
2, // [2:9] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}
func init() { file_agent_agentsocket_proto_agentsocket_proto_init() }
@@ -946,6 +1108,42 @@ func file_agent_agentsocket_proto_agentsocket_proto_init() {
return nil
}
}
file_agent_agentsocket_proto_agentsocket_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SyncListRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_agent_agentsocket_proto_agentsocket_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ScriptInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_agent_agentsocket_proto_agentsocket_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SyncListResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -953,7 +1151,7 @@ func file_agent_agentsocket_proto_agentsocket_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_agent_agentsocket_proto_agentsocket_proto_rawDesc,
NumEnums: 0,
NumMessages: 13,
NumMessages: 16,
NumExtensions: 0,
NumServices: 1,
},
+13
View File
@@ -52,6 +52,17 @@ message SyncStatusResponse {
repeated DependencyInfo dependencies = 3;
}
message SyncListRequest {}
message ScriptInfo {
string id = 1;
string status = 2;
}
message SyncListResponse {
repeated ScriptInfo scripts = 1;
}
// AgentSocket provides direct access to the agent over local IPC.
service AgentSocket {
// Ping the agent to check if it is alive.
@@ -66,4 +77,6 @@ service AgentSocket {
rpc SyncReady(SyncReadyRequest) returns (SyncReadyResponse);
// Get the status of a unit and list its dependencies.
rpc SyncStatus(SyncStatusRequest) returns (SyncStatusResponse);
// List all available scripts that can be used as dependencies.
rpc SyncList(SyncListRequest) returns (SyncListResponse);
}
+41 -1
View File
@@ -44,6 +44,7 @@ type DRPCAgentSocketClient interface {
SyncComplete(ctx context.Context, in *SyncCompleteRequest) (*SyncCompleteResponse, error)
SyncReady(ctx context.Context, in *SyncReadyRequest) (*SyncReadyResponse, error)
SyncStatus(ctx context.Context, in *SyncStatusRequest) (*SyncStatusResponse, error)
SyncList(ctx context.Context, in *SyncListRequest) (*SyncListResponse, error)
}
type drpcAgentSocketClient struct {
@@ -110,6 +111,15 @@ func (c *drpcAgentSocketClient) SyncStatus(ctx context.Context, in *SyncStatusRe
return out, nil
}
func (c *drpcAgentSocketClient) SyncList(ctx context.Context, in *SyncListRequest) (*SyncListResponse, error) {
out := new(SyncListResponse)
err := c.cc.Invoke(ctx, "/coder.agentsocket.v1.AgentSocket/SyncList", drpcEncoding_File_agent_agentsocket_proto_agentsocket_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}
type DRPCAgentSocketServer interface {
Ping(context.Context, *PingRequest) (*PingResponse, error)
SyncStart(context.Context, *SyncStartRequest) (*SyncStartResponse, error)
@@ -117,6 +127,7 @@ type DRPCAgentSocketServer interface {
SyncComplete(context.Context, *SyncCompleteRequest) (*SyncCompleteResponse, error)
SyncReady(context.Context, *SyncReadyRequest) (*SyncReadyResponse, error)
SyncStatus(context.Context, *SyncStatusRequest) (*SyncStatusResponse, error)
SyncList(context.Context, *SyncListRequest) (*SyncListResponse, error)
}
type DRPCAgentSocketUnimplementedServer struct{}
@@ -145,9 +156,13 @@ func (s *DRPCAgentSocketUnimplementedServer) SyncStatus(context.Context, *SyncSt
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
func (s *DRPCAgentSocketUnimplementedServer) SyncList(context.Context, *SyncListRequest) (*SyncListResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCAgentSocketDescription struct{}
func (DRPCAgentSocketDescription) NumMethods() int { return 6 }
func (DRPCAgentSocketDescription) NumMethods() int { return 7 }
func (DRPCAgentSocketDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
@@ -205,6 +220,15 @@ func (DRPCAgentSocketDescription) Method(n int) (string, drpc.Encoding, drpc.Rec
in1.(*SyncStatusRequest),
)
}, DRPCAgentSocketServer.SyncStatus, true
case 6:
return "/coder.agentsocket.v1.AgentSocket/SyncList", drpcEncoding_File_agent_agentsocket_proto_agentsocket_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCAgentSocketServer).
SyncList(
ctx,
in1.(*SyncListRequest),
)
}, DRPCAgentSocketServer.SyncList, true
default:
return "", nil, nil, nil, false
}
@@ -309,3 +333,19 @@ func (x *drpcAgentSocket_SyncStatusStream) SendAndClose(m *SyncStatusResponse) e
}
return x.CloseSend()
}
type DRPCAgentSocket_SyncListStream interface {
drpc.Stream
SendAndClose(*SyncListResponse) error
}
type drpcAgentSocket_SyncListStream struct {
drpc.Stream
}
func (x *drpcAgentSocket_SyncListStream) SendAndClose(m *SyncListResponse) error {
if err := x.MsgSend(m, drpcEncoding_File_agent_agentsocket_proto_agentsocket_proto{}); err != nil {
return err
}
return x.CloseSend()
}
+5 -1
View File
@@ -39,12 +39,16 @@ func NewServer(logger slog.Logger, opts ...Option) (*Server, error) {
}
logger = logger.Named("agentsocket-server")
unitMgr := options.unitManager
if unitMgr == nil {
unitMgr = unit.NewManager()
}
server := &Server{
logger: logger,
path: options.path,
service: &DRPCAgentSocketService{
logger: logger,
unitManager: unit.NewManager(),
unitManager: unitMgr,
},
}
+22
View File
@@ -150,3 +150,25 @@ func (s *DRPCAgentSocketService) SyncStatus(_ context.Context, req *proto.SyncSt
Dependencies: depInfos,
}, nil
}
// SyncList returns a list of all units in the dependency graph.
func (s *DRPCAgentSocketService) SyncList(_ context.Context, _ *proto.SyncListRequest) (*proto.SyncListResponse, error) {
if s.unitManager == nil {
return &proto.SyncListResponse{
Scripts: []*proto.ScriptInfo{},
}, nil
}
units := s.unitManager.GetAllUnits()
var scriptInfos []*proto.ScriptInfo
for _, u := range units {
scriptInfos = append(scriptInfos, &proto.ScriptInfo{
Id: string(u.ID()),
Status: string(u.Status()),
})
}
return &proto.SyncListResponse{
Scripts: scriptInfos,
}, nil
}
+12
View File
@@ -288,3 +288,15 @@ func (m *Manager) GetUnmetDependencies(unit ID) ([]Dependency, error) {
func (m *Manager) ExportDOT(name string) (string, error) {
return m.graph.ToDOT(name)
}
// GetAllUnits returns all registered units in the manager.
func (m *Manager) GetAllUnits() []Unit {
m.mu.RLock()
defer m.mu.RUnlock()
units := make([]Unit, 0, len(m.units))
for _, u := range m.units {
units = append(units, u)
}
return units
}
+1
View File
@@ -20,6 +20,7 @@ func (r *RootCmd) syncCommand() *serpent.Command {
r.syncWant(&socketPath),
r.syncComplete(&socketPath),
r.syncStatus(&socketPath),
r.syncList(&socketPath),
},
Options: serpent.OptionSet{
{
+63
View File
@@ -0,0 +1,63 @@
package cli
import (
"fmt"
"golang.org/x/xerrors"
"github.com/coder/serpent"
"github.com/coder/coder/v2/agent/agentsocket"
"github.com/coder/coder/v2/cli/cliui"
)
func (*RootCmd) syncList(socketPath *string) *serpent.Command {
formatter := cliui.NewOutputFormatter(
cliui.TableFormat(
[]agentsocket.ScriptInfo{},
[]string{
"id",
"status",
},
),
cliui.JSONFormat(),
)
cmd := &serpent.Command{
Use: "list",
Short: "List all units in the dependency graph",
Long: "List all units registered in the dependency graph, including their current status. Units can be coder scripts or other units registered via sync commands.",
Handler: func(i *serpent.Invocation) error {
ctx := i.Context()
opts := []agentsocket.Option{}
if *socketPath != "" {
opts = append(opts, agentsocket.WithPath(*socketPath))
}
client, err := agentsocket.NewClient(ctx, opts...)
if err != nil {
return xerrors.Errorf("connect to agent socket: %w", err)
}
defer client.Close()
scripts, err := client.SyncList(ctx)
if err != nil {
return xerrors.Errorf("list scripts failed: %w", err)
}
out, err := formatter.Format(ctx, scripts)
if err != nil {
return xerrors.Errorf("format scripts: %w", err)
}
_, _ = fmt.Fprintln(i.Stdout, out)
return nil
},
}
formatter.AttachOptions(&cmd.Options)
return cmd
}
+1
View File
@@ -193,6 +193,7 @@ func dbAgentScriptToProto(script database.WorkspaceAgentScript) *agentproto.Work
RunOnStop: script.RunOnStop,
StartBlocksLogin: script.StartBlocksLogin,
Timeout: durationpb.New(time.Duration(script.TimeoutSeconds) * time.Second),
DisplayName: script.DisplayName,
}
}