backend, web: support for resuming streams

This commit is contained in:
2026-05-04 09:12:31 -07:00
parent 70a60edf1c
commit f514c42de6
7 changed files with 1186 additions and 406 deletions

View File

@@ -39,6 +39,22 @@ Chat upload limits:
```
- OpenAI model lists are filtered to models that are expected to work with the backend's Responses API implementation.
## Active Runs
### `GET /v1/active-runs`
- Response:
```json
{
"chats": ["chat-id-with-active-stream"],
"searches": ["search-id-with-active-stream"]
}
```
Behavior notes:
- Lists in-memory chat/search streams that are still running on this server process.
- Clients should use this after app start or page refresh to restore per-row generating indicators.
- The lists are not durable across server restarts.
## Chats
### `GET /v1/chats`
@@ -260,6 +276,32 @@ Search run notes:
- Persists answer text/citations + ranked results.
- If both search and answer fail, endpoint returns an error.
### `POST /v1/searches/:searchId/run/stream`
- Body: same as `POST /v1/searches/:searchId/run`
- Response: `text/event-stream`
Events:
- `search_results`: `{ "requestId": string|null, "results": SearchResultItem[] }`
- `search_error`: `{ "error": string }`
- `answer`: `{ "answerText": string|null, "answerRequestId": string|null, "answerCitations": SearchDetail["answerCitations"] }`
- `answer_error`: `{ "error": string }`
- terminal `done`: `{ "search": SearchDetail }`
- terminal `error`: `{ "message": string }`
Behavior notes:
- The stream is owned by the backend after it starts. If the original HTTP client disconnects, the backend keeps running and persists the final search state.
- While a search stream is active, `GET /v1/active-runs` includes the `searchId`.
- If a stream is already active for the same `searchId`, this endpoint attaches to the existing stream instead of starting a second run.
### `POST /v1/searches/:searchId/run/stream/attach`
- Body: none
- Response: `text/event-stream` with the same event names as `POST /v1/searches/:searchId/run/stream`
- Not found: `404 { "message": "active search stream not found" }`
Behavior notes:
- Replays buffered events for the active in-memory stream, then emits new events until `done` or `error`.
- Intended for clients that discovered a pending search via `GET /v1/active-runs`, such as after browser refresh.
## Type Shapes
`ChatSummary`

View File

@@ -4,6 +4,7 @@ This document defines the server-sent events (SSE) contract for chat completions
Endpoint:
- `POST /v1/chat-completions/stream`
- `POST /v1/chats/:chatId/stream/attach`
Transport:
- HTTP response uses `Content-Type: text/event-stream; charset=utf-8`
@@ -61,6 +62,23 @@ Notes:
- For persisted streams, backend stores only new non-assistant input history rows to avoid duplicates.
- Attachments are optional and are persisted under `message.metadata.attachments` on stored user messages when `persist` is `true`.
Persisted chat streams with a `chatId` are backend-owned active runs:
- Once started, the backend keeps the stream running even if the HTTP client disconnects or refreshes.
- While running, `GET /v1/active-runs` includes the `chatId`.
- Starting a second persisted stream for the same active `chatId` returns `409`.
- Clients can reattach with `POST /v1/chats/:chatId/stream/attach`.
## Attach Endpoint
`POST /v1/chats/:chatId/stream/attach`
- Body: none.
- Response uses the same `text/event-stream` transport and event names as `POST /v1/chat-completions/stream`.
- Replays buffered events for the active in-memory stream, then emits new events until `done` or `error`.
- Returns `404 { "message": "active chat stream not found" }` if no stream is currently active for that chat.
- Authentication is the same as all other API endpoints.
This endpoint is intended for clients that restored an active `chatId` from `GET /v1/active-runs`, especially after browser refresh. Replayed `delta` events may include text that was originally emitted before the client attached.
## Event Stream Contract
Event order:

View File

@@ -0,0 +1,59 @@
export type SseStreamEvent = {
event: string;
data: unknown;
};
type SseStreamListener = (event: SseStreamEvent) => void;
export class ActiveSseStream {
private readonly events: SseStreamEvent[] = [];
private readonly listeners = new Set<SseStreamListener>();
private completed = false;
private resolveDone!: () => void;
readonly done: Promise<void>;
constructor() {
this.done = new Promise((resolve) => {
this.resolveDone = resolve;
});
}
get isCompleted() {
return this.completed;
}
emit(event: string, data: unknown) {
if (this.completed) return;
const entry = { event, data };
this.events.push(entry);
for (const listener of this.listeners) {
listener(entry);
}
}
complete(finalEvent?: SseStreamEvent) {
if (this.completed) return;
if (finalEvent) {
this.emit(finalEvent.event, finalEvent.data);
}
this.completed = true;
this.listeners.clear();
this.resolveDone();
}
subscribe(listener: SseStreamListener) {
for (const event of this.events) {
listener(event);
}
if (this.completed) {
return () => {};
}
this.listeners.add(listener);
return () => {
this.listeners.delete(listener);
};
}
}

View File

@@ -1,12 +1,13 @@
import { performance } from "node:perf_hooks";
import { z } from "zod";
import type { FastifyInstance } from "fastify";
import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
import { ActiveSseStream, type SseStreamEvent } from "./active-streams.js";
import { prisma } from "./db.js";
import { requireAdmin } from "./auth.js";
import { env } from "./env.js";
import { buildComparableAttachments } from "./llm/message-content.js";
import { runMultiplex } from "./llm/multiplexer.js";
import { runMultiplexStream } from "./llm/streaming.js";
import { runMultiplexStream, type StreamEvent } from "./llm/streaming.js";
import { getModelCatalogSnapshot } from "./llm/model-catalog.js";
import { openaiClient } from "./llm/providers.js";
import { exaClient } from "./search/exa.js";
@@ -120,6 +121,26 @@ const CompletionMessageSchema = z
}
});
const CompletionStreamBody = z
.object({
chatId: z.string().optional(),
persist: z.boolean().optional(),
provider: z.enum(["openai", "anthropic", "xai"]),
model: z.string().min(1),
messages: z.array(CompletionMessageSchema),
temperature: z.number().min(0).max(2).optional(),
maxTokens: z.number().int().positive().optional(),
})
.superRefine((value, ctx) => {
if (value.persist === false && value.chatId) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "chatId must be omitted when persist is false",
path: ["chatId"],
});
}
});
function mergeAttachmentsIntoMetadata(metadata: unknown, attachments?: ChatAttachment[]) {
if (!attachments?.length) return metadata as any;
if (!metadata || typeof metadata !== "object" || Array.isArray(metadata)) {
@@ -293,6 +314,246 @@ function buildSseHeaders(originHeader: string | undefined) {
return headers;
}
type SearchRunRequest = z.infer<typeof SearchRunBody>;
const activeChatStreams = new Map<string, ActiveSseStream>();
const activeSearchStreams = new Map<string, ActiveSseStream>();
function getErrorMessage(err: unknown) {
return err instanceof Error ? err.message : String(err);
}
function writeSseEvent(reply: FastifyReply, event: SseStreamEvent) {
if (reply.raw.destroyed || reply.raw.writableEnded) return;
reply.raw.write(`event: ${event.event}\n`);
reply.raw.write(`data: ${JSON.stringify(event.data)}\n\n`);
}
async function streamActiveRun(req: FastifyRequest, reply: FastifyReply, stream: ActiveSseStream) {
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
reply.raw.flushHeaders?.();
let unsubscribe = () => {};
let closed = false;
const closedPromise = new Promise<void>((resolve) => {
const onClose = () => {
closed = true;
unsubscribe();
reply.raw.off("close", onClose);
resolve();
};
reply.raw.on("close", onClose);
stream.done.finally(() => {
reply.raw.off("close", onClose);
});
});
unsubscribe = stream.subscribe((event) => writeSseEvent(reply, event));
await Promise.race([stream.done, closedPromise]);
unsubscribe();
if (!closed && !reply.raw.destroyed && !reply.raw.writableEnded) {
reply.raw.end();
}
return reply;
}
function mapChatStreamEvent(ev: StreamEvent): SseStreamEvent {
if (ev.type === "tool_call") return { event: "tool_call", data: ev.event };
return { event: ev.type, data: ev };
}
function startActiveChatStream(chatId: string, body: z.infer<typeof CompletionStreamBody>) {
const stream = new ActiveSseStream();
activeChatStreams.set(chatId, stream);
void (async () => {
let sawTerminalEvent = false;
try {
for await (const ev of runMultiplexStream(body)) {
const event = mapChatStreamEvent(ev);
if (ev.type === "done" || ev.type === "error") {
sawTerminalEvent = true;
stream.complete(event);
break;
}
stream.emit(event.event, event.data);
}
if (!sawTerminalEvent) {
stream.complete({ event: "error", data: { message: "chat stream ended unexpectedly" } });
}
} catch (err) {
stream.complete({ event: "error", data: { message: getErrorMessage(err) } });
} finally {
activeChatStreams.delete(chatId);
}
})();
return stream;
}
async function executeSearchRunStream(searchId: string, body: SearchRunRequest, stream: ActiveSseStream) {
const startedAt = performance.now();
const query = body.query?.trim();
if (!query) {
stream.complete({ event: "error", data: { message: "query is required" } });
return;
}
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
try {
const exa = exaClient();
const searchPromise = exa.search(query, {
type: body.type ?? "auto",
numResults: body.numResults ?? 10,
includeDomains: body.includeDomains,
excludeDomains: body.excludeDomains,
moderation: true,
userLocation: "US",
contents: false,
} as any);
const answerPromise = exa.answer(query, {
text: true,
model: "exa",
userLocation: "US",
});
let searchResponse: any | null = null;
let answerResponse: any | null = null;
let enrichedResults: any[] | null = null;
let searchError: string | null = null;
let answerError: string | null = null;
const searchSettled = searchPromise.then(
async (value) => {
searchResponse = value;
const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index));
stream.emit("search_results", {
requestId: value?.requestId ?? null,
results: previewResults,
});
const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string");
if (!urls.length) return;
try {
const contentsResponse = await exa.getContents(urls, {
text: { maxCharacters: 1200 },
highlights: {
query,
maxCharacters: 320,
numSentences: 2,
highlightsPerUrl: 2,
},
} as any);
const byUrl = new Map<string, any>();
for (const contentItem of contentsResponse?.results ?? []) {
byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem);
}
enrichedResults = (value?.results ?? []).map((result: any) => {
const contentItem = byUrl.get(normalizeUrlForMatch(result?.url));
if (!contentItem) return result;
return {
...result,
text: contentItem.text ?? result.text ?? null,
highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null,
highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null,
};
});
stream.emit("search_results", {
requestId: value?.requestId ?? null,
results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)),
});
} catch {
// keep preview results if content enrichment fails
}
},
(reason) => {
searchError = reason?.message ?? String(reason);
stream.emit("search_error", { error: searchError });
}
);
const answerSettled = answerPromise.then(
(value) => {
answerResponse = value;
stream.emit("answer", {
answerText: parseAnswerText(value),
answerRequestId: value?.requestId ?? null,
answerCitations: (value?.citations as any) ?? null,
});
},
(reason) => {
answerError = reason?.message ?? String(reason);
stream.emit("answer_error", { error: answerError });
}
);
await Promise.all([searchSettled, answerSettled]);
const latencyMs = Math.round(performance.now() - startedAt);
const persistedResults = enrichedResults ?? searchResponse?.results ?? [];
const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
const answerText = parseAnswerText(answerResponse);
await prisma.$transaction(async (tx) => {
await tx.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
requestId: searchResponse?.requestId ?? null,
rawResponse: searchResponse as any,
latencyMs,
error: searchError,
answerText,
answerRequestId: answerResponse?.requestId ?? null,
answerCitations: (answerResponse?.citations as any) ?? null,
answerRawResponse: answerResponse as any,
answerError,
},
});
await tx.searchResult.deleteMany({ where: { searchId } });
if (rows.length) {
await tx.searchResult.createMany({ data: rows as any });
}
});
const search = await prisma.search.findUnique({
where: { id: searchId },
include: { results: { orderBy: { rank: "asc" } } },
});
if (!search) {
stream.complete({ event: "error", data: { message: "search not found" } });
} else {
stream.complete({ event: "done", data: { search } });
}
} catch (err) {
const message = getErrorMessage(err);
try {
await prisma.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
latencyMs: Math.round(performance.now() - startedAt),
error: message,
},
});
} catch {
// keep the stream terminal event even if the backing search row disappeared
}
stream.complete({ event: "error", data: { message } });
} finally {
activeSearchStreams.delete(searchId);
}
}
export async function registerRoutes(app: FastifyInstance) {
app.get("/health", { logLevel: "silent" }, async () => ({ ok: true }));
@@ -306,6 +567,14 @@ export async function registerRoutes(app: FastifyInstance) {
return { providers: getModelCatalogSnapshot() };
});
app.get("/v1/active-runs", async (req) => {
requireAdmin(req);
return {
chats: Array.from(activeChatStreams.keys()),
searches: Array.from(activeSearchStreams.keys()),
};
});
app.get("/v1/chats", async (req) => {
requireAdmin(req);
const chats = await prisma.chat.findMany({
@@ -695,162 +964,24 @@ export async function registerRoutes(app: FastifyInstance) {
const query = body.query?.trim() || existing.query?.trim();
if (!query) return app.httpErrors.badRequest("query is required");
const startedAt = performance.now();
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
const send = (event: string, data: any) => {
if (reply.raw.writableEnded) return;
reply.raw.write(`event: ${event}\n`);
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
try {
const exa = exaClient();
const searchPromise = exa.search(query, {
type: body.type ?? "auto",
numResults: body.numResults ?? 10,
includeDomains: body.includeDomains,
excludeDomains: body.excludeDomains,
moderation: true,
userLocation: "US",
contents: false,
} as any);
const answerPromise = exa.answer(query, {
text: true,
model: "exa",
userLocation: "US",
});
let searchResponse: any | null = null;
let answerResponse: any | null = null;
let enrichedResults: any[] | null = null;
let searchError: string | null = null;
let answerError: string | null = null;
const searchSettled = searchPromise.then(
async (value) => {
searchResponse = value;
const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index));
send("search_results", {
requestId: value?.requestId ?? null,
results: previewResults,
});
const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string");
if (!urls.length) return;
try {
const contentsResponse = await exa.getContents(urls, {
text: { maxCharacters: 1200 },
highlights: {
query,
maxCharacters: 320,
numSentences: 2,
highlightsPerUrl: 2,
},
} as any);
const byUrl = new Map<string, any>();
for (const contentItem of contentsResponse?.results ?? []) {
byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem);
}
enrichedResults = (value?.results ?? []).map((result: any) => {
const contentItem = byUrl.get(normalizeUrlForMatch(result?.url));
if (!contentItem) return result;
return {
...result,
text: contentItem.text ?? result.text ?? null,
highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null,
highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null,
};
});
send("search_results", {
requestId: value?.requestId ?? null,
results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)),
});
} catch {
// keep preview results if content enrichment fails
}
},
(reason) => {
searchError = reason?.message ?? String(reason);
send("search_error", { error: searchError });
}
);
const answerSettled = answerPromise.then(
(value) => {
answerResponse = value;
send("answer", {
answerText: parseAnswerText(value),
answerRequestId: value?.requestId ?? null,
answerCitations: (value?.citations as any) ?? null,
});
},
(reason) => {
answerError = reason?.message ?? String(reason);
send("answer_error", { error: answerError });
}
);
await Promise.all([searchSettled, answerSettled]);
const latencyMs = Math.round(performance.now() - startedAt);
const persistedResults = enrichedResults ?? searchResponse?.results ?? [];
const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
const answerText = parseAnswerText(answerResponse);
await prisma.$transaction(async (tx) => {
await tx.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
requestId: searchResponse?.requestId ?? null,
rawResponse: searchResponse as any,
latencyMs,
error: searchError,
answerText,
answerRequestId: answerResponse?.requestId ?? null,
answerCitations: (answerResponse?.citations as any) ?? null,
answerRawResponse: answerResponse as any,
answerError,
},
});
await tx.searchResult.deleteMany({ where: { searchId } });
if (rows.length) {
await tx.searchResult.createMany({ data: rows as any });
}
});
const search = await prisma.search.findUnique({
where: { id: searchId },
include: { results: { orderBy: { rank: "asc" } } },
});
if (!search) {
send("error", { message: "search not found" });
} else {
send("done", { search });
}
} catch (err: any) {
await prisma.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
latencyMs: Math.round(performance.now() - startedAt),
error: err?.message ?? String(err),
},
});
send("error", { message: err?.message ?? String(err) });
} finally {
reply.raw.end();
const existingStream = activeSearchStreams.get(searchId);
if (existingStream) {
return streamActiveRun(req, reply, existingStream);
}
return reply;
const stream = new ActiveSseStream();
activeSearchStreams.set(searchId, stream);
void executeSearchRunStream(searchId, { ...body, query }, stream);
return streamActiveRun(req, reply, stream);
});
app.post("/v1/searches/:searchId/run/stream/attach", async (req, reply) => {
requireAdmin(req);
const Params = z.object({ searchId: z.string() });
const { searchId } = Params.parse(req.params);
const stream = activeSearchStreams.get(searchId);
if (!stream) return app.httpErrors.notFound("active search stream not found");
return streamActiveRun(req, reply, stream);
});
app.get("/v1/chats/:chatId", async (req) => {
@@ -895,6 +1026,15 @@ export async function registerRoutes(app: FastifyInstance) {
return { message: msg };
});
app.post("/v1/chats/:chatId/stream/attach", async (req, reply) => {
requireAdmin(req);
const Params = z.object({ chatId: z.string() });
const { chatId } = Params.parse(req.params);
const stream = activeChatStreams.get(chatId);
if (!stream) return app.httpErrors.notFound("active chat stream not found");
return streamActiveRun(req, reply, stream);
});
// Main: create a completion via provider+model and store everything.
app.post("/v1/chat-completions", async (req) => {
requireAdmin(req);
@@ -935,27 +1075,7 @@ export async function registerRoutes(app: FastifyInstance) {
app.post("/v1/chat-completions/stream", async (req, reply) => {
requireAdmin(req);
const Body = z
.object({
chatId: z.string().optional(),
persist: z.boolean().optional(),
provider: z.enum(["openai", "anthropic", "xai"]),
model: z.string().min(1),
messages: z.array(CompletionMessageSchema),
temperature: z.number().min(0).max(2).optional(),
maxTokens: z.number().int().positive().optional(),
})
.superRefine((value, ctx) => {
if (value.persist === false && value.chatId) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message: "chatId must be omitted when persist is false",
path: ["chatId"],
});
}
});
const parsed = Body.safeParse(req.body);
const parsed = CompletionStreamBody.safeParse(req.body);
if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message);
const body = parsed.data;
@@ -970,23 +1090,24 @@ export async function registerRoutes(app: FastifyInstance) {
await storeNonAssistantMessages(body.chatId, body.messages);
}
if (body.persist !== false && body.chatId) {
if (activeChatStreams.has(body.chatId)) {
return app.httpErrors.conflict("chat completion already running");
}
const stream = startActiveChatStream(body.chatId, body);
return streamActiveRun(req, reply, stream);
}
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
reply.raw.flushHeaders();
const send = (event: string, data: any) => {
reply.raw.write(`event: ${event}\n`);
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
for await (const ev of runMultiplexStream(body)) {
if (ev.type === "meta") send("meta", ev);
else if (ev.type === "tool_call") send("tool_call", ev.event);
else if (ev.type === "delta") send("delta", ev);
else if (ev.type === "done") send("done", ev);
else if (ev.type === "error") send("error", ev);
writeSseEvent(reply, mapChatStreamEvent(ev));
}
reply.raw.end();
if (!reply.raw.destroyed && !reply.raw.writableEnded) {
reply.raw.end();
}
return reply;
});
}

View File

@@ -0,0 +1,34 @@
import assert from "node:assert/strict";
import test from "node:test";
import { ActiveSseStream, type SseStreamEvent } from "../src/active-streams.js";
test("ActiveSseStream replays buffered events to late subscribers", () => {
const stream = new ActiveSseStream();
stream.emit("delta", { text: "hel" });
stream.emit("delta", { text: "lo" });
const events: SseStreamEvent[] = [];
const unsubscribe = stream.subscribe((event) => events.push(event));
unsubscribe();
assert.deepEqual(events, [
{ event: "delta", data: { text: "hel" } },
{ event: "delta", data: { text: "lo" } },
]);
});
test("ActiveSseStream replays terminal events after completion", async () => {
const stream = new ActiveSseStream();
stream.emit("delta", { text: "done" });
stream.complete({ event: "done", data: { text: "done" } });
await stream.done;
const events: SseStreamEvent[] = [];
stream.subscribe((event) => events.push(event));
assert.equal(stream.isCompleted, true);
assert.deepEqual(events, [
{ event: "delta", data: { text: "done" } },
{ event: "done", data: { text: "done" } },
]);
});

File diff suppressed because it is too large Load Diff

View File

@@ -139,6 +139,11 @@ export type ModelCatalogResponse = {
providers: Record<Provider, ProviderModelInfo>;
};
export type ActiveRunsResponse = {
chats: string[];
searches: string[];
};
type CompletionResponse = {
chatId: string | null;
message: {
@@ -217,6 +222,10 @@ export async function listModels() {
return api<ModelCatalogResponse>("/v1/models");
}
export async function getActiveRuns() {
return api<ActiveRunsResponse>("/v1/active-runs");
}
export async function createChat(input?: string | CreateChatRequest) {
const body = typeof input === "string" ? { title: input } : input ?? {};
const data = await api<{ chat: ChatSummary }>("/v1/chats", {
@@ -333,6 +342,85 @@ type RunSearchStreamHandlers = {
onError?: (payload: { message: string }) => void;
};
async function readSseStream(response: Response, dispatch: (eventName: string, payload: any) => void) {
if (!response.ok) {
const fallback = `${response.status} ${response.statusText}`;
let message = fallback;
try {
const body = (await response.json()) as { message?: string };
if (body.message) message = body.message;
} catch {
// keep fallback message
}
throw new Error(message);
}
if (!response.body) {
throw new Error("No response stream");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let eventName = "message";
let dataLines: string[] = [];
const flushEvent = () => {
if (!dataLines.length) {
eventName = "message";
return;
}
const dataText = dataLines.join("\n");
let payload: any = null;
try {
payload = JSON.parse(dataText);
} catch {
payload = { message: dataText };
}
dispatch(eventName, payload);
dataLines = [];
eventName = "message";
};
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let newlineIndex = buffer.indexOf("\n");
while (newlineIndex >= 0) {
const rawLine = buffer.slice(0, newlineIndex);
buffer = buffer.slice(newlineIndex + 1);
const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine;
if (!line) {
flushEvent();
} else if (line.startsWith("event:")) {
eventName = line.slice("event:".length).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice("data:".length).trimStart());
}
newlineIndex = buffer.indexOf("\n");
}
}
buffer += decoder.decode();
if (buffer.length) {
const line = buffer.endsWith("\r") ? buffer.slice(0, -1) : buffer;
if (line.startsWith("event:")) {
eventName = line.slice("event:".length).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice("data:".length).trimStart());
}
}
flushEvent();
}
export async function runSearchStream(
searchId: string,
body: SearchRunRequest,
@@ -437,6 +525,30 @@ export async function runSearchStream(
flushEvent();
}
export async function attachSearchStream(searchId: string, handlers: RunSearchStreamHandlers, options?: { signal?: AbortSignal }) {
const headers = new Headers({
Accept: "text/event-stream",
});
if (authToken) {
headers.set("Authorization", `Bearer ${authToken}`);
}
const response = await fetch(`${API_BASE_URL}/v1/searches/${searchId}/run/stream/attach`, {
method: "POST",
headers,
signal: options?.signal,
});
await readSseStream(response, (eventName, payload) => {
if (eventName === "search_results") handlers.onSearchResults?.(payload);
else if (eventName === "search_error") handlers.onSearchError?.(payload);
else if (eventName === "answer") handlers.onAnswer?.(payload);
else if (eventName === "answer_error") handlers.onAnswerError?.(payload);
else if (eventName === "done") handlers.onDone?.(payload);
else if (eventName === "error") handlers.onError?.(payload);
});
}
export async function runCompletion(body: {
chatId: string;
provider: Provider;
@@ -556,3 +668,26 @@ export async function runCompletionStream(
}
flushEvent();
}
export async function attachCompletionStream(chatId: string, handlers: CompletionStreamHandlers, options?: { signal?: AbortSignal }) {
const headers = new Headers({
Accept: "text/event-stream",
});
if (authToken) {
headers.set("Authorization", `Bearer ${authToken}`);
}
const response = await fetch(`${API_BASE_URL}/v1/chats/${chatId}/stream/attach`, {
method: "POST",
headers,
signal: options?.signal,
});
await readSseStream(response, (eventName, payload) => {
if (eventName === "meta") handlers.onMeta?.(payload);
else if (eventName === "tool_call") handlers.onToolCall?.(payload);
else if (eventName === "delta") handlers.onDelta?.(payload);
else if (eventName === "done") handlers.onDone?.(payload);
else if (eventName === "error") handlers.onError?.(payload);
});
}