Compare commits

...

3 Commits

Author SHA1 Message Date
Mathias Fredriksson
4f1c8f558b fix(coderd/chatd): handle pubsub errors without permanent degradation
Previously the errCh handler permanently disabled pubsub on any error.
Both error sources are recoverable since PGPubsub handles reconnection.

ErrDroppedMessages now triggers a DB catch-up using lastMessageID.
Unmarshal errors are logged at Warn and skipped. The subscription
stays active in both cases.

Tests verify: DB catch-up on ErrDroppedMessages, pubsub active after
unmarshal errors, malformed JSON does not break the stream.
2026-03-16 15:19:51 +00:00
Mathias Fredriksson
c850b764c6 fix(chatd): degrade to local-only mode on pubsub error
The Subscribe merge goroutine permanently terminated on any pubsub
error, killing the client's event stream with no recovery. Now it
degrades to local-only mode by nil-ing the pubsub channels and
continuing the event loop. Local events (including message_parts
from the same replica) continue flowing.

Add tests for pubsub error recovery, notification handling, and
hasPubsub event filtering.
2026-03-16 15:13:35 +00:00
Mathias Fredriksson
96698bc88c test(chatd): assert stream recovers after pubsub error 2026-03-16 15:13:35 +00:00
2 changed files with 680 additions and 14 deletions

View File

@@ -1408,21 +1408,47 @@ func (p *Server) Subscribe(
case <-mergedCtx.Done():
return
case psErr := <-errCh:
p.logger.Error(mergedCtx, "chat stream pubsub error",
slog.F("chat_id", chatID),
slog.Error(psErr),
)
select {
case mergedEvents <- codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeError,
ChatID: chatID,
Error: &codersdk.ChatStreamError{
Message: psErr.Error(),
},
}:
case <-mergedCtx.Done():
if xerrors.Is(psErr, pubsub.ErrDroppedMessages) {
// PGPubsub reconnected and some notifications
// were lost. Do a DB catch-up to recover any
// missed messages, the subscription is still
// active.
p.logger.Warn(mergedCtx, "chat stream pubsub dropped messages, catching up",
slog.F("chat_id", chatID),
)
newMessages, msgErr := p.db.GetChatMessagesByChatID(mergedCtx, database.GetChatMessagesByChatIDParams{
ChatID: chatID,
AfterID: lastMessageID,
})
if msgErr != nil {
p.logger.Warn(mergedCtx, "failed to catch up chat messages after dropped pubsub messages",
slog.F("chat_id", chatID),
slog.Error(msgErr),
)
} else {
for _, msg := range newMessages {
sdkMsg := db2sdk.ChatMessage(msg)
select {
case <-mergedCtx.Done():
return
case mergedEvents <- codersdk.ChatStreamEvent{
Type: codersdk.ChatStreamEventTypeMessage,
ChatID: chatID,
Message: &sdkMsg,
}:
}
lastMessageID = msg.ID
}
}
} else {
// Unmarshal errors or other unexpected errors:
// log and continue, the subscription is still
// active.
p.logger.Warn(mergedCtx, "chat stream pubsub error",
slog.F("chat_id", chatID),
slog.Error(psErr),
)
}
return
case notify := <-notifications:
if notify.AfterMessageID > 0 || notify.FullRefresh {
afterID := lastMessageID

View File

@@ -16,6 +16,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/sqlc-dev/pqtype"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/xerrors"
@@ -32,6 +33,7 @@ import (
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
dbpubsub "github.com/coder/coder/v2/coderd/database/pubsub"
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
@@ -2121,3 +2123,641 @@ func TestComputerUseSubagentToolsAndModel(t *testing.T) {
require.Equal(t, database.ChatModeComputerUse,
children[0].Mode.ChatMode)
}
// faultPubsub wraps a real Pubsub and captures SubscribeWithErr
// listener callbacks so tests can inject errors that simulate
// pubsub transport failures (e.g. dropped connections).
type faultPubsub struct {
dbpubsub.Pubsub
mu sync.Mutex
listeners []dbpubsub.ListenerWithErr
}
func (f *faultPubsub) SubscribeWithErr(event string, listener dbpubsub.ListenerWithErr) (func(), error) {
f.mu.Lock()
f.listeners = append(f.listeners, listener)
f.mu.Unlock()
return f.Pubsub.SubscribeWithErr(event, listener)
}
// injectError calls all captured SubscribeWithErr listeners with
// the given error, simulating a pubsub transport failure.
func (f *faultPubsub) injectError(err error) {
f.mu.Lock()
defer f.mu.Unlock()
for _, l := range f.listeners {
l(context.Background(), nil, err)
}
}
func TestSubscribeRecoversAfterPubsubError(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
// Wrap the real pubsub so we can inject errors into the
// SubscribeWithErr listener callback.
faultPS := &faultPubsub{Pubsub: ps}
server := newTestServer(t, db, faultPS, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "pubsub-error-recovery",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// Set the chat to "running" so InterruptChat can transition
// it to "waiting".
runningWorker := uuid.New()
chat, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
ID: chat.ID,
Status: database.ChatStatusRunning,
WorkerID: uuid.NullUUID{UUID: runningWorker, Valid: true},
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
HeartbeatAt: sql.NullTime{Time: time.Now(), Valid: true},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Insert a message that the catch-up query should find.
newContent, marshalErr := json.Marshal([]codersdk.ChatMessagePart{
codersdk.ChatMessageText("message inserted before dropped-messages"),
})
require.NoError(t, marshalErr)
newMsg, err := db.InsertChatMessage(ctx, database.InsertChatMessageParams{
ChatID: chat.ID,
Role: database.ChatMessageRoleAssistant,
Content: pqtype.NullRawMessage{RawMessage: newContent, Valid: true},
Visibility: database.ChatMessageVisibilityBoth,
})
require.NoError(t, err)
// Inject ErrDroppedMessages. The merge goroutine should do a
// DB catch-up and deliver the message we just inserted.
faultPS.injectError(dbpubsub.ErrDroppedMessages)
// The catch-up should deliver the new message.
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeMessage && event.Message != nil {
return event.Message.ID == newMsg.ID
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
// Verify pubsub is still active: trigger a real status
// change via InterruptChat (publishes via pubsub).
updated := server.InterruptChat(ctx, chat)
require.Equal(t, database.ChatStatusWaiting, updated.Status)
require.Eventually(t, func() bool {
select {
case event, ok := <-events:
if !ok {
t.Log("events channel closed unexpectedly")
return false
}
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
return event.Status.Status == codersdk.ChatStatusWaiting
}
t.Logf("skipping event: type=%s", event.Type)
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubUnmarshalErrorContinues(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
faultPS := &faultPubsub{Pubsub: ps}
server := newTestServer(t, db, faultPS, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "unmarshal-error-continues",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// Set the chat to "running" so InterruptChat can transition
// it to "waiting".
workerID := uuid.New()
chat, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
ID: chat.ID,
Status: database.ChatStatusRunning,
WorkerID: uuid.NullUUID{UUID: workerID, Valid: true},
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
HeartbeatAt: sql.NullTime{Time: time.Now(), Valid: true},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Inject an unmarshal-style error. The merge goroutine
// should log it and continue, not degrade.
faultPS.injectError(xerrors.New("unmarshal chat stream notify: invalid character"))
// Verify pubsub still works: trigger a real status change
// via InterruptChat which publishes via pubsub.
updated := server.InterruptChat(ctx, chat)
require.Equal(t, database.ChatStatusWaiting, updated.Status)
require.Eventually(t, func() bool {
select {
case event, ok := <-events:
if !ok {
t.Log("events channel closed unexpectedly")
return false
}
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
return event.Status.Status == codersdk.ChatStatusWaiting
}
t.Logf("skipping event: type=%s", event.Type)
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubNotificationStatus(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "pubsub-notify-status",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Trigger a status change via InterruptChat, which publishes
// both locally and via pubsub. The subscriber should receive
// the status event via the notifications handler.
workerID := uuid.New()
chat, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
ID: chat.ID,
Status: database.ChatStatusRunning,
WorkerID: uuid.NullUUID{UUID: workerID, Valid: true},
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
HeartbeatAt: sql.NullTime{Time: time.Now(), Valid: true},
})
require.NoError(t, err)
updated := server.InterruptChat(ctx, chat)
require.Equal(t, database.ChatStatusWaiting, updated.Status)
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
return event.Status.Status == codersdk.ChatStatusWaiting
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubNotificationError(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "pubsub-notify-error",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Publish an error notification via pubsub. The notification
// handler should forward it as a ChatStreamEventTypeError.
errMsg := coderdpubsub.ChatStreamNotifyMessage{
Error: "something went wrong on remote replica",
}
payload, marshalErr := json.Marshal(errMsg)
require.NoError(t, marshalErr)
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeError && event.Error != nil {
return event.Error.Message == "something went wrong on remote replica"
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubNotificationQueueUpdate(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "pubsub-notify-queue",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Publish a queue_update notification via pubsub.
notify := coderdpubsub.ChatStreamNotifyMessage{
QueueUpdate: true,
}
payload, marshalErr := json.Marshal(notify)
require.NoError(t, marshalErr)
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case event := <-events:
return event.Type == codersdk.ChatStreamEventTypeQueueUpdate
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubNotificationMessage(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "pubsub-notify-message",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// Get the current last message ID from the initial message.
messages, err := db.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{
ChatID: chat.ID,
AfterID: 0,
})
require.NoError(t, err)
require.NotEmpty(t, messages)
lastMsgID := messages[len(messages)-1].ID
// Subscribe after the initial message so the snapshot
// does not include it.
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, lastMsgID)
require.True(t, ok)
t.Cleanup(cancel)
// Insert a new message into the database.
newContent, marshalErr := json.Marshal([]codersdk.ChatMessagePart{
codersdk.ChatMessageText("new message from another replica"),
})
require.NoError(t, marshalErr)
newMsg, err := db.InsertChatMessage(ctx, database.InsertChatMessageParams{
ChatID: chat.ID,
Role: database.ChatMessageRoleAssistant,
Content: pqtype.NullRawMessage{RawMessage: newContent, Valid: true},
Visibility: database.ChatMessageVisibilityBoth,
})
require.NoError(t, err)
// Publish an AfterMessageID notification via pubsub so the
// subscriber fetches the new message from DB.
notify := coderdpubsub.ChatStreamNotifyMessage{
AfterMessageID: newMsg.ID,
}
payload, notifyMarshalErr := json.Marshal(notify)
require.NoError(t, notifyMarshalErr)
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeMessage && event.Message != nil {
return event.Message.ID == newMsg.ID
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribeHasPubsubFiltersLocalEvents(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "pubsub-filter-local",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Publish a local status event. With pubsub active
// (hasPubsub=true), the local events case only forwards
// message_part events. Status events from the local channel
// should be filtered out (they arrive via pubsub instead).
err = server.RefreshStatus(ctx, chat.ID)
require.NoError(t, err)
// The status event should arrive via the pubsub notification
// handler, not the local events path. We verify by checking
// that a status event arrives (proving the pubsub path works).
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
return true
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribeNoPubsubForwardsAllLocalEvents(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
// No pubsub: pass nil so hasPubsub=false from the start.
server := newTestServer(t, db, nil, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "no-pubsub-all-events",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Publish a local status event. Without pubsub, all event
// types should be forwarded from the local channel.
err = server.RefreshStatus(ctx, chat.ID)
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
return true
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribeNilServer(t *testing.T) {
t.Parallel()
var srv *chatd.Server
snapshot, events, cancel, ok := srv.Subscribe(
context.Background(), uuid.New(), nil, 0,
)
require.False(t, ok)
require.Nil(t, snapshot)
require.Nil(t, events)
require.Nil(t, cancel)
}
// failSubscribePubsub wraps a real Pubsub but always fails on
// SubscribeWithErr, simulating a pubsub that cannot be subscribed to
// (e.g. due to permission or connection issues at subscribe time).
type failSubscribePubsub struct {
dbpubsub.Pubsub
}
func (*failSubscribePubsub) SubscribeWithErr(_ string, _ dbpubsub.ListenerWithErr) (func(), error) {
return nil, xerrors.New("subscribe failed")
}
func TestSubscribePubsubSubscribeFailure(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
failPS := &failSubscribePubsub{Pubsub: ps}
server := newTestServer(t, db, failPS, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "subscribe-failure",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// Subscribe still succeeds but in local-only mode since
// SubscribeWithErr failed.
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Local events should still work.
err = server.RefreshStatus(ctx, chat.ID)
require.NoError(t, err)
require.Eventually(t, func() bool {
select {
case event := <-events:
return event.Type == codersdk.ChatStreamEventTypeStatus
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubMalformedNotification(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "malformed-notification",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Publish malformed JSON to the pubsub channel. The listener's
// unmarshal will fail, sending an error to errCh. The merge
// goroutine logs it and continues (no degradation).
err = ps.Publish(
coderdpubsub.ChatStreamNotifyChannel(chat.ID),
[]byte("not valid json{{{"),
)
require.NoError(t, err)
// Insert a new message and publish a valid notification.
// This proves pubsub is still active after the malformed
// message.
newContent, marshalErr := json.Marshal([]codersdk.ChatMessagePart{
codersdk.ChatMessageText("valid message after malformed"),
})
require.NoError(t, marshalErr)
newMsg, err := db.InsertChatMessage(ctx, database.InsertChatMessageParams{
ChatID: chat.ID,
Role: database.ChatMessageRoleAssistant,
Content: pqtype.NullRawMessage{RawMessage: newContent, Valid: true},
Visibility: database.ChatMessageVisibilityBoth,
})
require.NoError(t, err)
notify := coderdpubsub.ChatStreamNotifyMessage{
AfterMessageID: newMsg.ID,
}
payload, notifyMarshalErr := json.Marshal(notify)
require.NoError(t, notifyMarshalErr)
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
require.NoError(t, err)
// The valid notification should deliver the new message,
// proving pubsub was not degraded by the malformed one.
require.Eventually(t, func() bool {
select {
case event := <-events:
if event.Type == codersdk.ChatStreamEventTypeMessage && event.Message != nil {
return event.Message.ID == newMsg.ID
}
return false
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}
func TestSubscribePubsubFullRefreshNotification(t *testing.T) {
t.Parallel()
db, ps := dbtestutil.NewDB(t)
server := newTestServer(t, db, ps, uuid.New())
ctx := testutil.Context(t, testutil.WaitLong)
user, model := seedChatDependencies(ctx, t, db)
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
OwnerID: user.ID,
Title: "full-refresh",
ModelConfigID: model.ID,
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
})
require.NoError(t, err)
// Subscribe with afterMessageID=0 to get the initial message
// in the snapshot, then verify FullRefresh re-fetches it.
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
require.True(t, ok)
t.Cleanup(cancel)
// Publish a FullRefresh notification. This sets afterID=0
// in the notification handler, causing it to re-fetch all
// messages from the beginning.
notify := coderdpubsub.ChatStreamNotifyMessage{
FullRefresh: true,
}
payload, marshalErr := json.Marshal(notify)
require.NoError(t, marshalErr)
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
require.NoError(t, err)
// The initial user message should be re-delivered.
require.Eventually(t, func() bool {
select {
case event := <-events:
return event.Type == codersdk.ChatStreamEventTypeMessage
default:
return false
}
}, testutil.WaitMedium, testutil.IntervalFast)
}