Compare commits
2 Commits
fix/nullab
...
logpanic
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ed709b306 | ||
|
|
da2490b9cb |
@@ -39,6 +39,7 @@ import (
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/clistat"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/agentfiles"
|
||||
"github.com/coder/coder/v2/agent/agentscripts"
|
||||
@@ -553,7 +554,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
|
||||
// Set up collect and report as a single ticker with two channels,
|
||||
// this is to allow collection and reporting to be triggered
|
||||
// independently of each other.
|
||||
go func() {
|
||||
agentutil.Go(ctx, a.logger, func() {
|
||||
t := time.NewTicker(a.reportMetadataInterval)
|
||||
defer func() {
|
||||
t.Stop()
|
||||
@@ -578,9 +579,9 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
|
||||
wake(collect)
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
go func() {
|
||||
agentutil.Go(ctx, a.logger, func() {
|
||||
defer close(collectDone)
|
||||
|
||||
var (
|
||||
@@ -627,7 +628,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
|
||||
// We send the result to the channel in the goroutine to avoid
|
||||
// sending the same result multiple times. So, we don't care about
|
||||
// the return values.
|
||||
go flight.Do(md.Key, func() {
|
||||
agentutil.Go(ctx, a.logger, func() { flight.Do(md.Key, func() {
|
||||
ctx := slog.With(ctx, slog.F("key", md.Key))
|
||||
lastCollectedAtMu.RLock()
|
||||
collectedAt, ok := lastCollectedAts[md.Key]
|
||||
@@ -680,10 +681,10 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
|
||||
lastCollectedAts[md.Key] = now
|
||||
lastCollectedAtMu.Unlock()
|
||||
}
|
||||
})
|
||||
}) })
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// Gather metadata updates and report them once every interval. If a
|
||||
// previous report is in flight, wait for it to complete before
|
||||
@@ -734,14 +735,14 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
|
||||
}
|
||||
|
||||
reportInFlight = true
|
||||
go func() {
|
||||
agentutil.Go(ctx, a.logger, func() {
|
||||
a.logger.Debug(ctx, "batch updating metadata")
|
||||
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err := aAPI.BatchUpdateMetadata(ctx, &proto.BatchUpdateMetadataRequest{Metadata: metadata})
|
||||
reportError <- err
|
||||
}()
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1518,10 +1519,10 @@ func (a *agent) trackGoroutine(fn func()) error {
|
||||
return xerrors.Errorf("track conn goroutine: %w", ErrAgentClosing)
|
||||
}
|
||||
a.closeWaitGroup.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(a.hardCtx, a.logger, func() {
|
||||
defer a.closeWaitGroup.Done()
|
||||
fn()
|
||||
}()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1625,15 +1626,15 @@ func (a *agent) createTailnet(
|
||||
clog.Info(ctx, "accepted conn")
|
||||
wg.Add(1)
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
agentutil.Go(ctx, clog, func() {
|
||||
select {
|
||||
case <-closed:
|
||||
case <-a.hardCtx.Done():
|
||||
_ = conn.Close()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
})
|
||||
agentutil.Go(ctx, clog, func() {
|
||||
defer close(closed)
|
||||
sErr := speedtest.ServeConn(conn)
|
||||
if sErr != nil {
|
||||
@@ -1641,7 +1642,7 @@ func (a *agent) createTailnet(
|
||||
return
|
||||
}
|
||||
clog.Info(ctx, "test ended")
|
||||
}()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}); err != nil {
|
||||
@@ -1668,13 +1669,13 @@ func (a *agent) createTailnet(
|
||||
WriteTimeout: 20 * time.Second,
|
||||
ErrorLog: slog.Stdlib(ctx, a.logger.Named("http_api_server"), slog.LevelInfo),
|
||||
}
|
||||
go func() {
|
||||
agentutil.Go(ctx, a.logger, func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-a.hardCtx.Done():
|
||||
}
|
||||
_ = server.Close()
|
||||
}()
|
||||
})
|
||||
|
||||
apiServErr := server.Serve(apiListener)
|
||||
if apiServErr != nil && !xerrors.Is(apiServErr, http.ErrServerClosed) && !strings.Contains(apiServErr.Error(), "use of closed network connection") {
|
||||
@@ -1716,7 +1717,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
|
||||
coordination := ctrl.New(coordinate)
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
agentutil.Go(ctx, a.logger, func() {
|
||||
defer close(errCh)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -1728,7 +1729,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
|
||||
case err := <-coordination.Wait():
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
})
|
||||
return <-errCh
|
||||
}
|
||||
|
||||
@@ -1819,7 +1820,7 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(pingCtx, a.logger, func() {
|
||||
defer wg.Done()
|
||||
duration, p2p, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
|
||||
if err != nil {
|
||||
@@ -1833,7 +1834,7 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
|
||||
} else {
|
||||
derpConns++
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
sort.Float64s(durations)
|
||||
@@ -2031,13 +2032,13 @@ func (a *agent) Close() error {
|
||||
|
||||
// Wait for the graceful shutdown to complete, but don't wait forever so
|
||||
// that we don't break user expectations.
|
||||
go func() {
|
||||
agentutil.Go(a.hardCtx, a.logger, func() {
|
||||
defer a.hardCancel()
|
||||
select {
|
||||
case <-a.hardCtx.Done():
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// Wait for lifecycle to be reported
|
||||
lifecycleWaitLoop:
|
||||
@@ -2127,13 +2128,13 @@ const EnvAgentSubsystem = "CODER_AGENT_SUBSYSTEM"
|
||||
// eitherContext returns a context that is canceled when either context ends.
|
||||
func eitherContext(a, b context.Context) context.Context {
|
||||
ctx, cancel := context.WithCancel(a)
|
||||
go func() {
|
||||
agentutil.Go(ctx, slog.Logger{}, func() {
|
||||
defer cancel()
|
||||
select {
|
||||
case <-a.Done():
|
||||
case <-b.Done():
|
||||
}
|
||||
}()
|
||||
})
|
||||
return ctx
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers/ignore"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers/watcher"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/usershell"
|
||||
@@ -563,11 +564,11 @@ func (api *API) discoverDevcontainersInProject(projectPath string) error {
|
||||
|
||||
if dc.Status == codersdk.WorkspaceAgentDevcontainerStatusStarting {
|
||||
api.asyncWg.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(api.ctx, api.logger, func() {
|
||||
defer api.asyncWg.Done()
|
||||
|
||||
_ = api.CreateDevcontainer(dc.WorkspaceFolder, dc.ConfigPath)
|
||||
}()
|
||||
})
|
||||
}
|
||||
}
|
||||
api.mu.Unlock()
|
||||
@@ -1423,9 +1424,9 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques
|
||||
api.knownDevcontainers[dc.WorkspaceFolder] = dc
|
||||
api.broadcastUpdatesLocked()
|
||||
|
||||
go func() {
|
||||
agentutil.Go(ctx, api.logger, func() {
|
||||
_ = api.CreateDevcontainer(dc.WorkspaceFolder, dc.ConfigPath, WithRemoveExistingContainer())
|
||||
}()
|
||||
})
|
||||
|
||||
api.mu.Unlock()
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentssh"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
@@ -473,10 +474,10 @@ func (r *Runner) trackCommandGoroutine(fn func()) error {
|
||||
return xerrors.New("track command goroutine: closed")
|
||||
}
|
||||
r.cmdCloseWait.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(r.cronCtx, r.Logger, func() {
|
||||
defer r.cmdCloseWait.Done()
|
||||
fn()
|
||||
}()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentsocket/proto"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/unit"
|
||||
"github.com/coder/coder/v2/codersdk/drpcsdk"
|
||||
)
|
||||
@@ -79,10 +80,10 @@ func NewServer(logger slog.Logger, opts ...Option) (*Server, error) {
|
||||
server.logger.Info(server.ctx, "agent socket server started", slog.F("path", server.path))
|
||||
|
||||
server.wg.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(server.ctx, server.logger, func() {
|
||||
defer server.wg.Done()
|
||||
server.acceptConnections()
|
||||
}()
|
||||
})
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/agentrsa"
|
||||
"github.com/coder/coder/v2/agent/usershell"
|
||||
@@ -634,13 +635,13 @@ func (s *Server) startNonPTYSession(logger slog.Logger, session ssh.Session, mag
|
||||
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_pipe").Add(1)
|
||||
return xerrors.Errorf("create stdin pipe: %w", err)
|
||||
}
|
||||
go func() {
|
||||
agentutil.Go(session.Context(), logger, func() {
|
||||
_, err := io.Copy(stdinPipe, session)
|
||||
if err != nil {
|
||||
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_io_copy").Add(1)
|
||||
}
|
||||
_ = stdinPipe.Close()
|
||||
}()
|
||||
})
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "start_command").Add(1)
|
||||
@@ -662,11 +663,11 @@ func (s *Server) startNonPTYSession(logger slog.Logger, session ssh.Session, mag
|
||||
session.Signals(nil)
|
||||
close(sigs)
|
||||
}()
|
||||
go func() {
|
||||
agentutil.Go(session.Context(), logger, func() {
|
||||
for sig := range sigs {
|
||||
handleSignal(logger, sig, cmd.Process, s.metrics, magicTypeLabel)
|
||||
}
|
||||
}()
|
||||
})
|
||||
return cmd.Wait()
|
||||
}
|
||||
|
||||
@@ -737,7 +738,7 @@ func (s *Server) startPTYSession(logger slog.Logger, session ptySession, magicTy
|
||||
session.Signals(nil)
|
||||
close(sigs)
|
||||
}()
|
||||
go func() {
|
||||
agentutil.Go(ctx, logger, func() {
|
||||
for {
|
||||
if sigs == nil && windowSize == nil {
|
||||
return
|
||||
@@ -764,14 +765,14 @@ func (s *Server) startPTYSession(logger slog.Logger, session ptySession, magicTy
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
go func() {
|
||||
agentutil.Go(ctx, logger, func() {
|
||||
_, err := io.Copy(ptty.InputWriter(), session)
|
||||
if err != nil {
|
||||
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "input_io_copy").Add(1)
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// We need to wait for the command output to finish copying. It's safe to
|
||||
// just do this copy on the main handler goroutine because one of two things
|
||||
@@ -1213,11 +1214,11 @@ func (s *Server) Close() error {
|
||||
// but Close() may not have completed.
|
||||
func (s *Server) Shutdown(ctx context.Context) error {
|
||||
ch := make(chan error, 1)
|
||||
go func() {
|
||||
agentutil.Go(ctx, s.logger, func() {
|
||||
// TODO(mafredri): Implement shutdown, SIGHUP running commands, etc.
|
||||
// For now we just close the server.
|
||||
ch <- s.Close()
|
||||
}()
|
||||
})
|
||||
var err error
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -4,6 +4,9 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
)
|
||||
|
||||
// Bicopy copies all of the data between the two connections and will close them
|
||||
@@ -35,10 +38,10 @@ func Bicopy(ctx context.Context, c1, c2 io.ReadWriteCloser) {
|
||||
|
||||
// Convert waitgroup to a channel so we can also wait on the context.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
agentutil.Go(ctx, slog.Logger{}, func() {
|
||||
defer close(done)
|
||||
wg.Wait()
|
||||
}()
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
)
|
||||
|
||||
// streamLocalForwardPayload describes the extra data sent in a
|
||||
@@ -130,11 +131,11 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
|
||||
log.Debug(ctx, "SSH unix forward added to cache")
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go func() {
|
||||
agentutil.Go(ctx, log, func() {
|
||||
<-ctx.Done()
|
||||
_ = ln.Close()
|
||||
}()
|
||||
go func() {
|
||||
})
|
||||
agentutil.Go(ctx, log, func() {
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
@@ -152,7 +153,7 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
|
||||
SocketPath: addr,
|
||||
})
|
||||
|
||||
go func() {
|
||||
agentutil.Go(ctx, log, func() {
|
||||
ch, reqs, err := conn.OpenChannel("forwarded-streamlocal@openssh.com", payload)
|
||||
if err != nil {
|
||||
h.log.Warn(ctx, "open SSH unix forward channel to client", slog.Error(err))
|
||||
@@ -161,7 +162,7 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
|
||||
}
|
||||
go gossh.DiscardRequests(reqs)
|
||||
Bicopy(ctx, ch, c)
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
@@ -171,7 +172,7 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
|
||||
h.Unlock()
|
||||
log.Debug(ctx, "SSH unix forward listener removed from cache")
|
||||
_ = ln.Close()
|
||||
}()
|
||||
})
|
||||
|
||||
return true, nil
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -122,10 +123,10 @@ func (x *x11Forwarder) x11Handler(sshCtx ssh.Context, sshSession ssh.Session) (d
|
||||
}
|
||||
|
||||
// clean up the X11 session if the SSH session completes.
|
||||
go func() {
|
||||
agentutil.Go(ctx, x.logger, func() {
|
||||
<-ctx.Done()
|
||||
x.closeAndRemoveSession(x11session)
|
||||
}()
|
||||
})
|
||||
|
||||
go x.listenForConnections(ctx, x11session, serverConn, x11)
|
||||
x.logger.Debug(ctx, "X11 forwarding started", slog.F("display", x11session.display))
|
||||
@@ -206,10 +207,10 @@ func (x *x11Forwarder) listenForConnections(
|
||||
_ = conn.Close()
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
agentutil.Go(ctx, x.logger, func() {
|
||||
defer x.trackConn(conn, false)
|
||||
Bicopy(ctx, conn, channel)
|
||||
}()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
25
agent/agentutil/agentutil.go
Normal file
25
agent/agentutil/agentutil.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package agentutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime/debug"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
)
|
||||
|
||||
// Go runs the provided function in a goroutine, recovering from panics and
|
||||
// logging them before re-panicking.
|
||||
func Go(ctx context.Context, log slog.Logger, fn func()) {
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Critical(ctx, "panic in goroutine",
|
||||
slog.F("panic", r),
|
||||
slog.F("stack", string(debug.Stack())),
|
||||
)
|
||||
panic(r)
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}()
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/codersdk/agentsdk"
|
||||
"github.com/coder/quartz"
|
||||
@@ -69,7 +70,7 @@ func NewAppHealthReporterWithClock(
|
||||
continue
|
||||
}
|
||||
app := nextApp
|
||||
go func() {
|
||||
agentutil.Go(ctx, logger, func() {
|
||||
_ = clk.TickerFunc(ctx, time.Duration(app.Healthcheck.Interval)*time.Second, func() error {
|
||||
// We time out at the healthcheck interval to prevent getting too backed up, but
|
||||
// set it 1ms early so that it's not simultaneous with the next tick in testing,
|
||||
@@ -133,7 +134,7 @@ func NewAppHealthReporterWithClock(
|
||||
}
|
||||
return nil
|
||||
}, "healthcheck", app.Slug)
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/boundarylogproxy/codec"
|
||||
agentproto "github.com/coder/coder/v2/agent/proto"
|
||||
)
|
||||
@@ -133,11 +134,11 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
|
||||
defer cancel()
|
||||
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(ctx, s.logger, func() {
|
||||
defer s.wg.Done()
|
||||
<-ctx.Done()
|
||||
_ = conn.Close()
|
||||
}()
|
||||
})
|
||||
|
||||
// This is intended to be a sane starting point for the read buffer size. It may be
|
||||
// grown by codec.ReadFrame if necessary.
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/pty"
|
||||
)
|
||||
|
||||
@@ -76,7 +77,7 @@ func newBuffered(ctx context.Context, logger slog.Logger, execer agentexec.Exece
|
||||
// We do not need to separately monitor for the process exiting. When it
|
||||
// exits, our ptty.OutputReader() will return EOF after reading all process
|
||||
// output.
|
||||
go func() {
|
||||
agentutil.Go(ctx, logger, func() {
|
||||
buffer := make([]byte, 1024)
|
||||
for {
|
||||
read, err := ptty.OutputReader().Read(buffer)
|
||||
@@ -118,7 +119,7 @@ func newBuffered(ctx context.Context, logger slog.Logger, execer agentexec.Exece
|
||||
}
|
||||
rpty.state.cond.L.Unlock()
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
return rpty
|
||||
}
|
||||
@@ -133,7 +134,7 @@ func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.
|
||||
logger.Debug(ctx, "reconnecting pty ready")
|
||||
rpty.state.setState(StateReady, nil)
|
||||
|
||||
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing)
|
||||
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing, logger)
|
||||
if state < StateClosing {
|
||||
// If we have not closed yet then the context is what unblocked us (which
|
||||
// means the agent is shutting down) so move into the closing phase.
|
||||
@@ -190,7 +191,7 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
|
||||
delete(rpty.activeConns, connID)
|
||||
}()
|
||||
|
||||
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
|
||||
state, err := rpty.state.waitForStateOrContext(ctx, StateReady, logger)
|
||||
if state != StateReady {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/codersdk/workspacesdk"
|
||||
"github.com/coder/coder/v2/pty"
|
||||
)
|
||||
@@ -177,20 +178,20 @@ func (s *ptyState) waitForState(state State) (State, error) {
|
||||
|
||||
// waitForStateOrContext blocks until the state or a greater one is reached or
|
||||
// the provided context ends.
|
||||
func (s *ptyState) waitForStateOrContext(ctx context.Context, state State) (State, error) {
|
||||
func (s *ptyState) waitForStateOrContext(ctx context.Context, state State, logger slog.Logger) (State, error) {
|
||||
s.cond.L.Lock()
|
||||
defer s.cond.L.Unlock()
|
||||
|
||||
nevermind := make(chan struct{})
|
||||
defer close(nevermind)
|
||||
go func() {
|
||||
agentutil.Go(ctx, logger, func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Wake up when the context ends.
|
||||
s.cond.Broadcast()
|
||||
case <-nevermind:
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
for ctx.Err() == nil && state > s.state {
|
||||
s.cond.Wait()
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/pty"
|
||||
)
|
||||
|
||||
@@ -141,7 +142,7 @@ func (rpty *screenReconnectingPTY) lifecycle(ctx context.Context, logger slog.Lo
|
||||
logger.Debug(ctx, "reconnecting pty ready")
|
||||
rpty.state.setState(StateReady, nil)
|
||||
|
||||
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing)
|
||||
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing, logger)
|
||||
if state < StateClosing {
|
||||
// If we have not closed yet then the context is what unblocked us (which
|
||||
// means the agent is shutting down) so move into the closing phase.
|
||||
@@ -166,7 +167,7 @@ func (rpty *screenReconnectingPTY) Attach(ctx context.Context, _ string, conn ne
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
|
||||
state, err := rpty.state.waitForStateOrContext(ctx, StateReady, logger)
|
||||
if state != StateReady {
|
||||
return err
|
||||
}
|
||||
@@ -256,7 +257,7 @@ func (rpty *screenReconnectingPTY) doAttach(ctx context.Context, conn net.Conn,
|
||||
// We do not need to separately monitor for the process exiting. When it
|
||||
// exits, our ptty.OutputReader() will return EOF after reading all process
|
||||
// output.
|
||||
go func() {
|
||||
agentutil.Go(ctx, logger, func() {
|
||||
defer versionCancel()
|
||||
defer func() {
|
||||
err := conn.Close()
|
||||
@@ -298,7 +299,7 @@ func (rpty *screenReconnectingPTY) doAttach(ctx context.Context, conn net.Conn,
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// Version seems to be the only command without a side effect (other than
|
||||
// making the version pop up briefly) so use it to wait for the session to
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/agentssh"
|
||||
"github.com/coder/coder/v2/agent/usershell"
|
||||
"github.com/coder/coder/v2/codersdk/workspacesdk"
|
||||
@@ -90,7 +91,7 @@ func (s *Server) Serve(ctx, hardCtx context.Context, l net.Listener) (retErr err
|
||||
wg.Add(1)
|
||||
disconnected := s.reportConnection(uuid.New(), remoteAddrString)
|
||||
closed := make(chan struct{})
|
||||
go func() {
|
||||
agentutil.Go(ctx, clog, func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case <-closed:
|
||||
@@ -98,9 +99,9 @@ func (s *Server) Serve(ctx, hardCtx context.Context, l net.Listener) (retErr err
|
||||
disconnected(1, "server shut down")
|
||||
_ = conn.Close()
|
||||
}
|
||||
}()
|
||||
})
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
agentutil.Go(ctx, clog, func() {
|
||||
defer close(closed)
|
||||
defer wg.Done()
|
||||
err := s.handleConn(ctx, clog, conn)
|
||||
@@ -113,7 +114,7 @@ func (s *Server) Serve(ctx, hardCtx context.Context, l net.Listener) (retErr err
|
||||
} else {
|
||||
disconnected(0, "")
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
return retErr
|
||||
@@ -226,18 +227,18 @@ func (s *Server) handleConn(ctx context.Context, logger slog.Logger, conn net.Co
|
||||
)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
agentutil.Go(ctx, connLogger, func() {
|
||||
select {
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
rpty.Close(ctx.Err())
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
go func() {
|
||||
agentutil.Go(ctx, connLogger, func() {
|
||||
rpty.Wait()
|
||||
s.reconnectingPTYs.Delete(msg.ID)
|
||||
}()
|
||||
})
|
||||
|
||||
connected = true
|
||||
sendConnected <- rpty
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"tailscale.com/types/netlogtype"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"github.com/coder/coder/v2/agent/agentutil"
|
||||
"github.com/coder/coder/v2/agent/proto"
|
||||
)
|
||||
|
||||
@@ -86,13 +87,13 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
|
||||
// use a separate goroutine to monitor the context so that we notice immediately, rather than
|
||||
// waiting for the next callback (which might never come if we are closing!)
|
||||
ctxDone := false
|
||||
go func() {
|
||||
agentutil.Go(ctx, s.logger, func() {
|
||||
<-ctx.Done()
|
||||
s.L.Lock()
|
||||
defer s.L.Unlock()
|
||||
ctxDone = true
|
||||
s.Broadcast()
|
||||
}()
|
||||
})
|
||||
defer s.logger.Debug(ctx, "reportLoop exiting")
|
||||
|
||||
s.L.Lock()
|
||||
|
||||
1
go.mod
1
go.mod
@@ -470,6 +470,7 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
cdr.dev/slog v1.6.2-0.20251120224544-40ff19937ff2
|
||||
github.com/anthropics/anthropic-sdk-go v1.19.0
|
||||
github.com/brianvoe/gofakeit/v7 v7.14.0
|
||||
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1,3 +1,5 @@
|
||||
cdr.dev/slog v1.6.2-0.20251120224544-40ff19937ff2 h1:M4Z9eTbnHPdZI4GpBUNCae0lSgUucY+aW5j7+zB8lCk=
|
||||
cdr.dev/slog v1.6.2-0.20251120224544-40ff19937ff2/go.mod h1:NaoTA7KwopCrnaSb0JXTC0PTp/O/Y83Lndnq0OEV3ZQ=
|
||||
cdr.dev/slog/v3 v3.0.0-rc1 h1:EN7Zim6GvTpAeHQjI0ERDEfqKbTyXRvgH4UhlzLpvWM=
|
||||
cdr.dev/slog/v3 v3.0.0-rc1/go.mod h1:iO/OALX1VxlI03mkodCGdVP7pXzd2bRMvu3ePvlJ9ak=
|
||||
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=
|
||||
|
||||
@@ -104,6 +104,17 @@ func testingWithOwnerUser(m dsl.Matcher) {
|
||||
Report(`This client is operating as the owner user, which has unrestricted permissions. Consider creating a different user.`)
|
||||
}
|
||||
|
||||
// doNotUseRawGoInAgent detects raw `go func()` in agent package.
|
||||
// Use agentutil.Go() instead for panic recovery.
|
||||
//
|
||||
//nolint:unused,deadcode,varnamelen
|
||||
func doNotUseRawGoInAgent(m dsl.Matcher) {
|
||||
m.Match(`go func() { $*_ }()`, `go func($*_) { $*_ }($*_)`).
|
||||
Where(m.File().PkgPath.Matches(`github\.com/coder/coder/v2/agent(/.*)?`) &&
|
||||
!m.File().Name.Matches(`_test\.go$`)).
|
||||
Report("Use agentutil.Go() instead of raw go func() for panic recovery")
|
||||
}
|
||||
|
||||
// Use xerrors everywhere! It provides additional stacktrace info!
|
||||
//
|
||||
//nolint:unused,deadcode,varnamelen
|
||||
|
||||
Reference in New Issue
Block a user