Compare commits

...

11 Commits

Author SHA1 Message Date
Jon Ayers e85be9f42e fix(tailnet): give peers with no handshake full lostTimeout before removal 2026-04-10 05:43:18 +00:00
Jon Ayers cfd7730194 chore(enterprise/tailnet): add debug logging for LOST and DISCONNECTED peer updates 2026-04-10 00:40:57 +00:00
Jon Ayers 1937ada0cd fix: use enriched logger for HeartbeatClose, reduce AwaitReachable backoff to 5s 2026-04-09 23:45:59 +00:00
Jon Ayers d64cd6415d revert: move HeartbeatClose back before agent dial 2026-04-09 22:37:31 +00:00
Jon Ayers c1851d9453 chore(coderd/workspaceapps): add workspace_id and elapsed time to PTY dial logs 2026-04-09 22:35:44 +00:00
Jon Ayers 8f73453681 fix(coderd/workspaceapps): move HeartbeatClose after agent dial, add 1m setup timeout 2026-04-09 21:39:28 +00:00
Jon Ayers 165db3d31c perf(enterprise/tailnet): increase coordinator worker counts and batch size for 10k scale 2026-04-09 21:26:52 +00:00
Jon Ayers 1bd1516fd1 perf(tailnet): singleflight AwaitReachable to deduplicate concurrent ping storms 2026-04-09 19:34:22 +00:00
Jon Ayers 81ba35a987 fix(coderd/tailnet): move ensureAgent Send outside mutex using singleflight 2026-04-09 19:12:27 +00:00
Jon Ayers 53d63cf8e9 perf(coderd/database/pubsub): batch-drain msgQueue to amortize lock overhead
Replace the one-at-a-time dequeue loop in msgQueue.run() with a batch
drain that copies up to 256 messages per lock acquisition. This
amortizes mutex acquire/release and cond.Wait costs across many
messages, improving drain throughput during bursts and reducing the
likelihood of ring buffer overflow.
2026-04-08 00:02:29 +00:00
Jon Ayers 4213a43b53 fix(enterprise/tailnet): async singleflight-coalesced resyncPeerMappings in pubsub callbacks
Replace synchronous resyncPeerMappings() calls in listenPeer and
listenTunnel with async goroutines using singleflight.Do. This
prevents blocking the pubsub drain goroutine when ErrDroppedMessages
arrives, avoiding cascading buffer overflows.
2026-04-08 00:02:21 +00:00
7 changed files with 255 additions and 94 deletions
+27 -17
View File
@@ -81,8 +81,8 @@ func newMsgQueue(ctx context.Context, l Listener, le ListenerWithErr) *msgQueue
}
func (q *msgQueue) run() {
var batch [maxDrainBatch]msgOrErr
for {
// wait until there is something on the queue or we are closed
q.cond.L.Lock()
for q.size == 0 && !q.closed {
q.cond.Wait()
@@ -91,28 +91,32 @@ func (q *msgQueue) run() {
q.cond.L.Unlock()
return
}
item := q.q[q.front]
q.front = (q.front + 1) % BufferSize
q.size--
// Drain up to maxDrainBatch items while holding the lock.
n := min(q.size, maxDrainBatch)
for i := range n {
batch[i] = q.q[q.front]
q.front = (q.front + 1) % BufferSize
}
q.size -= n
q.cond.L.Unlock()
// process item without holding lock
if item.err == nil {
// real message
if q.l != nil {
q.l(q.ctx, item.msg)
// Dispatch each message individually without holding the lock.
for i := range n {
item := batch[i]
if item.err == nil {
if q.l != nil {
q.l(q.ctx, item.msg)
continue
}
if q.le != nil {
q.le(q.ctx, item.msg, nil)
continue
}
continue
}
if q.le != nil {
q.le(q.ctx, item.msg, nil)
continue
q.le(q.ctx, nil, item.err)
}
// unhittable
continue
}
// if the listener wants errors, send it.
if q.le != nil {
q.le(q.ctx, nil, item.err)
}
}
}
@@ -233,6 +237,12 @@ type PGPubsub struct {
// for a subscriber before dropping messages.
const BufferSize = 2048
// maxDrainBatch is the maximum number of messages to drain from the ring
// buffer per iteration. Batching amortizes the cost of mutex
// acquire/release and cond.Wait across many messages, improving drain
// throughput during bursts.
const maxDrainBatch = 256
// Subscribe calls the listener when an event matching the name is received.
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return p.subscribeQueue(event, newMsgQueue(context.Background(), listener, nil))
+37 -19
View File
@@ -19,6 +19,7 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/singleflight"
"golang.org/x/xerrors"
"tailscale.com/derp"
"tailscale.com/tailcfg"
@@ -389,6 +390,7 @@ type MultiAgentController struct {
// connections to the destination
tickets map[uuid.UUID]map[uuid.UUID]struct{}
coordination *tailnet.BasicCoordination
sendGroup singleflight.Group
cancel context.CancelFunc
expireOldAgentsDone chan struct{}
@@ -418,28 +420,44 @@ func (m *MultiAgentController) New(client tailnet.CoordinatorClient) tailnet.Clo
func (m *MultiAgentController) ensureAgent(agentID uuid.UUID) error {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.connectionTimes[agentID]
// If we don't have the agent, subscribe.
if !ok {
m.logger.Debug(context.Background(),
"subscribing to agent", slog.F("agent_id", agentID))
if m.coordination != nil {
err := m.coordination.Client.Send(&proto.CoordinateRequest{
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
})
if err != nil {
err = xerrors.Errorf("subscribe agent: %w", err)
m.coordination.SendErr(err)
_ = m.coordination.Client.Close()
m.coordination = nil
return err
}
}
m.tickets[agentID] = map[uuid.UUID]struct{}{}
if ok {
m.connectionTimes[agentID] = time.Now()
m.mu.Unlock()
return nil
}
m.mu.Unlock()
m.logger.Debug(context.Background(),
"subscribing to agent", slog.F("agent_id", agentID))
_, err, _ := m.sendGroup.Do(agentID.String(), func() (interface{}, error) {
m.mu.Lock()
coord := m.coordination
m.mu.Unlock()
if coord == nil {
return nil, xerrors.New("no active coordination")
}
err := coord.Client.Send(&proto.CoordinateRequest{
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
})
if err != nil {
return nil, err
}
m.mu.Lock()
m.tickets[agentID] = map[uuid.UUID]struct{}{}
m.mu.Unlock()
return nil, nil
})
if err != nil {
m.logger.Error(context.Background(), "ensureAgent send failed",
slog.F("agent_id", agentID), slog.Error(err))
return xerrors.Errorf("send AddTunnel: %w", err)
}
m.mu.Lock()
m.connectionTimes[agentID] = time.Now()
m.mu.Unlock()
return nil
}
+13 -8
View File
@@ -730,7 +730,10 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
if !ok {
return
}
log := s.Logger.With(slog.F("agent_id", appToken.AgentID))
log := s.Logger.With(
slog.F("agent_id", appToken.AgentID),
slog.F("workspace_id", appToken.WorkspaceID),
)
log.Debug(ctx, "resolved PTY request")
values := r.URL.Query()
@@ -765,19 +768,21 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
})
return
}
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)
ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.
go httpapi.HeartbeatClose(ctx, log, cancel, conn)
dialStart := time.Now()
agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
if err != nil {
log.Debug(ctx, "dial workspace agent", slog.Error(err))
log.Debug(ctx, "dial workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
return
}
defer release()
log.Debug(ctx, "dialed workspace agent")
log.Debug(ctx, "dialed workspace agent", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
// #nosec G115 - Safe conversion for terminal height/width which are expected to be within uint16 range (0-65535)
ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command"), func(arp *workspacesdk.AgentReconnectingPTYInit) {
arp.Container = container
@@ -785,12 +790,12 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
arp.BackendType = backendType
})
if err != nil {
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err))
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err))
return
}
defer ptNetConn.Close()
log.Debug(ctx, "obtained PTY")
log.Debug(ctx, "obtained PTY", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
report := newStatsReportFromSignedToken(*appToken)
s.collectStats(report)
@@ -800,7 +805,7 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
}()
agentssh.Bicopy(ctx, wsNetConn, ptNetConn)
log.Debug(ctx, "pty Bicopy finished")
log.Debug(ctx, "pty Bicopy finished", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
}
func (s *Server) collectStats(stats StatsReport) {
+27 -7
View File
@@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"golang.org/x/sync/singleflight"
"golang.org/x/xerrors"
gProto "google.golang.org/protobuf/proto"
@@ -33,9 +34,9 @@ const (
eventReadyForHandshake = "tailnet_ready_for_handshake"
HeartbeatPeriod = time.Second * 2
MissedHeartbeats = 3
numQuerierWorkers = 10
numQuerierWorkers = 40
numBinderWorkers = 10
numTunnelerWorkers = 10
numTunnelerWorkers = 20
numHandshakerWorkers = 5
dbMaxBackoff = 10 * time.Second
cleanupPeriod = time.Hour
@@ -770,6 +771,9 @@ func (m *mapper) bestToUpdate(best map[uuid.UUID]mapping) *proto.CoordinateRespo
for k := range m.sent {
if _, ok := best[k]; !ok {
m.logger.Debug(m.ctx, "peer no longer in best mappings, sending DISCONNECTED",
slog.F("peer_id", k),
)
resp.PeerUpdates = append(resp.PeerUpdates, &proto.CoordinateResponse_PeerUpdate{
Id: agpl.UUIDToByteSlice(k),
Kind: proto.CoordinateResponse_PeerUpdate_DISCONNECTED,
@@ -820,6 +824,8 @@ type querier struct {
mu sync.Mutex
mappers map[mKey]*mapper
healthy bool
resyncGroup singleflight.Group
}
func newQuerier(ctx context.Context,
@@ -958,7 +964,7 @@ func (q *querier) cleanupConn(c *connIO) {
// maxBatchSize is the maximum number of keys to process in a single batch
// query.
const maxBatchSize = 50
const maxBatchSize = 200
func (q *querier) peerUpdateWorker() {
defer q.wg.Done()
@@ -1207,8 +1213,13 @@ func (q *querier) subscribe() {
func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
q.logger.Warn(q.ctx, "pubsub may have dropped peer updates")
// we need to schedule a full resync of peer mappings
q.resyncPeerMappings()
// Schedule a full resync asynchronously so we don't block the
// pubsub drain goroutine. Singleflight coalesces concurrent
// resync requests.
go q.resyncGroup.Do("resync", func() (any, error) {
q.resyncPeerMappings()
return nil, nil
})
return
}
if err != nil {
@@ -1234,8 +1245,13 @@ func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
func (q *querier) listenTunnel(_ context.Context, msg []byte, err error) {
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
q.logger.Warn(q.ctx, "pubsub may have dropped tunnel updates")
// we need to schedule a full resync of peer mappings
q.resyncPeerMappings()
// Schedule a full resync asynchronously so we don't block the
// pubsub drain goroutine. Singleflight coalesces concurrent
// resync requests.
go q.resyncGroup.Do("resync", func() (any, error) {
q.resyncPeerMappings()
return nil, nil
})
return
}
if err != nil {
@@ -1601,6 +1617,10 @@ func (h *heartbeats) filter(mappings []mapping) []mapping {
// the only mapping available for it. Newer mappings will take
// precedence.
m.kind = proto.CoordinateResponse_PeerUpdate_LOST
h.logger.Debug(h.ctx, "mapping rewritten to LOST due to missed heartbeats",
slog.F("peer_id", m.peer),
slog.F("coordinator_id", m.coordinator),
)
}
}
+10 -1
View File
@@ -732,7 +732,16 @@ func (l *peerLifecycle) setLostTimer(c *configMaps) {
if l.lostTimer != nil {
l.lostTimer.Stop()
}
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
var ttl time.Duration
if l.lastHandshake.IsZero() {
// Peer has never completed a handshake. Give it the full
// lostTimeout to establish one rather than deleting it
// immediately. A zero lastHandshake just means WireGuard
// hasn't connected yet, not that the peer is gone.
ttl = lostTimeout
} else {
ttl = lostTimeout - c.clock.Since(l.lastHandshake)
}
if ttl <= 0 {
ttl = time.Nanosecond
}
+91
View File
@@ -641,6 +641,97 @@ func TestConfigMaps_updatePeers_lost(t *testing.T) {
_ = testutil.TryReceive(ctx, t, done)
}
func TestConfigMaps_updatePeers_lost_zero_handshake(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
fEng := newFakeEngineConfigurable()
nodePrivateKey := key.NewNode()
nodeID := tailcfg.NodeID(5)
discoKey := key.NewDisco()
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), CoderDNSSuffixFQDN)
defer uut.close()
mClock := quartz.NewMock(t)
uut.clock = mClock
p1ID := uuid.UUID{1}
p1Node := newTestNode(1)
p1n, err := NodeToProto(p1Node)
require.NoError(t, err)
// Respond to the status request from updatePeers(NODE) with no
// handshake information, so lastHandshake stays zero.
expectNoStatus := func() <-chan struct{} {
called := make(chan struct{})
go func() {
select {
case <-ctx.Done():
t.Error("timeout waiting for status")
return
case b := <-fEng.status:
_ = b // don't add any peer
}
select {
case <-ctx.Done():
t.Error("timeout sending done")
case fEng.statusDone <- struct{}{}:
close(called)
}
}()
return called
}
// Add the peer via NODE update — no handshake in status.
s1 := expectNoStatus()
updates := []*proto.CoordinateResponse_PeerUpdate{
{
Id: p1ID[:],
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
Node: p1n,
},
}
uut.updatePeers(updates)
nm := testutil.TryReceive(ctx, t, fEng.setNetworkMap)
r := testutil.TryReceive(ctx, t, fEng.reconfig)
require.Len(t, nm.Peers, 1)
require.Len(t, r.wg.Peers, 1)
_ = testutil.TryReceive(ctx, t, s1)
// Mark the peer as LOST, still with no handshake.
s2 := expectNoStatus()
updates[0].Kind = proto.CoordinateResponse_PeerUpdate_LOST
updates[0].Node = nil
uut.updatePeers(updates)
_ = testutil.TryReceive(ctx, t, s2)
// Peer should NOT be removed immediately.
select {
case <-fEng.setNetworkMap:
t.Fatal("should not reprogram")
default:
// OK!
}
// Prepare a status response for when the lost timer fires after
// lostTimeout. Return empty status (no handshake ever happened).
s3 := expectNoStatus()
mClock.Advance(lostTimeout).MustWait(ctx)
_ = testutil.TryReceive(ctx, t, s3)
// Now the peer should be removed.
nm = testutil.TryReceive(ctx, t, fEng.setNetworkMap)
r = testutil.TryReceive(ctx, t, fEng.reconfig)
require.Len(t, nm.Peers, 0)
require.Len(t, r.wg.Peers, 0)
done := make(chan struct{})
go func() {
defer close(done)
uut.close()
}()
_ = testutil.TryReceive(ctx, t, done)
}
func TestConfigMaps_updatePeers_lost_and_found(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
+50 -42
View File
@@ -12,6 +12,8 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/tailscale/wireguard-go/tun"
@@ -463,6 +465,8 @@ type Conn struct {
trafficStats *connstats.Statistics
lastNetInfo *tailcfg.NetInfo
awaitReachableGroup singleflight.Group
}
func (c *Conn) GetNetInfo() *tailcfg.NetInfo {
@@ -599,56 +603,60 @@ func (c *Conn) DERPMap() *tailcfg.DERPMap {
// address is reachable. It's the callers responsibility to provide
// a timeout, otherwise this function will block forever.
func (c *Conn) AwaitReachable(ctx context.Context, ip netip.Addr) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Cancel all pending pings on exit.
result, _, _ := c.awaitReachableGroup.Do(ip.String(), func() (interface{}, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Cancel all pending pings on exit.
completedCtx, completed := context.WithCancel(context.Background())
defer completed()
completedCtx, completed := context.WithCancel(context.Background())
defer completed()
run := func() {
// Safety timeout, initially we'll have around 10-20 goroutines
// running in parallel. The exponential backoff will converge
// around ~1 ping / 30s, this means we'll have around 10-20
// goroutines pending towards the end as well.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
run := func() {
// Safety timeout, initially we'll have around 10-20 goroutines
// running in parallel. The exponential backoff will converge
// around ~1 ping / 30s, this means we'll have around 10-20
// goroutines pending towards the end as well.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
// For reachability, we use TSMP ping, which pings at the IP layer, and
// therefore requires that wireguard and the netstack are up. If we
// don't wait for wireguard to be up, we could miss a handshake, and it
// might take 5 seconds for the handshake to be retried. A 5s initial
// round trip can set us up for poor TCP performance, since the initial
// round-trip-time sets the initial retransmit timeout.
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
if err == nil {
completed()
// For reachability, we use TSMP ping, which pings at the IP layer,
// and therefore requires that wireguard and the netstack are up.
// If we don't wait for wireguard to be up, we could miss a
// handshake, and it might take 5 seconds for the handshake to be
// retried. A 5s initial round trip can set us up for poor TCP
// performance, since the initial round-trip-time sets the initial
// retransmit timeout.
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
if err == nil {
completed()
}
}
}
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0
eb.InitialInterval = 50 * time.Millisecond
eb.MaxInterval = 30 * time.Second
// Consume the first interval since
// we'll fire off a ping immediately.
_ = eb.NextBackOff()
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0
eb.InitialInterval = 50 * time.Millisecond
eb.MaxInterval = 5 * time.Second
// Consume the first interval since
// we'll fire off a ping immediately.
_ = eb.NextBackOff()
t := backoff.NewTicker(eb)
defer t.Stop()
t := backoff.NewTicker(eb)
defer t.Stop()
go run()
for {
select {
case <-completedCtx.Done():
return true
case <-t.C:
// Pings can take a while, so we can run multiple
// in parallel to return ASAP.
go run()
case <-ctx.Done():
return false
go run()
for {
select {
case <-completedCtx.Done():
return true, nil
case <-t.C:
// Pings can take a while, so we can run multiple
// in parallel to return ASAP.
go run()
case <-ctx.Done():
return false, nil
}
}
}
})
return result.(bool)
}
// Closed is a channel that ends when the connection has