Compare commits

...

2 Commits

Author SHA1 Message Date
Mathias Fredriksson 09d799606b fix(site): apply stream parts during pending and clear synchronously
shouldApplyMessagePart dropped parts when chatStatus was "pending",
causing early LLM response content to disappear when pubsub status
delivery was slower than message_part delivery. Remove the "pending"
guard since stream state is already cleared on transition to pending.

Replace RAF-deferred stream reset with synchronous clearStreamState
on durable message arrival. The RAF approach allowed stale tool calls
to persist when new parts canceled the scheduled reset.
2026-03-13 23:30:54 +00:00
Mathias Fredriksson 6e5ac02c96 test(site): assert correct stream state behavior during pending and step transitions
Bug A: Parts arriving during 'pending' chatStatus are silently dropped
by the hook's shouldApplyMessagePart closure and never recovered.

Bug F: Tool calls from a previous step persist when
cancelScheduledStreamReset prevents clearStreamState between steps.
2026-03-13 22:36:25 +00:00
4 changed files with 162 additions and 28 deletions
@@ -623,7 +623,7 @@ describe("useChatStore", () => {
});
});
it("ignores message_part updates while chat is pending", async () => {
it("applies message_part updates while chat is pending", async () => {
immediateAnimationFrame();
const chatID = "chat-1";
@@ -683,6 +683,7 @@ describe("useChatStore", () => {
]);
});
// Transitioning to "pending" clears stream state.
act(() => {
mockSocket.emitData({
type: "status",
@@ -695,6 +696,9 @@ describe("useChatStore", () => {
expect(result.current.streamState).toBeNull();
});
// Parts arriving during "pending" are now applied
// (Bug A fix: removed "pending" guard from
// shouldApplyMessagePart).
act(() => {
mockSocket.emitData({
type: "message_part",
@@ -710,10 +714,11 @@ describe("useChatStore", () => {
});
await waitFor(() => {
expect(result.current.streamState).toBeNull();
expect(result.current.streamState?.blocks).toEqual([
{ type: "response", text: "late" },
]);
});
});
it("does not restore stale queued messages after a stream queue_update", async () => {
const chatID = "chat-1";
const existingMessage = makeMessage(chatID, 1, "user", "hello");
@@ -452,7 +452,6 @@ export const useChatStore = (
const queryClient = useQueryClient();
const storeRef = useRef<ChatStore>(createChatStore());
const streamResetFrameRef = useRef<number | null>(null);
const queuedMessagesHydratedChatIDRef = useRef<string | null>(null);
// Tracks whether the WebSocket has delivered a queue_update for the
// current chat. When true, the stream is the authoritative source
@@ -497,22 +496,6 @@ export const useChatStore = (
[chatID, queryClient],
);
const cancelScheduledStreamReset = useCallback(() => {
if (streamResetFrameRef.current === null) {
return;
}
window.cancelAnimationFrame(streamResetFrameRef.current);
streamResetFrameRef.current = null;
}, []);
const scheduleStreamReset = useCallback(() => {
cancelScheduledStreamReset();
streamResetFrameRef.current = window.requestAnimationFrame(() => {
store.clearStreamState();
streamResetFrameRef.current = null;
});
}, [cancelScheduledStreamReset, store]);
const updateChatQueuedMessages = useCallback(
(queuedMessages: readonly TypesGen.ChatQueuedMessage[] | undefined) => {
if (!chatID) {
@@ -587,7 +570,6 @@ export const useChatStore = (
}, [chatMessagesData, chatID, chatQueuedMessages, store]);
useEffect(() => {
cancelScheduledStreamReset();
store.resetTransientState();
activeChatIDRef.current = chatID ?? null;
@@ -622,7 +604,7 @@ export const useChatStore = (
const shouldApplyMessagePart = (): boolean => {
const currentStatus = store.getSnapshot().chatStatus;
return currentStatus !== "pending" && currentStatus !== "waiting";
return currentStatus !== "waiting";
};
const pendingMessageParts: Record<string, unknown>[] = [];
@@ -630,7 +612,6 @@ export const useChatStore = (
if (pendingMessageParts.length === 0) {
return;
}
cancelScheduledStreamReset();
const parts = pendingMessageParts.splice(0, pendingMessageParts.length);
const currentChatID = chatID;
startTransition(() => {
@@ -658,7 +639,6 @@ export const useChatStore = (
}
const part = asRecord(streamEvent.message_part?.part);
if (part) {
cancelScheduledStreamReset();
pendingMessageParts.push(part);
}
continue;
@@ -688,7 +668,7 @@ export const useChatStore = (
lastMessageIdRef.current = message.id;
}
if (changed) {
scheduleStreamReset();
store.clearStreamState();
}
// Do not update updated_at here. The global
// chat-list WebSocket delivers the authoritative
@@ -814,14 +794,11 @@ export const useChatStore = (
return () => {
disposed = true;
disposeSocket();
cancelScheduledStreamReset();
activeChatIDRef.current = null;
};
}, [
cancelScheduledStreamReset,
chatID,
clearChatErrorReason,
scheduleStreamReset,
setChatErrorReason,
store,
updateChatQueuedMessages,
@@ -506,3 +506,82 @@ describe("subscribe", () => {
expect(countB).toBe(1);
});
});
// ---------------------------------------------------------------------------
// Bug A: Parts dropped during "pending" status
//
// In production, the useChatStore hook defines a closure called
// shouldApplyMessagePart() (ChatContext.ts lines 623-626) that returns
// false when chatStatus is "pending" or "waiting". When it returns false,
// message_part stream events are silently dropped (line 656-658 `continue`)
// and never queued, buffered, or replayed. There is no recovery path.
// ---------------------------------------------------------------------------
describe("Bug A: parts arriving during pending status are lost", () => {
it("store.applyMessagePart accepts parts regardless of chatStatus", () => {
// Documents that the store itself has no status guard.
// The filtering is solely in the hook's shouldApplyMessagePart
// closure, not in the store.
const store = createChatStore();
store.setChatStatus("pending");
store.applyMessagePart({ type: "text", text: "during pending" });
expect(store.getSnapshot().streamState?.blocks).toEqual([
{ type: "response", text: "during pending" },
]);
});
it("parts arriving during pending are applied after fix", () => {
// After removing the "pending" guard from shouldApplyMessagePart,
// the hook now forwards parts during "pending" to the store.
const store = createChatStore();
store.setChatStatus("pending");
// Parts arriving while status is "pending" are now applied.
store.applyMessagePart({ type: "text", text: "Hello " });
store.applyMessagePart({
type: "tool-call",
tool_name: "execute",
tool_call_id: "tc-1",
args: { command: "ls" },
});
// Status transitions to "running".
store.setChatStatus("running");
// Parts arriving after the transition are applied normally.
store.applyMessagePart({ type: "text", text: "world" });
const state = store.getSnapshot();
// All parts are present, including those from the pending phase.
expect(state.streamState?.blocks).toEqual([
{ type: "response", text: "Hello " },
{ type: "tool", id: "tc-1" },
{ type: "response", text: "world" },
]);
});
it("parts are still blocked during waiting status", () => {
// The "waiting" guard in shouldApplyMessagePart() is preserved.
// "waiting" means the chat is paused for user input, so parts
// should not be applied. We model the hook behavior by not
// calling applyMessagePart during "waiting".
const store = createChatStore();
store.setChatStatus("waiting");
// Hook skips parts during "waiting" (not calling applyMessagePart).
// Status transitions to "running".
store.setChatStatus("running");
// Parts arriving after the transition are applied normally.
store.applyMessagePart({ type: "text", text: "after waiting" });
const state = store.getSnapshot();
expect(state.streamState?.blocks).toEqual([
{ type: "response", text: "after waiting" },
]);
});
});
@@ -301,3 +301,76 @@ describe("buildStreamTools", () => {
expect(tools[0].status).toBe("completed");
});
});
// ---------------------------------------------------------------------------
// Bug F: Stream state isolation between steps
//
// When a durable "message" event arrives, the orchestration layer
// (ChatContext.ts) calls clearStreamState() synchronously. This
// ensures tool calls from the previous step do not persist into
// the next one. The reducer itself (applyMessagePartToStreamState)
// is a pure accumulator and has no clearing logic.
// ---------------------------------------------------------------------------
describe("Bug F: stream state isolation between steps", () => {
it("accumulates tool calls from consecutive steps without clearing", () => {
// applyMessagePartToStreamState is a pure reducer. It
// accumulates all tool calls it receives. Isolation between
// steps is the responsibility of the orchestration layer
// (calling clearStreamState between steps).
let state: StreamState | null = null;
state = applyMessagePartToStreamState(state, {
type: "tool-call",
tool_name: "execute",
tool_call_id: "tc-1",
args: { command: "ls" },
});
state = applyMessagePartToStreamState(state, {
type: "tool-call",
tool_name: "readFile",
tool_call_id: "tc-2",
args: { path: "/tmp/file.txt" },
});
const tools = buildStreamTools(state);
expect(tools).toHaveLength(2);
expect(tools).toEqual(
expect.arrayContaining([
expect.objectContaining({ id: "tc-1", name: "execute" }),
expect.objectContaining({ id: "tc-2", name: "readFile" }),
]),
);
});
it("clearing between steps isolates tool calls", () => {
let state: StreamState | null = null;
// Step N: tool call arrives.
state = applyMessagePartToStreamState(state, {
type: "tool-call",
tool_name: "execute",
tool_call_id: "tc-1",
args: { command: "ls" },
});
expect(buildStreamTools(state)).toHaveLength(1);
// Orchestration layer clears state between steps.
state = null;
// Step N+1: different tool call on fresh state.
state = applyMessagePartToStreamState(state, {
type: "tool-call",
tool_name: "readFile",
tool_call_id: "tc-2",
args: { path: "/tmp/file.txt" },
});
const tools = buildStreamTools(state);
expect(tools).toHaveLength(1);
expect(tools[0]).toEqual(
expect.objectContaining({ id: "tc-2", name: "readFile" }),
);
});
});