Compare commits

...

2 Commits

Author SHA1 Message Date
Jon Ayers
2ed709b306 refactor(agent): migrate go func() calls to agentutil.Go()
This adds panic recovery to all goroutines in the agent package
by using the new agentutil.Go() helper which wraps goroutines
with defer/recover and logs panics before re-panicking.

Files modified:
- agent/agentutil/agentutil.go (new)
- agent/stats.go
- agent/agent.go
- agent/agentscripts/agentscripts.go
- agent/reconnectingpty/reconnectingpty.go
- agent/reconnectingpty/screen.go
- agent/reconnectingpty/server.go
- agent/reconnectingpty/buffered.go
- agent/agentcontainers/api.go
- agent/apphealth.go
- agent/boundarylogproxy/proxy.go
- agent/agentssh/forward.go
- agent/agentssh/x11.go
- agent/agentssh/bicopy.go
- agent/agentssh/agentssh.go
- agent/agentsocket/server.go
2026-02-05 03:17:50 +00:00
Jon Ayers
da2490b9cb feat: add agentutil.Go() and lint rule for panic recovery 2026-02-04 04:51:15 +00:00
19 changed files with 135 additions and 79 deletions

View File

@@ -39,6 +39,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/clistat"
"github.com/coder/coder/v2/agent/agentcontainers"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/agentfiles"
"github.com/coder/coder/v2/agent/agentscripts"
@@ -553,7 +554,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
// Set up collect and report as a single ticker with two channels,
// this is to allow collection and reporting to be triggered
// independently of each other.
go func() {
agentutil.Go(ctx, a.logger, func() {
t := time.NewTicker(a.reportMetadataInterval)
defer func() {
t.Stop()
@@ -578,9 +579,9 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
wake(collect)
}
}
}()
})
go func() {
agentutil.Go(ctx, a.logger, func() {
defer close(collectDone)
var (
@@ -627,7 +628,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
// We send the result to the channel in the goroutine to avoid
// sending the same result multiple times. So, we don't care about
// the return values.
go flight.Do(md.Key, func() {
agentutil.Go(ctx, a.logger, func() { flight.Do(md.Key, func() {
ctx := slog.With(ctx, slog.F("key", md.Key))
lastCollectedAtMu.RLock()
collectedAt, ok := lastCollectedAts[md.Key]
@@ -680,10 +681,10 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
lastCollectedAts[md.Key] = now
lastCollectedAtMu.Unlock()
}
})
}) })
}
}
}()
})
// Gather metadata updates and report them once every interval. If a
// previous report is in flight, wait for it to complete before
@@ -734,14 +735,14 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient28
}
reportInFlight = true
go func() {
agentutil.Go(ctx, a.logger, func() {
a.logger.Debug(ctx, "batch updating metadata")
ctx, cancel := context.WithTimeout(ctx, reportTimeout)
defer cancel()
_, err := aAPI.BatchUpdateMetadata(ctx, &proto.BatchUpdateMetadataRequest{Metadata: metadata})
reportError <- err
}()
})
}
}
}
@@ -1518,10 +1519,10 @@ func (a *agent) trackGoroutine(fn func()) error {
return xerrors.Errorf("track conn goroutine: %w", ErrAgentClosing)
}
a.closeWaitGroup.Add(1)
go func() {
agentutil.Go(a.hardCtx, a.logger, func() {
defer a.closeWaitGroup.Done()
fn()
}()
})
return nil
}
@@ -1625,15 +1626,15 @@ func (a *agent) createTailnet(
clog.Info(ctx, "accepted conn")
wg.Add(1)
closed := make(chan struct{})
go func() {
agentutil.Go(ctx, clog, func() {
select {
case <-closed:
case <-a.hardCtx.Done():
_ = conn.Close()
}
wg.Done()
}()
go func() {
})
agentutil.Go(ctx, clog, func() {
defer close(closed)
sErr := speedtest.ServeConn(conn)
if sErr != nil {
@@ -1641,7 +1642,7 @@ func (a *agent) createTailnet(
return
}
clog.Info(ctx, "test ended")
}()
})
}
wg.Wait()
}); err != nil {
@@ -1668,13 +1669,13 @@ func (a *agent) createTailnet(
WriteTimeout: 20 * time.Second,
ErrorLog: slog.Stdlib(ctx, a.logger.Named("http_api_server"), slog.LevelInfo),
}
go func() {
agentutil.Go(ctx, a.logger, func() {
select {
case <-ctx.Done():
case <-a.hardCtx.Done():
}
_ = server.Close()
}()
})
apiServErr := server.Serve(apiListener)
if apiServErr != nil && !xerrors.Is(apiServErr, http.ErrServerClosed) && !strings.Contains(apiServErr.Error(), "use of closed network connection") {
@@ -1716,7 +1717,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
coordination := ctrl.New(coordinate)
errCh := make(chan error, 1)
go func() {
agentutil.Go(ctx, a.logger, func() {
defer close(errCh)
select {
case <-ctx.Done():
@@ -1728,7 +1729,7 @@ func (a *agent) runCoordinator(ctx context.Context, tClient tailnetproto.DRPCTai
case err := <-coordination.Wait():
errCh <- err
}
}()
})
return <-errCh
}
@@ -1819,7 +1820,7 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
continue
}
wg.Add(1)
go func() {
agentutil.Go(pingCtx, a.logger, func() {
defer wg.Done()
duration, p2p, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
if err != nil {
@@ -1833,7 +1834,7 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
} else {
derpConns++
}
}()
})
}
wg.Wait()
sort.Float64s(durations)
@@ -2031,13 +2032,13 @@ func (a *agent) Close() error {
// Wait for the graceful shutdown to complete, but don't wait forever so
// that we don't break user expectations.
go func() {
agentutil.Go(a.hardCtx, a.logger, func() {
defer a.hardCancel()
select {
case <-a.hardCtx.Done():
case <-time.After(5 * time.Second):
}
}()
})
// Wait for lifecycle to be reported
lifecycleWaitLoop:
@@ -2127,13 +2128,13 @@ const EnvAgentSubsystem = "CODER_AGENT_SUBSYSTEM"
// eitherContext returns a context that is canceled when either context ends.
func eitherContext(a, b context.Context) context.Context {
ctx, cancel := context.WithCancel(a)
go func() {
agentutil.Go(ctx, slog.Logger{}, func() {
defer cancel()
select {
case <-a.Done():
case <-b.Done():
}
}()
})
return ctx
}

View File

@@ -28,6 +28,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentcontainers/ignore"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/agentcontainers/watcher"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/usershell"
@@ -563,11 +564,11 @@ func (api *API) discoverDevcontainersInProject(projectPath string) error {
if dc.Status == codersdk.WorkspaceAgentDevcontainerStatusStarting {
api.asyncWg.Add(1)
go func() {
agentutil.Go(api.ctx, api.logger, func() {
defer api.asyncWg.Done()
_ = api.CreateDevcontainer(dc.WorkspaceFolder, dc.ConfigPath)
}()
})
}
}
api.mu.Unlock()
@@ -1423,9 +1424,9 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques
api.knownDevcontainers[dc.WorkspaceFolder] = dc
api.broadcastUpdatesLocked()
go func() {
agentutil.Go(ctx, api.logger, func() {
_ = api.CreateDevcontainer(dc.WorkspaceFolder, dc.ConfigPath, WithRemoveExistingContainer())
}()
})
api.mu.Unlock()

View File

@@ -22,6 +22,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/codersdk"
@@ -473,10 +474,10 @@ func (r *Runner) trackCommandGoroutine(fn func()) error {
return xerrors.New("track command goroutine: closed")
}
r.cmdCloseWait.Add(1)
go func() {
agentutil.Go(r.cronCtx, r.Logger, func() {
defer r.cmdCloseWait.Done()
fn()
}()
})
return nil
}

View File

@@ -12,6 +12,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentsocket/proto"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/unit"
"github.com/coder/coder/v2/codersdk/drpcsdk"
)
@@ -79,10 +80,10 @@ func NewServer(logger slog.Logger, opts ...Option) (*Server, error) {
server.logger.Info(server.ctx, "agent socket server started", slog.F("path", server.path))
server.wg.Add(1)
go func() {
agentutil.Go(server.ctx, server.logger, func() {
defer server.wg.Done()
server.acceptConnections()
}()
})
return server, nil
}

View File

@@ -29,6 +29,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentcontainers"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/agentrsa"
"github.com/coder/coder/v2/agent/usershell"
@@ -634,13 +635,13 @@ func (s *Server) startNonPTYSession(logger slog.Logger, session ssh.Session, mag
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_pipe").Add(1)
return xerrors.Errorf("create stdin pipe: %w", err)
}
go func() {
agentutil.Go(session.Context(), logger, func() {
_, err := io.Copy(stdinPipe, session)
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "stdin_io_copy").Add(1)
}
_ = stdinPipe.Close()
}()
})
err = cmd.Start()
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "no", "start_command").Add(1)
@@ -662,11 +663,11 @@ func (s *Server) startNonPTYSession(logger slog.Logger, session ssh.Session, mag
session.Signals(nil)
close(sigs)
}()
go func() {
agentutil.Go(session.Context(), logger, func() {
for sig := range sigs {
handleSignal(logger, sig, cmd.Process, s.metrics, magicTypeLabel)
}
}()
})
return cmd.Wait()
}
@@ -737,7 +738,7 @@ func (s *Server) startPTYSession(logger slog.Logger, session ptySession, magicTy
session.Signals(nil)
close(sigs)
}()
go func() {
agentutil.Go(ctx, logger, func() {
for {
if sigs == nil && windowSize == nil {
return
@@ -764,14 +765,14 @@ func (s *Server) startPTYSession(logger slog.Logger, session ptySession, magicTy
}
}
}
}()
})
go func() {
agentutil.Go(ctx, logger, func() {
_, err := io.Copy(ptty.InputWriter(), session)
if err != nil {
s.metrics.sessionErrors.WithLabelValues(magicTypeLabel, "yes", "input_io_copy").Add(1)
}
}()
})
// We need to wait for the command output to finish copying. It's safe to
// just do this copy on the main handler goroutine because one of two things
@@ -1213,11 +1214,11 @@ func (s *Server) Close() error {
// but Close() may not have completed.
func (s *Server) Shutdown(ctx context.Context) error {
ch := make(chan error, 1)
go func() {
agentutil.Go(ctx, s.logger, func() {
// TODO(mafredri): Implement shutdown, SIGHUP running commands, etc.
// For now we just close the server.
ch <- s.Close()
}()
})
var err error
select {
case <-ctx.Done():

View File

@@ -4,6 +4,9 @@ import (
"context"
"io"
"sync"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentutil"
)
// Bicopy copies all of the data between the two connections and will close them
@@ -35,10 +38,10 @@ func Bicopy(ctx context.Context, c1, c2 io.ReadWriteCloser) {
// Convert waitgroup to a channel so we can also wait on the context.
done := make(chan struct{})
go func() {
agentutil.Go(ctx, slog.Logger{}, func() {
defer close(done)
wg.Wait()
}()
})
select {
case <-ctx.Done():

View File

@@ -16,6 +16,7 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentutil"
)
// streamLocalForwardPayload describes the extra data sent in a
@@ -130,11 +131,11 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
log.Debug(ctx, "SSH unix forward added to cache")
ctx, cancel := context.WithCancel(ctx)
go func() {
agentutil.Go(ctx, log, func() {
<-ctx.Done()
_ = ln.Close()
}()
go func() {
})
agentutil.Go(ctx, log, func() {
defer cancel()
for {
@@ -152,7 +153,7 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
SocketPath: addr,
})
go func() {
agentutil.Go(ctx, log, func() {
ch, reqs, err := conn.OpenChannel("forwarded-streamlocal@openssh.com", payload)
if err != nil {
h.log.Warn(ctx, "open SSH unix forward channel to client", slog.Error(err))
@@ -161,7 +162,7 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
}
go gossh.DiscardRequests(reqs)
Bicopy(ctx, ch, c)
}()
})
}
h.Lock()
@@ -171,7 +172,7 @@ func (h *forwardedUnixHandler) HandleSSHRequest(ctx ssh.Context, _ *ssh.Server,
h.Unlock()
log.Debug(ctx, "SSH unix forward listener removed from cache")
_ = ln.Close()
}()
})
return true, nil

View File

@@ -22,6 +22,7 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentutil"
)
const (
@@ -122,10 +123,10 @@ func (x *x11Forwarder) x11Handler(sshCtx ssh.Context, sshSession ssh.Session) (d
}
// clean up the X11 session if the SSH session completes.
go func() {
agentutil.Go(ctx, x.logger, func() {
<-ctx.Done()
x.closeAndRemoveSession(x11session)
}()
})
go x.listenForConnections(ctx, x11session, serverConn, x11)
x.logger.Debug(ctx, "X11 forwarding started", slog.F("display", x11session.display))
@@ -206,10 +207,10 @@ func (x *x11Forwarder) listenForConnections(
_ = conn.Close()
continue
}
go func() {
agentutil.Go(ctx, x.logger, func() {
defer x.trackConn(conn, false)
Bicopy(ctx, conn, channel)
}()
})
}
}

View File

@@ -0,0 +1,25 @@
package agentutil
import (
"context"
"runtime/debug"
"cdr.dev/slog/v3"
)
// Go runs the provided function in a goroutine, recovering from panics and
// logging them before re-panicking.
func Go(ctx context.Context, log slog.Logger, fn func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Critical(ctx, "panic in goroutine",
slog.F("panic", r),
slog.F("stack", string(debug.Stack())),
)
panic(r)
}
}()
fn()
}()
}

View File

@@ -10,6 +10,7 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/quartz"
@@ -69,7 +70,7 @@ func NewAppHealthReporterWithClock(
continue
}
app := nextApp
go func() {
agentutil.Go(ctx, logger, func() {
_ = clk.TickerFunc(ctx, time.Duration(app.Healthcheck.Interval)*time.Second, func() error {
// We time out at the healthcheck interval to prevent getting too backed up, but
// set it 1ms early so that it's not simultaneous with the next tick in testing,
@@ -133,7 +134,7 @@ func NewAppHealthReporterWithClock(
}
return nil
}, "healthcheck", app.Slug)
}()
})
}
mu.Lock()

View File

@@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/proto"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/boundarylogproxy/codec"
agentproto "github.com/coder/coder/v2/agent/proto"
)
@@ -133,11 +134,11 @@ func (s *Server) handleConnection(ctx context.Context, conn net.Conn) {
defer cancel()
s.wg.Add(1)
go func() {
agentutil.Go(ctx, s.logger, func() {
defer s.wg.Done()
<-ctx.Done()
_ = conn.Close()
}()
})
// This is intended to be a sane starting point for the read buffer size. It may be
// grown by codec.ReadFrame if necessary.

View File

@@ -14,6 +14,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/pty"
)
@@ -76,7 +77,7 @@ func newBuffered(ctx context.Context, logger slog.Logger, execer agentexec.Exece
// We do not need to separately monitor for the process exiting. When it
// exits, our ptty.OutputReader() will return EOF after reading all process
// output.
go func() {
agentutil.Go(ctx, logger, func() {
buffer := make([]byte, 1024)
for {
read, err := ptty.OutputReader().Read(buffer)
@@ -118,7 +119,7 @@ func newBuffered(ctx context.Context, logger slog.Logger, execer agentexec.Exece
}
rpty.state.cond.L.Unlock()
}
}()
})
return rpty
}
@@ -133,7 +134,7 @@ func (rpty *bufferedReconnectingPTY) lifecycle(ctx context.Context, logger slog.
logger.Debug(ctx, "reconnecting pty ready")
rpty.state.setState(StateReady, nil)
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing)
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing, logger)
if state < StateClosing {
// If we have not closed yet then the context is what unblocked us (which
// means the agent is shutting down) so move into the closing phase.
@@ -190,7 +191,7 @@ func (rpty *bufferedReconnectingPTY) Attach(ctx context.Context, connID string,
delete(rpty.activeConns, connID)
}()
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
state, err := rpty.state.waitForStateOrContext(ctx, StateReady, logger)
if state != StateReady {
return err
}

View File

@@ -15,6 +15,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/pty"
)
@@ -177,20 +178,20 @@ func (s *ptyState) waitForState(state State) (State, error) {
// waitForStateOrContext blocks until the state or a greater one is reached or
// the provided context ends.
func (s *ptyState) waitForStateOrContext(ctx context.Context, state State) (State, error) {
func (s *ptyState) waitForStateOrContext(ctx context.Context, state State, logger slog.Logger) (State, error) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
nevermind := make(chan struct{})
defer close(nevermind)
go func() {
agentutil.Go(ctx, logger, func() {
select {
case <-ctx.Done():
// Wake up when the context ends.
s.cond.Broadcast()
case <-nevermind:
}
}()
})
for ctx.Err() == nil && state > s.state {
s.cond.Wait()

View File

@@ -20,6 +20,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentexec"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/pty"
)
@@ -141,7 +142,7 @@ func (rpty *screenReconnectingPTY) lifecycle(ctx context.Context, logger slog.Lo
logger.Debug(ctx, "reconnecting pty ready")
rpty.state.setState(StateReady, nil)
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing)
state, reasonErr := rpty.state.waitForStateOrContext(ctx, StateClosing, logger)
if state < StateClosing {
// If we have not closed yet then the context is what unblocked us (which
// means the agent is shutting down) so move into the closing phase.
@@ -166,7 +167,7 @@ func (rpty *screenReconnectingPTY) Attach(ctx context.Context, _ string, conn ne
ctx, cancel := context.WithCancel(ctx)
defer cancel()
state, err := rpty.state.waitForStateOrContext(ctx, StateReady)
state, err := rpty.state.waitForStateOrContext(ctx, StateReady, logger)
if state != StateReady {
return err
}
@@ -256,7 +257,7 @@ func (rpty *screenReconnectingPTY) doAttach(ctx context.Context, conn net.Conn,
// We do not need to separately monitor for the process exiting. When it
// exits, our ptty.OutputReader() will return EOF after reading all process
// output.
go func() {
agentutil.Go(ctx, logger, func() {
defer versionCancel()
defer func() {
err := conn.Close()
@@ -298,7 +299,7 @@ func (rpty *screenReconnectingPTY) doAttach(ctx context.Context, conn net.Conn,
break
}
}
}()
})
// Version seems to be the only command without a side effect (other than
// making the version pop up briefly) so use it to wait for the session to

View File

@@ -15,6 +15,7 @@ import (
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentcontainers"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/agentssh"
"github.com/coder/coder/v2/agent/usershell"
"github.com/coder/coder/v2/codersdk/workspacesdk"
@@ -90,7 +91,7 @@ func (s *Server) Serve(ctx, hardCtx context.Context, l net.Listener) (retErr err
wg.Add(1)
disconnected := s.reportConnection(uuid.New(), remoteAddrString)
closed := make(chan struct{})
go func() {
agentutil.Go(ctx, clog, func() {
defer wg.Done()
select {
case <-closed:
@@ -98,9 +99,9 @@ func (s *Server) Serve(ctx, hardCtx context.Context, l net.Listener) (retErr err
disconnected(1, "server shut down")
_ = conn.Close()
}
}()
})
wg.Add(1)
go func() {
agentutil.Go(ctx, clog, func() {
defer close(closed)
defer wg.Done()
err := s.handleConn(ctx, clog, conn)
@@ -113,7 +114,7 @@ func (s *Server) Serve(ctx, hardCtx context.Context, l net.Listener) (retErr err
} else {
disconnected(0, "")
}
}()
})
}
wg.Wait()
return retErr
@@ -226,18 +227,18 @@ func (s *Server) handleConn(ctx context.Context, logger slog.Logger, conn net.Co
)
done := make(chan struct{})
go func() {
agentutil.Go(ctx, connLogger, func() {
select {
case <-done:
case <-ctx.Done():
rpty.Close(ctx.Err())
}
}()
})
go func() {
agentutil.Go(ctx, connLogger, func() {
rpty.Wait()
s.reconnectingPTYs.Delete(msg.ID)
}()
})
connected = true
sendConnected <- rpty

View File

@@ -10,6 +10,7 @@ import (
"tailscale.com/types/netlogtype"
"cdr.dev/slog/v3"
"github.com/coder/coder/v2/agent/agentutil"
"github.com/coder/coder/v2/agent/proto"
)
@@ -86,13 +87,13 @@ func (s *statsReporter) reportLoop(ctx context.Context, dest statsDest) error {
// use a separate goroutine to monitor the context so that we notice immediately, rather than
// waiting for the next callback (which might never come if we are closing!)
ctxDone := false
go func() {
agentutil.Go(ctx, s.logger, func() {
<-ctx.Done()
s.L.Lock()
defer s.L.Unlock()
ctxDone = true
s.Broadcast()
}()
})
defer s.logger.Debug(ctx, "reportLoop exiting")
s.L.Lock()

1
go.mod
View File

@@ -470,6 +470,7 @@ require (
)
require (
cdr.dev/slog v1.6.2-0.20251120224544-40ff19937ff2
github.com/anthropics/anthropic-sdk-go v1.19.0
github.com/brianvoe/gofakeit/v7 v7.14.0
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225

2
go.sum
View File

@@ -1,3 +1,5 @@
cdr.dev/slog v1.6.2-0.20251120224544-40ff19937ff2 h1:M4Z9eTbnHPdZI4GpBUNCae0lSgUucY+aW5j7+zB8lCk=
cdr.dev/slog v1.6.2-0.20251120224544-40ff19937ff2/go.mod h1:NaoTA7KwopCrnaSb0JXTC0PTp/O/Y83Lndnq0OEV3ZQ=
cdr.dev/slog/v3 v3.0.0-rc1 h1:EN7Zim6GvTpAeHQjI0ERDEfqKbTyXRvgH4UhlzLpvWM=
cdr.dev/slog/v3 v3.0.0-rc1/go.mod h1:iO/OALX1VxlI03mkodCGdVP7pXzd2bRMvu3ePvlJ9ak=
cel.dev/expr v0.24.0 h1:56OvJKSH3hDGL0ml5uSxZmz3/3Pq4tJ+fb1unVLAFcY=

View File

@@ -104,6 +104,17 @@ func testingWithOwnerUser(m dsl.Matcher) {
Report(`This client is operating as the owner user, which has unrestricted permissions. Consider creating a different user.`)
}
// doNotUseRawGoInAgent detects raw `go func()` in agent package.
// Use agentutil.Go() instead for panic recovery.
//
//nolint:unused,deadcode,varnamelen
func doNotUseRawGoInAgent(m dsl.Matcher) {
m.Match(`go func() { $*_ }()`, `go func($*_) { $*_ }($*_)`).
Where(m.File().PkgPath.Matches(`github\.com/coder/coder/v2/agent(/.*)?`) &&
!m.File().Name.Matches(`_test\.go$`)).
Report("Use agentutil.Go() instead of raw go func() for panic recovery")
}
// Use xerrors everywhere! It provides additional stacktrace info!
//
//nolint:unused,deadcode,varnamelen