Compare commits
5 Commits
devtools/0
...
wsproxy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aefc75133a | ||
|
|
b9181c3934 | ||
|
|
a90471db53 | ||
|
|
cb71f5e789 | ||
|
|
f50707bc3e |
@@ -146,8 +146,12 @@ func NewWorkspaceProxyReplica(t *testing.T, coderdAPI *coderd.API, owner *coders
|
||||
|
||||
logger := testutil.Logger(t).With(slog.F("server_url", serverURL.String()))
|
||||
|
||||
// nolint: forcetypeassert // This is a stdlib transport it's unnecessary to type assert especially in tests.
|
||||
wssrv, err := wsproxy.New(ctx, &wsproxy.Options{
|
||||
Logger: logger,
|
||||
Logger: logger,
|
||||
// It's important to ensure each test has its own isolated transport to avoid interfering with other tests
|
||||
// especially in shutdown.
|
||||
HTTPClient: &http.Client{Transport: http.DefaultTransport.(*http.Transport).Clone()},
|
||||
Experiments: options.Experiments,
|
||||
DashboardURL: coderdAPI.AccessURL,
|
||||
AccessURL: accessURL,
|
||||
|
||||
162
enterprise/wsproxy/derpmetrics/collector.go
Normal file
162
enterprise/wsproxy/derpmetrics/collector.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package derpmetrics
|
||||
|
||||
import (
|
||||
"expvar"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"tailscale.com/derp"
|
||||
)
|
||||
|
||||
// NewCollector returns a prometheus.Collector that bridges the
|
||||
// derp.Server's expvar-based stats into Prometheus metrics.
|
||||
func NewCollector(server *derp.Server) prometheus.Collector {
|
||||
return &collector{server: server}
|
||||
}
|
||||
|
||||
const (
|
||||
namespace = "coder"
|
||||
subsystem = "wsproxy_derp"
|
||||
)
|
||||
|
||||
// Simple counter metrics keyed by their expvar name.
|
||||
var counterMetrics = map[string]*prometheus.Desc{
|
||||
"accepts": desc("accepts_total", "Total number of accepted connections."),
|
||||
"bytes_received": desc("bytes_received_total", "Total bytes received."),
|
||||
"bytes_sent": desc("bytes_sent_total", "Total bytes sent."),
|
||||
"packets_sent": desc("packets_sent_total", "Total packets sent."),
|
||||
"packets_received": desc("packets_received_total", "Total packets received."),
|
||||
"packets_dropped": desc("packets_dropped_total_unlabeled", "Total packets dropped (unlabeled aggregate)."),
|
||||
"packets_forwarded_out": desc("packets_forwarded_out_total", "Total packets forwarded out."),
|
||||
"packets_forwarded_in": desc("packets_forwarded_in_total", "Total packets forwarded in."),
|
||||
"home_moves_in": desc("home_moves_in_total", "Total home moves in."),
|
||||
"home_moves_out": desc("home_moves_out_total", "Total home moves out."),
|
||||
"got_ping": desc("got_ping_total", "Total pings received."),
|
||||
"sent_pong": desc("sent_pong_total", "Total pongs sent."),
|
||||
"unknown_frames": desc("unknown_frames_total", "Total unknown frames received."),
|
||||
"peer_gone_disconnected_frames": desc("peer_gone_disconnected_frames_total", "Total peer-gone-disconnected frames sent."),
|
||||
"peer_gone_not_here_frames": desc("peer_gone_not_here_frames_total", "Total peer-gone-not-here frames sent."),
|
||||
"multiforwarder_created": desc("multiforwarder_created_total", "Total multiforwarders created."),
|
||||
"multiforwarder_deleted": desc("multiforwarder_deleted_total", "Total multiforwarders deleted."),
|
||||
"packet_forwarder_delete_other_value": desc("packet_forwarder_delete_other_value_total", "Total packet forwarder delete-other-value events."),
|
||||
"counter_total_dup_client_conns": desc("duplicate_client_conns_total", "Total duplicate client connections."),
|
||||
}
|
||||
|
||||
// Simple gauge metrics keyed by their expvar name.
|
||||
var gaugeMetrics = map[string]*prometheus.Desc{
|
||||
"gauge_current_connections": desc("current_connections", "Current number of connections."),
|
||||
"gauge_current_home_connections": desc("current_home_connections", "Current number of home connections."),
|
||||
"gauge_watchers": desc("watchers", "Current number of watchers."),
|
||||
"gauge_current_file_descriptors": desc("current_file_descriptors", "Current number of file descriptors."),
|
||||
"gauge_clients_total": desc("clients_total", "Current total number of clients."),
|
||||
"gauge_clients_local": desc("clients_local", "Current number of local clients."),
|
||||
"gauge_clients_remote": desc("clients_remote", "Current number of remote clients."),
|
||||
"gauge_current_dup_client_keys": desc("current_duplicate_client_keys", "Current number of duplicate client keys."),
|
||||
"gauge_current_dup_client_conns": desc("current_duplicate_client_conns", "Current number of duplicate client connections."),
|
||||
}
|
||||
|
||||
// Labeled counter metrics (nested metrics.Set) with their label name.
|
||||
var labeledCounterMetrics = map[string]struct {
|
||||
desc *prometheus.Desc
|
||||
labelName string
|
||||
}{
|
||||
"counter_packets_dropped_reason": {
|
||||
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "packets_dropped_total"), "Total packets dropped by reason.", []string{"reason"}, nil),
|
||||
labelName: "reason",
|
||||
},
|
||||
"counter_packets_dropped_type": {
|
||||
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "packets_dropped_by_type_total"), "Total packets dropped by type.", []string{"type"}, nil),
|
||||
labelName: "type",
|
||||
},
|
||||
"counter_packets_received_kind": {
|
||||
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "packets_received_by_kind_total"), "Total packets received by kind.", []string{"kind"}, nil),
|
||||
labelName: "kind",
|
||||
},
|
||||
"counter_tcp_rtt": {
|
||||
desc: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "tcp_rtt"), "TCP RTT measurements.", []string{"bucket"}, nil),
|
||||
labelName: "bucket",
|
||||
},
|
||||
}
|
||||
|
||||
var avgQueueDurationDesc = desc("average_queue_duration_seconds", "Average queue duration in seconds.")
|
||||
|
||||
func desc(name, help string) *prometheus.Desc {
|
||||
return prometheus.NewDesc(
|
||||
prometheus.BuildFQName(namespace, subsystem, name),
|
||||
help, nil, nil,
|
||||
)
|
||||
}
|
||||
|
||||
type collector struct {
|
||||
server *derp.Server
|
||||
}
|
||||
|
||||
var _ prometheus.Collector = (*collector)(nil)
|
||||
|
||||
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
|
||||
for _, d := range counterMetrics {
|
||||
ch <- d
|
||||
}
|
||||
for _, d := range gaugeMetrics {
|
||||
ch <- d
|
||||
}
|
||||
for _, m := range labeledCounterMetrics {
|
||||
ch <- m.desc
|
||||
}
|
||||
ch <- avgQueueDurationDesc
|
||||
}
|
||||
|
||||
func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
statsVar := c.server.ExpVar()
|
||||
|
||||
// The returned expvar.Var is a *metrics.Set which supports Do().
|
||||
type doer interface {
|
||||
Do(func(expvar.KeyValue))
|
||||
}
|
||||
d, ok := statsVar.(doer)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
d.Do(func(kv expvar.KeyValue) {
|
||||
// Counter metrics.
|
||||
if desc, ok := counterMetrics[kv.Key]; ok {
|
||||
if v, err := strconv.ParseFloat(kv.Value.String(), 64); err == nil {
|
||||
ch <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, v)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Gauge metrics.
|
||||
if desc, ok := gaugeMetrics[kv.Key]; ok {
|
||||
if v, err := strconv.ParseFloat(kv.Value.String(), 64); err == nil {
|
||||
ch <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, v)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Labeled counter metrics (nested metrics.Set).
|
||||
if lm, ok := labeledCounterMetrics[kv.Key]; ok {
|
||||
if nested, ok := kv.Value.(doer); ok {
|
||||
nested.Do(func(sub expvar.KeyValue) {
|
||||
if v, err := strconv.ParseFloat(sub.Value.String(), 64); err == nil {
|
||||
ch <- prometheus.MustNewConstMetric(lm.desc, prometheus.CounterValue, v, sub.Key)
|
||||
}
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Average queue duration: convert ms → seconds.
|
||||
if kv.Key == "average_queue_duration_ms" {
|
||||
s := kv.Value.String()
|
||||
// expvar.Func may return a quoted string or a number.
|
||||
s = strings.Trim(s, "\"")
|
||||
if v, err := strconv.ParseFloat(s, 64); err == nil {
|
||||
ch <- prometheus.MustNewConstMetric(avgQueueDurationDesc, prometheus.GaugeValue, v/1000.0)
|
||||
}
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
62
enterprise/wsproxy/derpmetrics/collector_test.go
Normal file
62
enterprise/wsproxy/derpmetrics/collector_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package derpmetrics_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
"tailscale.com/derp"
|
||||
"tailscale.com/types/key"
|
||||
|
||||
"github.com/coder/coder/v2/enterprise/wsproxy/derpmetrics"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
||||
|
||||
func TestCollector(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
_ = logger
|
||||
|
||||
srv := derp.NewServer(key.NewNode(), func(format string, args ...any) {
|
||||
t.Logf(format, args...)
|
||||
})
|
||||
defer srv.Close()
|
||||
|
||||
c := derpmetrics.NewCollector(srv)
|
||||
|
||||
t.Run("ImplementsCollector", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
var _ prometheus.Collector = c
|
||||
})
|
||||
|
||||
t.Run("RegisterAndCollect", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
err := reg.Register(c)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Gather metrics and ensure no errors.
|
||||
families, err := reg.Gather()
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, families)
|
||||
|
||||
// Check that at least some expected metric names are present.
|
||||
names := make(map[string]bool)
|
||||
for _, f := range families {
|
||||
names[f.GetName()] = true
|
||||
}
|
||||
|
||||
// These gauges should always be present (even if zero).
|
||||
require.True(t, names["coder_wsproxy_derp_current_connections"],
|
||||
"expected current_connections metric, got: %v", names)
|
||||
require.True(t, names["coder_wsproxy_derp_current_home_connections"],
|
||||
"expected current_home_connections metric, got: %v", names)
|
||||
})
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -36,6 +37,7 @@ import (
|
||||
"github.com/coder/coder/v2/coderd/tracing"
|
||||
"github.com/coder/coder/v2/coderd/workspaceapps"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/enterprise/wsproxy/derpmetrics"
|
||||
"github.com/coder/coder/v2/enterprise/derpmesh"
|
||||
"github.com/coder/coder/v2/enterprise/replicasync"
|
||||
"github.com/coder/coder/v2/enterprise/wsproxy/wsproxysdk"
|
||||
@@ -44,6 +46,12 @@ import (
|
||||
"github.com/coder/coder/v2/tailnet"
|
||||
)
|
||||
|
||||
// expWsproxyDERPOnce guards the global expvar.Publish call for the wsproxy
|
||||
// DERP server, similar to expDERPOnce in coderd. We use a different variable
|
||||
// name ("wsproxy_derp") to avoid conflicts when both run in the same process
|
||||
// during tests.
|
||||
var expWsproxyDERPOnce sync.Once
|
||||
|
||||
type Options struct {
|
||||
Logger slog.Logger
|
||||
Experiments codersdk.Experiments
|
||||
@@ -196,6 +204,13 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
|
||||
return nil, xerrors.Errorf("create DERP mesh tls config: %w", err)
|
||||
}
|
||||
derpServer := derp.NewServer(key.NewNode(), tailnet.Logger(opts.Logger.Named("net.derp")))
|
||||
if opts.PrometheusRegistry != nil {
|
||||
opts.PrometheusRegistry.MustRegister(derpmetrics.NewCollector(derpServer))
|
||||
}
|
||||
// Publish DERP server metrics via expvar, served at /debug/expvar.
|
||||
expWsproxyDERPOnce.Do(func() {
|
||||
expvar.Publish("wsproxy_derp", derpServer.ExpVar())
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
@@ -317,7 +332,31 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
|
||||
})
|
||||
|
||||
derpHandler := derphttp.Handler(derpServer)
|
||||
derpHandler, s.derpCloseFunc = tailnet.WithWebsocketSupport(derpServer, derpHandler)
|
||||
|
||||
// Prometheus metrics for DERP websocket connections.
|
||||
derpWSActiveConns := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "coder_wsproxy",
|
||||
Subsystem: "derp_websocket",
|
||||
Name: "active_connections",
|
||||
Help: "Number of active DERP websocket connections.",
|
||||
})
|
||||
derpWSBytesTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "coder_wsproxy",
|
||||
Subsystem: "derp_websocket",
|
||||
Name: "bytes_total",
|
||||
Help: "Total bytes flowing through DERP websocket connections.",
|
||||
}, []string{"direction"})
|
||||
if opts.PrometheusRegistry != nil {
|
||||
opts.PrometheusRegistry.MustRegister(derpWSActiveConns, derpWSBytesTotal)
|
||||
}
|
||||
|
||||
derpHandler, s.derpCloseFunc = tailnet.WithWebsocketSupportAndMetrics(
|
||||
derpServer, derpHandler, &tailnet.DERPWebsocketMetrics{
|
||||
OnConnOpen: func() { derpWSActiveConns.Inc() },
|
||||
OnConnClose: func() { derpWSActiveConns.Dec() },
|
||||
OnRead: func(n int) { derpWSBytesTotal.WithLabelValues("read").Add(float64(n)) },
|
||||
OnWrite: func(n int) { derpWSBytesTotal.WithLabelValues("write").Add(float64(n)) },
|
||||
})
|
||||
|
||||
// The primary coderd dashboard needs to make some GET requests to
|
||||
// the workspace proxies to check latency.
|
||||
@@ -420,6 +459,7 @@ func New(ctx context.Context, opts *Options) (*Server, error) {
|
||||
r.Get("/healthz", func(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("OK")) })
|
||||
// TODO: @emyrk should this be authenticated or debounced?
|
||||
r.Get("/healthz-report", s.healthReport)
|
||||
r.Method("GET", "/debug/expvar", expvar.Handler())
|
||||
r.NotFound(func(rw http.ResponseWriter, r *http.Request) {
|
||||
site.RenderStaticErrorPage(rw, r, site.ErrorPageData{
|
||||
Title: "Head to the Dashboard",
|
||||
|
||||
@@ -453,6 +453,7 @@ func (l *RegisterWorkspaceProxyLoop) failureFn(err error) {
|
||||
if deregisterErr != nil {
|
||||
l.opts.Logger.Error(context.Background(),
|
||||
"failed to deregister workspace proxy with Coder primary (it will be automatically deregistered shortly)",
|
||||
slog.F("root_error", err.Error()),
|
||||
slog.Error(deregisterErr),
|
||||
)
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"context"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@@ -21,7 +22,14 @@ import (
|
||||
// connections to the "derp" subprotocol to WebSockets and
|
||||
// passes them to the DERP server.
|
||||
// Taken from: https://github.com/tailscale/tailscale/blob/e3211ff88ba85435f70984cf67d9b353f3d650d8/cmd/derper/websocket.go#L21
|
||||
func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func()) {
|
||||
func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func()) { return WithWebsocketSupportAndMetrics(s, base, nil)
|
||||
}
|
||||
|
||||
// WithWebsocketSupportAndMetrics is like WithWebsocketSupport but
|
||||
// also instruments connections using the provided metrics hooks.
|
||||
// If metrics is nil, no instrumentation is applied.
|
||||
func WithWebsocketSupportAndMetrics(s *derp.Server, base http.Handler, metrics *DERPWebsocketMetrics) (http.Handler, func()) {
|
||||
|
||||
var mu sync.Mutex
|
||||
var waitGroup sync.WaitGroup
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
@@ -64,9 +72,15 @@ func WithWebsocketSupport(s *derp.Server, base http.Handler) (http.Handler, func
|
||||
c.Close(websocket.StatusPolicyViolation, "client must speak the derp subprotocol")
|
||||
return
|
||||
}
|
||||
wc := websocket.NetConn(ctx, c, websocket.MessageBinary)
|
||||
brw := bufio.NewReadWriter(bufio.NewReader(wc), bufio.NewWriter(wc))
|
||||
s.Accept(ctx, wc, brw, r.RemoteAddr)
|
||||
var conn net.Conn = websocket.NetConn(ctx, c, websocket.MessageBinary)
|
||||
if metrics != nil {
|
||||
if metrics.OnConnOpen != nil {
|
||||
metrics.OnConnOpen()
|
||||
}
|
||||
conn = newCountingConn(conn, metrics)
|
||||
}
|
||||
brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
|
||||
s.Accept(ctx, conn, brw, r.RemoteAddr)
|
||||
}), func() {
|
||||
cancelFunc()
|
||||
mu.Lock()
|
||||
|
||||
61
tailnet/derp_metrics.go
Normal file
61
tailnet/derp_metrics.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package tailnet
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// DERPWebsocketMetrics provides hooks for instrumenting DERP
|
||||
// websocket connections. All methods must be safe for concurrent
|
||||
// use.
|
||||
type DERPWebsocketMetrics struct {
|
||||
// OnConnOpen is called when a DERP websocket connection is
|
||||
// accepted.
|
||||
OnConnOpen func()
|
||||
// OnConnClose is called when a DERP websocket connection is
|
||||
// closed.
|
||||
OnConnClose func()
|
||||
// OnRead is called after a successful Read with the number
|
||||
// of bytes read.
|
||||
OnRead func(n int)
|
||||
// OnWrite is called after a successful Write with the number
|
||||
// of bytes written.
|
||||
OnWrite func(n int)
|
||||
}
|
||||
|
||||
// countingConn wraps a net.Conn and reports bytes read/written
|
||||
// through the provided callbacks.
|
||||
type countingConn struct {
|
||||
net.Conn
|
||||
metrics *DERPWebsocketMetrics
|
||||
closed atomic.Bool
|
||||
}
|
||||
|
||||
func newCountingConn(conn net.Conn, m *DERPWebsocketMetrics) *countingConn {
|
||||
return &countingConn{Conn: conn, metrics: m}
|
||||
}
|
||||
|
||||
func (c *countingConn) Read(b []byte) (int, error) {
|
||||
n, err := c.Conn.Read(b)
|
||||
if n > 0 && c.metrics.OnRead != nil {
|
||||
c.metrics.OnRead(n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *countingConn) Write(b []byte) (int, error) {
|
||||
n, err := c.Conn.Write(b)
|
||||
if n > 0 && c.metrics.OnWrite != nil {
|
||||
c.metrics.OnWrite(n)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (c *countingConn) Close() error {
|
||||
if c.closed.CompareAndSwap(false, true) {
|
||||
if c.metrics.OnConnClose != nil {
|
||||
c.metrics.OnConnClose()
|
||||
}
|
||||
}
|
||||
return c.Conn.Close()
|
||||
}
|
||||
Reference in New Issue
Block a user