Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cfd7730194 | |||
| 1937ada0cd | |||
| d64cd6415d | |||
| c1851d9453 | |||
| 8f73453681 | |||
| 165db3d31c | |||
| 1bd1516fd1 | |||
| 81ba35a987 | |||
| 53d63cf8e9 | |||
| 4213a43b53 |
@@ -134,7 +134,6 @@ func TestUserCreate(t *testing.T) {
|
||||
{
|
||||
name: "ServiceAccount",
|
||||
args: []string{"--service-account", "-u", "dean"},
|
||||
err: "Premium feature",
|
||||
},
|
||||
{
|
||||
name: "ServiceAccountLoginType",
|
||||
|
||||
@@ -123,10 +123,6 @@ func UsersPagination(
|
||||
require.Contains(t, gotUsers[0].Name, "after")
|
||||
}
|
||||
|
||||
type UsersFilterOptions struct {
|
||||
CreateServiceAccounts bool
|
||||
}
|
||||
|
||||
// UsersFilter creates a set of users to run various filters against for
|
||||
// testing. It can be used to test filtering both users and group members.
|
||||
func UsersFilter(
|
||||
@@ -134,16 +130,11 @@ func UsersFilter(
|
||||
t *testing.T,
|
||||
client *codersdk.Client,
|
||||
db database.Store,
|
||||
options *UsersFilterOptions,
|
||||
setup func(users []codersdk.User),
|
||||
fetch func(ctx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser,
|
||||
) {
|
||||
t.Helper()
|
||||
|
||||
if options == nil {
|
||||
options = &UsersFilterOptions{}
|
||||
}
|
||||
|
||||
firstUser, err := client.User(setupCtx, codersdk.Me)
|
||||
require.NoError(t, err, "fetch me")
|
||||
|
||||
@@ -220,13 +211,11 @@ func UsersFilter(
|
||||
}
|
||||
|
||||
// Add some service accounts.
|
||||
if options.CreateServiceAccounts {
|
||||
for range 3 {
|
||||
_, user := CreateAnotherUserMutators(t, client, orgID, nil, func(r *codersdk.CreateUserRequestWithOrgs) {
|
||||
r.ServiceAccount = true
|
||||
})
|
||||
users = append(users, user)
|
||||
}
|
||||
for range 3 {
|
||||
_, user := CreateAnotherUserMutators(t, client, orgID, nil, func(r *codersdk.CreateUserRequestWithOrgs) {
|
||||
r.ServiceAccount = true
|
||||
})
|
||||
users = append(users, user)
|
||||
}
|
||||
|
||||
hashedPassword, err := userpassword.Hash("SomeStrongPassword!")
|
||||
|
||||
@@ -81,8 +81,8 @@ func newMsgQueue(ctx context.Context, l Listener, le ListenerWithErr) *msgQueue
|
||||
}
|
||||
|
||||
func (q *msgQueue) run() {
|
||||
var batch [maxDrainBatch]msgOrErr
|
||||
for {
|
||||
// wait until there is something on the queue or we are closed
|
||||
q.cond.L.Lock()
|
||||
for q.size == 0 && !q.closed {
|
||||
q.cond.Wait()
|
||||
@@ -91,28 +91,32 @@ func (q *msgQueue) run() {
|
||||
q.cond.L.Unlock()
|
||||
return
|
||||
}
|
||||
item := q.q[q.front]
|
||||
q.front = (q.front + 1) % BufferSize
|
||||
q.size--
|
||||
// Drain up to maxDrainBatch items while holding the lock.
|
||||
n := min(q.size, maxDrainBatch)
|
||||
for i := range n {
|
||||
batch[i] = q.q[q.front]
|
||||
q.front = (q.front + 1) % BufferSize
|
||||
}
|
||||
q.size -= n
|
||||
q.cond.L.Unlock()
|
||||
|
||||
// process item without holding lock
|
||||
if item.err == nil {
|
||||
// real message
|
||||
if q.l != nil {
|
||||
q.l(q.ctx, item.msg)
|
||||
// Dispatch each message individually without holding the lock.
|
||||
for i := range n {
|
||||
item := batch[i]
|
||||
if item.err == nil {
|
||||
if q.l != nil {
|
||||
q.l(q.ctx, item.msg)
|
||||
continue
|
||||
}
|
||||
if q.le != nil {
|
||||
q.le(q.ctx, item.msg, nil)
|
||||
continue
|
||||
}
|
||||
continue
|
||||
}
|
||||
if q.le != nil {
|
||||
q.le(q.ctx, item.msg, nil)
|
||||
continue
|
||||
q.le(q.ctx, nil, item.err)
|
||||
}
|
||||
// unhittable
|
||||
continue
|
||||
}
|
||||
// if the listener wants errors, send it.
|
||||
if q.le != nil {
|
||||
q.le(q.ctx, nil, item.err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -233,6 +237,12 @@ type PGPubsub struct {
|
||||
// for a subscriber before dropping messages.
|
||||
const BufferSize = 2048
|
||||
|
||||
// maxDrainBatch is the maximum number of messages to drain from the ring
|
||||
// buffer per iteration. Batching amortizes the cost of mutex
|
||||
// acquire/release and cond.Wait across many messages, improving drain
|
||||
// throughput during bursts.
|
||||
const maxDrainBatch = 256
|
||||
|
||||
// Subscribe calls the listener when an event matching the name is received.
|
||||
func (p *PGPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
|
||||
return p.subscribeQueue(event, newMsgQueue(context.Background(), listener, nil))
|
||||
|
||||
@@ -148,7 +148,7 @@ func TestGetOrgMembersFilter(t *testing.T) {
|
||||
setupCtx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
|
||||
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
|
||||
res, err := client.OrganizationMembersPaginated(testCtx, first.OrganizationID, req)
|
||||
require.NoError(t, err)
|
||||
reduced := make([]codersdk.ReducedUser, len(res.Members))
|
||||
|
||||
+37
-19
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golang.org/x/xerrors"
|
||||
"tailscale.com/derp"
|
||||
"tailscale.com/tailcfg"
|
||||
@@ -389,6 +390,7 @@ type MultiAgentController struct {
|
||||
// connections to the destination
|
||||
tickets map[uuid.UUID]map[uuid.UUID]struct{}
|
||||
coordination *tailnet.BasicCoordination
|
||||
sendGroup singleflight.Group
|
||||
|
||||
cancel context.CancelFunc
|
||||
expireOldAgentsDone chan struct{}
|
||||
@@ -418,28 +420,44 @@ func (m *MultiAgentController) New(client tailnet.CoordinatorClient) tailnet.Clo
|
||||
|
||||
func (m *MultiAgentController) ensureAgent(agentID uuid.UUID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
_, ok := m.connectionTimes[agentID]
|
||||
// If we don't have the agent, subscribe.
|
||||
if !ok {
|
||||
m.logger.Debug(context.Background(),
|
||||
"subscribing to agent", slog.F("agent_id", agentID))
|
||||
if m.coordination != nil {
|
||||
err := m.coordination.Client.Send(&proto.CoordinateRequest{
|
||||
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
|
||||
})
|
||||
if err != nil {
|
||||
err = xerrors.Errorf("subscribe agent: %w", err)
|
||||
m.coordination.SendErr(err)
|
||||
_ = m.coordination.Client.Close()
|
||||
m.coordination = nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
m.tickets[agentID] = map[uuid.UUID]struct{}{}
|
||||
if ok {
|
||||
m.connectionTimes[agentID] = time.Now()
|
||||
m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
m.logger.Debug(context.Background(),
|
||||
"subscribing to agent", slog.F("agent_id", agentID))
|
||||
|
||||
_, err, _ := m.sendGroup.Do(agentID.String(), func() (interface{}, error) {
|
||||
m.mu.Lock()
|
||||
coord := m.coordination
|
||||
m.mu.Unlock()
|
||||
if coord == nil {
|
||||
return nil, xerrors.New("no active coordination")
|
||||
}
|
||||
err := coord.Client.Send(&proto.CoordinateRequest{
|
||||
AddTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.mu.Lock()
|
||||
m.tickets[agentID] = map[uuid.UUID]struct{}{}
|
||||
m.mu.Unlock()
|
||||
return nil, nil
|
||||
})
|
||||
if err != nil {
|
||||
m.logger.Error(context.Background(), "ensureAgent send failed",
|
||||
slog.F("agent_id", agentID), slog.Error(err))
|
||||
return xerrors.Errorf("send AddTunnel: %w", err)
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
m.connectionTimes[agentID] = time.Now()
|
||||
m.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -475,14 +475,6 @@ func (api *API) postUser(rw http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
req.UserLoginType = codersdk.LoginTypeNone
|
||||
|
||||
// Service accounts are a Premium feature.
|
||||
if !api.Entitlements.Enabled(codersdk.FeatureServiceAccounts) {
|
||||
httpapi.Write(ctx, rw, http.StatusForbidden, codersdk.Response{
|
||||
Message: fmt.Sprintf("%s is a Premium feature. Contact sales!", codersdk.FeatureServiceAccounts.Humanize()),
|
||||
})
|
||||
return
|
||||
}
|
||||
} else if req.UserLoginType == "" {
|
||||
// Default to password auth
|
||||
req.UserLoginType = codersdk.LoginTypePassword
|
||||
|
||||
+114
-6
@@ -979,7 +979,28 @@ func TestPostUsers(t *testing.T) {
|
||||
require.Equal(t, found.LoginType, codersdk.LoginTypeOIDC)
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/Unlicensed", func(t *testing.T) {
|
||||
t.Run("ServiceAccount/OK", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client := coderdtest.New(t, nil)
|
||||
first := coderdtest.CreateFirstUser(t, client)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-ok",
|
||||
UserLoginType: codersdk.LoginTypeNone,
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, codersdk.LoginTypeNone, user.LoginType)
|
||||
require.Empty(t, user.Email)
|
||||
require.Equal(t, "service-acct-ok", user.Username)
|
||||
require.Equal(t, codersdk.UserStatusDormant, user.Status)
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/WithEmail", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client := coderdtest.New(t, nil)
|
||||
first := coderdtest.CreateFirstUser(t, client)
|
||||
@@ -989,14 +1010,75 @@ func TestPostUsers(t *testing.T) {
|
||||
|
||||
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-ok",
|
||||
UserLoginType: codersdk.LoginTypeNone,
|
||||
Username: "service-acct-email",
|
||||
Email: "should-not-have@email.com",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
var apiErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusForbidden, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Premium feature")
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Email cannot be set for service accounts")
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/WithPassword", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client := coderdtest.New(t, nil)
|
||||
first := coderdtest.CreateFirstUser(t, client)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-password",
|
||||
Password: "ShouldNotHavePassword123!",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
var apiErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Password cannot be set for service accounts")
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/WithInvalidLoginType", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client := coderdtest.New(t, nil)
|
||||
first := coderdtest.CreateFirstUser(t, client)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-login-type",
|
||||
UserLoginType: codersdk.LoginTypePassword,
|
||||
ServiceAccount: true,
|
||||
})
|
||||
var apiErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Service accounts must use login type 'none'")
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/DefaultLoginType", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client := coderdtest.New(t, nil)
|
||||
first := coderdtest.CreateFirstUser(t, client)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-default-login",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
found, err := client.User(ctx, user.ID.String())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, codersdk.LoginTypeNone, found.LoginType)
|
||||
require.Empty(t, found.Email)
|
||||
})
|
||||
|
||||
t.Run("NonServiceAccount/WithoutEmail", func(t *testing.T) {
|
||||
@@ -1016,6 +1098,32 @@ func TestPostUsers(t *testing.T) {
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/MultipleWithoutEmail", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client := coderdtest.New(t, nil)
|
||||
first := coderdtest.CreateFirstUser(t, client)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
user1, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-multi-1",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, user1.Email)
|
||||
|
||||
user2, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-multi-2",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, user2.Email)
|
||||
require.NotEqual(t, user1.ID, user2.ID)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNotifyCreatedUser(t *testing.T) {
|
||||
@@ -1724,7 +1832,7 @@ func TestGetUsersFilter(t *testing.T) {
|
||||
setupCtx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
|
||||
coderdtest.UsersFilter(setupCtx, t, client, api.Database, nil, func(testCtx context.Context, req codersdk.UsersRequest) []codersdk.ReducedUser {
|
||||
res, err := client.Users(testCtx, req)
|
||||
require.NoError(t, err)
|
||||
reduced := make([]codersdk.ReducedUser, len(res.Users))
|
||||
|
||||
@@ -730,7 +730,10 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log := s.Logger.With(slog.F("agent_id", appToken.AgentID))
|
||||
log := s.Logger.With(
|
||||
slog.F("agent_id", appToken.AgentID),
|
||||
slog.F("workspace_id", appToken.WorkspaceID),
|
||||
)
|
||||
log.Debug(ctx, "resolved PTY request")
|
||||
|
||||
values := r.URL.Query()
|
||||
@@ -765,19 +768,21 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
return
|
||||
}
|
||||
go httpapi.HeartbeatClose(ctx, s.Logger, cancel, conn)
|
||||
|
||||
ctx, wsNetConn := WebsocketNetConn(ctx, conn, websocket.MessageBinary)
|
||||
defer wsNetConn.Close() // Also closes conn.
|
||||
|
||||
go httpapi.HeartbeatClose(ctx, log, cancel, conn)
|
||||
|
||||
dialStart := time.Now()
|
||||
|
||||
agentConn, release, err := s.AgentProvider.AgentConn(ctx, appToken.AgentID)
|
||||
if err != nil {
|
||||
log.Debug(ctx, "dial workspace agent", slog.Error(err))
|
||||
log.Debug(ctx, "dial workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial workspace agent: %s", err))
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
log.Debug(ctx, "dialed workspace agent")
|
||||
log.Debug(ctx, "dialed workspace agent", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
// #nosec G115 - Safe conversion for terminal height/width which are expected to be within uint16 range (0-65535)
|
||||
ptNetConn, err := agentConn.ReconnectingPTY(ctx, reconnect, uint16(height), uint16(width), r.URL.Query().Get("command"), func(arp *workspacesdk.AgentReconnectingPTYInit) {
|
||||
arp.Container = container
|
||||
@@ -785,12 +790,12 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
arp.BackendType = backendType
|
||||
})
|
||||
if err != nil {
|
||||
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err))
|
||||
log.Debug(ctx, "dial reconnecting pty server in workspace agent", slog.Error(err), slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
_ = conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("dial: %s", err))
|
||||
return
|
||||
}
|
||||
defer ptNetConn.Close()
|
||||
log.Debug(ctx, "obtained PTY")
|
||||
log.Debug(ctx, "obtained PTY", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
|
||||
report := newStatsReportFromSignedToken(*appToken)
|
||||
s.collectStats(report)
|
||||
@@ -800,7 +805,7 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
|
||||
}()
|
||||
|
||||
agentssh.Bicopy(ctx, wsNetConn, ptNetConn)
|
||||
log.Debug(ctx, "pty Bicopy finished")
|
||||
log.Debug(ctx, "pty Bicopy finished", slog.F("elapsed_ms", time.Since(dialStart).Milliseconds()))
|
||||
}
|
||||
|
||||
func (s *Server) collectStats(stats StatsReport) {
|
||||
|
||||
@@ -196,7 +196,6 @@ const (
|
||||
FeatureWorkspaceExternalAgent FeatureName = "workspace_external_agent"
|
||||
FeatureAIBridge FeatureName = "aibridge"
|
||||
FeatureBoundary FeatureName = "boundary"
|
||||
FeatureServiceAccounts FeatureName = "service_accounts"
|
||||
FeatureAIGovernanceUserLimit FeatureName = "ai_governance_user_limit"
|
||||
)
|
||||
|
||||
@@ -228,7 +227,6 @@ var (
|
||||
FeatureWorkspaceExternalAgent,
|
||||
FeatureAIBridge,
|
||||
FeatureBoundary,
|
||||
FeatureServiceAccounts,
|
||||
FeatureAIGovernanceUserLimit,
|
||||
}
|
||||
|
||||
@@ -277,7 +275,6 @@ func (n FeatureName) AlwaysEnable() bool {
|
||||
FeatureWorkspacePrebuilds: true,
|
||||
FeatureWorkspaceExternalAgent: true,
|
||||
FeatureBoundary: true,
|
||||
FeatureServiceAccounts: true,
|
||||
}[n]
|
||||
}
|
||||
|
||||
@@ -285,7 +282,7 @@ func (n FeatureName) AlwaysEnable() bool {
|
||||
func (n FeatureName) Enterprise() bool {
|
||||
switch n {
|
||||
// Add all features that should be excluded in the Enterprise feature set.
|
||||
case FeatureMultipleOrganizations, FeatureCustomRoles, FeatureServiceAccounts:
|
||||
case FeatureMultipleOrganizations, FeatureCustomRoles:
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
|
||||
@@ -1,38 +1,31 @@
|
||||
# Headless Authentication
|
||||
|
||||
> [!NOTE]
|
||||
> Creating service accounts requires a [Premium license](https://coder.com/pricing).
|
||||
Headless user accounts that cannot use the web UI to log in to Coder. This is
|
||||
useful for creating accounts for automated systems, such as CI/CD pipelines or
|
||||
for users who only consume Coder via another client/API.
|
||||
|
||||
Service accounts are headless user accounts that cannot use the web UI to log in
|
||||
to Coder. This is useful for creating accounts for automated systems, such as
|
||||
CI/CD pipelines or for users who only consume Coder via another client/API. Service accounts do not have passwords or associated email addresses.
|
||||
You must have the User Admin role or above to create headless users.
|
||||
|
||||
You must have the User Admin role or above to create service accounts.
|
||||
|
||||
## Create a service account
|
||||
## Create a headless user
|
||||
|
||||
<div class="tabs">
|
||||
|
||||
## CLI
|
||||
|
||||
Use the `--service-account` flag to create a dedicated service account:
|
||||
|
||||
```sh
|
||||
coder users create \
|
||||
--email="coder-bot@coder.com" \
|
||||
--username="coder-bot" \
|
||||
--service-account
|
||||
--login-type="none" \
|
||||
```
|
||||
|
||||
## UI
|
||||
|
||||
Navigate to **Deployment** > **Users** > **Create user**, then select
|
||||
**Service account** as the login type.
|
||||
Navigate to the `Users` > `Create user` in the topbar
|
||||
|
||||

|
||||
|
||||
</div>
|
||||
|
||||
## Authenticate as a service account
|
||||
|
||||
To make API or CLI requests on behalf of the headless user, learn how to
|
||||
[generate API tokens on behalf of a user](./sessions-tokens.md#generate-a-long-lived-api-token-on-behalf-of-another-user).
|
||||
|
||||
+1
-2
@@ -495,8 +495,7 @@
|
||||
{
|
||||
"title": "Headless Authentication",
|
||||
"description": "Create and manage headless service accounts for automated systems and API integrations",
|
||||
"path": "./admin/users/headless-auth.md",
|
||||
"state": ["premium"]
|
||||
"path": "./admin/users/headless-auth.md"
|
||||
},
|
||||
{
|
||||
"title": "Groups \u0026 Roles",
|
||||
|
||||
@@ -468,10 +468,9 @@ func (b *DBBatcher) retryLoop() {
|
||||
func (b *DBBatcher) retryBatch(params database.BatchUpsertConnectionLogsParams) {
|
||||
count := len(params.ID)
|
||||
for attempt := range maxRetries {
|
||||
t := b.clock.NewTimer(retryInterval, "connectionLogBatcher", "retryBackoff")
|
||||
t := time.NewTimer(retryInterval)
|
||||
select {
|
||||
case <-b.ctx.Done():
|
||||
t.Stop()
|
||||
b.shutdownBatch(params)
|
||||
return
|
||||
case <-t.C:
|
||||
|
||||
@@ -355,20 +355,15 @@ func Test_batcherFlush(t *testing.T) {
|
||||
store := dbmock.NewMockStore(ctrl)
|
||||
clock := quartz.NewMock(t)
|
||||
|
||||
// Trap the capacity flush (fires when batch reaches maxBatchSize).
|
||||
capacityTrap := clock.Trap().TimerReset("connectionLogBatcher", "capacityFlush")
|
||||
defer capacityTrap.Close()
|
||||
scheduledTrap := clock.Trap().TimerReset("connectionLogBatcher", "scheduledFlush")
|
||||
defer scheduledTrap.Close()
|
||||
|
||||
// Trap the retry backoff timer created by retryBatch.
|
||||
retryTrap := clock.Trap().NewTimer("connectionLogBatcher", "retryBackoff")
|
||||
defer retryTrap.Close()
|
||||
|
||||
// Batch size of 1: consuming the item triggers an immediate
|
||||
// capacity flush, avoiding the timer/itemCh select race.
|
||||
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(1))
|
||||
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(100))
|
||||
|
||||
evt := fakeConnectEvent(uuid.New(), "agent1", uuid.New())
|
||||
|
||||
// First call (synchronous in flush) fails, then the
|
||||
// retry worker retries after the backoff and succeeds.
|
||||
gomock.InOrder(
|
||||
store.EXPECT().
|
||||
BatchUpsertConnectionLogs(gomock.Any(), gomock.Any()).
|
||||
@@ -385,15 +380,14 @@ func Test_batcherFlush(t *testing.T) {
|
||||
|
||||
require.NoError(t, b.Upsert(ctx, evt))
|
||||
|
||||
// Item consumed → capacity flush fires → transient error →
|
||||
// batch queued to retryCh → timer reset trapped.
|
||||
capacityTrap.MustWait(ctx).MustRelease(ctx)
|
||||
|
||||
// Retry worker creates a timer — trap it, release, advance.
|
||||
retryCall := retryTrap.MustWait(ctx)
|
||||
retryCall.MustRelease(ctx)
|
||||
clock.Advance(retryInterval).MustWait(ctx)
|
||||
// Trigger a scheduled flush while the batcher is still
|
||||
// running. The synchronous write fails and queues to
|
||||
// retryCh. The retry worker picks it up after a real-
|
||||
// time 1s delay and succeeds.
|
||||
clock.Advance(defaultFlushInterval).MustWait(ctx)
|
||||
scheduledTrap.MustWait(ctx).MustRelease(ctx)
|
||||
|
||||
// Wait for the retry to complete (real-time 1s delay).
|
||||
require.NoError(t, b.Close())
|
||||
})
|
||||
|
||||
@@ -406,10 +400,10 @@ func Test_batcherFlush(t *testing.T) {
|
||||
store := dbmock.NewMockStore(ctrl)
|
||||
clock := quartz.NewMock(t)
|
||||
|
||||
capacityTrap := clock.Trap().TimerReset("connectionLogBatcher", "capacityFlush")
|
||||
defer capacityTrap.Close()
|
||||
scheduledTrap := clock.Trap().TimerReset("connectionLogBatcher", "scheduledFlush")
|
||||
defer scheduledTrap.Close()
|
||||
|
||||
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(1))
|
||||
b := NewDBBatcher(ctx, store, log, WithClock(clock), WithBatchSize(100))
|
||||
|
||||
evt := fakeConnectEvent(uuid.New(), "agent1", uuid.New())
|
||||
|
||||
@@ -434,9 +428,10 @@ func Test_batcherFlush(t *testing.T) {
|
||||
}).
|
||||
AnyTimes()
|
||||
|
||||
// Send event — capacity flush triggers immediately.
|
||||
// Send event and trigger flush — fails, queues.
|
||||
require.NoError(t, b.Upsert(ctx, evt))
|
||||
capacityTrap.MustWait(ctx).MustRelease(ctx)
|
||||
clock.Advance(defaultFlushInterval).MustWait(ctx)
|
||||
scheduledTrap.MustWait(ctx).MustRelease(ctx)
|
||||
|
||||
// Close triggers shutdown. The retry worker drains
|
||||
// retryCh and writes the batch via writeBatch.
|
||||
|
||||
@@ -1161,8 +1161,7 @@ func TestGetGroupMembersFilter(t *testing.T) {
|
||||
},
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureTemplateRBAC: 1,
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
codersdk.FeatureTemplateRBAC: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
@@ -1192,8 +1191,7 @@ func TestGetGroupMembersFilter(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return res.Users
|
||||
}
|
||||
options := &coderdtest.UsersFilterOptions{CreateServiceAccounts: true}
|
||||
coderdtest.UsersFilter(setupCtx, t, client, db, options, setup, fetch)
|
||||
coderdtest.UsersFilter(setupCtx, t, client, db, setup, fetch)
|
||||
}
|
||||
|
||||
func TestGetGroupMembersPagination(t *testing.T) {
|
||||
|
||||
@@ -614,168 +614,4 @@ func TestEnterprisePostUser(t *testing.T) {
|
||||
require.Len(t, memberedOrgs, 2)
|
||||
require.ElementsMatch(t, []uuid.UUID{second.ID, third.ID}, []uuid.UUID{memberedOrgs[0].ID, memberedOrgs[1].ID})
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/OK", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, first := coderdenttest.New(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
//nolint:gocritic
|
||||
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-ok",
|
||||
UserLoginType: codersdk.LoginTypeNone,
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, codersdk.LoginTypeNone, user.LoginType)
|
||||
require.Empty(t, user.Email)
|
||||
require.Equal(t, "service-acct-ok", user.Username)
|
||||
require.Equal(t, codersdk.UserStatusDormant, user.Status)
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/WithEmail", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, first := coderdenttest.New(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
//nolint:gocritic
|
||||
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-email",
|
||||
Email: "should-not-have@email.com",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
var apiErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Email cannot be set for service accounts")
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/WithPassword", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, first := coderdenttest.New(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
//nolint:gocritic
|
||||
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-password",
|
||||
Password: "ShouldNotHavePassword123!",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
var apiErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Password cannot be set for service accounts")
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/WithInvalidLoginType", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, first := coderdenttest.New(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
//nolint:gocritic
|
||||
_, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-login-type",
|
||||
UserLoginType: codersdk.LoginTypePassword,
|
||||
ServiceAccount: true,
|
||||
})
|
||||
var apiErr *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiErr)
|
||||
require.Equal(t, http.StatusBadRequest, apiErr.StatusCode())
|
||||
require.Contains(t, apiErr.Message, "Service accounts must use login type 'none'")
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/DefaultLoginType", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, first := coderdenttest.New(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
//nolint:gocritic
|
||||
user, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-default-login",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
found, err := client.User(ctx, user.ID.String())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, codersdk.LoginTypeNone, found.LoginType)
|
||||
require.Empty(t, found.Email)
|
||||
})
|
||||
|
||||
t.Run("ServiceAccount/MultipleWithoutEmail", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, first := coderdenttest.New(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
|
||||
defer cancel()
|
||||
|
||||
//nolint:gocritic
|
||||
user1, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-multi-1",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, user1.Email)
|
||||
|
||||
user2, err := client.CreateUserWithOrgs(ctx, codersdk.CreateUserRequestWithOrgs{
|
||||
OrganizationIDs: []uuid.UUID{first.OrganizationID},
|
||||
Username: "service-acct-multi-2",
|
||||
ServiceAccount: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, user2.Email)
|
||||
require.NotEqual(t, user1.ID, user2.ID)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -231,13 +231,7 @@ func TestWorkspaceSharingDisabled(t *testing.T) {
|
||||
t.Run("ACLEndpointsForbiddenServiceAccountsMode", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
client, db, owner := coderdenttest.NewWithDatabase(t, &coderdenttest.Options{
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
client, db, owner := coderdenttest.NewWithDatabase(t, nil)
|
||||
|
||||
regularClient, regularUser := coderdtest.CreateAnotherUser(t, client, owner.OrganizationID)
|
||||
regularWS := dbfake.WorkspaceBuild(t, db, database.WorkspaceTable{
|
||||
@@ -450,8 +444,7 @@ func TestWorkspaceSharingDisabled(t *testing.T) {
|
||||
},
|
||||
LicenseOptions: &coderdenttest.LicenseOptions{
|
||||
Features: license.Features{
|
||||
codersdk.FeatureTemplateRBAC: 1,
|
||||
codersdk.FeatureServiceAccounts: 1,
|
||||
codersdk.FeatureTemplateRBAC: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@@ -1451,17 +1451,15 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
|
||||
)
|
||||
})
|
||||
|
||||
// Worker with a short fallback poll interval. The primary
|
||||
// trigger is signalWake() from SendMessage, but under heavy
|
||||
// CI load the wake goroutine may be delayed. A short poll
|
||||
// ensures the worker always picks up the pending chat.
|
||||
// Worker with a 1-hour acquire interval; only processes when
|
||||
// explicitly woken.
|
||||
workerLogger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true})
|
||||
worker := osschatd.New(osschatd.Config{
|
||||
Logger: workerLogger,
|
||||
Database: db,
|
||||
ReplicaID: workerID,
|
||||
Pubsub: ps,
|
||||
PendingChatAcquireInterval: time.Second,
|
||||
PendingChatAcquireInterval: time.Hour,
|
||||
InFlightChatStaleAfter: testutil.WaitSuperLong,
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
@@ -1491,11 +1489,7 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
|
||||
return snapshot, relayEvents, cancel, nil
|
||||
}, nil)
|
||||
|
||||
// Use WaitSuperLong so the test survives heavy CI contention.
|
||||
// The worker pipeline (model resolution, message loading, LLM
|
||||
// call) involves multiple DB round-trips that can be slow under
|
||||
// load.
|
||||
ctx := testutil.Context(t, testutil.WaitSuperLong)
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
user, model := seedChatDependencies(ctx, t, db)
|
||||
setOpenAIProviderBaseURL(ctx, t, db, openAIURL)
|
||||
|
||||
@@ -1515,32 +1509,11 @@ func TestSubscribeRelayEstablishedMidStream(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the worker to reach the LLM (first streaming
|
||||
// request). Also poll the chat status so we fail fast with a
|
||||
// clear message if the worker errors out instead of timing
|
||||
// out silently.
|
||||
ticker := time.NewTicker(250 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
waitForStream:
|
||||
for {
|
||||
select {
|
||||
case <-firstChunkEmitted:
|
||||
break waitForStream
|
||||
case <-ticker.C:
|
||||
currentChat, dbErr := db.GetChatByID(ctx, chat.ID)
|
||||
if dbErr == nil && currentChat.Status == database.ChatStatusError {
|
||||
t.Fatalf("worker failed to process chat: status=%s last_error=%s",
|
||||
currentChat.Status, currentChat.LastError.String)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
// Dump the final chat status for debugging.
|
||||
currentChat, dbErr := db.GetChatByID(context.Background(), chat.ID)
|
||||
if dbErr == nil {
|
||||
t.Fatalf("timed out waiting for worker to start streaming (chat status=%s, last_error=%q)",
|
||||
currentChat.Status, currentChat.LastError.String)
|
||||
}
|
||||
t.Fatal("timed out waiting for worker to start streaming")
|
||||
}
|
||||
// Wait for the worker to reach the LLM (first streaming request).
|
||||
select {
|
||||
case <-firstChunkEmitted:
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timed out waiting for worker to start streaming")
|
||||
}
|
||||
|
||||
// Wait for the subscriber to receive the running status, which
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"golang.org/x/xerrors"
|
||||
gProto "google.golang.org/protobuf/proto"
|
||||
|
||||
@@ -33,9 +34,9 @@ const (
|
||||
eventReadyForHandshake = "tailnet_ready_for_handshake"
|
||||
HeartbeatPeriod = time.Second * 2
|
||||
MissedHeartbeats = 3
|
||||
numQuerierWorkers = 10
|
||||
numQuerierWorkers = 40
|
||||
numBinderWorkers = 10
|
||||
numTunnelerWorkers = 10
|
||||
numTunnelerWorkers = 20
|
||||
numHandshakerWorkers = 5
|
||||
dbMaxBackoff = 10 * time.Second
|
||||
cleanupPeriod = time.Hour
|
||||
@@ -770,6 +771,9 @@ func (m *mapper) bestToUpdate(best map[uuid.UUID]mapping) *proto.CoordinateRespo
|
||||
|
||||
for k := range m.sent {
|
||||
if _, ok := best[k]; !ok {
|
||||
m.logger.Debug(m.ctx, "peer no longer in best mappings, sending DISCONNECTED",
|
||||
slog.F("peer_id", k),
|
||||
)
|
||||
resp.PeerUpdates = append(resp.PeerUpdates, &proto.CoordinateResponse_PeerUpdate{
|
||||
Id: agpl.UUIDToByteSlice(k),
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_DISCONNECTED,
|
||||
@@ -820,6 +824,8 @@ type querier struct {
|
||||
mu sync.Mutex
|
||||
mappers map[mKey]*mapper
|
||||
healthy bool
|
||||
|
||||
resyncGroup singleflight.Group
|
||||
}
|
||||
|
||||
func newQuerier(ctx context.Context,
|
||||
@@ -958,7 +964,7 @@ func (q *querier) cleanupConn(c *connIO) {
|
||||
|
||||
// maxBatchSize is the maximum number of keys to process in a single batch
|
||||
// query.
|
||||
const maxBatchSize = 50
|
||||
const maxBatchSize = 200
|
||||
|
||||
func (q *querier) peerUpdateWorker() {
|
||||
defer q.wg.Done()
|
||||
@@ -1207,8 +1213,13 @@ func (q *querier) subscribe() {
|
||||
func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
|
||||
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
|
||||
q.logger.Warn(q.ctx, "pubsub may have dropped peer updates")
|
||||
// we need to schedule a full resync of peer mappings
|
||||
q.resyncPeerMappings()
|
||||
// Schedule a full resync asynchronously so we don't block the
|
||||
// pubsub drain goroutine. Singleflight coalesces concurrent
|
||||
// resync requests.
|
||||
go q.resyncGroup.Do("resync", func() (any, error) {
|
||||
q.resyncPeerMappings()
|
||||
return nil, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
@@ -1234,8 +1245,13 @@ func (q *querier) listenPeer(_ context.Context, msg []byte, err error) {
|
||||
func (q *querier) listenTunnel(_ context.Context, msg []byte, err error) {
|
||||
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
|
||||
q.logger.Warn(q.ctx, "pubsub may have dropped tunnel updates")
|
||||
// we need to schedule a full resync of peer mappings
|
||||
q.resyncPeerMappings()
|
||||
// Schedule a full resync asynchronously so we don't block the
|
||||
// pubsub drain goroutine. Singleflight coalesces concurrent
|
||||
// resync requests.
|
||||
go q.resyncGroup.Do("resync", func() (any, error) {
|
||||
q.resyncPeerMappings()
|
||||
return nil, nil
|
||||
})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
@@ -1601,6 +1617,10 @@ func (h *heartbeats) filter(mappings []mapping) []mapping {
|
||||
// the only mapping available for it. Newer mappings will take
|
||||
// precedence.
|
||||
m.kind = proto.CoordinateResponse_PeerUpdate_LOST
|
||||
h.logger.Debug(h.ctx, "mapping rewritten to LOST due to missed heartbeats",
|
||||
slog.F("peer_id", m.peer),
|
||||
slog.F("coordinator_id", m.coordinator),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Generated
-2
@@ -3511,7 +3511,6 @@ export type FeatureName =
|
||||
| "multiple_external_auth"
|
||||
| "multiple_organizations"
|
||||
| "scim"
|
||||
| "service_accounts"
|
||||
| "task_batch_actions"
|
||||
| "template_rbac"
|
||||
| "user_limit"
|
||||
@@ -3540,7 +3539,6 @@ export const FeatureNames: FeatureName[] = [
|
||||
"multiple_external_auth",
|
||||
"multiple_organizations",
|
||||
"scim",
|
||||
"service_accounts",
|
||||
"task_batch_actions",
|
||||
"template_rbac",
|
||||
"user_limit",
|
||||
|
||||
@@ -16,12 +16,7 @@ type LogLineProps = {
|
||||
level: LogLevel;
|
||||
} & HTMLAttributes<HTMLPreElement>;
|
||||
|
||||
export const LogLine: FC<LogLineProps> = ({
|
||||
level,
|
||||
className,
|
||||
style,
|
||||
...props
|
||||
}) => {
|
||||
export const LogLine: FC<LogLineProps> = ({ level, className, ...props }) => {
|
||||
return (
|
||||
<pre
|
||||
{...props}
|
||||
@@ -38,10 +33,7 @@ export const LogLine: FC<LogLineProps> = ({
|
||||
className,
|
||||
)}
|
||||
style={{
|
||||
...style,
|
||||
padding:
|
||||
style?.padding ??
|
||||
`0 var(--log-line-side-padding, ${DEFAULT_LOG_LINE_SIDE_PADDING}px)`,
|
||||
padding: `0 var(--log-line-side-padding, ${DEFAULT_LOG_LINE_SIDE_PADDING}px)`,
|
||||
}}
|
||||
/>
|
||||
);
|
||||
|
||||
@@ -17,7 +17,6 @@ const meta: Meta<typeof CreateUserForm> = {
|
||||
onCancel: action("cancel"),
|
||||
onSubmit: action("submit"),
|
||||
isLoading: false,
|
||||
serviceAccountsEnabled: true,
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -87,7 +87,6 @@ interface CreateUserFormProps {
|
||||
onCancel: () => void;
|
||||
authMethods?: TypesGen.AuthMethods;
|
||||
showOrganizations: boolean;
|
||||
serviceAccountsEnabled: boolean;
|
||||
}
|
||||
|
||||
export const CreateUserForm: FC<CreateUserFormProps> = ({
|
||||
@@ -97,13 +96,12 @@ export const CreateUserForm: FC<CreateUserFormProps> = ({
|
||||
onCancel,
|
||||
showOrganizations,
|
||||
authMethods,
|
||||
serviceAccountsEnabled,
|
||||
}) => {
|
||||
const availableLoginTypes = [
|
||||
authMethods?.password.enabled && "password",
|
||||
authMethods?.oidc.enabled && "oidc",
|
||||
authMethods?.github.enabled && "github",
|
||||
serviceAccountsEnabled && "none",
|
||||
"none",
|
||||
].filter(Boolean) as Array<keyof typeof loginTypeOptions>;
|
||||
|
||||
const defaultLoginType = availableLoginTypes[0];
|
||||
|
||||
@@ -6,7 +6,6 @@ import { getErrorDetail, getErrorMessage } from "#/api/errors";
|
||||
import { authMethods, createUser } from "#/api/queries/users";
|
||||
import { Margins } from "#/components/Margins/Margins";
|
||||
import { useDashboard } from "#/modules/dashboard/useDashboard";
|
||||
import { useFeatureVisibility } from "#/modules/dashboard/useFeatureVisibility";
|
||||
import { pageTitle } from "#/utils/page";
|
||||
import { CreateUserForm } from "./CreateUserForm";
|
||||
|
||||
@@ -16,7 +15,6 @@ const CreateUserPage: FC = () => {
|
||||
const createUserMutation = useMutation(createUser(queryClient));
|
||||
const authMethodsQuery = useQuery(authMethods());
|
||||
const { showOrganizations } = useDashboard();
|
||||
const { service_accounts: serviceAccountsEnabled } = useFeatureVisibility();
|
||||
|
||||
return (
|
||||
<Margins>
|
||||
@@ -60,7 +58,6 @@ const CreateUserPage: FC = () => {
|
||||
}}
|
||||
authMethods={authMethodsQuery.data}
|
||||
showOrganizations={showOrganizations}
|
||||
serviceAccountsEnabled={serviceAccountsEnabled}
|
||||
/>
|
||||
</Margins>
|
||||
);
|
||||
|
||||
@@ -148,13 +148,6 @@ const CreateWorkspacePage: FC = () => {
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip stale responses. If we've already sent a newer request,
|
||||
// this response contains outdated parameter values that would
|
||||
// overwrite the user's more recent input.
|
||||
if (response.id < wsResponseId.current) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!initialParamsSentRef.current && response.parameters?.length > 0) {
|
||||
sendInitialParameters([...response.parameters]);
|
||||
}
|
||||
@@ -294,22 +287,12 @@ const CreateWorkspacePage: FC = () => {
|
||||
return [...latestResponse.parameters].sort((a, b) => a.order - b.order);
|
||||
}, [latestResponse?.parameters]);
|
||||
|
||||
// The initial WS response (id: -1) contains only template defaults.
|
||||
// Keep the loader visible until a response reflecting the user's
|
||||
// actual parameter values arrives (id >= 0).
|
||||
const awaitingUserValues =
|
||||
latestResponse !== null &&
|
||||
latestResponse.id < 0 &&
|
||||
initialParamsSentRef.current &&
|
||||
!wsError;
|
||||
|
||||
const shouldShowLoader =
|
||||
!templateQuery.data ||
|
||||
isLoadingFormData ||
|
||||
isLoadingExternalAuth ||
|
||||
autoCreateReady ||
|
||||
(!latestResponse && !wsError) ||
|
||||
awaitingUserValues;
|
||||
(!latestResponse && !wsError);
|
||||
|
||||
return (
|
||||
<>
|
||||
|
||||
+20
-53
@@ -8,7 +8,6 @@ import { DetailedError } from "#/api/errors";
|
||||
import type {
|
||||
DynamicParametersRequest,
|
||||
DynamicParametersResponse,
|
||||
PreviewParameter,
|
||||
WorkspaceBuildParameter,
|
||||
} from "#/api/typesGenerated";
|
||||
import { ErrorAlert } from "#/components/Alert/ErrorAlert";
|
||||
@@ -23,7 +22,6 @@ import {
|
||||
TooltipTrigger,
|
||||
} from "#/components/Tooltip/Tooltip";
|
||||
import { useEffectEvent } from "#/hooks/hookPolyfills";
|
||||
import { getInitialParameterValues } from "#/modules/workspaces/DynamicParameter/DynamicParameter";
|
||||
import { docs } from "#/utils/docs";
|
||||
import { pageTitle } from "#/utils/page";
|
||||
import type { AutofillBuildParameter } from "#/utils/richParameters";
|
||||
@@ -75,31 +73,24 @@ const WorkspaceParametersPageExperimental: FC = () => {
|
||||
}
|
||||
});
|
||||
|
||||
// Sends the user's build parameter values to the WebSocket so the
|
||||
// backend evaluates dynamic expressions against real values instead
|
||||
// of template defaults. Bails when the REST build parameters
|
||||
// haven't loaded yet; the retrigger effect below covers that case.
|
||||
const sendInitialParameters = useEffectEvent(
|
||||
(parameters: PreviewParameter[]) => {
|
||||
if (initialParamsSentRef.current) return;
|
||||
if (parameters.length === 0) return;
|
||||
if (!latestBuildParameters) return;
|
||||
// On page load, sends initial workspace build parameters to the websocket.
|
||||
// This ensures the backend has the form's complete initial state,
|
||||
// vital for rendering dynamic UI elements dependent on initial parameter values.
|
||||
const sendInitialParameters = useEffectEvent(() => {
|
||||
if (initialParamsSentRef.current) return;
|
||||
if (autofillParameters.length === 0) return;
|
||||
|
||||
const inputs: Record<string, string> = {};
|
||||
for (const p of getInitialParameterValues(
|
||||
parameters,
|
||||
autofillParameters,
|
||||
)) {
|
||||
if (p.name && p.value) {
|
||||
inputs[p.name] = p.value;
|
||||
}
|
||||
const initialParamsToSend: Record<string, string> = {};
|
||||
for (const param of autofillParameters) {
|
||||
if (param.name && param.value) {
|
||||
initialParamsToSend[param.name] = param.value;
|
||||
}
|
||||
if (Object.keys(inputs).length === 0) return;
|
||||
}
|
||||
if (Object.keys(initialParamsToSend).length === 0) return;
|
||||
|
||||
sendMessage(inputs);
|
||||
initialParamsSentRef.current = true;
|
||||
},
|
||||
);
|
||||
sendMessage(initialParamsToSend);
|
||||
initialParamsSentRef.current = true;
|
||||
});
|
||||
|
||||
const onMessage = useEffectEvent((response: DynamicParametersResponse) => {
|
||||
if (latestResponse && latestResponse?.id >= response.id) {
|
||||
@@ -113,27 +104,13 @@ const WorkspaceParametersPageExperimental: FC = () => {
|
||||
return;
|
||||
}
|
||||
|
||||
// Send initial params before storing the response so the
|
||||
// stale-response guard above filters the defaults on the
|
||||
// next WS message.
|
||||
if (!initialParamsSentRef.current && response.parameters?.length > 0) {
|
||||
sendInitialParameters([...response.parameters]);
|
||||
}
|
||||
|
||||
setLatestResponse(response);
|
||||
|
||||
if (!initialParamsSentRef.current && response.parameters?.length > 0) {
|
||||
sendInitialParameters();
|
||||
}
|
||||
});
|
||||
|
||||
// When the WS first message arrives before the REST build
|
||||
// parameters have loaded, sendInitialParameters bails. This
|
||||
// effect retriggers the send once both sources are available.
|
||||
useEffect(() => {
|
||||
if (initialParamsSentRef.current) return;
|
||||
if (!latestResponse?.parameters?.length) return;
|
||||
if (!latestBuildParameters) return;
|
||||
|
||||
sendInitialParameters([...latestResponse.parameters]);
|
||||
}, [latestResponse, latestBuildParameters, sendInitialParameters]);
|
||||
|
||||
useEffect(() => {
|
||||
if (!templateVersionId && !workspace.latest_build.template_version_id)
|
||||
return;
|
||||
@@ -251,20 +228,10 @@ const WorkspaceParametersPageExperimental: FC = () => {
|
||||
const error =
|
||||
wsError || startWithParameters.error || restartWithParameters.error;
|
||||
|
||||
// The initial WS response (id: -1) contains only template defaults.
|
||||
// Keep the loader visible until a response reflecting the user's
|
||||
// actual build parameter values arrives (id >= 0).
|
||||
const awaitingUserValues =
|
||||
latestResponse !== null &&
|
||||
latestResponse.id < 0 &&
|
||||
initialParamsSentRef.current &&
|
||||
!wsError;
|
||||
|
||||
if (
|
||||
latestBuildParametersLoading ||
|
||||
(!latestResponse && !wsError) ||
|
||||
(ws.current && ws.current.readyState === WebSocket.CONNECTING) ||
|
||||
awaitingUserValues
|
||||
(ws.current && ws.current.readyState === WebSocket.CONNECTING)
|
||||
) {
|
||||
return <Loader />;
|
||||
}
|
||||
|
||||
+50
-42
@@ -12,6 +12,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
"github.com/tailscale/wireguard-go/tun"
|
||||
@@ -463,6 +465,8 @@ type Conn struct {
|
||||
|
||||
trafficStats *connstats.Statistics
|
||||
lastNetInfo *tailcfg.NetInfo
|
||||
|
||||
awaitReachableGroup singleflight.Group
|
||||
}
|
||||
|
||||
func (c *Conn) GetNetInfo() *tailcfg.NetInfo {
|
||||
@@ -599,56 +603,60 @@ func (c *Conn) DERPMap() *tailcfg.DERPMap {
|
||||
// address is reachable. It's the callers responsibility to provide
|
||||
// a timeout, otherwise this function will block forever.
|
||||
func (c *Conn) AwaitReachable(ctx context.Context, ip netip.Addr) bool {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel() // Cancel all pending pings on exit.
|
||||
result, _, _ := c.awaitReachableGroup.Do(ip.String(), func() (interface{}, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel() // Cancel all pending pings on exit.
|
||||
|
||||
completedCtx, completed := context.WithCancel(context.Background())
|
||||
defer completed()
|
||||
completedCtx, completed := context.WithCancel(context.Background())
|
||||
defer completed()
|
||||
|
||||
run := func() {
|
||||
// Safety timeout, initially we'll have around 10-20 goroutines
|
||||
// running in parallel. The exponential backoff will converge
|
||||
// around ~1 ping / 30s, this means we'll have around 10-20
|
||||
// goroutines pending towards the end as well.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
run := func() {
|
||||
// Safety timeout, initially we'll have around 10-20 goroutines
|
||||
// running in parallel. The exponential backoff will converge
|
||||
// around ~1 ping / 30s, this means we'll have around 10-20
|
||||
// goroutines pending towards the end as well.
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
// For reachability, we use TSMP ping, which pings at the IP layer, and
|
||||
// therefore requires that wireguard and the netstack are up. If we
|
||||
// don't wait for wireguard to be up, we could miss a handshake, and it
|
||||
// might take 5 seconds for the handshake to be retried. A 5s initial
|
||||
// round trip can set us up for poor TCP performance, since the initial
|
||||
// round-trip-time sets the initial retransmit timeout.
|
||||
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
|
||||
if err == nil {
|
||||
completed()
|
||||
// For reachability, we use TSMP ping, which pings at the IP layer,
|
||||
// and therefore requires that wireguard and the netstack are up.
|
||||
// If we don't wait for wireguard to be up, we could miss a
|
||||
// handshake, and it might take 5 seconds for the handshake to be
|
||||
// retried. A 5s initial round trip can set us up for poor TCP
|
||||
// performance, since the initial round-trip-time sets the initial
|
||||
// retransmit timeout.
|
||||
_, _, _, err := c.pingWithType(ctx, ip, tailcfg.PingTSMP)
|
||||
if err == nil {
|
||||
completed()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eb := backoff.NewExponentialBackOff()
|
||||
eb.MaxElapsedTime = 0
|
||||
eb.InitialInterval = 50 * time.Millisecond
|
||||
eb.MaxInterval = 30 * time.Second
|
||||
// Consume the first interval since
|
||||
// we'll fire off a ping immediately.
|
||||
_ = eb.NextBackOff()
|
||||
eb := backoff.NewExponentialBackOff()
|
||||
eb.MaxElapsedTime = 0
|
||||
eb.InitialInterval = 50 * time.Millisecond
|
||||
eb.MaxInterval = 5 * time.Second
|
||||
// Consume the first interval since
|
||||
// we'll fire off a ping immediately.
|
||||
_ = eb.NextBackOff()
|
||||
|
||||
t := backoff.NewTicker(eb)
|
||||
defer t.Stop()
|
||||
t := backoff.NewTicker(eb)
|
||||
defer t.Stop()
|
||||
|
||||
go run()
|
||||
for {
|
||||
select {
|
||||
case <-completedCtx.Done():
|
||||
return true
|
||||
case <-t.C:
|
||||
// Pings can take a while, so we can run multiple
|
||||
// in parallel to return ASAP.
|
||||
go run()
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
go run()
|
||||
for {
|
||||
select {
|
||||
case <-completedCtx.Done():
|
||||
return true, nil
|
||||
case <-t.C:
|
||||
// Pings can take a while, so we can run multiple
|
||||
// in parallel to return ASAP.
|
||||
go run()
|
||||
case <-ctx.Done():
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
return result.(bool)
|
||||
}
|
||||
|
||||
// Closed is a channel that ends when the connection has
|
||||
|
||||
Reference in New Issue
Block a user