Compare commits

...

5 Commits

Author SHA1 Message Date
Jon Ayers
aefc75133a fix: use separate http.Transports for wsproxy tests 2026-02-24 23:02:37 +00:00
Jon Ayers
b9181c3934 feat(wsproxy): add /debug/expvar endpoint for DERP server stats 2026-02-21 00:19:41 +00:00
Jon Ayers
a90471db53 feat(monitoring): add wsproxy DERP section to Grafana dashboard
Adds a new 'Workspace Proxy - DERP' row with 6 panels:
- DERP Connections (current connections and home connections)
- DERP Client Breakdown (local, remote, total)
- DERP Throughput (bytes received/sent rate)
- DERP Packets (received/sent/forwarded rate)
- DERP Packet Drops (by reason label)
- DERP Queue Duration (average queue duration)
2026-02-20 23:44:24 +00:00
Jon Ayers
cb71f5e789 feat(wsproxy): add DERP websocket throughput metrics
Add Prometheus metrics tracking active DERP websocket connections and
bytes relayed through the wsproxy:

- coder_wsproxy_derp_websocket_active_connections (gauge)
- coder_wsproxy_derp_websocket_bytes_total (counter, direction=read|write)

Implementation adds a DERPWebsocketMetrics hook struct and countingConn
wrapper in tailnet/, and a new WithWebsocketSupportAndMetrics function
that instruments the websocket connection lifecycle. The existing
WithWebsocketSupport function delegates to the new one with nil metrics.
2026-02-20 23:44:21 +00:00
Jon Ayers
f50707bc3e feat(wsproxy): add Prometheus collector for DERP server expvar metrics
Create a prometheus.Collector that bridges the tailscale derp.Server's
expvar-based stats to Prometheus metrics with namespace coder, subsystem
wsproxy_derp. Handles counters, gauges, labeled metrics (nested
metrics.Set for drop reasons, packet types, etc.), and the average
queue duration (converted from ms to seconds).

Register the collector in the wsproxy server after derpServer creation.
2026-02-20 23:40:03 +00:00
8 changed files with 2005 additions and 1009 deletions

View File

@@ -146,8 +146,12 @@ func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *coders
logger := testutil.Logger(t).With(slog.F("server_url", serverURL.String()))
// nolint: forcetypeassert // This is a stdlib transport it's unnecessary to type assert especially in tests.
wssrv, err := wsproxy.New(ctx, &wsproxy.Options{
Logger: logger,
Logger: logger,
// It's important to ensure each test has its own isolated transport to avoid interfering with other tests
// especially in shutdown.
HTTPClient: &http.Client{Transport: http.DefaultTransport.(*http.Transport).Clone()},
Experiments: options.Experiments,
DashboardURL: coderdAPI.AccessURL,
AccessURL: accessURL,

View File

@@ -0,0 +1,162 @@
package derpmetrics
import (
"expvar"
"strconv"
"strings"
"github.com/prometheus/client_golang/prometheus"
"tailscale.com/derp"
)
// NewCollector returns a prometheus.Collector that bridges the
// derp.Server's expvar-based stats into Prometheus metrics.
func NewCollector(server *derp.Server) prometheus.Collector {
return &collector{server: server}
}
const (
namespace = "coder"
subsystem = "wsproxy_derp"
)
// Simple counter metrics keyed by their expvar name.
var counterMetrics = map[string]*prometheus.Desc{
"accepts": desc("accepts_total", "Total number of accepted connections."),
"bytes_received": desc("bytes_received_total", "Total bytes received."),
"bytes_sent": desc("bytes_sent_total", "Total bytes sent."),
"packets_sent": desc("packets_sent_total", "Total packets sent."),
"packets_received": desc("packets_received_total", "Total packets received."),
"packets_dropped": desc("packets_dropped_total_unlabeled", "Total packets dropped (unlabeled aggregate)."),
"packets_forwarded_out": desc("packets_forwarded_out_total", "Total packets forwarded out."),
"packets_forwarded_in": desc("packets_forwarded_in_total", "Total packets forwarded in."),
"home_moves_in": desc("home_moves_in_total", "Total home moves in."),
"home_moves_out": desc("home_moves_out_total", "Total home moves out."),
"got_ping": desc("got_ping_total", "Total pings received."),
"sent_pong": desc("sent_pong_total", "Total pongs sent."),
"unknown_frames": desc("unknown_frames_total", "Total unknown frames received."),
"peer_gone_disconnected_frames": desc("peer_gone_disconnected_frames_total", "Total peer-gone-disconnected frames sent."),
"peer_gone_not_here_frames": desc("peer_gone_not_here_frames_total", "Total peer-gone-not-here frames sent."),
"multiforwarder_created": desc("multiforwarder_created_total", "Total multiforwarders created."),
"multiforwarder_deleted": desc("multiforwarder_deleted_total", "Total multiforwarders deleted."),
"packet_forwarder_delete_other_value": desc("packet_forwarder_delete_other_value_total", "Total packet forwarder delete-other-value events."),
"counter_total_dup_client_conns": desc("duplicate_client_conns_total", "Total duplicate client connections."),
}
// Simple gauge metrics keyed by their expvar name.
var gaugeMetrics = map[string]*prometheus.Desc{
"gauge_current_connections": desc("current_connections", "Current number of connections."),
"gauge_current_home_connections": desc("current_home_connections", "Current number of home connections."),
"gauge_watchers": desc("watchers", "Current number of watchers."),
"gauge_current_file_descriptors": desc("current_file_descriptors", "Current number of file descriptors."),
"gauge_clients_total": desc("clients_total", "Current total number of clients."),
"gauge_clients_local": desc("clients_local", "Current number of local clients."),
"gauge_clients_remote": desc("clients_remote", "Current number of remote clients."),
"gauge_current_dup_client_keys": desc("current_duplicate_client_keys", "Current number of duplicate client keys."),
"gauge_current_dup_client_conns": desc("current_duplicate_client_conns", "Current number of duplicate client connections."),
}
// Labeled counter metrics (nested metrics.Set) with their label name.
var labeledCounterMetrics = map[string]struct {
desc *prometheus.Desc
labelName string
}{
"counter_packets_dropped_reason": {
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "packets_dropped_total"), "Total packets dropped by reason.", []string{"reason"}, nil),
labelName: "reason",
},
"counter_packets_dropped_type": {
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "packets_dropped_by_type_total"), "Total packets dropped by type.", []string{"type"}, nil),
labelName: "type",
},
"counter_packets_received_kind": {
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "packets_received_by_kind_total"), "Total packets received by kind.", []string{"kind"}, nil),
labelName: "kind",
},
"counter_tcp_rtt": {
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "tcp_rtt"), "TCP RTT measurements.", []string{"bucket"}, nil),
labelName: "bucket",
},
}
var avgQueueDurationDesc = desc("average_queue_duration_seconds", "Average queue duration in seconds.")
func desc(name, help string) *prometheus.Desc {
return prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, name),
help, nil, nil,
)
}
type collector struct {
server *derp.Server
}
var _ prometheus.Collector = (*collector)(nil)
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
for _, d := range counterMetrics {
ch <- d
}
for _, d := range gaugeMetrics {
ch <- d
}
for _, m := range labeledCounterMetrics {
ch <- m.desc
}
ch <- avgQueueDurationDesc
}
func (c *collector) Collect(ch chan<- prometheus.Metric) {
statsVar := c.server.ExpVar()
// The returned expvar.Var is a *metrics.Set which supports Do().
type doer interface {
Do(func(expvar.KeyValue))
}
d, ok := statsVar.(doer)
if !ok {
return
}
d.Do(func(kv expvar.KeyValue) {
// Counter metrics.
if desc, ok := counterMetrics[kv.Key]; ok {
if v, err := strconv.ParseFloat(kv.Value.String(), 64); err == nil {
ch <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, v)
}
return
}
// Gauge metrics.
if desc, ok := gaugeMetrics[kv.Key]; ok {
if v, err := strconv.ParseFloat(kv.Value.String(), 64); err == nil {
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, v)
}
return
}
// Labeled counter metrics (nested metrics.Set).
if lm, ok := labeledCounterMetrics[kv.Key]; ok {
if nested, ok := kv.Value.(doer); ok {
nested.Do(func(sub expvar.KeyValue) {
if v, err := strconv.ParseFloat(sub.Value.String(), 64); err == nil {
ch <- prometheus.MustNewConstMetric(lm.desc, prometheus.CounterValue, v, sub.Key)
}
})
}
return
}
// Average queue duration: convert ms → seconds.
if kv.Key == "average_queue_duration_ms" {
s := kv.Value.String()
// expvar.Func may return a quoted string or a number.
s = strings.Trim(s, "\"")
if v, err := strconv.ParseFloat(s, 64); err == nil {
ch <- prometheus.MustNewConstMetric(avgQueueDurationDesc, prometheus.GaugeValue, v/1000.0)
}
return
}
})
}

View File

@@ -0,0 +1,62 @@
package derpmetrics_test
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"tailscale.com/derp"
"tailscale.com/types/key"
"github.com/coder/coder/v2/enterprise/wsproxy/derpmetrics"
"github.com/coder/coder/v2/testutil"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestCollector(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
_ = logger
srv := derp.NewServer(key.NewNode(), func(format string, args ...any) {
t.Logf(format, args...)
})
defer srv.Close()
c := derpmetrics.NewCollector(srv)
t.Run("ImplementsCollector", func(t *testing.T) {
t.Parallel()
var _ prometheus.Collector = c
})
t.Run("RegisterAndCollect", func(t *testing.T) {
t.Parallel()
reg := prometheus.NewRegistry()
err := reg.Register(c)
require.NoError(t, err)
// Gather metrics and ensure no errors.
families, err := reg.Gather()
require.NoError(t, err)
require.NotEmpty(t, families)
// Check that at least some expected metric names are present.
names := make(map[string]bool)
for _, f := range families {
names[f.GetName()] = true
}
// These gauges should always be present (even if zero).
require.True(t, names["coder_wsproxy_derp_current_connections"],
"expected current_connections metric, got: %v", names)
require.True(t, names["coder_wsproxy_derp_current_home_connections"],
"expected current_home_connections metric, got: %v", names)
})
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"expvar"
"fmt"
"net/http"
"net/url"
@@ -36,6 +37,7 @@ import (
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/enterprise/wsproxy/derpmetrics"
"github.com/coder/coder/v2/enterprise/derpmesh"
"github.com/coder/coder/v2/enterprise/replicasync"
"github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk"
@@ -44,6 +46,12 @@ import (
"github.com/coder/coder/v2/tailnet"
)
// expWsproxyDERPOnce guards the global expvar.Publish call for the wsproxy
// DERP server, similar to expDERPOnce in coderd. We use a different variable
// name ("wsproxy_derp") to avoid conflicts when both run in the same process
// during tests.
var expWsproxyDERPOnce sync.Once
type Options struct {
Logger slog.Logger
Experiments codersdk.Experiments
@@ -196,6 +204,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
return nil, xerrors.Errorf("create DERP mesh tls config: %w", err)
}
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("net.derp")))
if opts.PrometheusRegistry != nil {
opts.PrometheusRegistry.MustRegister(derpmetrics.NewCollector(derpServer))
}
// Publish DERP server metrics via expvar, served at /debug/expvar.
expWsproxyDERPOnce.Do(func() {
expvar.Publish("wsproxy_derp", derpServer.ExpVar())
})
ctx, cancel := context.WithCancel(context.Background())
@@ -317,7 +332,31 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
})
derpHandler := derphttp.Handler(derpServer)
derpHandler, s.derpCloseFunc = tailnet.WithWebsocketSupport(derpServer, derpHandler)
// Prometheus metrics for DERP websocket connections.
derpWSActiveConns := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "coder_wsproxy",
Subsystem: "derp_websocket",
Name: "active_connections",
Help: "Number of active DERP websocket connections.",
})
derpWSBytesTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder_wsproxy",
Subsystem: "derp_websocket",
Name: "bytes_total",
Help: "Total bytes flowing through DERP websocket connections.",
}, []string{"direction"})
if opts.PrometheusRegistry != nil {
opts.PrometheusRegistry.MustRegister(derpWSActiveConns, derpWSBytesTotal)
}
derpHandler, s.derpCloseFunc = tailnet.WithWebsocketSupportAndMetrics(
derpServer, derpHandler, &tailnet.DERPWebsocketMetrics{
OnConnOpen: func() { derpWSActiveConns.Inc() },
OnConnClose: func() { derpWSActiveConns.Dec() },
OnRead: func(n int) { derpWSBytesTotal.WithLabelValues("read").Add(float64(n)) },
OnWrite: func(n int) { derpWSBytesTotal.WithLabelValues("write").Add(float64(n)) },
})
// The primary coderd dashboard needs to make some GET requests to
// the workspace proxies to check latency.
@@ -420,6 +459,7 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
r.Get("/healthz", func(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("OK")) })
// TODO: @emyrk should this be authenticated or debounced?
r.Get("/healthz-report", s.healthReport)
r.Method("GET", "/debug/expvar", expvar.Handler())
r.NotFound(func(rw http.ResponseWriter, r *http.Request) {
site.RenderStaticErrorPage(rw, r, site.ErrorPageData{
Title: "Head to the Dashboard",

View File

@@ -453,6 +453,7 @@ func (l *RegisterWorkspaceProxyLoop) failureFn(err error) {
if deregisterErr != nil {
l.opts.Logger.Error(context.Background(),
"failed to deregister workspace proxy with Coder primary (it will be automatically deregistered shortly)",
slog.F("root_error", err.Error()),
slog.Error(deregisterErr),
)
}

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,7 @@ import (
"bufio"
"context"
"log"
"net"
"net/http"
"net/url"
"strconv"
@@ -21,7 +22,14 @@ import (
// connections to the "derp" subprotocol to WebSockets and
// passes them to the DERP server.
// Taken from: https://github.com/tailscale/tailscale/blob/e3211ff88ba85435f70984cf67d9b353f3d650d8/cmd/derper/websocket.go#L21
func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func()) {
func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func()) { return WithWebsocketSupportAndMetrics(s, base, nil)
}
// WithWebsocketSupportAndMetrics is like WithWebsocketSupport but
// also instruments connections using the provided metrics hooks.
// If metrics is nil, no instrumentation is applied.
func WithWebsocketSupportAndMetrics(s *derp.Server, base http.Handler, metrics *DERPWebsocketMetrics) (http.Handler, func()) {
var mu sync.Mutex
var waitGroup sync.WaitGroup
ctx, cancelFunc := context.WithCancel(context.Background())
@@ -64,9 +72,15 @@ func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func
c.Close(websocket.StatusPolicyViolation, "client must speak the derp subprotocol")
return
}
wc := websocket.NetConn(ctx, c, websocket.MessageBinary)
brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc))
s.Accept(ctx, wc, brw, r.RemoteAddr)
var conn net.Conn = websocket.NetConn(ctx, c, websocket.MessageBinary)
if metrics != nil {
if metrics.OnConnOpen != nil {
metrics.OnConnOpen()
}
conn = newCountingConn(conn, metrics)
}
brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
s.Accept(ctx, conn, brw, r.RemoteAddr)
}), func() {
cancelFunc()
mu.Lock()

61
tailnet/derp_metrics.go Normal file
View File

@@ -0,0 +1,61 @@
package tailnet
import (
"net"
"sync/atomic"
)
// DERPWebsocketMetrics provides hooks for instrumenting DERP
// websocket connections. All methods must be safe for concurrent
// use.
type DERPWebsocketMetrics struct {
// OnConnOpen is called when a DERP websocket connection is
// accepted.
OnConnOpen func()
// OnConnClose is called when a DERP websocket connection is
// closed.
OnConnClose func()
// OnRead is called after a successful Read with the number
// of bytes read.
OnRead func(n int)
// OnWrite is called after a successful Write with the number
// of bytes written.
OnWrite func(n int)
}
// countingConn wraps a net.Conn and reports bytes read/written
// through the provided callbacks.
type countingConn struct {
net.Conn
metrics *DERPWebsocketMetrics
closed atomic.Bool
}
func newCountingConn(conn net.Conn, m *DERPWebsocketMetrics) *countingConn {
return &countingConn{Conn: conn, metrics: m}
}
func (c *countingConn) Read(b []byte) (int, error) {
n, err := c.Conn.Read(b)
if n > 0 && c.metrics.OnRead != nil {
c.metrics.OnRead(n)
}
return n, err
}
func (c *countingConn) Write(b []byte) (int, error) {
n, err := c.Conn.Write(b)
if n > 0 && c.metrics.OnWrite != nil {
c.metrics.OnWrite(n)
}
return n, err
}
func (c *countingConn) Close() error {
if c.closed.CompareAndSwap(false, true) {
if c.metrics.OnConnClose != nil {
c.metrics.OnConnClose()
}
}
return c.Conn.Close()
}