feat: improve coder connect tunnel handling on reconnect (#17598)
Closes https://github.com/coder/internal/issues/563 The [Coder Connect tunnel](https://github.com/coder/coder/blob/main/vpn/tunnel.go) receives workspace state from the Coder server over a [dRPC stream.](https://github.com/coder/coder/blob/114ba4593b2a82dfd41cdcb7fd6eb70d866e7b86/tailnet/controllers.go#L1029) When first connecting to this stream, the current state of the user's workspaces is received, with subsequent messages being diffs on top of that state. However, if the client disconnects from this stream, such as when the user's device is suspended, and then reconnects later, no mechanism exists for the tunnel to differentiate that message containing the entire initial state from another diff, and so that state is incorrectly applied as a diff. In practice: - Tunnel connects, receives a workspace update containing all the existing workspaces & agents. - Tunnel loses connection, but isn't completely stopped. - All the user's workspaces are restarted, producing a new set of agents. - Tunnel regains connection, and receives a workspace update containing all the existing workspaces & agents. - This initial update is incorrectly applied as a diff, with the Tunnel's state containing both the old & new agents. This PR introduces a solution in which tunnelUpdater, when created, sends a FreshState flag with the WorkspaceUpdate type. This flag is handled in the vpn tunnel in the following fashion: - Preserve existing Agents - Remove current Agents in the tunnel that are not present in the WorkspaceUpdate - Remove unreferenced Workspaces
This commit is contained in:
+30
-2
@@ -897,6 +897,21 @@ type Workspace struct {
|
||||
agents map[uuid.UUID]*Agent
|
||||
}
|
||||
|
||||
func (w *Workspace) Clone() Workspace {
|
||||
agents := make(map[uuid.UUID]*Agent, len(w.agents))
|
||||
for k, v := range w.agents {
|
||||
clone := v.Clone()
|
||||
agents[k] = &clone
|
||||
}
|
||||
return Workspace{
|
||||
ID: w.ID,
|
||||
Name: w.Name,
|
||||
Status: w.Status,
|
||||
ownerUsername: w.ownerUsername,
|
||||
agents: agents,
|
||||
}
|
||||
}
|
||||
|
||||
type DNSNameOptions struct {
|
||||
Suffix string
|
||||
}
|
||||
@@ -1049,6 +1064,7 @@ func (t *tunnelUpdater) recvLoop() {
|
||||
t.logger.Debug(context.Background(), "tunnel updater recvLoop started")
|
||||
defer t.logger.Debug(context.Background(), "tunnel updater recvLoop done")
|
||||
defer close(t.recvLoopDone)
|
||||
updateKind := Snapshot
|
||||
for {
|
||||
update, err := t.client.Recv()
|
||||
if err != nil {
|
||||
@@ -1061,8 +1077,10 @@ func (t *tunnelUpdater) recvLoop() {
|
||||
}
|
||||
t.logger.Debug(context.Background(), "got workspace update",
|
||||
slog.F("workspace_update", update),
|
||||
slog.F("update_kind", updateKind),
|
||||
)
|
||||
err = t.handleUpdate(update)
|
||||
err = t.handleUpdate(update, updateKind)
|
||||
updateKind = Diff
|
||||
if err != nil {
|
||||
t.logger.Critical(context.Background(), "failed to handle workspace Update", slog.Error(err))
|
||||
cErr := t.client.Close()
|
||||
@@ -1083,14 +1101,23 @@ type WorkspaceUpdate struct {
|
||||
UpsertedAgents []*Agent
|
||||
DeletedWorkspaces []*Workspace
|
||||
DeletedAgents []*Agent
|
||||
Kind UpdateKind
|
||||
}
|
||||
|
||||
type UpdateKind int
|
||||
|
||||
const (
|
||||
Diff UpdateKind = iota
|
||||
Snapshot
|
||||
)
|
||||
|
||||
func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
|
||||
clone := WorkspaceUpdate{
|
||||
UpsertedWorkspaces: make([]*Workspace, len(w.UpsertedWorkspaces)),
|
||||
UpsertedAgents: make([]*Agent, len(w.UpsertedAgents)),
|
||||
DeletedWorkspaces: make([]*Workspace, len(w.DeletedWorkspaces)),
|
||||
DeletedAgents: make([]*Agent, len(w.DeletedAgents)),
|
||||
Kind: w.Kind,
|
||||
}
|
||||
for i, ws := range w.UpsertedWorkspaces {
|
||||
clone.UpsertedWorkspaces[i] = &Workspace{
|
||||
@@ -1115,7 +1142,7 @@ func (w *WorkspaceUpdate) Clone() WorkspaceUpdate {
|
||||
return clone
|
||||
}
|
||||
|
||||
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
|
||||
func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate, updateKind UpdateKind) error {
|
||||
t.Lock()
|
||||
defer t.Unlock()
|
||||
|
||||
@@ -1124,6 +1151,7 @@ func (t *tunnelUpdater) handleUpdate(update *proto.WorkspaceUpdate) error {
|
||||
UpsertedAgents: []*Agent{},
|
||||
DeletedWorkspaces: []*Workspace{},
|
||||
DeletedAgents: []*Agent{},
|
||||
Kind: updateKind,
|
||||
}
|
||||
|
||||
for _, uw := range update.UpsertedWorkspaces {
|
||||
|
||||
@@ -1611,6 +1611,7 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
|
||||
},
|
||||
DeletedWorkspaces: []*tailnet.Workspace{},
|
||||
DeletedAgents: []*tailnet.Agent{},
|
||||
Kind: tailnet.Snapshot,
|
||||
}
|
||||
|
||||
// And the callback
|
||||
@@ -1626,6 +1627,9 @@ func TestTunnelAllWorkspaceUpdatesController_Initial(t *testing.T) {
|
||||
slices.SortFunc(recvState.UpsertedAgents, func(a, b *tailnet.Agent) int {
|
||||
return strings.Compare(a.Name, b.Name)
|
||||
})
|
||||
// tunnel is still open, so it's a diff
|
||||
currentState.Kind = tailnet.Diff
|
||||
|
||||
require.Equal(t, currentState, recvState)
|
||||
}
|
||||
|
||||
@@ -1692,14 +1696,17 @@ func TestTunnelAllWorkspaceUpdatesController_DeleteAgent(t *testing.T) {
|
||||
},
|
||||
DeletedWorkspaces: []*tailnet.Workspace{},
|
||||
DeletedAgents: []*tailnet.Agent{},
|
||||
Kind: tailnet.Snapshot,
|
||||
}
|
||||
|
||||
cbUpdate := testutil.TryReceive(ctx, t, fUH.ch)
|
||||
require.Equal(t, initRecvUp, cbUpdate)
|
||||
|
||||
// Current state should match initial
|
||||
state, err := updateCtrl.CurrentState()
|
||||
require.NoError(t, err)
|
||||
// tunnel is still open, so it's a diff
|
||||
initRecvUp.Kind = tailnet.Diff
|
||||
|
||||
require.Equal(t, initRecvUp, state)
|
||||
|
||||
// Send update that removes w1a1 and adds w1a2
|
||||
|
||||
Reference in New Issue
Block a user