Compare commits

...

1 Commits

Author SHA1 Message Date
Danielle Maywood 3e714f0510 fix(site/AgentsPage): gate active chat rendering on WebSocket snapshot delivery
When navigating to an agent chat with a running tool execution, the REST
API loads durable messages quickly but the streaming tool output only
arrives after the WebSocket connects and delivers its buffered snapshot
(~1 second delay). This caused a brief flash of incomplete chat content
before the tool output appeared.

Add a transportReady flag to the chat store that tracks whether the
WebSocket has connected and delivered its initial snapshot. For active
chats (status running or pending), the loading skeleton now stays visible
until the WebSocket delivers its first batch of events and buffered
message parts are flushed synchronously to the store. Non-active chats
are unaffected and render immediately after REST data loads.
2026-04-02 09:45:46 +00:00
5 changed files with 74 additions and 33 deletions
+14 -1
View File
@@ -51,7 +51,9 @@ import {
getWorkspaceAgent,
} from "./components/ChatConversation/chatHelpers";
import {
isActiveChatStatus,
selectChatStatus,
selectTransportReady,
useChatSelector,
useChatStore,
} from "./components/ChatConversation/chatStore";
@@ -662,6 +664,7 @@ const AgentChatPage: FC = () => {
});
const liveChatStatus =
useChatSelector(store, selectChatStatus) ?? chatRecord?.status ?? null;
const transportReady = useChatSelector(store, selectTransportReady);
const persistedError = getPersistedDetailError({
chatStatus: liveChatStatus,
chatRecord,
@@ -1072,7 +1075,17 @@ const AgentChatPage: FC = () => {
onRegenerateTitle(agentId);
};
if (chatQuery.isLoading || chatMessagesQuery.isLoading) {
// Gate on REST queries AND on the WebSocket snapshot for
// active chats. Without this, active chats briefly render
// without streaming tool output while the WebSocket
// connects and delivers the server's buffered state.
const isWaitingForTransport =
isActiveChatStatus(liveChatStatus) && !transportReady;
if (
chatQuery.isLoading ||
chatMessagesQuery.isLoading ||
isWaitingForTransport
) {
return (
<AgentChatPageLoadingView
titleElement={titleElement}
@@ -486,7 +486,7 @@ describe("applyMessagePart / applyMessageParts", () => {
// ---------------------------------------------------------------------------
describe("resetTransientState", () => {
it("clears streamState, streamError, retryState, reconnectState, and subagentOverrides", () => {
it("clears streamState, streamError, retryState, reconnectState, subagentOverrides, and transportReady", () => {
const store = createChatStore();
store.applyMessagePart({ type: "text", text: "stream" });
store.setStreamError({
@@ -507,6 +507,7 @@ describe("resetTransientState", () => {
retryingAt: "2025-01-01T00:00:01.000Z",
});
store.setSubagentStatusOverride("sub-1", "error");
store.setTransportReady(true);
store.resetTransientState();
@@ -516,6 +517,7 @@ describe("resetTransientState", () => {
expect(state.retryState).toBeNull();
expect(state.reconnectState).toBeNull();
expect(state.subagentStatusOverrides.size).toBe(0);
expect(state.transportReady).toBe(false);
});
it("preserves messages and queued messages", () => {
@@ -2956,7 +2956,7 @@ describe("useChatStore", () => {
});
describe("thinking indicator event ordering", () => {
it("shows starting phase when message_part arrives before status:running in same batch", async () => {
it("flushes parts synchronously on initial snapshot when message_part arrives before status:running", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
immediateAnimationFrame();
@@ -3001,8 +3001,9 @@ describe("thinking indicator event ordering", () => {
});
// Server sends message_part BEFORE status:running in the same
// WebSocket frame. This is the event ordering that previously
// caused the "Thinking..." indicator to be skipped.
// WebSocket frame. On the initial snapshot delivery, parts are
// flushed synchronously (not deferred) so the loading gate can
// release with complete stream state.
act(() => {
mockSocket.emitDataBatch([
{
@@ -3020,28 +3021,16 @@ describe("thinking indicator event ordering", () => {
]);
});
// After the batch, the status should be "running" but stream
// parts should NOT have been applied yet (deferred to
// setTimeout). This is the window where "Thinking..." shows.
// Parts should be applied immediately (no deferred window)
// because this is the first handleMessage call.
await waitFor(() => {
expect(result.current.chatStatus).toBe("running");
expect(result.current.streamState).toBeNull();
expect(result.current.isAwaiting).toBe(true);
});
// Let the deferred parts flush fire (setTimeout 0).
await act(async () => {
vi.advanceTimersByTime(1);
});
// Now stream state should be populated.
await waitFor(() => {
expect(result.current.streamState).not.toBeNull();
expect(result.current.isAwaiting).toBe(false);
});
});
it("shows starting phase when status:running arrives before message_part in same batch", async () => {
it("flushes parts synchronously on initial snapshot when status:running arrives before message_part", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
immediateAnimationFrame();
@@ -3085,7 +3074,9 @@ describe("thinking indicator event ordering", () => {
expect(watchChat).toHaveBeenCalledWith(chatID, 1);
});
// Server sends status:running BEFORE message_part (the "good" order).
// Server sends status:running BEFORE message_part. On the
// initial snapshot delivery, parts are flushed synchronously
// so the loading gate can release with complete stream state.
act(() => {
mockSocket.emitDataBatch([
{
@@ -3103,19 +3094,10 @@ describe("thinking indicator event ordering", () => {
]);
});
// Same contract: status set, parts deferred.
// Parts should be applied immediately (no deferred window)
// because this is the first handleMessage call.
await waitFor(() => {
expect(result.current.chatStatus).toBe("running");
expect(result.current.streamState).toBeNull();
expect(result.current.isAwaiting).toBe(true);
});
// Let the deferred parts flush fire.
await act(async () => {
vi.advanceTimersByTime(1);
});
await waitFor(() => {
expect(result.current.streamState).not.toBeNull();
expect(result.current.isAwaiting).toBe(false);
});
@@ -142,6 +142,11 @@ export type ChatStoreState = {
reconnectState: ReconnectState | null;
queuedMessages: readonly TypesGen.ChatQueuedMessage[];
subagentStatusOverrides: Map<string, TypesGen.ChatStatus>;
// True once the WebSocket has connected and delivered its
// initial snapshot for the current chat. Gates rendering of
// active chats so the user sees a loading skeleton instead
// of an incomplete chat missing streaming tool output.
transportReady: boolean;
};
export type ChatStore = {
@@ -175,6 +180,7 @@ export type ChatStore = {
status: TypesGen.ChatStatus,
) => void;
resetTransientState: () => void;
setTransportReady: (ready: boolean) => void;
};
const createInitialState = (): ChatStoreState => ({
@@ -187,6 +193,7 @@ const createInitialState = (): ChatStoreState => ({
reconnectState: null,
queuedMessages: [],
subagentStatusOverrides: new Map(),
transportReady: false,
});
export const createChatStore = (): ChatStore => {
@@ -503,7 +510,8 @@ export const createChatStore = (): ChatStore => {
state.streamError === null &&
state.retryState === null &&
state.reconnectState === null &&
state.subagentStatusOverrides.size === 0
state.subagentStatusOverrides.size === 0 &&
!state.transportReady
) {
return;
}
@@ -514,6 +522,16 @@ export const createChatStore = (): ChatStore => {
retryState: null,
reconnectState: null,
subagentStatusOverrides: new Map(),
transportReady: false,
}));
},
setTransportReady: (ready: boolean) => {
if (state.transportReady === ready) {
return;
}
setState((current) => ({
...current,
transportReady: ready,
}));
},
};
@@ -534,6 +552,8 @@ export const selectSubagentStatusOverrides = (state: ChatStoreState) =>
export const selectRetryState = (state: ChatStoreState) => state.retryState;
export const selectReconnectState = (state: ChatStoreState) =>
state.reconnectState;
export const selectTransportReady = (state: ChatStoreState) =>
state.transportReady;
const selectLatestDurableMessage = (
state: ChatStoreState,
@@ -331,6 +331,11 @@ export const useChatStore = (
// Local disposed flag so the message handler (which lives
// outside the utility) can bail out after cleanup.
let disposed = false;
// Tracks whether we've delivered the initial WebSocket
// snapshot to the store. On the first handleMessage call we
// flush buffered parts synchronously and set transportReady
// so the loading gate in AgentChatPage can release.
let transportReadyFired = false;
// Parts buffer lives at the effect scope so it persists
// across WebSocket messages. A rAF-based flush coalesces
@@ -585,6 +590,25 @@ export const useChatStore = (
}
}
});
// After the first WebSocket message is processed, flush
// any buffered parts synchronously and mark the transport
// as ready. This ensures the first render after the
// loading gate releases has complete stream state.
if (!transportReadyFired) {
transportReadyFired = true;
if (partsBuf.length > 0) {
if (partsFlushTimer !== null) {
clearTimeout(partsFlushTimer);
partsFlushTimer = null;
}
const parts = partsBuf.splice(0);
if (shouldApplyMessagePart()) {
store.applyMessageParts(parts);
}
}
store.setTransportReady(true);
}
};
const disposeSocket = createReconnectingWebSocket({
connect() {