Compare commits

...

1 Commits

Author SHA1 Message Date
Mathias Fredriksson 4e551efedc fix(agent): serialize pings in stats collection
Cap concurrent Ping calls in Collect() to one at a time using a
channel semaphore. During network disruption, N parallel Pings
pile up on magicsock.Conn.mu, and orphaned goroutines from expired
pingCtx still trigger the wgengine watchdog. With 4 peers this
meant ~80s of blocked goroutines vs the 45s watchdog threshold.

Extracts collectPeerLatencies() behind a peerPinger interface for
direct testing of the concurrency constraint.

Refs #22864
2026-03-09 22:44:01 +00:00
2 changed files with 171 additions and 37 deletions
+66 -37
View File
@@ -31,8 +31,10 @@ import (
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/timestamppb"
"tailscale.com/ipn/ipnstate"
"tailscale.com/net/speedtest"
"tailscale.com/tailcfg"
"tailscale.com/types/key"
"tailscale.com/types/netlogtype"
"tailscale.com/util/clientmetric"
@@ -1830,43 +1832,7 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
// Compute the median connection latency!
a.logger.Debug(ctx, "starting peer latency measurement for stats")
var wg sync.WaitGroup
var mu sync.Mutex
status := a.network.Status()
durations := []float64{}
p2pConns := 0
derpConns := 0
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
for nodeID, peer := range status.Peer {
if !peer.Active {
continue
}
addresses, found := a.network.NodeAddresses(nodeID)
if !found {
continue
}
if len(addresses) == 0 {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
duration, p2p, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
if err != nil {
return
}
mu.Lock()
defer mu.Unlock()
durations = append(durations, float64(duration.Microseconds()))
if p2p {
p2pConns++
} else {
derpConns++
}
}()
}
wg.Wait()
durations, p2pConns, derpConns := collectPeerLatencies(ctx, a.network)
sort.Float64s(durations)
durationsLength := len(durations)
switch {
@@ -1895,6 +1861,69 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
return stats
}
// peerPinger is the subset of tailnet.Conn used by
// collectPeerLatencies, extracted as an interface for testing.
type peerPinger interface {
Status() *ipnstate.Status
NodeAddresses(key.NodePublic) ([]netip.Prefix, bool)
Ping(context.Context, netip.Addr) (time.Duration, bool, *ipnstate.PingResult, error)
}
// collectPeerLatencies pings every active peer and returns per-ping
// durations (in microseconds), plus DERP and P2P connection counts.
// At most one Ping is in-flight at a time to avoid piling goroutines
// on magicsock.Conn.mu, which can trigger the wgengine watchdog
// during network disruption. See #22864.
func collectPeerLatencies(ctx context.Context, network peerPinger) (
durations []float64, p2pConns int, derpConns int,
) {
var wg sync.WaitGroup
var mu sync.Mutex
status := network.Status()
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
// Capacity-1 semaphore: at most one Ping in flight.
pingSem := make(chan struct{}, 1)
for nodeID, peer := range status.Peer {
if !peer.Active {
continue
}
addresses, found := network.NodeAddresses(nodeID)
if !found {
continue
}
if len(addresses) == 0 {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
select {
case pingSem <- struct{}{}:
case <-pingCtx.Done():
return
}
defer func() { <-pingSem }()
duration, p2p, _, err := network.Ping(pingCtx, addresses[0].Addr())
if err != nil {
return
}
mu.Lock()
defer mu.Unlock()
durations = append(durations, float64(duration.Microseconds()))
if p2p {
p2pConns++
} else {
derpConns++
}
}()
}
wg.Wait()
return durations, p2pConns, derpConns
}
// isClosed returns whether the API is closed or not.
func (a *agent) isClosed() bool {
return a.hardCtx.Err() != nil
+105
View File
@@ -1,10 +1,18 @@
package agent
import (
"context"
"net/netip"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go4.org/mem"
"tailscale.com/ipn/ipnstate"
"tailscale.com/types/key"
"cdr.dev/slog/v3"
"cdr.dev/slog/v3/sloggers/slogtest"
@@ -42,3 +50,100 @@ func TestReportConnectionEmpty(t *testing.T) {
require.Equal(t, proto.Connection_DISCONNECT, req1.GetConnection().GetAction())
require.Equal(t, "because", req1.GetConnection().GetReason())
}
// TestCollectPeerLatencies_SerializedPings verifies that
// collectPeerLatencies never has more than one Ping in flight at a
// time, preventing goroutine pile-ups on magicsock.Conn.mu that can
// trigger the wgengine watchdog. See #22864.
func TestCollectPeerLatencies_SerializedPings(t *testing.T) {
t.Parallel()
const numPeers = 5
const pingDelay = 10 * time.Millisecond
fp := &fakePingerNetwork{
pingDelay: pingDelay,
}
// Build N active peers, each with a unique address.
for i := range numPeers {
fp.addActivePeer(i)
}
ctx := testutil.Context(t, testutil.WaitShort)
durations, p2p, derp := collectPeerLatencies(ctx, fp)
// All peers should have been pinged successfully.
require.Len(t, durations, numPeers)
require.Equal(t, 0, p2p)
require.Equal(t, numPeers, derp)
// The semaphore must have prevented any concurrent pings.
maxSeen := fp.maxConcurrent.Load()
assert.EqualValues(t, 1, maxSeen,
"expected at most 1 concurrent Ping, got %d", maxSeen)
}
// fakePingerNetwork implements peerPinger for testing. It records the
// maximum number of concurrent Ping calls.
type fakePingerNetwork struct {
peers map[key.NodePublic]*ipnstate.PeerStatus
addrs map[key.NodePublic][]netip.Prefix
active atomic.Int32
// maxConcurrent tracks the high-water mark of in-flight pings.
maxConcurrent atomic.Int32
pingDelay time.Duration
}
func (f *fakePingerNetwork) addActivePeer(i int) {
if f.peers == nil {
f.peers = make(map[key.NodePublic]*ipnstate.PeerStatus)
f.addrs = make(map[key.NodePublic][]netip.Prefix)
}
// Deterministic key derived from index.
var raw [32]byte
raw[0] = byte(i)
pub := key.NodePublicFromRaw32(mem.B(raw[:]))
f.peers[pub] = &ipnstate.PeerStatus{Active: true}
addr := netip.AddrFrom4([4]byte{
10, 0, 0, byte(1 + i),
})
f.addrs[pub] = []netip.Prefix{
netip.PrefixFrom(addr, 32),
}
}
func (f *fakePingerNetwork) Status() *ipnstate.Status {
return &ipnstate.Status{Peer: f.peers}
}
func (f *fakePingerNetwork) NodeAddresses(
pub key.NodePublic,
) ([]netip.Prefix, bool) {
a, ok := f.addrs[pub]
return a, ok
}
func (f *fakePingerNetwork) Ping(
ctx context.Context, _ netip.Addr,
) (time.Duration, bool, *ipnstate.PingResult, error) {
cur := f.active.Add(1)
defer f.active.Add(-1)
// Record high-water mark via CAS loop.
for {
prev := f.maxConcurrent.Load()
if cur <= prev {
break
}
if f.maxConcurrent.CompareAndSwap(prev, cur) {
break
}
}
select {
case <-time.After(f.pingDelay):
case <-ctx.Done():
return 0, false, nil, ctx.Err()
}
return f.pingDelay, false, &ipnstate.PingResult{}, nil
}