Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 09d799606b | |||
| 6e5ac02c96 |
@@ -623,7 +623,7 @@ describe("useChatStore", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores message_part updates while chat is pending", async () => {
|
||||
it("applies message_part updates while chat is pending", async () => {
|
||||
immediateAnimationFrame();
|
||||
|
||||
const chatID = "chat-1";
|
||||
@@ -683,6 +683,7 @@ describe("useChatStore", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
// Transitioning to "pending" clears stream state.
|
||||
act(() => {
|
||||
mockSocket.emitData({
|
||||
type: "status",
|
||||
@@ -695,6 +696,9 @@ describe("useChatStore", () => {
|
||||
expect(result.current.streamState).toBeNull();
|
||||
});
|
||||
|
||||
// Parts arriving during "pending" are now applied
|
||||
// (Bug A fix: removed "pending" guard from
|
||||
// shouldApplyMessagePart).
|
||||
act(() => {
|
||||
mockSocket.emitData({
|
||||
type: "message_part",
|
||||
@@ -710,10 +714,11 @@ describe("useChatStore", () => {
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(result.current.streamState).toBeNull();
|
||||
expect(result.current.streamState?.blocks).toEqual([
|
||||
{ type: "response", text: "late" },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
it("does not restore stale queued messages after a stream queue_update", async () => {
|
||||
const chatID = "chat-1";
|
||||
const existingMessage = makeMessage(chatID, 1, "user", "hello");
|
||||
|
||||
@@ -452,7 +452,6 @@ export const useChatStore = (
|
||||
|
||||
const queryClient = useQueryClient();
|
||||
const storeRef = useRef<ChatStore>(createChatStore());
|
||||
const streamResetFrameRef = useRef<number | null>(null);
|
||||
const queuedMessagesHydratedChatIDRef = useRef<string | null>(null);
|
||||
// Tracks whether the WebSocket has delivered a queue_update for the
|
||||
// current chat. When true, the stream is the authoritative source
|
||||
@@ -497,22 +496,6 @@ export const useChatStore = (
|
||||
[chatID, queryClient],
|
||||
);
|
||||
|
||||
const cancelScheduledStreamReset = useCallback(() => {
|
||||
if (streamResetFrameRef.current === null) {
|
||||
return;
|
||||
}
|
||||
window.cancelAnimationFrame(streamResetFrameRef.current);
|
||||
streamResetFrameRef.current = null;
|
||||
}, []);
|
||||
|
||||
const scheduleStreamReset = useCallback(() => {
|
||||
cancelScheduledStreamReset();
|
||||
streamResetFrameRef.current = window.requestAnimationFrame(() => {
|
||||
store.clearStreamState();
|
||||
streamResetFrameRef.current = null;
|
||||
});
|
||||
}, [cancelScheduledStreamReset, store]);
|
||||
|
||||
const updateChatQueuedMessages = useCallback(
|
||||
(queuedMessages: readonly TypesGen.ChatQueuedMessage[] | undefined) => {
|
||||
if (!chatID) {
|
||||
@@ -587,7 +570,6 @@ export const useChatStore = (
|
||||
}, [chatMessagesData, chatID, chatQueuedMessages, store]);
|
||||
|
||||
useEffect(() => {
|
||||
cancelScheduledStreamReset();
|
||||
store.resetTransientState();
|
||||
activeChatIDRef.current = chatID ?? null;
|
||||
|
||||
@@ -622,7 +604,7 @@ export const useChatStore = (
|
||||
|
||||
const shouldApplyMessagePart = (): boolean => {
|
||||
const currentStatus = store.getSnapshot().chatStatus;
|
||||
return currentStatus !== "pending" && currentStatus !== "waiting";
|
||||
return currentStatus !== "waiting";
|
||||
};
|
||||
|
||||
const pendingMessageParts: Record<string, unknown>[] = [];
|
||||
@@ -630,7 +612,6 @@ export const useChatStore = (
|
||||
if (pendingMessageParts.length === 0) {
|
||||
return;
|
||||
}
|
||||
cancelScheduledStreamReset();
|
||||
const parts = pendingMessageParts.splice(0, pendingMessageParts.length);
|
||||
const currentChatID = chatID;
|
||||
startTransition(() => {
|
||||
@@ -658,7 +639,6 @@ export const useChatStore = (
|
||||
}
|
||||
const part = asRecord(streamEvent.message_part?.part);
|
||||
if (part) {
|
||||
cancelScheduledStreamReset();
|
||||
pendingMessageParts.push(part);
|
||||
}
|
||||
continue;
|
||||
@@ -688,7 +668,7 @@ export const useChatStore = (
|
||||
lastMessageIdRef.current = message.id;
|
||||
}
|
||||
if (changed) {
|
||||
scheduleStreamReset();
|
||||
store.clearStreamState();
|
||||
}
|
||||
// Do not update updated_at here. The global
|
||||
// chat-list WebSocket delivers the authoritative
|
||||
@@ -814,14 +794,11 @@ export const useChatStore = (
|
||||
return () => {
|
||||
disposed = true;
|
||||
disposeSocket();
|
||||
cancelScheduledStreamReset();
|
||||
activeChatIDRef.current = null;
|
||||
};
|
||||
}, [
|
||||
cancelScheduledStreamReset,
|
||||
chatID,
|
||||
clearChatErrorReason,
|
||||
scheduleStreamReset,
|
||||
setChatErrorReason,
|
||||
store,
|
||||
updateChatQueuedMessages,
|
||||
|
||||
@@ -506,3 +506,82 @@ describe("subscribe", () => {
|
||||
expect(countB).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Bug A: Parts dropped during "pending" status
|
||||
//
|
||||
// In production, the useChatStore hook defines a closure called
|
||||
// shouldApplyMessagePart() (ChatContext.ts lines 623-626) that returns
|
||||
// false when chatStatus is "pending" or "waiting". When it returns false,
|
||||
// message_part stream events are silently dropped (line 656-658 `continue`)
|
||||
// and never queued, buffered, or replayed. There is no recovery path.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("Bug A: parts arriving during pending status are lost", () => {
|
||||
it("store.applyMessagePart accepts parts regardless of chatStatus", () => {
|
||||
// Documents that the store itself has no status guard.
|
||||
// The filtering is solely in the hook's shouldApplyMessagePart
|
||||
// closure, not in the store.
|
||||
const store = createChatStore();
|
||||
store.setChatStatus("pending");
|
||||
|
||||
store.applyMessagePart({ type: "text", text: "during pending" });
|
||||
|
||||
expect(store.getSnapshot().streamState?.blocks).toEqual([
|
||||
{ type: "response", text: "during pending" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("parts arriving during pending are applied after fix", () => {
|
||||
// After removing the "pending" guard from shouldApplyMessagePart,
|
||||
// the hook now forwards parts during "pending" to the store.
|
||||
const store = createChatStore();
|
||||
store.setChatStatus("pending");
|
||||
|
||||
// Parts arriving while status is "pending" are now applied.
|
||||
store.applyMessagePart({ type: "text", text: "Hello " });
|
||||
store.applyMessagePart({
|
||||
type: "tool-call",
|
||||
tool_name: "execute",
|
||||
tool_call_id: "tc-1",
|
||||
args: { command: "ls" },
|
||||
});
|
||||
|
||||
// Status transitions to "running".
|
||||
store.setChatStatus("running");
|
||||
|
||||
// Parts arriving after the transition are applied normally.
|
||||
store.applyMessagePart({ type: "text", text: "world" });
|
||||
|
||||
const state = store.getSnapshot();
|
||||
|
||||
// All parts are present, including those from the pending phase.
|
||||
expect(state.streamState?.blocks).toEqual([
|
||||
{ type: "response", text: "Hello " },
|
||||
{ type: "tool", id: "tc-1" },
|
||||
{ type: "response", text: "world" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("parts are still blocked during waiting status", () => {
|
||||
// The "waiting" guard in shouldApplyMessagePart() is preserved.
|
||||
// "waiting" means the chat is paused for user input, so parts
|
||||
// should not be applied. We model the hook behavior by not
|
||||
// calling applyMessagePart during "waiting".
|
||||
const store = createChatStore();
|
||||
store.setChatStatus("waiting");
|
||||
|
||||
// Hook skips parts during "waiting" (not calling applyMessagePart).
|
||||
|
||||
// Status transitions to "running".
|
||||
store.setChatStatus("running");
|
||||
|
||||
// Parts arriving after the transition are applied normally.
|
||||
store.applyMessagePart({ type: "text", text: "after waiting" });
|
||||
|
||||
const state = store.getSnapshot();
|
||||
expect(state.streamState?.blocks).toEqual([
|
||||
{ type: "response", text: "after waiting" },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -301,3 +301,76 @@ describe("buildStreamTools", () => {
|
||||
expect(tools[0].status).toBe("completed");
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Bug F: Stream state isolation between steps
|
||||
//
|
||||
// When a durable "message" event arrives, the orchestration layer
|
||||
// (ChatContext.ts) calls clearStreamState() synchronously. This
|
||||
// ensures tool calls from the previous step do not persist into
|
||||
// the next one. The reducer itself (applyMessagePartToStreamState)
|
||||
// is a pure accumulator and has no clearing logic.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("Bug F: stream state isolation between steps", () => {
|
||||
it("accumulates tool calls from consecutive steps without clearing", () => {
|
||||
// applyMessagePartToStreamState is a pure reducer. It
|
||||
// accumulates all tool calls it receives. Isolation between
|
||||
// steps is the responsibility of the orchestration layer
|
||||
// (calling clearStreamState between steps).
|
||||
let state: StreamState | null = null;
|
||||
|
||||
state = applyMessagePartToStreamState(state, {
|
||||
type: "tool-call",
|
||||
tool_name: "execute",
|
||||
tool_call_id: "tc-1",
|
||||
args: { command: "ls" },
|
||||
});
|
||||
|
||||
state = applyMessagePartToStreamState(state, {
|
||||
type: "tool-call",
|
||||
tool_name: "readFile",
|
||||
tool_call_id: "tc-2",
|
||||
args: { path: "/tmp/file.txt" },
|
||||
});
|
||||
|
||||
const tools = buildStreamTools(state);
|
||||
expect(tools).toHaveLength(2);
|
||||
expect(tools).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({ id: "tc-1", name: "execute" }),
|
||||
expect.objectContaining({ id: "tc-2", name: "readFile" }),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("clearing between steps isolates tool calls", () => {
|
||||
let state: StreamState | null = null;
|
||||
|
||||
// Step N: tool call arrives.
|
||||
state = applyMessagePartToStreamState(state, {
|
||||
type: "tool-call",
|
||||
tool_name: "execute",
|
||||
tool_call_id: "tc-1",
|
||||
args: { command: "ls" },
|
||||
});
|
||||
expect(buildStreamTools(state)).toHaveLength(1);
|
||||
|
||||
// Orchestration layer clears state between steps.
|
||||
state = null;
|
||||
|
||||
// Step N+1: different tool call on fresh state.
|
||||
state = applyMessagePartToStreamState(state, {
|
||||
type: "tool-call",
|
||||
tool_name: "readFile",
|
||||
tool_call_id: "tc-2",
|
||||
args: { path: "/tmp/file.txt" },
|
||||
});
|
||||
|
||||
const tools = buildStreamTools(state);
|
||||
expect(tools).toHaveLength(1);
|
||||
expect(tools[0]).toEqual(
|
||||
expect.objectContaining({ id: "tc-2", name: "readFile" }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user