Compare commits

..

10 Commits

Author SHA1 Message Date
Jon Ayers cfd7730194 chore(enterprise/tailnet): add debug logging for LOST and DISCONNECTED peer updates 2026-04-10 00:40:57 +00:00
Jon Ayers 1937ada0cd fix: use enriched logger for HeartbeatClose, reduce AwaitReachable backoff to 5s 2026-04-09 23:45:59 +00:00
Jon Ayers d64cd6415d revert: move HeartbeatClose back before agent dial 2026-04-09 22:37:31 +00:00
Jon Ayers c1851d9453 chore(coderd/workspaceapps): add workspace_id and elapsed time to PTY dial logs 2026-04-09 22:35:44 +00:00
Jon Ayers 8f73453681 fix(coderd/workspaceapps): move HeartbeatClose after agent dial, add 1m setup timeout 2026-04-09 21:39:28 +00:00
Jon Ayers 165db3d31c perf(enterprise/tailnet): increase coordinator worker counts and batch size for 10k scale 2026-04-09 21:26:52 +00:00
Jon Ayers 1bd1516fd1 perf(tailnet): singleflight AwaitReachable to deduplicate concurrent ping storms 2026-04-09 19:34:22 +00:00
Jon Ayers 81ba35a987 fix(coderd/tailnet): move ensureAgent Send outside mutex using singleflight 2026-04-09 19:12:27 +00:00
Jon Ayers 53d63cf8e9 perf(coderd/database/pubsub): batch-drain msgQueue to amortize lock overhead
Replace the one-at-a-time dequeue loop in msgQueue.run() with a batch
drain that copies up to 256 messages per lock acquisition. This
amortizes mutex acquire/release and cond.Wait costs across many
messages, improving drain throughput during bursts and reducing the
likelihood of ring buffer overflow.
2026-04-08 00:02:29 +00:00
Jon Ayers 4213a43b53 fix(enterprise/tailnet): async singleflight-coalesced resyncPeerMappings in pubsub callbacks
Replace synchronous resyncPeerMappings() calls in listenPeer and
listenTunnel with async goroutines using singleflight.Do. This
prevents blocking the pubsub drain goroutine when ErrDroppedMessages
arrives, avoiding cascading buffer overflows.
2026-04-08 00:02:21 +00:00
26 changed files with 340 additions and 474 deletions
-1
View File
@@ -134,7 +134,6 @@ func TestUserCreate(t *testing.T) {
{
name: "ServiceAccount",
args: []string{"--service-account", "-u", "dean"},
err: "Premium feature",
},
{
name: "ServiceAccountLoginType",
+5 -16
View File
@@ -123,10 +123,6 @@ func UsersPagination(
require.Contains(t, gotUsers[0].Name, "after")
}
type UsersFilterOptions struct {
CreateServiceAccounts bool
}
// UsersFilter creates a set of users to run various filters against for
// testing. It can be used to test filtering both users and group members.
func UsersFilter(
@@ -134,16 +130,11 @@ func UsersFilter(
t *testing.T,
client *codersdk.Client,
db database.Store,
options *UsersFilterOptions,
setup func(users []codersdk.User),
fetch func(ctx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser,
) {
t.Helper()
if options == nil {
options = &UsersFilterOptions{}
}
firstUser, err := client.User(setupCtx, codersdk.Me)
require.NoError(t, err, "fetch me")
@@ -220,13 +211,11 @@ func UsersFilter(
}
// Add some service accounts.
if options.CreateServiceAccounts {
for range 3 {
_, user := CreateAnotherUserMutators(t, client, orgID, nil, func(r *codersdk.CreateUserRequestWithOrgs) {
r.ServiceAccount = true
})
users = append(users, user)
}
for range 3 {
_, user := CreateAnotherUserMutators(t, client, orgID, nil, func(r *codersdk.CreateUserRequestWithOrgs) {
r.ServiceAccount = true
})
users = append(users, user)
}
hashedPassword, err := userpassword.Hash("SomeStrongPassword!")
+27 -17
View File
@@ -81,8 +81,8 @@ func newMsgQueue(ctx context.Context, l Listener, le ListenerWithErr) *msgQueue
}
func (q *msgQueue) run() {
var batch [maxDrainBatch]msgOrErr
for {
// wait until there is something on the queue or we are closed
q.cond.L.Lock()
for q.size == 0 && !q.closed {
q.cond.Wait()
@@ -91,28 +91,32 @@ func (q *msgQueue) run() {
q.cond.L.Unlock()
return
}
item := q.q[q.front]
q.front = (q.front + 1) % BufferSize
q.size--
// Drain up to maxDrainBatch items while holding the lock.
n := min(q.size, maxDrainBatch)
for i := range n {
batch[i] = q.q[q.front]
q.front = (q.front + 1) % BufferSize
}
q.size -= n
q.cond.L.Unlock()
// process item without holding lock
if item.err == nil {
// real message
if q.l != nil {
q.l(q.ctx, item.msg)
// Dispatch each message individually without holding the lock.
for i := range n {
item := batch[i]
if item.err == nil {
if q.l != nil {
q.l(q.ctx, item.msg)
continue
}
if q.le != nil {
q.le(q.ctx, item.msg, nil)
continue
}
continue
}
if q.le != nil {
q.le(q.ctx, item.msg, nil)
continue
q.le(q.ctx, nil, item.err)
}
// unhittable
continue
}
// if the listener wants errors, send it.
if q.le != nil {
q.le(q.ctx, nil, item.err)
}
}
}
@@ -233,6 +237,12 @@ type PGPubsub struct {
// for a subscriber before dropping messages.
const BufferSize = 2048
// maxDrainBatch is the maximum number of messages to drain from the ring
// buffer per iteration. Batching amortizes the cost of mutex
// acquire/release and cond.Wait across many messages, improving drain
// throughput during bursts.
const maxDrainBatch = 256
// Subscribe calls the listener when an event matching the name is received.
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return p.subscribeQueue(event, newMsgQueue(context.Background(), listener, nil))
+1 -1
View File
@@ -148,7 +148,7 @@ func TestGetOrgMembersFilter(t *testing.T) {
setupCtx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
res, err := client.OrganizationMembersPaginated(testCtx, first.OrganizationID, req)
require.NoError(t, err)
reduced := make([]codersdk.ReducedUser, len(res.Members))
+37 -19
View File
@@ -19,6 +19,7 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/singleflight"
"golang.org/x/xerrors"
"tailscale.com/derp"
"tailscale.com/tailcfg"
@@ -389,6 +390,7 @@ type MultiAgentController struct {
// connections to the destination
tickets map[uuid.UUID]map[uuid.UUID]struct{}
coordination *tailnet.BasicCoordination
sendGroup singleflight.Group
cancel context.CancelFunc
expireOldAgentsDone chan struct{}
@@ -418,28 +420,44 @@ func (m *MultiAgentController) New(client tailnet.CoordinatorClient) tailnet.Clo
func (m *MultiAgentController) ensureAgent(agentID uuid.UUID) error {
m.mu.Lock()
defer m.mu.Unlock()
_, ok := m.connectionTimes[agentID]
// If we don't have the agent, subscribe.
if !ok {
m.logger.Debug(context.Background(),
"subscribing to agent", slog.F("agent_id", agentID))
if m.coordination != nil {
err := m.coordination.Client.Send(&proto.CoordinateRequest{
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
})
if err != nil {
err = xerrors.Errorf("subscribe agent: %w", err)
m.coordination.SendErr(err)
_ = m.coordination.Client.Close()
m.coordination = nil
return err
}
}
m.tickets[agentID] = map[uuid.UUID]struct{}{}
if ok {
m.connectionTimes[agentID] = time.Now()
m.mu.Unlock()
return nil
}
m.mu.Unlock()
m.logger.Debug(context.Background(),
"subscribing to agent", slog.F("agent_id", agentID))
_, err, _ := m.sendGroup.Do(agentID.String(), func() (interface{}, error) {
m.mu.Lock()
coord := m.coordination
m.mu.Unlock()
if coord == nil {
return nil, xerrors.New("no active coordination")
}
err := coord.Client.Send(&proto.CoordinateRequest{
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
})
if err != nil {
return nil, err
}
m.mu.Lock()
m.tickets[agentID] = map[uuid.UUID]struct{}{}
m.mu.Unlock()
return nil, nil
})
if err != nil {
m.logger.Error(context.Background(), "ensureAgent send failed",
slog.F("agent_id", agentID), slog.Error(err))
return xerrors.Errorf("send AddTunnel: %w", err)
}
m.mu.Lock()
m.connectionTimes[agentID] = time.Now()
m.mu.Unlock()
return nil
}
-8
View File
@@ -475,14 +475,6 @@ func (api *API) postUser(rw http.ResponseWriter, r *http.Request) {
}
req.UserLoginType = codersdk.LoginTypeNone
// Service accounts are a Premium feature.
if !api.Entitlements.Enabled(codersdk.FeatureServiceAccounts) {
httpapi.Write(ctx, rw, http.StatusForbidden, codersdk.Response{
Message: fmt.Sprintf("%s is a Premium feature. Contact sales!", codersdk.FeatureServiceAccounts.Humanize()),
})
return
}
} else if req.UserLoginType == "" {
// Default to password auth
req.UserLoginType = codersdk.LoginTypePassword
+114 -6
View File
@@ -979,7 +979,28 @@ func TestPostUsers(t *testing.T) {
require.Equal(t, found.LoginType, codersdk.LoginTypeOIDC)
})
t.Run("ServiceAccount/Unlicensed", func(t *testing.T) {
t.Run("ServiceAccount/OK", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
first := coderdtest.CreateFirstUser(t, client)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-ok",
UserLoginType: codersdk.LoginTypeNone,
ServiceAccount: true,
})
require.NoError(t, err)
require.Equal(t, codersdk.LoginTypeNone, user.LoginType)
require.Empty(t, user.Email)
require.Equal(t, "service-acct-ok", user.Username)
require.Equal(t, codersdk.UserStatusDormant, user.Status)
})
t.Run("ServiceAccount/WithEmail", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
first := coderdtest.CreateFirstUser(t, client)
@@ -989,14 +1010,75 @@ func TestPostUsers(t *testing.T) {
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-ok",
UserLoginType: codersdk.LoginTypeNone,
Username: "service-acct-email",
Email: "should-not-have@email.com",
ServiceAccount: true,
})
var apiErr *codersdk.Error
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusForbidden, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Premium feature")
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Email cannot be set for service accounts")
})
t.Run("ServiceAccount/WithPassword", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
first := coderdtest.CreateFirstUser(t, client)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-password",
Password: "ShouldNotHavePassword123!",
ServiceAccount: true,
})
var apiErr *codersdk.Error
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Password cannot be set for service accounts")
})
t.Run("ServiceAccount/WithInvalidLoginType", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
first := coderdtest.CreateFirstUser(t, client)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-login-type",
UserLoginType: codersdk.LoginTypePassword,
ServiceAccount: true,
})
var apiErr *codersdk.Error
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Service accounts must use login type 'none'")
})
t.Run("ServiceAccount/DefaultLoginType", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
first := coderdtest.CreateFirstUser(t, client)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-default-login",
ServiceAccount: true,
})
require.NoError(t, err)
found, err := client.User(ctx, user.ID.String())
require.NoError(t, err)
require.Equal(t, codersdk.LoginTypeNone, found.LoginType)
require.Empty(t, found.Email)
})
t.Run("NonServiceAccount/WithoutEmail", func(t *testing.T) {
@@ -1016,6 +1098,32 @@ func TestPostUsers(t *testing.T) {
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
})
t.Run("ServiceAccount/MultipleWithoutEmail", func(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, nil)
first := coderdtest.CreateFirstUser(t, client)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
user1, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-multi-1",
ServiceAccount: true,
})
require.NoError(t, err)
require.Empty(t, user1.Email)
user2, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-multi-2",
ServiceAccount: true,
})
require.NoError(t, err)
require.Empty(t, user2.Email)
require.NotEqual(t, user1.ID, user2.ID)
})
}
func TestNotifyCreatedUser(t *testing.T) {
@@ -1724,7 +1832,7 @@ func TestGetUsersFilter(t *testing.T) {
setupCtx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
res, err := client.Users(testCtx, req)
require.NoError(t, err)
reduced := make([]codersdk.ReducedUser, len(res.Users))
+13 -8
View File
@@ -730,7 +730,10 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
if !ok {
return
}
log := s.Logger.With(slog.F("agent_id", appToken.AgentID))
log := s.Logger.With(
slog.F("agent_id", appToken.AgentID),
slog.F("workspace_id", appToken.WorkspaceID),
)
log.Debug(ctx, "resolved PTY request")
values := r.URL.Query()
@@ -765,19 +768,21 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
})
return
}
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)
ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
defer wsNetConn.Close() // Also closes conn.
go httpapi.HeartbeatClose(ctx, log, cancel, conn)
dialStart := time.Now()
agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
if err != nil {
log.Debug(ctx, "dial workspace agent", slog.Error(err))
log.Debug(ctx, "dial workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
return
}
defer release()
log.Debug(ctx, "dialed workspace agent")
log.Debug(ctx, "dialed workspace agent", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
// #nosec G115 - Safe conversion for terminal height/width which are expected to be within uint16 range (0-65535)
ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command"), func(arp *workspacesdk.AgentReconnectingPTYInit) {
arp.Container = container
@@ -785,12 +790,12 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
arp.BackendType = backendType
})
if err != nil {
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err))
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err))
return
}
defer ptNetConn.Close()
log.Debug(ctx, "obtained PTY")
log.Debug(ctx, "obtained PTY", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
report := newStatsReportFromSignedToken(*appToken)
s.collectStats(report)
@@ -800,7 +805,7 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
}()
agentssh.Bicopy(ctx, wsNetConn, ptNetConn)
log.Debug(ctx, "pty Bicopy finished")
log.Debug(ctx, "pty Bicopy finished", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
}
func (s *Server) collectStats(stats StatsReport) {
+1 -4
View File
@@ -196,7 +196,6 @@ const (
FeatureWorkspaceExternalAgent FeatureName = "workspace_external_agent"
FeatureAIBridge FeatureName = "aibridge"
FeatureBoundary FeatureName = "boundary"
FeatureServiceAccounts FeatureName = "service_accounts"
FeatureAIGovernanceUserLimit FeatureName = "ai_governance_user_limit"
)
@@ -228,7 +227,6 @@ var (
FeatureWorkspaceExternalAgent,
FeatureAIBridge,
FeatureBoundary,
FeatureServiceAccounts,
FeatureAIGovernanceUserLimit,
}
@@ -277,7 +275,6 @@ func (n FeatureName) AlwaysEnable() bool {
FeatureWorkspacePrebuilds: true,
FeatureWorkspaceExternalAgent: true,
FeatureBoundary: true,
FeatureServiceAccounts: true,
}[n]
}
@@ -285,7 +282,7 @@ func (n FeatureName) AlwaysEnable() bool {
func (n FeatureName) Enterprise() bool {
switch n {
// Add all features that should be excluded in the Enterprise feature set.
case FeatureMultipleOrganizations, FeatureCustomRoles, FeatureServiceAccounts:
case FeatureMultipleOrganizations, FeatureCustomRoles:
return false
default:
return true
+8 -15
View File
@@ -1,38 +1,31 @@
# Headless Authentication
> [!NOTE]
> Creating service accounts requires a [Premium license](https://coder.com/pricing).
Headless user accounts that cannot use the web UI to log in to Coder. This is
useful for creating accounts for automated systems, such as CI/CD pipelines or
for users who only consume Coder via another client/API.
Service accounts are headless user accounts that cannot use the web UI to log in
to Coder. This is useful for creating accounts for automated systems, such as
CI/CD pipelines or for users who only consume Coder via another client/API. Service accounts do not have passwords or associated email addresses.
You must have the User Admin role or above to create headless users.
You must have the User Admin role or above to create service accounts.
## Create a service account
## Create a headless user
<div class="tabs">
## CLI
Use the `--service-account` flag to create a dedicated service account:
```sh
coder users create \
--email="coder-bot@coder.com" \
--username="coder-bot" \
--service-account
--login-type="none" \
```
## UI
Navigate to **Deployment** > **Users** > **Create user**, then select
**Service account** as the login type.
Navigate to the `Users` > `Create user` in the topbar
![Create a user via the UI](../../images/admin/users/headless-user.png)
</div>
## Authenticate as a service account
To make API or CLI requests on behalf of the headless user, learn how to
[generate API tokens on behalf of a user](./sessions-tokens.md#generate-a-long-lived-api-token-on-behalf-of-another-user).
+1 -2
View File
@@ -495,8 +495,7 @@
{
"title": "Headless Authentication",
"description": "Create and manage headless service accounts for automated systems and API integrations",
"path": "./admin/users/headless-auth.md",
"state": ["premium"]
"path": "./admin/users/headless-auth.md"
},
{
"title": "Groups \u0026 Roles",
@@ -468,10 +468,9 @@ func (b *DBBatcher) retryLoop() {
func (b *DBBatcher) retryBatch(params database.BatchUpsertConnectionLogsParams) {
count := len(params.ID)
for attempt := range maxRetries {
t := b.clock.NewTimer(retryInterval, "connectionLogBatcher", "retryBackoff")
t := time.NewTimer(retryInterval)
select {
case <-b.ctx.Done():
t.Stop()
b.shutdownBatch(params)
return
case <-t.C:
@@ -355,20 +355,15 @@ func Test_batcherFlush(t *testing.T) {
store := dbmock.NewMockStore(ctrl)
clock := quartz.NewMock(t)
// Trap the capacity flush (fires when batch reaches maxBatchSize).
capacityTrap := clock.Trap().TimerReset("connectionLogBatcher", "capacityFlush")
defer capacityTrap.Close()
scheduledTrap := clock.Trap().TimerReset("connectionLogBatcher", "scheduledFlush")
defer scheduledTrap.Close()
// Trap the retry backoff timer created by retryBatch.
retryTrap := clock.Trap().NewTimer("connectionLogBatcher", "retryBackoff")
defer retryTrap.Close()
// Batch size of 1: consuming the item triggers an immediate
// capacity flush, avoiding the timer/itemCh select race.
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(1))
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(100))
evt := fakeConnectEvent(uuid.New(), "agent1", uuid.New())
// First call (synchronous in flush) fails, then the
// retry worker retries after the backoff and succeeds.
gomock.InOrder(
store.EXPECT().
BatchUpsertConnectionLogs(gomock.Any(), gomock.Any()).
@@ -385,15 +380,14 @@ func Test_batcherFlush(t *testing.T) {
require.NoError(t, b.Upsert(ctx, evt))
// Item consumed → capacity flush fires → transient error →
// batch queued to retryCh → timer reset trapped.
capacityTrap.MustWait(ctx).MustRelease(ctx)
// Retry worker creates a timer — trap it, release, advance.
retryCall := retryTrap.MustWait(ctx)
retryCall.MustRelease(ctx)
clock.Advance(retryInterval).MustWait(ctx)
// Trigger a scheduled flush while the batcher is still
// running. The synchronous write fails and queues to
// retryCh. The retry worker picks it up after a real-
// time 1s delay and succeeds.
clock.Advance(defaultFlushInterval).MustWait(ctx)
scheduledTrap.MustWait(ctx).MustRelease(ctx)
// Wait for the retry to complete (real-time 1s delay).
require.NoError(t, b.Close())
})
@@ -406,10 +400,10 @@ func Test_batcherFlush(t *testing.T) {
store := dbmock.NewMockStore(ctrl)
clock := quartz.NewMock(t)
capacityTrap := clock.Trap().TimerReset("connectionLogBatcher", "capacityFlush")
defer capacityTrap.Close()
scheduledTrap := clock.Trap().TimerReset("connectionLogBatcher", "scheduledFlush")
defer scheduledTrap.Close()
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(1))
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(100))
evt := fakeConnectEvent(uuid.New(), "agent1", uuid.New())
@@ -434,9 +428,10 @@ func Test_batcherFlush(t *testing.T) {
}).
AnyTimes()
// Send event — capacity flush triggers immediately.
// Send event and trigger flush — fails, queues.
require.NoError(t, b.Upsert(ctx, evt))
capacityTrap.MustWait(ctx).MustRelease(ctx)
clock.Advance(defaultFlushInterval).MustWait(ctx)
scheduledTrap.MustWait(ctx).MustRelease(ctx)
// Close triggers shutdown. The retry worker drains
// retryCh and writes the batch via writeBatch.
+2 -4
View File
@@ -1161,8 +1161,7 @@ func TestGetGroupMembersFilter(t *testing.T) {
},
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureTemplateRBAC: 1,
codersdk.FeatureServiceAccounts: 1,
codersdk.FeatureTemplateRBAC: 1,
},
},
})
@@ -1192,8 +1191,7 @@ func TestGetGroupMembersFilter(t *testing.T) {
require.NoError(t, err)
return res.Users
}
options := &coderdtest.UsersFilterOptions{CreateServiceAccounts: true}
coderdtest.UsersFilter(setupCtx, t, client, db, options, setup, fetch)
coderdtest.UsersFilter(setupCtx, t, client, db, setup, fetch)
}
func TestGetGroupMembersPagination(t *testing.T) {
-164
View File
@@ -614,168 +614,4 @@ func TestEnterprisePostUser(t *testing.T) {
require.Len(t, memberedOrgs, 2)
require.ElementsMatch(t, []uuid.UUID{second.ID, third.ID}, []uuid.UUID{memberedOrgs[0].ID, memberedOrgs[1].ID})
})
t.Run("ServiceAccount/OK", func(t *testing.T) {
t.Parallel()
client, first := coderdenttest.New(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:gocritic
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-ok",
UserLoginType: codersdk.LoginTypeNone,
ServiceAccount: true,
})
require.NoError(t, err)
require.Equal(t, codersdk.LoginTypeNone, user.LoginType)
require.Empty(t, user.Email)
require.Equal(t, "service-acct-ok", user.Username)
require.Equal(t, codersdk.UserStatusDormant, user.Status)
})
t.Run("ServiceAccount/WithEmail", func(t *testing.T) {
t.Parallel()
client, first := coderdenttest.New(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:gocritic
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-email",
Email: "should-not-have@email.com",
ServiceAccount: true,
})
var apiErr *codersdk.Error
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Email cannot be set for service accounts")
})
t.Run("ServiceAccount/WithPassword", func(t *testing.T) {
t.Parallel()
client, first := coderdenttest.New(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:gocritic
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-password",
Password: "ShouldNotHavePassword123!",
ServiceAccount: true,
})
var apiErr *codersdk.Error
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Password cannot be set for service accounts")
})
t.Run("ServiceAccount/WithInvalidLoginType", func(t *testing.T) {
t.Parallel()
client, first := coderdenttest.New(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:gocritic
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-login-type",
UserLoginType: codersdk.LoginTypePassword,
ServiceAccount: true,
})
var apiErr *codersdk.Error
require.ErrorAs(t, err, &apiErr)
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
require.Contains(t, apiErr.Message, "Service accounts must use login type 'none'")
})
t.Run("ServiceAccount/DefaultLoginType", func(t *testing.T) {
t.Parallel()
client, first := coderdenttest.New(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:gocritic
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-default-login",
ServiceAccount: true,
})
require.NoError(t, err)
found, err := client.User(ctx, user.ID.String())
require.NoError(t, err)
require.Equal(t, codersdk.LoginTypeNone, found.LoginType)
require.Empty(t, found.Email)
})
t.Run("ServiceAccount/MultipleWithoutEmail", func(t *testing.T) {
t.Parallel()
client, first := coderdenttest.New(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
//nolint:gocritic
user1, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-multi-1",
ServiceAccount: true,
})
require.NoError(t, err)
require.Empty(t, user1.Email)
user2, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
OrganizationIDs: []uuid.UUID{first.OrganizationID},
Username: "service-acct-multi-2",
ServiceAccount: true,
})
require.NoError(t, err)
require.Empty(t, user2.Email)
require.NotEqual(t, user1.ID, user2.ID)
})
}
+2 -9
View File
@@ -231,13 +231,7 @@ func TestWorkspaceSharingDisabled(t *testing.T) {
t.Run("ACLEndpointsForbiddenServiceAccountsMode", func(t *testing.T) {
t.Parallel()
client, db, owner := coderdenttest.NewWithDatabase(t, &coderdenttest.Options{
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureServiceAccounts: 1,
},
},
})
client, db, owner := coderdenttest.NewWithDatabase(t, nil)
regularClient, regularUser := coderdtest.CreateAnotherUser(t, client, owner.OrganizationID)
regularWS := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
@@ -450,8 +444,7 @@ func TestWorkspaceSharingDisabled(t *testing.T) {
},
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureTemplateRBAC: 1,
codersdk.FeatureServiceAccounts: 1,
codersdk.FeatureTemplateRBAC: 1,
},
},
})
+9 -36
View File
@@ -1451,17 +1451,15 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
)
})
// Worker with a short fallback poll interval. The primary
// trigger is signalWake() from SendMessage, but under heavy
// CI load the wake goroutine may be delayed. A short poll
// ensures the worker always picks up the pending chat.
// Worker with a 1-hour acquire interval; only processes when
// explicitly woken.
workerLogger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
worker := osschatd.New(osschatd.Config{
Logger: workerLogger,
Database: db,
ReplicaID: workerID,
Pubsub: ps,
PendingChatAcquireInterval: time.Second,
PendingChatAcquireInterval: time.Hour,
InFlightChatStaleAfter: testutil.WaitSuperLong,
})
t.Cleanup(func() {
@@ -1491,11 +1489,7 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
return snapshot, relayEvents, cancel, nil
}, nil)
// Use WaitSuperLong so the test survives heavy CI contention.
// The worker pipeline (model resolution, message loading, LLM
// call) involves multiple DB round-trips that can be slow under
// load.
ctx := testutil.Context(t, testutil.WaitSuperLong)
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
@@ -1515,32 +1509,11 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
})
require.NoError(t, err)
// Wait for the worker to reach the LLM (first streaming
// request). Also poll the chat status so we fail fast with a
// clear message if the worker errors out instead of timing
// out silently.
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
waitForStream:
for {
select {
case <-firstChunkEmitted:
break waitForStream
case <-ticker.C:
currentChat, dbErr := db.GetChatByID(ctx, chat.ID)
if dbErr == nil && currentChat.Status == database.ChatStatusError {
t.Fatalf("worker failed to process chat: status=%s last_error=%s",
currentChat.Status, currentChat.LastError.String)
}
case <-ctx.Done():
// Dump the final chat status for debugging.
currentChat, dbErr := db.GetChatByID(context.Background(), chat.ID)
if dbErr == nil {
t.Fatalf("timed out waiting for worker to start streaming (chat status=%s, last_error=%q)",
currentChat.Status, currentChat.LastError.String)
}
t.Fatal("timed out waiting for worker to start streaming")
}
// Wait for the worker to reach the LLM (first streaming request).
select {
case <-firstChunkEmitted:
case <-ctx.Done():
t.Fatal("timed out waiting for worker to start streaming")
}
// Wait for the subscriber to receive the running status, which
+27 -7
View File
@@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"golang.org/x/sync/singleflight"
"golang.org/x/xerrors"
gProto "google.golang.org/protobuf/proto"
@@ -33,9 +34,9 @@ const (
eventReadyForHandshake = "tailnet_ready_for_handshake"
HeartbeatPeriod = time.Second * 2
MissedHeartbeats = 3
numQuerierWorkers = 10
numQuerierWorkers = 40
numBinderWorkers = 10
numTunnelerWorkers = 10
numTunnelerWorkers = 20
numHandshakerWorkers = 5
dbMaxBackoff = 10 * time.Second
cleanupPeriod = time.Hour
@@ -770,6 +771,9 @@ func (m *mapper) bestToUpdate(best map[uuid.UUID]mapping) *proto.CoordinateRespo
for k := range m.sent {
if _, ok := best[k]; !ok {
m.logger.Debug(m.ctx, "peer no longer in best mappings, sending DISCONNECTED",
slog.F("peer_id", k),
)
resp.PeerUpdates = append(resp.PeerUpdates, &proto.CoordinateResponse_PeerUpdate{
Id: agpl.UUIDToByteSlice(k),
Kind: proto.CoordinateResponse_PeerUpdate_DISCONNECTED,
@@ -820,6 +824,8 @@ type querier struct {
mu sync.Mutex
mappers map[mKey]*mapper
healthy bool
resyncGroup singleflight.Group
}
func newQuerier(ctx context.Context,
@@ -958,7 +964,7 @@ func (q *querier) cleanupConn(c *connIO) {
// maxBatchSize is the maximum number of keys to process in a single batch
// query.
const maxBatchSize = 50
const maxBatchSize = 200
func (q *querier) peerUpdateWorker() {
defer q.wg.Done()
@@ -1207,8 +1213,13 @@ func (q *querier) subscribe() {
func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
q.logger.Warn(q.ctx, "pubsub may have dropped peer updates")
// we need to schedule a full resync of peer mappings
q.resyncPeerMappings()
// Schedule a full resync asynchronously so we don't block the
// pubsub drain goroutine. Singleflight coalesces concurrent
// resync requests.
go q.resyncGroup.Do("resync", func() (any, error) {
q.resyncPeerMappings()
return nil, nil
})
return
}
if err != nil {
@@ -1234,8 +1245,13 @@ func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
func (q *querier) listenTunnel(_ context.Context, msg []byte, err error) {
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
q.logger.Warn(q.ctx, "pubsub may have dropped tunnel updates")
// we need to schedule a full resync of peer mappings
q.resyncPeerMappings()
// Schedule a full resync asynchronously so we don't block the
// pubsub drain goroutine. Singleflight coalesces concurrent
// resync requests.
go q.resyncGroup.Do("resync", func() (any, error) {
q.resyncPeerMappings()
return nil, nil
})
return
}
if err != nil {
@@ -1601,6 +1617,10 @@ func (h *heartbeats) filter(mappings []mapping) []mapping {
// the only mapping available for it. Newer mappings will take
// precedence.
m.kind = proto.CoordinateResponse_PeerUpdate_LOST
h.logger.Debug(h.ctx, "mapping rewritten to LOST due to missed heartbeats",
slog.F("peer_id", m.peer),
slog.F("coordinator_id", m.coordinator),
)
}
}
-2
View File
@@ -3511,7 +3511,6 @@ export type FeatureName =
| "multiple_external_auth"
| "multiple_organizations"
| "scim"
| "service_accounts"
| "task_batch_actions"
| "template_rbac"
| "user_limit"
@@ -3540,7 +3539,6 @@ export const FeatureNames: FeatureName[] = [
"multiple_external_auth",
"multiple_organizations",
"scim",
"service_accounts",
"task_batch_actions",
"template_rbac",
"user_limit",
+2 -10
View File
@@ -16,12 +16,7 @@ type LogLineProps = {
level: LogLevel;
} & HTMLAttributes<HTMLPreElement>;
export const LogLine: FC<LogLineProps> = ({
level,
className,
style,
...props
}) => {
export const LogLine: FC<LogLineProps> = ({ level, className, ...props }) => {
return (
<pre
{...props}
@@ -38,10 +33,7 @@ export const LogLine: FC<LogLineProps> = ({
className,
)}
style={{
...style,
padding:
style?.padding ??
`0 var(--log-line-side-padding, ${DEFAULT_LOG_LINE_SIDE_PADDING}px)`,
padding: `0 var(--log-line-side-padding, ${DEFAULT_LOG_LINE_SIDE_PADDING}px)`,
}}
/>
);
@@ -17,7 +17,6 @@ const meta: Meta<typeof CreateUserForm> = {
onCancel: action("cancel"),
onSubmit: action("submit"),
isLoading: false,
serviceAccountsEnabled: true,
},
};
@@ -87,7 +87,6 @@ interface CreateUserFormProps {
onCancel: () => void;
authMethods?: TypesGen.AuthMethods;
showOrganizations: boolean;
serviceAccountsEnabled: boolean;
}
export const CreateUserForm: FC<CreateUserFormProps> = ({
@@ -97,13 +96,12 @@ export const CreateUserForm: FC<CreateUserFormProps> = ({
onCancel,
showOrganizations,
authMethods,
serviceAccountsEnabled,
}) => {
const availableLoginTypes = [
authMethods?.password.enabled && "password",
authMethods?.oidc.enabled && "oidc",
authMethods?.github.enabled && "github",
serviceAccountsEnabled && "none",
"none",
].filter(Boolean) as Array<keyof typeof loginTypeOptions>;
const defaultLoginType = availableLoginTypes[0];
@@ -6,7 +6,6 @@ import { getErrorDetail, getErrorMessage } from "#/api/errors";
import { authMethods, createUser } from "#/api/queries/users";
import { Margins } from "#/components/Margins/Margins";
import { useDashboard } from "#/modules/dashboard/useDashboard";
import { useFeatureVisibility } from "#/modules/dashboard/useFeatureVisibility";
import { pageTitle } from "#/utils/page";
import { CreateUserForm } from "./CreateUserForm";
@@ -16,7 +15,6 @@ const CreateUserPage: FC = () => {
const createUserMutation = useMutation(createUser(queryClient));
const authMethodsQuery = useQuery(authMethods());
const { showOrganizations } = useDashboard();
const { service_accounts: serviceAccountsEnabled } = useFeatureVisibility();
return (
<Margins>
@@ -60,7 +58,6 @@ const CreateUserPage: FC = () => {
}}
authMethods={authMethodsQuery.data}
showOrganizations={showOrganizations}
serviceAccountsEnabled={serviceAccountsEnabled}
/>
</Margins>
);
@@ -148,13 +148,6 @@ const CreateWorkspacePage: FC = () => {
return;
}
// Skip stale responses. If we've already sent a newer request,
// this response contains outdated parameter values that would
// overwrite the user's more recent input.
if (response.id < wsResponseId.current) {
return;
}
if (!initialParamsSentRef.current && response.parameters?.length > 0) {
sendInitialParameters([...response.parameters]);
}
@@ -294,22 +287,12 @@ const CreateWorkspacePage: FC = () => {
return [...latestResponse.parameters].sort((a, b) => a.order - b.order);
}, [latestResponse?.parameters]);
// The initial WS response (id: -1) contains only template defaults.
// Keep the loader visible until a response reflecting the user's
// actual parameter values arrives (id >= 0).
const awaitingUserValues =
latestResponse !== null &&
latestResponse.id < 0 &&
initialParamsSentRef.current &&
!wsError;
const shouldShowLoader =
!templateQuery.data ||
isLoadingFormData ||
isLoadingExternalAuth ||
autoCreateReady ||
(!latestResponse && !wsError) ||
awaitingUserValues;
(!latestResponse && !wsError);
return (
<>
@@ -8,7 +8,6 @@ import { DetailedError } from "#/api/errors";
import type {
DynamicParametersRequest,
DynamicParametersResponse,
PreviewParameter,
WorkspaceBuildParameter,
} from "#/api/typesGenerated";
import { ErrorAlert } from "#/components/Alert/ErrorAlert";
@@ -23,7 +22,6 @@ import {
TooltipTrigger,
} from "#/components/Tooltip/Tooltip";
import { useEffectEvent } from "#/hooks/hookPolyfills";
import { getInitialParameterValues } from "#/modules/workspaces/DynamicParameter/DynamicParameter";
import { docs } from "#/utils/docs";
import { pageTitle } from "#/utils/page";
import type { AutofillBuildParameter } from "#/utils/richParameters";
@@ -75,31 +73,24 @@ const WorkspaceParametersPageExperimental: FC = () => {
}
});
// Sends the user's build parameter values to the WebSocket so the
// backend evaluates dynamic expressions against real values instead
// of template defaults. Bails when the REST build parameters
// haven't loaded yet; the retrigger effect below covers that case.
const sendInitialParameters = useEffectEvent(
(parameters: PreviewParameter[]) => {
if (initialParamsSentRef.current) return;
if (parameters.length === 0) return;
if (!latestBuildParameters) return;
// On page load, sends initial workspace build parameters to the websocket.
// This ensures the backend has the form's complete initial state,
// vital for rendering dynamic UI elements dependent on initial parameter values.
const sendInitialParameters = useEffectEvent(() => {
if (initialParamsSentRef.current) return;
if (autofillParameters.length === 0) return;
const inputs: Record<string, string> = {};
for (const p of getInitialParameterValues(
parameters,
autofillParameters,
)) {
if (p.name && p.value) {
inputs[p.name] = p.value;
}
const initialParamsToSend: Record<string, string> = {};
for (const param of autofillParameters) {
if (param.name && param.value) {
initialParamsToSend[param.name] = param.value;
}
if (Object.keys(inputs).length === 0) return;
}
if (Object.keys(initialParamsToSend).length === 0) return;
sendMessage(inputs);
initialParamsSentRef.current = true;
},
);
sendMessage(initialParamsToSend);
initialParamsSentRef.current = true;
});
const onMessage = useEffectEvent((response: DynamicParametersResponse) => {
if (latestResponse && latestResponse?.id >= response.id) {
@@ -113,27 +104,13 @@ const WorkspaceParametersPageExperimental: FC = () => {
return;
}
// Send initial params before storing the response so the
// stale-response guard above filters the defaults on the
// next WS message.
if (!initialParamsSentRef.current && response.parameters?.length > 0) {
sendInitialParameters([...response.parameters]);
}
setLatestResponse(response);
if (!initialParamsSentRef.current && response.parameters?.length > 0) {
sendInitialParameters();
}
});
// When the WS first message arrives before the REST build
// parameters have loaded, sendInitialParameters bails. This
// effect retriggers the send once both sources are available.
useEffect(() => {
if (initialParamsSentRef.current) return;
if (!latestResponse?.parameters?.length) return;
if (!latestBuildParameters) return;
sendInitialParameters([...latestResponse.parameters]);
}, [latestResponse, latestBuildParameters, sendInitialParameters]);
useEffect(() => {
if (!templateVersionId && !workspace.latest_build.template_version_id)
return;
@@ -251,20 +228,10 @@ const WorkspaceParametersPageExperimental: FC = () => {
const error =
wsError || startWithParameters.error || restartWithParameters.error;
// The initial WS response (id: -1) contains only template defaults.
// Keep the loader visible until a response reflecting the user's
// actual build parameter values arrives (id >= 0).
const awaitingUserValues =
latestResponse !== null &&
latestResponse.id < 0 &&
initialParamsSentRef.current &&
!wsError;
if (
latestBuildParametersLoading ||
(!latestResponse && !wsError) ||
(ws.current && ws.current.readyState === WebSocket.CONNECTING) ||
awaitingUserValues
(ws.current && ws.current.readyState === WebSocket.CONNECTING)
) {
return <Loader />;
}
+50 -42
View File
@@ -12,6 +12,8 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/tailscale/wireguard-go/tun"
@@ -463,6 +465,8 @@ type Conn struct {
trafficStats *connstats.Statistics
lastNetInfo *tailcfg.NetInfo
awaitReachableGroup singleflight.Group
}
func (c *Conn) GetNetInfo() *tailcfg.NetInfo {
@@ -599,56 +603,60 @@ func (c *Conn) DERPMap() *tailcfg.DERPMap {
// address is reachable. It's the callers responsibility to provide
// a timeout, otherwise this function will block forever.
func (c *Conn) AwaitReachable(ctx context.Context, ip netip.Addr) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Cancel all pending pings on exit.
result, _, _ := c.awaitReachableGroup.Do(ip.String(), func() (interface{}, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Cancel all pending pings on exit.
completedCtx, completed := context.WithCancel(context.Background())
defer completed()
completedCtx, completed := context.WithCancel(context.Background())
defer completed()
run := func() {
// Safety timeout, initially we'll have around 10-20 goroutines
// running in parallel. The exponential backoff will converge
// around ~1 ping / 30s, this means we'll have around 10-20
// goroutines pending towards the end as well.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
run := func() {
// Safety timeout, initially we'll have around 10-20 goroutines
// running in parallel. The exponential backoff will converge
// around ~1 ping / 30s, this means we'll have around 10-20
// goroutines pending towards the end as well.
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
// For reachability, we use TSMP ping, which pings at the IP layer, and
// therefore requires that wireguard and the netstack are up. If we
// don't wait for wireguard to be up, we could miss a handshake, and it
// might take 5 seconds for the handshake to be retried. A 5s initial
// round trip can set us up for poor TCP performance, since the initial
// round-trip-time sets the initial retransmit timeout.
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
if err == nil {
completed()
// For reachability, we use TSMP ping, which pings at the IP layer,
// and therefore requires that wireguard and the netstack are up.
// If we don't wait for wireguard to be up, we could miss a
// handshake, and it might take 5 seconds for the handshake to be
// retried. A 5s initial round trip can set us up for poor TCP
// performance, since the initial round-trip-time sets the initial
// retransmit timeout.
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
if err == nil {
completed()
}
}
}
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0
eb.InitialInterval = 50 * time.Millisecond
eb.MaxInterval = 30 * time.Second
// Consume the first interval since
// we'll fire off a ping immediately.
_ = eb.NextBackOff()
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 0
eb.InitialInterval = 50 * time.Millisecond
eb.MaxInterval = 5 * time.Second
// Consume the first interval since
// we'll fire off a ping immediately.
_ = eb.NextBackOff()
t := backoff.NewTicker(eb)
defer t.Stop()
t := backoff.NewTicker(eb)
defer t.Stop()
go run()
for {
select {
case <-completedCtx.Done():
return true
case <-t.C:
// Pings can take a while, so we can run multiple
// in parallel to return ASAP.
go run()
case <-ctx.Done():
return false
go run()
for {
select {
case <-completedCtx.Done():
return true, nil
case <-t.C:
// Pings can take a while, so we can run multiple
// in parallel to return ASAP.
go run()
case <-ctx.Done():
return false, nil
}
}
}
})
return result.(bool)
}
// Closed is a channel that ends when the connection has