Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4e551efedc |
+66
-37
@@ -31,8 +31,10 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/net/speedtest"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/key"
|
||||
"tailscale.com/types/netlogtype"
|
||||
"tailscale.com/util/clientmetric"
|
||||
|
||||
@@ -1830,43 +1832,7 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
|
||||
|
||||
// Compute the median connection latency!
|
||||
a.logger.Debug(ctx, "starting peer latency measurement for stats")
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
status := a.network.Status()
|
||||
durations := []float64{}
|
||||
p2pConns := 0
|
||||
derpConns := 0
|
||||
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancelFunc()
|
||||
for nodeID, peer := range status.Peer {
|
||||
if !peer.Active {
|
||||
continue
|
||||
}
|
||||
addresses, found := a.network.NodeAddresses(nodeID)
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
if len(addresses) == 0 {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
duration, p2p, _, err := a.network.Ping(pingCtx, addresses[0].Addr())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
durations = append(durations, float64(duration.Microseconds()))
|
||||
if p2p {
|
||||
p2pConns++
|
||||
} else {
|
||||
derpConns++
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
durations, p2pConns, derpConns := collectPeerLatencies(ctx, a.network)
|
||||
sort.Float64s(durations)
|
||||
durationsLength := len(durations)
|
||||
switch {
|
||||
@@ -1895,6 +1861,69 @@ func (a *agent) Collect(ctx context.Context, networkStats map[netlogtype.Connect
|
||||
return stats
|
||||
}
|
||||
|
||||
// peerPinger is the subset of tailnet.Conn used by
|
||||
// collectPeerLatencies, extracted as an interface for testing.
|
||||
type peerPinger interface {
|
||||
Status() *ipnstate.Status
|
||||
NodeAddresses(key.NodePublic) ([]netip.Prefix, bool)
|
||||
Ping(context.Context, netip.Addr) (time.Duration, bool, *ipnstate.PingResult, error)
|
||||
}
|
||||
|
||||
// collectPeerLatencies pings every active peer and returns per-ping
|
||||
// durations (in microseconds), plus DERP and P2P connection counts.
|
||||
// At most one Ping is in-flight at a time to avoid piling goroutines
|
||||
// on magicsock.Conn.mu, which can trigger the wgengine watchdog
|
||||
// during network disruption. See #22864.
|
||||
func collectPeerLatencies(ctx context.Context, network peerPinger) (
|
||||
durations []float64, p2pConns int, derpConns int,
|
||||
) {
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
status := network.Status()
|
||||
pingCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancelFunc()
|
||||
|
||||
// Capacity-1 semaphore: at most one Ping in flight.
|
||||
pingSem := make(chan struct{}, 1)
|
||||
|
||||
for nodeID, peer := range status.Peer {
|
||||
if !peer.Active {
|
||||
continue
|
||||
}
|
||||
addresses, found := network.NodeAddresses(nodeID)
|
||||
if !found {
|
||||
continue
|
||||
}
|
||||
if len(addresses) == 0 {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case pingSem <- struct{}{}:
|
||||
case <-pingCtx.Done():
|
||||
return
|
||||
}
|
||||
defer func() { <-pingSem }()
|
||||
duration, p2p, _, err := network.Ping(pingCtx, addresses[0].Addr())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
durations = append(durations, float64(duration.Microseconds()))
|
||||
if p2p {
|
||||
p2pConns++
|
||||
} else {
|
||||
derpConns++
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return durations, p2pConns, derpConns
|
||||
}
|
||||
|
||||
// isClosed returns whether the API is closed or not.
|
||||
func (a *agent) isClosed() bool {
|
||||
return a.hardCtx.Err() != nil
|
||||
|
||||
@@ -1,10 +1,18 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/netip"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go4.org/mem"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/types/key"
|
||||
|
||||
"cdr.dev/slog/v3"
|
||||
"cdr.dev/slog/v3/sloggers/slogtest"
|
||||
@@ -42,3 +50,100 @@ func TestReportConnectionEmpty(t *testing.T) {
|
||||
require.Equal(t, proto.Connection_DISCONNECT, req1.GetConnection().GetAction())
|
||||
require.Equal(t, "because", req1.GetConnection().GetReason())
|
||||
}
|
||||
|
||||
// TestCollectPeerLatencies_SerializedPings verifies that
|
||||
// collectPeerLatencies never has more than one Ping in flight at a
|
||||
// time, preventing goroutine pile-ups on magicsock.Conn.mu that can
|
||||
// trigger the wgengine watchdog. See #22864.
|
||||
func TestCollectPeerLatencies_SerializedPings(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const numPeers = 5
|
||||
const pingDelay = 10 * time.Millisecond
|
||||
|
||||
fp := &fakePingerNetwork{
|
||||
pingDelay: pingDelay,
|
||||
}
|
||||
// Build N active peers, each with a unique address.
|
||||
for i := range numPeers {
|
||||
fp.addActivePeer(i)
|
||||
}
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
durations, p2p, derp := collectPeerLatencies(ctx, fp)
|
||||
|
||||
// All peers should have been pinged successfully.
|
||||
require.Len(t, durations, numPeers)
|
||||
require.Equal(t, 0, p2p)
|
||||
require.Equal(t, numPeers, derp)
|
||||
|
||||
// The semaphore must have prevented any concurrent pings.
|
||||
maxSeen := fp.maxConcurrent.Load()
|
||||
assert.EqualValues(t, 1, maxSeen,
|
||||
"expected at most 1 concurrent Ping, got %d", maxSeen)
|
||||
}
|
||||
|
||||
// fakePingerNetwork implements peerPinger for testing. It records the
|
||||
// maximum number of concurrent Ping calls.
|
||||
type fakePingerNetwork struct {
|
||||
peers map[key.NodePublic]*ipnstate.PeerStatus
|
||||
addrs map[key.NodePublic][]netip.Prefix
|
||||
active atomic.Int32
|
||||
// maxConcurrent tracks the high-water mark of in-flight pings.
|
||||
maxConcurrent atomic.Int32
|
||||
pingDelay time.Duration
|
||||
}
|
||||
|
||||
func (f *fakePingerNetwork) addActivePeer(i int) {
|
||||
if f.peers == nil {
|
||||
f.peers = make(map[key.NodePublic]*ipnstate.PeerStatus)
|
||||
f.addrs = make(map[key.NodePublic][]netip.Prefix)
|
||||
}
|
||||
// Deterministic key derived from index.
|
||||
var raw [32]byte
|
||||
raw[0] = byte(i)
|
||||
pub := key.NodePublicFromRaw32(mem.B(raw[:]))
|
||||
f.peers[pub] = &ipnstate.PeerStatus{Active: true}
|
||||
addr := netip.AddrFrom4([4]byte{
|
||||
10, 0, 0, byte(1 + i),
|
||||
})
|
||||
f.addrs[pub] = []netip.Prefix{
|
||||
netip.PrefixFrom(addr, 32),
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakePingerNetwork) Status() *ipnstate.Status {
|
||||
return &ipnstate.Status{Peer: f.peers}
|
||||
}
|
||||
|
||||
func (f *fakePingerNetwork) NodeAddresses(
|
||||
pub key.NodePublic,
|
||||
) ([]netip.Prefix, bool) {
|
||||
a, ok := f.addrs[pub]
|
||||
return a, ok
|
||||
}
|
||||
|
||||
func (f *fakePingerNetwork) Ping(
|
||||
ctx context.Context, _ netip.Addr,
|
||||
) (time.Duration, bool, *ipnstate.PingResult, error) {
|
||||
cur := f.active.Add(1)
|
||||
defer f.active.Add(-1)
|
||||
|
||||
// Record high-water mark via CAS loop.
|
||||
for {
|
||||
prev := f.maxConcurrent.Load()
|
||||
if cur <= prev {
|
||||
break
|
||||
}
|
||||
if f.maxConcurrent.CompareAndSwap(prev, cur) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(f.pingDelay):
|
||||
case <-ctx.Done():
|
||||
return 0, false, nil, ctx.Err()
|
||||
}
|
||||
return f.pingDelay, false, &ipnstate.PingResult{}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user