Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3e714f0510 |
@@ -51,7 +51,9 @@ import {
|
||||
getWorkspaceAgent,
|
||||
} from "./components/ChatConversation/chatHelpers";
|
||||
import {
|
||||
isActiveChatStatus,
|
||||
selectChatStatus,
|
||||
selectTransportReady,
|
||||
useChatSelector,
|
||||
useChatStore,
|
||||
} from "./components/ChatConversation/chatStore";
|
||||
@@ -662,6 +664,7 @@ const AgentChatPage: FC = () => {
|
||||
});
|
||||
const liveChatStatus =
|
||||
useChatSelector(store, selectChatStatus) ?? chatRecord?.status ?? null;
|
||||
const transportReady = useChatSelector(store, selectTransportReady);
|
||||
const persistedError = getPersistedDetailError({
|
||||
chatStatus: liveChatStatus,
|
||||
chatRecord,
|
||||
@@ -1072,7 +1075,17 @@ const AgentChatPage: FC = () => {
|
||||
onRegenerateTitle(agentId);
|
||||
};
|
||||
|
||||
if (chatQuery.isLoading || chatMessagesQuery.isLoading) {
|
||||
// Gate on REST queries AND on the WebSocket snapshot for
|
||||
// active chats. Without this, active chats briefly render
|
||||
// without streaming tool output while the WebSocket
|
||||
// connects and delivers the server's buffered state.
|
||||
const isWaitingForTransport =
|
||||
isActiveChatStatus(liveChatStatus) && !transportReady;
|
||||
if (
|
||||
chatQuery.isLoading ||
|
||||
chatMessagesQuery.isLoading ||
|
||||
isWaitingForTransport
|
||||
) {
|
||||
return (
|
||||
<AgentChatPageLoadingView
|
||||
titleElement={titleElement}
|
||||
|
||||
@@ -486,7 +486,7 @@ describe("applyMessagePart / applyMessageParts", () => {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("resetTransientState", () => {
|
||||
it("clears streamState, streamError, retryState, reconnectState, and subagentOverrides", () => {
|
||||
it("clears streamState, streamError, retryState, reconnectState, subagentOverrides, and transportReady", () => {
|
||||
const store = createChatStore();
|
||||
store.applyMessagePart({ type: "text", text: "stream" });
|
||||
store.setStreamError({
|
||||
@@ -507,6 +507,7 @@ describe("resetTransientState", () => {
|
||||
retryingAt: "2025-01-01T00:00:01.000Z",
|
||||
});
|
||||
store.setSubagentStatusOverride("sub-1", "error");
|
||||
store.setTransportReady(true);
|
||||
|
||||
store.resetTransientState();
|
||||
|
||||
@@ -516,6 +517,7 @@ describe("resetTransientState", () => {
|
||||
expect(state.retryState).toBeNull();
|
||||
expect(state.reconnectState).toBeNull();
|
||||
expect(state.subagentStatusOverrides.size).toBe(0);
|
||||
expect(state.transportReady).toBe(false);
|
||||
});
|
||||
|
||||
it("preserves messages and queued messages", () => {
|
||||
|
||||
@@ -2956,7 +2956,7 @@ describe("useChatStore", () => {
|
||||
});
|
||||
|
||||
describe("thinking indicator event ordering", () => {
|
||||
it("shows starting phase when message_part arrives before status:running in same batch", async () => {
|
||||
it("flushes parts synchronously on initial snapshot when message_part arrives before status:running", async () => {
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
immediateAnimationFrame();
|
||||
|
||||
@@ -3001,8 +3001,9 @@ describe("thinking indicator event ordering", () => {
|
||||
});
|
||||
|
||||
// Server sends message_part BEFORE status:running in the same
|
||||
// WebSocket frame. This is the event ordering that previously
|
||||
// caused the "Thinking..." indicator to be skipped.
|
||||
// WebSocket frame. On the initial snapshot delivery, parts are
|
||||
// flushed synchronously (not deferred) so the loading gate can
|
||||
// release with complete stream state.
|
||||
act(() => {
|
||||
mockSocket.emitDataBatch([
|
||||
{
|
||||
@@ -3020,28 +3021,16 @@ describe("thinking indicator event ordering", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
// After the batch, the status should be "running" but stream
|
||||
// parts should NOT have been applied yet (deferred to
|
||||
// setTimeout). This is the window where "Thinking..." shows.
|
||||
// Parts should be applied immediately (no deferred window)
|
||||
// because this is the first handleMessage call.
|
||||
await waitFor(() => {
|
||||
expect(result.current.chatStatus).toBe("running");
|
||||
expect(result.current.streamState).toBeNull();
|
||||
expect(result.current.isAwaiting).toBe(true);
|
||||
});
|
||||
|
||||
// Let the deferred parts flush fire (setTimeout 0).
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(1);
|
||||
});
|
||||
|
||||
// Now stream state should be populated.
|
||||
await waitFor(() => {
|
||||
expect(result.current.streamState).not.toBeNull();
|
||||
expect(result.current.isAwaiting).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
it("shows starting phase when status:running arrives before message_part in same batch", async () => {
|
||||
it("flushes parts synchronously on initial snapshot when status:running arrives before message_part", async () => {
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
immediateAnimationFrame();
|
||||
|
||||
@@ -3085,7 +3074,9 @@ describe("thinking indicator event ordering", () => {
|
||||
expect(watchChat).toHaveBeenCalledWith(chatID, 1);
|
||||
});
|
||||
|
||||
// Server sends status:running BEFORE message_part (the "good" order).
|
||||
// Server sends status:running BEFORE message_part. On the
|
||||
// initial snapshot delivery, parts are flushed synchronously
|
||||
// so the loading gate can release with complete stream state.
|
||||
act(() => {
|
||||
mockSocket.emitDataBatch([
|
||||
{
|
||||
@@ -3103,19 +3094,10 @@ describe("thinking indicator event ordering", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
// Same contract: status set, parts deferred.
|
||||
// Parts should be applied immediately (no deferred window)
|
||||
// because this is the first handleMessage call.
|
||||
await waitFor(() => {
|
||||
expect(result.current.chatStatus).toBe("running");
|
||||
expect(result.current.streamState).toBeNull();
|
||||
expect(result.current.isAwaiting).toBe(true);
|
||||
});
|
||||
|
||||
// Let the deferred parts flush fire.
|
||||
await act(async () => {
|
||||
vi.advanceTimersByTime(1);
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(result.current.streamState).not.toBeNull();
|
||||
expect(result.current.isAwaiting).toBe(false);
|
||||
});
|
||||
|
||||
@@ -142,6 +142,11 @@ export type ChatStoreState = {
|
||||
reconnectState: ReconnectState | null;
|
||||
queuedMessages: readonly TypesGen.ChatQueuedMessage[];
|
||||
subagentStatusOverrides: Map<string, TypesGen.ChatStatus>;
|
||||
// True once the WebSocket has connected and delivered its
|
||||
// initial snapshot for the current chat. Gates rendering of
|
||||
// active chats so the user sees a loading skeleton instead
|
||||
// of an incomplete chat missing streaming tool output.
|
||||
transportReady: boolean;
|
||||
};
|
||||
|
||||
export type ChatStore = {
|
||||
@@ -175,6 +180,7 @@ export type ChatStore = {
|
||||
status: TypesGen.ChatStatus,
|
||||
) => void;
|
||||
resetTransientState: () => void;
|
||||
setTransportReady: (ready: boolean) => void;
|
||||
};
|
||||
|
||||
const createInitialState = (): ChatStoreState => ({
|
||||
@@ -187,6 +193,7 @@ const createInitialState = (): ChatStoreState => ({
|
||||
reconnectState: null,
|
||||
queuedMessages: [],
|
||||
subagentStatusOverrides: new Map(),
|
||||
transportReady: false,
|
||||
});
|
||||
|
||||
export const createChatStore = (): ChatStore => {
|
||||
@@ -503,7 +510,8 @@ export const createChatStore = (): ChatStore => {
|
||||
state.streamError === null &&
|
||||
state.retryState === null &&
|
||||
state.reconnectState === null &&
|
||||
state.subagentStatusOverrides.size === 0
|
||||
state.subagentStatusOverrides.size === 0 &&
|
||||
!state.transportReady
|
||||
) {
|
||||
return;
|
||||
}
|
||||
@@ -514,6 +522,16 @@ export const createChatStore = (): ChatStore => {
|
||||
retryState: null,
|
||||
reconnectState: null,
|
||||
subagentStatusOverrides: new Map(),
|
||||
transportReady: false,
|
||||
}));
|
||||
},
|
||||
setTransportReady: (ready: boolean) => {
|
||||
if (state.transportReady === ready) {
|
||||
return;
|
||||
}
|
||||
setState((current) => ({
|
||||
...current,
|
||||
transportReady: ready,
|
||||
}));
|
||||
},
|
||||
};
|
||||
@@ -534,6 +552,8 @@ export const selectSubagentStatusOverrides = (state: ChatStoreState) =>
|
||||
export const selectRetryState = (state: ChatStoreState) => state.retryState;
|
||||
export const selectReconnectState = (state: ChatStoreState) =>
|
||||
state.reconnectState;
|
||||
export const selectTransportReady = (state: ChatStoreState) =>
|
||||
state.transportReady;
|
||||
|
||||
const selectLatestDurableMessage = (
|
||||
state: ChatStoreState,
|
||||
|
||||
@@ -331,6 +331,11 @@ export const useChatStore = (
|
||||
// Local disposed flag so the message handler (which lives
|
||||
// outside the utility) can bail out after cleanup.
|
||||
let disposed = false;
|
||||
// Tracks whether we've delivered the initial WebSocket
|
||||
// snapshot to the store. On the first handleMessage call we
|
||||
// flush buffered parts synchronously and set transportReady
|
||||
// so the loading gate in AgentChatPage can release.
|
||||
let transportReadyFired = false;
|
||||
|
||||
// Parts buffer lives at the effect scope so it persists
|
||||
// across WebSocket messages. A rAF-based flush coalesces
|
||||
@@ -585,6 +590,25 @@ export const useChatStore = (
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// After the first WebSocket message is processed, flush
|
||||
// any buffered parts synchronously and mark the transport
|
||||
// as ready. This ensures the first render after the
|
||||
// loading gate releases has complete stream state.
|
||||
if (!transportReadyFired) {
|
||||
transportReadyFired = true;
|
||||
if (partsBuf.length > 0) {
|
||||
if (partsFlushTimer !== null) {
|
||||
clearTimeout(partsFlushTimer);
|
||||
partsFlushTimer = null;
|
||||
}
|
||||
const parts = partsBuf.splice(0);
|
||||
if (shouldApplyMessagePart()) {
|
||||
store.applyMessageParts(parts);
|
||||
}
|
||||
}
|
||||
store.setTransportReady(true);
|
||||
}
|
||||
};
|
||||
const disposeSocket = createReconnectingWebSocket({
|
||||
connect() {
|
||||
|
||||
Reference in New Issue
Block a user