From 2313e560e8e9e975cd35cb3f5a74e2480e7da489 Mon Sep 17 00:00:00 2001 From: James Magahern Date: Sat, 2 May 2026 23:09:39 -0700 Subject: [PATCH] fix streaming --- docs/api/streaming-chat.md | 2 +- ios/Apps/Sybil/project.yml | 4 +- server/package.json | 1 + server/src/llm/chat-tools.ts | 55 +++++++++-- server/src/routes.ts | 1 + server/tests/chat-tools-streaming.test.ts | 107 ++++++++++++++++++++++ 6 files changed, 161 insertions(+), 9 deletions(-) create mode 100644 server/tests/chat-tools-streaming.test.ts diff --git a/docs/api/streaming-chat.md b/docs/api/streaming-chat.md index 456a1f2..2c08034 100644 --- a/docs/api/streaming-chat.md +++ b/docs/api/streaming-chat.md @@ -141,7 +141,7 @@ Event order: Tool-enabled streaming notes (`openai`/`xai`): - Stream still emits standard `meta`, `delta`, `done|error` events. - Stream may emit `tool_call` events while tool calls are executed. -- `delta` events carry assistant text. The backend may buffer model-native text briefly while determining whether a provider round contains tool calls. +- `delta` events carry assistant text and are emitted incrementally for normal text rounds. The backend may buffer model-native text briefly while determining whether a provider round contains tool calls. - OpenAI Responses stream events are normalized by the backend into this SSE contract; clients do not consume OpenAI's raw Responses stream event names. ## Persistence + Consistency Model diff --git a/ios/Apps/Sybil/project.yml b/ios/Apps/Sybil/project.yml index 80e5b2c..5861c2c 100644 --- a/ios/Apps/Sybil/project.yml +++ b/ios/Apps/Sybil/project.yml @@ -19,8 +19,8 @@ targets: TARGETED_DEVICE_FAMILY: "1,2" GENERATE_INFOPLIST_FILE: YES ASSETCATALOG_COMPILER_APPICON_NAME: AppIcon - MARKETING_VERSION: 1.3 - CURRENT_PROJECT_VERSION: 4 + MARKETING_VERSION: 1.4 + CURRENT_PROJECT_VERSION: 5 INFOPLIST_KEY_CFBundleDisplayName: Sybil INFOPLIST_KEY_ITSAppUsesNonExemptEncryption: NO INFOPLIST_KEY_UIApplicationSupportsIndirectInputEvents: YES diff --git a/server/package.json b/server/package.json index 89c08c7..ab59cba 100644 --- a/server/package.json +++ b/server/package.json @@ -11,6 +11,7 @@ "prebuild": "node scripts/ensure-prisma-client.mjs", "dev": "node ./node_modules/tsx/dist/cli.mjs watch src/index.ts", "start": "node dist/index.js", + "test": "node --test --import tsx tests/**/*.test.ts", "build": "node ./node_modules/typescript/bin/tsc -p tsconfig.json", "prisma:generate": "node ./node_modules/prisma/build/index.js generate", "db:migrate": "node ./node_modules/prisma/build/index.js migrate dev", diff --git a/server/src/llm/chat-tools.ts b/server/src/llm/chat-tools.ts index ed6a304..c663a73 100644 --- a/server/src/llm/chat-tools.ts +++ b/server/src/llm/chat-tools.ts @@ -853,6 +853,12 @@ function extractResponsesText(response: any, fallback = "") { return parts.join("") || fallback; } +function getUnstreamedText(finalText: string, streamedText: string) { + if (!finalText) return ""; + if (!streamedText) return finalText; + return finalText.startsWith(streamedText) ? finalText.slice(streamedText.length) : ""; +} + function getResponseFailureMessage(response: any) { if (response?.status !== "failed" && response?.status !== "incomplete") return null; const errorMessage = typeof response?.error?.message === "string" ? response.error.message : null; @@ -1113,6 +1119,9 @@ export async function* runToolAwareOpenAIChatStream( } as any); let roundText = ""; + let streamedRoundText = ""; + let roundHasToolCalls = false; + let canStreamRoundText = false; let completedResponse: any | null = null; const completedOutputItems: any[] = []; @@ -1121,8 +1130,23 @@ export async function* runToolAwareOpenAIChatStream( if (event?.type === "response.output_text.delta" && typeof event.delta === "string") { roundText += event.delta; + if (canStreamRoundText && !roundHasToolCalls && event.delta.length) { + streamedRoundText += event.delta; + yield { type: "delta", text: event.delta }; + } + } else if (event?.type === "response.output_item.added" && event.item) { + if (event.item.type === "function_call") { + roundHasToolCalls = true; + canStreamRoundText = false; + } else if (event.item.type === "message" && !roundHasToolCalls) { + canStreamRoundText = true; + } } else if (event?.type === "response.output_item.done" && event.item) { completedOutputItems[event.output_index ?? completedOutputItems.length] = event.item; + if (event.item.type === "function_call") { + roundHasToolCalls = true; + canStreamRoundText = false; + } } else if (event?.type === "response.completed") { completedResponse = event.response; sawUsage = mergeResponsesUsage(usageAcc, event.response?.usage) || sawUsage; @@ -1144,13 +1168,18 @@ export async function* runToolAwareOpenAIChatStream( const normalizedToolCalls = normalizeResponsesToolCalls(responseOutputItems, round); if (!normalizedToolCalls.length) { const text = extractResponsesText(completedResponse, roundText); - if (danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES && looksLikeDanglingToolIntent(text)) { + if ( + !streamedRoundText && + danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES && + looksLikeDanglingToolIntent(text) + ) { danglingToolIntentRetries += 1; appendDanglingToolIntentCorrection(input, text); continue; } - if (text) { - yield { type: "delta", text }; + const unstreamedText = getUnstreamedText(text, streamedRoundText); + if (unstreamedText) { + yield { type: "delta", text: unstreamedText }; } yield { type: "done", @@ -1214,6 +1243,8 @@ export async function* runToolAwareChatCompletionsStream( } as any); let roundText = ""; + let streamedRoundText = ""; + let roundHasToolCalls = false; const roundToolCalls = new Map(); for await (const chunk of stream as any as AsyncIterable) { @@ -1224,9 +1255,16 @@ export async function* runToolAwareChatCompletionsStream( const deltaText = choice?.delta?.content ?? ""; if (typeof deltaText === "string" && deltaText.length) { roundText += deltaText; + if (!roundHasToolCalls) { + streamedRoundText += deltaText; + yield { type: "delta", text: deltaText }; + } } const deltaToolCalls = Array.isArray(choice?.delta?.tool_calls) ? choice.delta.tool_calls : []; + if (deltaToolCalls.length) { + roundHasToolCalls = true; + } for (const toolCall of deltaToolCalls) { const idx = typeof toolCall?.index === "number" ? toolCall.index : 0; const entry = roundToolCalls.get(idx) ?? { arguments: "" }; @@ -1252,13 +1290,18 @@ export async function* runToolAwareChatCompletionsStream( })); if (!normalizedToolCalls.length) { - if (danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES && looksLikeDanglingToolIntent(roundText)) { + if ( + !streamedRoundText && + danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES && + looksLikeDanglingToolIntent(roundText) + ) { danglingToolIntentRetries += 1; appendDanglingToolIntentCorrection(conversation, roundText); continue; } - if (roundText) { - yield { type: "delta", text: roundText }; + const unstreamedText = getUnstreamedText(roundText, streamedRoundText); + if (unstreamedText) { + yield { type: "delta", text: unstreamedText }; } yield { type: "done", diff --git a/server/src/routes.ts b/server/src/routes.ts index fbb2395..dc25184 100644 --- a/server/src/routes.ts +++ b/server/src/routes.ts @@ -914,6 +914,7 @@ export async function registerRoutes(app: FastifyInstance) { } reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); + reply.raw.flushHeaders(); const send = (event: string, data: any) => { reply.raw.write(`event: ${event}\n`); diff --git a/server/tests/chat-tools-streaming.test.ts b/server/tests/chat-tools-streaming.test.ts new file mode 100644 index 0000000..e727f88 --- /dev/null +++ b/server/tests/chat-tools-streaming.test.ts @@ -0,0 +1,107 @@ +import assert from "node:assert/strict"; +import test from "node:test"; +import { + runToolAwareChatCompletionsStream, + runToolAwareOpenAIChatStream, + type ToolAwareStreamingEvent, +} from "../src/llm/chat-tools.js"; + +async function* streamFrom(events: any[]) { + for (const event of events) { + await Promise.resolve(); + yield event; + } +} + +async function collectEvents(iterable: AsyncIterable) { + const events: ToolAwareStreamingEvent[] = []; + for await (const event of iterable) { + events.push(event); + } + return events; +} + +test("OpenAI Responses stream emits text deltas as they arrive", async () => { + const outputMessage = { + id: "msg_1", + type: "message", + role: "assistant", + status: "completed", + content: [{ type: "output_text", text: "Hello" }], + }; + const client = { + responses: { + create: async () => + streamFrom([ + { type: "response.output_item.added", item: { ...outputMessage, content: [] }, output_index: 0 }, + { type: "response.output_text.delta", delta: "Hel", output_index: 0, content_index: 0 }, + { type: "response.output_text.delta", delta: "lo", output_index: 0, content_index: 0 }, + { type: "response.output_item.done", item: outputMessage, output_index: 0 }, + { + type: "response.completed", + response: { + status: "completed", + output_text: "Hello", + output: [outputMessage], + usage: { input_tokens: 2, output_tokens: 1, total_tokens: 3 }, + }, + }, + ]), + }, + }; + + const events = await collectEvents( + runToolAwareOpenAIChatStream({ + client: client as any, + model: "gpt-test", + messages: [{ role: "user", content: "Say hello" }], + }) + ); + + assert.deepEqual( + events.map((event) => event.type), + ["delta", "delta", "done"] + ); + assert.deepEqual( + events.filter((event) => event.type === "delta").map((event) => event.text), + ["Hel", "lo"] + ); + assert.equal(events.at(-1)?.type === "done" ? events.at(-1)?.result.text : null, "Hello"); +}); + +test("OpenAI-compatible Chat Completions stream emits text deltas as they arrive", async () => { + const client = { + chat: { + completions: { + create: async () => + streamFrom([ + { choices: [{ delta: { role: "assistant" } }] }, + { choices: [{ delta: { content: "Hel" } }] }, + { choices: [{ delta: { content: "lo" } }] }, + { + choices: [{ delta: {}, finish_reason: "stop" }], + usage: { prompt_tokens: 2, completion_tokens: 1, total_tokens: 3 }, + }, + ]), + }, + }, + }; + + const events = await collectEvents( + runToolAwareChatCompletionsStream({ + client: client as any, + model: "grok-test", + messages: [{ role: "user", content: "Say hello" }], + }) + ); + + assert.deepEqual( + events.map((event) => event.type), + ["delta", "delta", "done"] + ); + assert.deepEqual( + events.filter((event) => event.type === "delta").map((event) => event.text), + ["Hel", "lo"] + ); + assert.equal(events.at(-1)?.type === "done" ? events.at(-1)?.result.text : null, "Hello"); +});