fix streaming

This commit is contained in:
2026-05-02 23:09:39 -07:00
parent 94565298d8
commit 2313e560e8
6 changed files with 161 additions and 9 deletions

View File

@@ -141,7 +141,7 @@ Event order:
Tool-enabled streaming notes (`openai`/`xai`): Tool-enabled streaming notes (`openai`/`xai`):
- Stream still emits standard `meta`, `delta`, `done|error` events. - Stream still emits standard `meta`, `delta`, `done|error` events.
- Stream may emit `tool_call` events while tool calls are executed. - Stream may emit `tool_call` events while tool calls are executed.
- `delta` events carry assistant text. The backend may buffer model-native text briefly while determining whether a provider round contains tool calls. - `delta` events carry assistant text and are emitted incrementally for normal text rounds. The backend may buffer model-native text briefly while determining whether a provider round contains tool calls.
- OpenAI Responses stream events are normalized by the backend into this SSE contract; clients do not consume OpenAI's raw Responses stream event names. - OpenAI Responses stream events are normalized by the backend into this SSE contract; clients do not consume OpenAI's raw Responses stream event names.
## Persistence + Consistency Model ## Persistence + Consistency Model

View File

@@ -19,8 +19,8 @@ targets:
TARGETED_DEVICE_FAMILY: "1,2" TARGETED_DEVICE_FAMILY: "1,2"
GENERATE_INFOPLIST_FILE: YES GENERATE_INFOPLIST_FILE: YES
ASSETCATALOG_COMPILER_APPICON_NAME: AppIcon ASSETCATALOG_COMPILER_APPICON_NAME: AppIcon
MARKETING_VERSION: 1.3 MARKETING_VERSION: 1.4
CURRENT_PROJECT_VERSION: 4 CURRENT_PROJECT_VERSION: 5
INFOPLIST_KEY_CFBundleDisplayName: Sybil INFOPLIST_KEY_CFBundleDisplayName: Sybil
INFOPLIST_KEY_ITSAppUsesNonExemptEncryption: NO INFOPLIST_KEY_ITSAppUsesNonExemptEncryption: NO
INFOPLIST_KEY_UIApplicationSupportsIndirectInputEvents: YES INFOPLIST_KEY_UIApplicationSupportsIndirectInputEvents: YES

View File

@@ -11,6 +11,7 @@
"prebuild": "node scripts/ensure-prisma-client.mjs", "prebuild": "node scripts/ensure-prisma-client.mjs",
"dev": "node ./node_modules/tsx/dist/cli.mjs watch src/index.ts", "dev": "node ./node_modules/tsx/dist/cli.mjs watch src/index.ts",
"start": "node dist/index.js", "start": "node dist/index.js",
"test": "node --test --import tsx tests/**/*.test.ts",
"build": "node ./node_modules/typescript/bin/tsc -p tsconfig.json", "build": "node ./node_modules/typescript/bin/tsc -p tsconfig.json",
"prisma:generate": "node ./node_modules/prisma/build/index.js generate", "prisma:generate": "node ./node_modules/prisma/build/index.js generate",
"db:migrate": "node ./node_modules/prisma/build/index.js migrate dev", "db:migrate": "node ./node_modules/prisma/build/index.js migrate dev",

View File

@@ -853,6 +853,12 @@ function extractResponsesText(response: any, fallback = "") {
return parts.join("") || fallback; return parts.join("") || fallback;
} }
function getUnstreamedText(finalText: string, streamedText: string) {
if (!finalText) return "";
if (!streamedText) return finalText;
return finalText.startsWith(streamedText) ? finalText.slice(streamedText.length) : "";
}
function getResponseFailureMessage(response: any) { function getResponseFailureMessage(response: any) {
if (response?.status !== "failed" && response?.status !== "incomplete") return null; if (response?.status !== "failed" && response?.status !== "incomplete") return null;
const errorMessage = typeof response?.error?.message === "string" ? response.error.message : null; const errorMessage = typeof response?.error?.message === "string" ? response.error.message : null;
@@ -1113,6 +1119,9 @@ export async function* runToolAwareOpenAIChatStream(
} as any); } as any);
let roundText = ""; let roundText = "";
let streamedRoundText = "";
let roundHasToolCalls = false;
let canStreamRoundText = false;
let completedResponse: any | null = null; let completedResponse: any | null = null;
const completedOutputItems: any[] = []; const completedOutputItems: any[] = [];
@@ -1121,8 +1130,23 @@ export async function* runToolAwareOpenAIChatStream(
if (event?.type === "response.output_text.delta" && typeof event.delta === "string") { if (event?.type === "response.output_text.delta" && typeof event.delta === "string") {
roundText += event.delta; roundText += event.delta;
if (canStreamRoundText && !roundHasToolCalls && event.delta.length) {
streamedRoundText += event.delta;
yield { type: "delta", text: event.delta };
}
} else if (event?.type === "response.output_item.added" && event.item) {
if (event.item.type === "function_call") {
roundHasToolCalls = true;
canStreamRoundText = false;
} else if (event.item.type === "message" && !roundHasToolCalls) {
canStreamRoundText = true;
}
} else if (event?.type === "response.output_item.done" && event.item) { } else if (event?.type === "response.output_item.done" && event.item) {
completedOutputItems[event.output_index ?? completedOutputItems.length] = event.item; completedOutputItems[event.output_index ?? completedOutputItems.length] = event.item;
if (event.item.type === "function_call") {
roundHasToolCalls = true;
canStreamRoundText = false;
}
} else if (event?.type === "response.completed") { } else if (event?.type === "response.completed") {
completedResponse = event.response; completedResponse = event.response;
sawUsage = mergeResponsesUsage(usageAcc, event.response?.usage) || sawUsage; sawUsage = mergeResponsesUsage(usageAcc, event.response?.usage) || sawUsage;
@@ -1144,13 +1168,18 @@ export async function* runToolAwareOpenAIChatStream(
const normalizedToolCalls = normalizeResponsesToolCalls(responseOutputItems, round); const normalizedToolCalls = normalizeResponsesToolCalls(responseOutputItems, round);
if (!normalizedToolCalls.length) { if (!normalizedToolCalls.length) {
const text = extractResponsesText(completedResponse, roundText); const text = extractResponsesText(completedResponse, roundText);
if (danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES && looksLikeDanglingToolIntent(text)) { if (
!streamedRoundText &&
danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES &&
looksLikeDanglingToolIntent(text)
) {
danglingToolIntentRetries += 1; danglingToolIntentRetries += 1;
appendDanglingToolIntentCorrection(input, text); appendDanglingToolIntentCorrection(input, text);
continue; continue;
} }
if (text) { const unstreamedText = getUnstreamedText(text, streamedRoundText);
yield { type: "delta", text }; if (unstreamedText) {
yield { type: "delta", text: unstreamedText };
} }
yield { yield {
type: "done", type: "done",
@@ -1214,6 +1243,8 @@ export async function* runToolAwareChatCompletionsStream(
} as any); } as any);
let roundText = ""; let roundText = "";
let streamedRoundText = "";
let roundHasToolCalls = false;
const roundToolCalls = new Map<number, { id?: string; name?: string; arguments: string }>(); const roundToolCalls = new Map<number, { id?: string; name?: string; arguments: string }>();
for await (const chunk of stream as any as AsyncIterable<any>) { for await (const chunk of stream as any as AsyncIterable<any>) {
@@ -1224,9 +1255,16 @@ export async function* runToolAwareChatCompletionsStream(
const deltaText = choice?.delta?.content ?? ""; const deltaText = choice?.delta?.content ?? "";
if (typeof deltaText === "string" && deltaText.length) { if (typeof deltaText === "string" && deltaText.length) {
roundText += deltaText; roundText += deltaText;
if (!roundHasToolCalls) {
streamedRoundText += deltaText;
yield { type: "delta", text: deltaText };
}
} }
const deltaToolCalls = Array.isArray(choice?.delta?.tool_calls) ? choice.delta.tool_calls : []; const deltaToolCalls = Array.isArray(choice?.delta?.tool_calls) ? choice.delta.tool_calls : [];
if (deltaToolCalls.length) {
roundHasToolCalls = true;
}
for (const toolCall of deltaToolCalls) { for (const toolCall of deltaToolCalls) {
const idx = typeof toolCall?.index === "number" ? toolCall.index : 0; const idx = typeof toolCall?.index === "number" ? toolCall.index : 0;
const entry = roundToolCalls.get(idx) ?? { arguments: "" }; const entry = roundToolCalls.get(idx) ?? { arguments: "" };
@@ -1252,13 +1290,18 @@ export async function* runToolAwareChatCompletionsStream(
})); }));
if (!normalizedToolCalls.length) { if (!normalizedToolCalls.length) {
if (danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES && looksLikeDanglingToolIntent(roundText)) { if (
!streamedRoundText &&
danglingToolIntentRetries < MAX_DANGLING_TOOL_INTENT_RETRIES &&
looksLikeDanglingToolIntent(roundText)
) {
danglingToolIntentRetries += 1; danglingToolIntentRetries += 1;
appendDanglingToolIntentCorrection(conversation, roundText); appendDanglingToolIntentCorrection(conversation, roundText);
continue; continue;
} }
if (roundText) { const unstreamedText = getUnstreamedText(roundText, streamedRoundText);
yield { type: "delta", text: roundText }; if (unstreamedText) {
yield { type: "delta", text: unstreamedText };
} }
yield { yield {
type: "done", type: "done",

View File

@@ -914,6 +914,7 @@ export async function registerRoutes(app: FastifyInstance) {
} }
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
reply.raw.flushHeaders();
const send = (event: string, data: any) => { const send = (event: string, data: any) => {
reply.raw.write(`event: ${event}\n`); reply.raw.write(`event: ${event}\n`);

View File

@@ -0,0 +1,107 @@
import assert from "node:assert/strict";
import test from "node:test";
import {
runToolAwareChatCompletionsStream,
runToolAwareOpenAIChatStream,
type ToolAwareStreamingEvent,
} from "../src/llm/chat-tools.js";
async function* streamFrom(events: any[]) {
for (const event of events) {
await Promise.resolve();
yield event;
}
}
async function collectEvents(iterable: AsyncIterable<ToolAwareStreamingEvent>) {
const events: ToolAwareStreamingEvent[] = [];
for await (const event of iterable) {
events.push(event);
}
return events;
}
test("OpenAI Responses stream emits text deltas as they arrive", async () => {
const outputMessage = {
id: "msg_1",
type: "message",
role: "assistant",
status: "completed",
content: [{ type: "output_text", text: "Hello" }],
};
const client = {
responses: {
create: async () =>
streamFrom([
{ type: "response.output_item.added", item: { ...outputMessage, content: [] }, output_index: 0 },
{ type: "response.output_text.delta", delta: "Hel", output_index: 0, content_index: 0 },
{ type: "response.output_text.delta", delta: "lo", output_index: 0, content_index: 0 },
{ type: "response.output_item.done", item: outputMessage, output_index: 0 },
{
type: "response.completed",
response: {
status: "completed",
output_text: "Hello",
output: [outputMessage],
usage: { input_tokens: 2, output_tokens: 1, total_tokens: 3 },
},
},
]),
},
};
const events = await collectEvents(
runToolAwareOpenAIChatStream({
client: client as any,
model: "gpt-test",
messages: [{ role: "user", content: "Say hello" }],
})
);
assert.deepEqual(
events.map((event) => event.type),
["delta", "delta", "done"]
);
assert.deepEqual(
events.filter((event) => event.type === "delta").map((event) => event.text),
["Hel", "lo"]
);
assert.equal(events.at(-1)?.type === "done" ? events.at(-1)?.result.text : null, "Hello");
});
test("OpenAI-compatible Chat Completions stream emits text deltas as they arrive", async () => {
const client = {
chat: {
completions: {
create: async () =>
streamFrom([
{ choices: [{ delta: { role: "assistant" } }] },
{ choices: [{ delta: { content: "Hel" } }] },
{ choices: [{ delta: { content: "lo" } }] },
{
choices: [{ delta: {}, finish_reason: "stop" }],
usage: { prompt_tokens: 2, completion_tokens: 1, total_tokens: 3 },
},
]),
},
},
};
const events = await collectEvents(
runToolAwareChatCompletionsStream({
client: client as any,
model: "grok-test",
messages: [{ role: "user", content: "Say hello" }],
})
);
assert.deepEqual(
events.map((event) => event.type),
["delta", "delta", "done"]
);
assert.deepEqual(
events.filter((event) => event.type === "delta").map((event) => event.text),
["Hel", "lo"]
);
assert.equal(events.at(-1)?.type === "done" ? events.at(-1)?.result.text : null, "Hello");
});