|
|
|
|
@@ -16,6 +16,7 @@ import (
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
|
"github.com/sqlc-dev/pqtype"
|
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
|
"go.uber.org/mock/gomock"
|
|
|
|
|
"golang.org/x/xerrors"
|
|
|
|
|
@@ -32,6 +33,7 @@ import (
|
|
|
|
|
"github.com/coder/coder/v2/coderd/database/dbgen"
|
|
|
|
|
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
|
|
|
|
dbpubsub "github.com/coder/coder/v2/coderd/database/pubsub"
|
|
|
|
|
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
|
|
|
|
|
"github.com/coder/coder/v2/coderd/util/slice"
|
|
|
|
|
"github.com/coder/coder/v2/codersdk"
|
|
|
|
|
"github.com/coder/coder/v2/codersdk/workspacesdk"
|
|
|
|
|
@@ -2121,3 +2123,641 @@ func TestComputerUseSubagentToolsAndModel(t *testing.T) {
|
|
|
|
|
require.Equal(t, database.ChatModeComputerUse,
|
|
|
|
|
children[0].Mode.ChatMode)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// faultPubsub wraps a real Pubsub and captures SubscribeWithErr
|
|
|
|
|
// listener callbacks so tests can inject errors that simulate
|
|
|
|
|
// pubsub transport failures (e.g. dropped connections).
|
|
|
|
|
type faultPubsub struct {
|
|
|
|
|
dbpubsub.Pubsub
|
|
|
|
|
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
listeners []dbpubsub.ListenerWithErr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *faultPubsub) SubscribeWithErr(event string, listener dbpubsub.ListenerWithErr) (func(), error) {
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
f.listeners = append(f.listeners, listener)
|
|
|
|
|
f.mu.Unlock()
|
|
|
|
|
return f.Pubsub.SubscribeWithErr(event, listener)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// injectError calls all captured SubscribeWithErr listeners with
|
|
|
|
|
// the given error, simulating a pubsub transport failure.
|
|
|
|
|
func (f *faultPubsub) injectError(err error) {
|
|
|
|
|
f.mu.Lock()
|
|
|
|
|
defer f.mu.Unlock()
|
|
|
|
|
for _, l := range f.listeners {
|
|
|
|
|
l(context.Background(), nil, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribeRecoversAfterPubsubError(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
// Wrap the real pubsub so we can inject errors into the
|
|
|
|
|
// SubscribeWithErr listener callback.
|
|
|
|
|
faultPS := &faultPubsub{Pubsub: ps}
|
|
|
|
|
server := newTestServer(t, db, faultPS, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "pubsub-error-recovery",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Set the chat to "running" so InterruptChat can transition
|
|
|
|
|
// it to "waiting".
|
|
|
|
|
runningWorker := uuid.New()
|
|
|
|
|
chat, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
|
|
|
|
|
ID: chat.ID,
|
|
|
|
|
Status: database.ChatStatusRunning,
|
|
|
|
|
WorkerID: uuid.NullUUID{UUID: runningWorker, Valid: true},
|
|
|
|
|
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
|
|
|
HeartbeatAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Insert a message that the catch-up query should find.
|
|
|
|
|
newContent, marshalErr := json.Marshal([]codersdk.ChatMessagePart{
|
|
|
|
|
codersdk.ChatMessageText("message inserted before dropped-messages"),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, marshalErr)
|
|
|
|
|
newMsg, err := db.InsertChatMessage(ctx, database.InsertChatMessageParams{
|
|
|
|
|
ChatID: chat.ID,
|
|
|
|
|
Role: database.ChatMessageRoleAssistant,
|
|
|
|
|
Content: pqtype.NullRawMessage{RawMessage: newContent, Valid: true},
|
|
|
|
|
Visibility: database.ChatMessageVisibilityBoth,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Inject ErrDroppedMessages. The merge goroutine should do a
|
|
|
|
|
// DB catch-up and deliver the message we just inserted.
|
|
|
|
|
faultPS.injectError(dbpubsub.ErrDroppedMessages)
|
|
|
|
|
|
|
|
|
|
// The catch-up should deliver the new message.
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeMessage && event.Message != nil {
|
|
|
|
|
return event.Message.ID == newMsg.ID
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
|
|
|
|
|
// Verify pubsub is still active: trigger a real status
|
|
|
|
|
// change via InterruptChat (publishes via pubsub).
|
|
|
|
|
updated := server.InterruptChat(ctx, chat)
|
|
|
|
|
require.Equal(t, database.ChatStatusWaiting, updated.Status)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event, ok := <-events:
|
|
|
|
|
if !ok {
|
|
|
|
|
t.Log("events channel closed unexpectedly")
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
|
|
|
|
|
return event.Status.Status == codersdk.ChatStatusWaiting
|
|
|
|
|
}
|
|
|
|
|
t.Logf("skipping event: type=%s", event.Type)
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubUnmarshalErrorContinues(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
faultPS := &faultPubsub{Pubsub: ps}
|
|
|
|
|
server := newTestServer(t, db, faultPS, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "unmarshal-error-continues",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Set the chat to "running" so InterruptChat can transition
|
|
|
|
|
// it to "waiting".
|
|
|
|
|
workerID := uuid.New()
|
|
|
|
|
chat, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
|
|
|
|
|
ID: chat.ID,
|
|
|
|
|
Status: database.ChatStatusRunning,
|
|
|
|
|
WorkerID: uuid.NullUUID{UUID: workerID, Valid: true},
|
|
|
|
|
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
|
|
|
HeartbeatAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Inject an unmarshal-style error. The merge goroutine
|
|
|
|
|
// should log it and continue, not degrade.
|
|
|
|
|
faultPS.injectError(xerrors.New("unmarshal chat stream notify: invalid character"))
|
|
|
|
|
|
|
|
|
|
// Verify pubsub still works: trigger a real status change
|
|
|
|
|
// via InterruptChat which publishes via pubsub.
|
|
|
|
|
updated := server.InterruptChat(ctx, chat)
|
|
|
|
|
require.Equal(t, database.ChatStatusWaiting, updated.Status)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event, ok := <-events:
|
|
|
|
|
if !ok {
|
|
|
|
|
t.Log("events channel closed unexpectedly")
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
|
|
|
|
|
return event.Status.Status == codersdk.ChatStatusWaiting
|
|
|
|
|
}
|
|
|
|
|
t.Logf("skipping event: type=%s", event.Type)
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubNotificationStatus(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "pubsub-notify-status",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Trigger a status change via InterruptChat, which publishes
|
|
|
|
|
// both locally and via pubsub. The subscriber should receive
|
|
|
|
|
// the status event via the notifications handler.
|
|
|
|
|
workerID := uuid.New()
|
|
|
|
|
chat, err = db.UpdateChatStatus(ctx, database.UpdateChatStatusParams{
|
|
|
|
|
ID: chat.ID,
|
|
|
|
|
Status: database.ChatStatusRunning,
|
|
|
|
|
WorkerID: uuid.NullUUID{UUID: workerID, Valid: true},
|
|
|
|
|
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
|
|
|
HeartbeatAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
updated := server.InterruptChat(ctx, chat)
|
|
|
|
|
require.Equal(t, database.ChatStatusWaiting, updated.Status)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
|
|
|
|
|
return event.Status.Status == codersdk.ChatStatusWaiting
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubNotificationError(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "pubsub-notify-error",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Publish an error notification via pubsub. The notification
|
|
|
|
|
// handler should forward it as a ChatStreamEventTypeError.
|
|
|
|
|
errMsg := coderdpubsub.ChatStreamNotifyMessage{
|
|
|
|
|
Error: "something went wrong on remote replica",
|
|
|
|
|
}
|
|
|
|
|
payload, marshalErr := json.Marshal(errMsg)
|
|
|
|
|
require.NoError(t, marshalErr)
|
|
|
|
|
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeError && event.Error != nil {
|
|
|
|
|
return event.Error.Message == "something went wrong on remote replica"
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubNotificationQueueUpdate(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "pubsub-notify-queue",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Publish a queue_update notification via pubsub.
|
|
|
|
|
notify := coderdpubsub.ChatStreamNotifyMessage{
|
|
|
|
|
QueueUpdate: true,
|
|
|
|
|
}
|
|
|
|
|
payload, marshalErr := json.Marshal(notify)
|
|
|
|
|
require.NoError(t, marshalErr)
|
|
|
|
|
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
return event.Type == codersdk.ChatStreamEventTypeQueueUpdate
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubNotificationMessage(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "pubsub-notify-message",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Get the current last message ID from the initial message.
|
|
|
|
|
messages, err := db.GetChatMessagesByChatID(ctx, database.GetChatMessagesByChatIDParams{
|
|
|
|
|
ChatID: chat.ID,
|
|
|
|
|
AfterID: 0,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
require.NotEmpty(t, messages)
|
|
|
|
|
lastMsgID := messages[len(messages)-1].ID
|
|
|
|
|
|
|
|
|
|
// Subscribe after the initial message so the snapshot
|
|
|
|
|
// does not include it.
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, lastMsgID)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Insert a new message into the database.
|
|
|
|
|
newContent, marshalErr := json.Marshal([]codersdk.ChatMessagePart{
|
|
|
|
|
codersdk.ChatMessageText("new message from another replica"),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, marshalErr)
|
|
|
|
|
newMsg, err := db.InsertChatMessage(ctx, database.InsertChatMessageParams{
|
|
|
|
|
ChatID: chat.ID,
|
|
|
|
|
Role: database.ChatMessageRoleAssistant,
|
|
|
|
|
Content: pqtype.NullRawMessage{RawMessage: newContent, Valid: true},
|
|
|
|
|
Visibility: database.ChatMessageVisibilityBoth,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Publish an AfterMessageID notification via pubsub so the
|
|
|
|
|
// subscriber fetches the new message from DB.
|
|
|
|
|
notify := coderdpubsub.ChatStreamNotifyMessage{
|
|
|
|
|
AfterMessageID: newMsg.ID,
|
|
|
|
|
}
|
|
|
|
|
payload, notifyMarshalErr := json.Marshal(notify)
|
|
|
|
|
require.NoError(t, notifyMarshalErr)
|
|
|
|
|
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeMessage && event.Message != nil {
|
|
|
|
|
return event.Message.ID == newMsg.ID
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribeHasPubsubFiltersLocalEvents(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "pubsub-filter-local",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Publish a local status event. With pubsub active
|
|
|
|
|
// (hasPubsub=true), the local events case only forwards
|
|
|
|
|
// message_part events. Status events from the local channel
|
|
|
|
|
// should be filtered out (they arrive via pubsub instead).
|
|
|
|
|
err = server.RefreshStatus(ctx, chat.ID)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// The status event should arrive via the pubsub notification
|
|
|
|
|
// handler, not the local events path. We verify by checking
|
|
|
|
|
// that a status event arrives (proving the pubsub path works).
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribeNoPubsubForwardsAllLocalEvents(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, _ := dbtestutil.NewDB(t)
|
|
|
|
|
// No pubsub: pass nil so hasPubsub=false from the start.
|
|
|
|
|
server := newTestServer(t, db, nil, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "no-pubsub-all-events",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Publish a local status event. Without pubsub, all event
|
|
|
|
|
// types should be forwarded from the local channel.
|
|
|
|
|
err = server.RefreshStatus(ctx, chat.ID)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeStatus && event.Status != nil {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribeNilServer(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
var srv *chatd.Server
|
|
|
|
|
snapshot, events, cancel, ok := srv.Subscribe(
|
|
|
|
|
context.Background(), uuid.New(), nil, 0,
|
|
|
|
|
)
|
|
|
|
|
require.False(t, ok)
|
|
|
|
|
require.Nil(t, snapshot)
|
|
|
|
|
require.Nil(t, events)
|
|
|
|
|
require.Nil(t, cancel)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// failSubscribePubsub wraps a real Pubsub but always fails on
|
|
|
|
|
// SubscribeWithErr, simulating a pubsub that cannot be subscribed to
|
|
|
|
|
// (e.g. due to permission or connection issues at subscribe time).
|
|
|
|
|
type failSubscribePubsub struct {
|
|
|
|
|
dbpubsub.Pubsub
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (*failSubscribePubsub) SubscribeWithErr(_ string, _ dbpubsub.ListenerWithErr) (func(), error) {
|
|
|
|
|
return nil, xerrors.New("subscribe failed")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubSubscribeFailure(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
failPS := &failSubscribePubsub{Pubsub: ps}
|
|
|
|
|
server := newTestServer(t, db, failPS, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "subscribe-failure",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Subscribe still succeeds but in local-only mode since
|
|
|
|
|
// SubscribeWithErr failed.
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Local events should still work.
|
|
|
|
|
err = server.RefreshStatus(ctx, chat.ID)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
return event.Type == codersdk.ChatStreamEventTypeStatus
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubMalformedNotification(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "malformed-notification",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Publish malformed JSON to the pubsub channel. The listener's
|
|
|
|
|
// unmarshal will fail, sending an error to errCh. The merge
|
|
|
|
|
// goroutine logs it and continues (no degradation).
|
|
|
|
|
err = ps.Publish(
|
|
|
|
|
coderdpubsub.ChatStreamNotifyChannel(chat.ID),
|
|
|
|
|
[]byte("not valid json{{{"),
|
|
|
|
|
)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Insert a new message and publish a valid notification.
|
|
|
|
|
// This proves pubsub is still active after the malformed
|
|
|
|
|
// message.
|
|
|
|
|
newContent, marshalErr := json.Marshal([]codersdk.ChatMessagePart{
|
|
|
|
|
codersdk.ChatMessageText("valid message after malformed"),
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, marshalErr)
|
|
|
|
|
newMsg, err := db.InsertChatMessage(ctx, database.InsertChatMessageParams{
|
|
|
|
|
ChatID: chat.ID,
|
|
|
|
|
Role: database.ChatMessageRoleAssistant,
|
|
|
|
|
Content: pqtype.NullRawMessage{RawMessage: newContent, Valid: true},
|
|
|
|
|
Visibility: database.ChatMessageVisibilityBoth,
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
notify := coderdpubsub.ChatStreamNotifyMessage{
|
|
|
|
|
AfterMessageID: newMsg.ID,
|
|
|
|
|
}
|
|
|
|
|
payload, notifyMarshalErr := json.Marshal(notify)
|
|
|
|
|
require.NoError(t, notifyMarshalErr)
|
|
|
|
|
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// The valid notification should deliver the new message,
|
|
|
|
|
// proving pubsub was not degraded by the malformed one.
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
if event.Type == codersdk.ChatStreamEventTypeMessage && event.Message != nil {
|
|
|
|
|
return event.Message.ID == newMsg.ID
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestSubscribePubsubFullRefreshNotification(t *testing.T) {
|
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
db, ps := dbtestutil.NewDB(t)
|
|
|
|
|
server := newTestServer(t, db, ps, uuid.New())
|
|
|
|
|
|
|
|
|
|
ctx := testutil.Context(t, testutil.WaitLong)
|
|
|
|
|
user, model := seedChatDependencies(ctx, t, db)
|
|
|
|
|
|
|
|
|
|
chat, err := server.CreateChat(ctx, chatd.CreateOptions{
|
|
|
|
|
OwnerID: user.ID,
|
|
|
|
|
Title: "full-refresh",
|
|
|
|
|
ModelConfigID: model.ID,
|
|
|
|
|
InitialUserContent: []codersdk.ChatMessagePart{codersdk.ChatMessageText("hello")},
|
|
|
|
|
})
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// Subscribe with afterMessageID=0 to get the initial message
|
|
|
|
|
// in the snapshot, then verify FullRefresh re-fetches it.
|
|
|
|
|
_, events, cancel, ok := server.Subscribe(ctx, chat.ID, nil, 0)
|
|
|
|
|
require.True(t, ok)
|
|
|
|
|
t.Cleanup(cancel)
|
|
|
|
|
|
|
|
|
|
// Publish a FullRefresh notification. This sets afterID=0
|
|
|
|
|
// in the notification handler, causing it to re-fetch all
|
|
|
|
|
// messages from the beginning.
|
|
|
|
|
notify := coderdpubsub.ChatStreamNotifyMessage{
|
|
|
|
|
FullRefresh: true,
|
|
|
|
|
}
|
|
|
|
|
payload, marshalErr := json.Marshal(notify)
|
|
|
|
|
require.NoError(t, marshalErr)
|
|
|
|
|
err = ps.Publish(coderdpubsub.ChatStreamNotifyChannel(chat.ID), payload)
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
// The initial user message should be re-delivered.
|
|
|
|
|
require.Eventually(t, func() bool {
|
|
|
|
|
select {
|
|
|
|
|
case event := <-events:
|
|
|
|
|
return event.Type == codersdk.ChatStreamEventTypeMessage
|
|
|
|
|
default:
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}, testutil.WaitMedium, testutil.IntervalFast)
|
|
|
|
|
}
|
|
|
|
|
|