From f514c42de672560b4029bbfd08aabe813014fdfb Mon Sep 17 00:00:00 2001 From: James Magahern Date: Mon, 4 May 2026 09:12:31 -0700 Subject: [PATCH] backend, web: support for resuming streams --- docs/api/rest.md | 42 ++ docs/api/streaming-chat.md | 18 + server/src/active-streams.ts | 59 ++ server/src/routes.ts | 497 ++++++++++------- server/tests/active-streams.test.ts | 34 ++ web/src/App.tsx | 807 ++++++++++++++++++++-------- web/src/lib/api.ts | 135 +++++ 7 files changed, 1186 insertions(+), 406 deletions(-) create mode 100644 server/src/active-streams.ts create mode 100644 server/tests/active-streams.test.ts diff --git a/docs/api/rest.md b/docs/api/rest.md index daf0c54..bae8e1b 100644 --- a/docs/api/rest.md +++ b/docs/api/rest.md @@ -39,6 +39,22 @@ Chat upload limits: ``` - OpenAI model lists are filtered to models that are expected to work with the backend's Responses API implementation. +## Active Runs + +### `GET /v1/active-runs` +- Response: +```json +{ + "chats": ["chat-id-with-active-stream"], + "searches": ["search-id-with-active-stream"] +} +``` + +Behavior notes: +- Lists in-memory chat/search streams that are still running on this server process. +- Clients should use this after app start or page refresh to restore per-row generating indicators. +- The lists are not durable across server restarts. + ## Chats ### `GET /v1/chats` @@ -260,6 +276,32 @@ Search run notes: - Persists answer text/citations + ranked results. - If both search and answer fail, endpoint returns an error. +### `POST /v1/searches/:searchId/run/stream` +- Body: same as `POST /v1/searches/:searchId/run` +- Response: `text/event-stream` + +Events: +- `search_results`: `{ "requestId": string|null, "results": SearchResultItem[] }` +- `search_error`: `{ "error": string }` +- `answer`: `{ "answerText": string|null, "answerRequestId": string|null, "answerCitations": SearchDetail["answerCitations"] }` +- `answer_error`: `{ "error": string }` +- terminal `done`: `{ "search": SearchDetail }` +- terminal `error`: `{ "message": string }` + +Behavior notes: +- The stream is owned by the backend after it starts. If the original HTTP client disconnects, the backend keeps running and persists the final search state. +- While a search stream is active, `GET /v1/active-runs` includes the `searchId`. +- If a stream is already active for the same `searchId`, this endpoint attaches to the existing stream instead of starting a second run. + +### `POST /v1/searches/:searchId/run/stream/attach` +- Body: none +- Response: `text/event-stream` with the same event names as `POST /v1/searches/:searchId/run/stream` +- Not found: `404 { "message": "active search stream not found" }` + +Behavior notes: +- Replays buffered events for the active in-memory stream, then emits new events until `done` or `error`. +- Intended for clients that discovered a pending search via `GET /v1/active-runs`, such as after browser refresh. + ## Type Shapes `ChatSummary` diff --git a/docs/api/streaming-chat.md b/docs/api/streaming-chat.md index cd2e2dc..c6dc6f4 100644 --- a/docs/api/streaming-chat.md +++ b/docs/api/streaming-chat.md @@ -4,6 +4,7 @@ This document defines the server-sent events (SSE) contract for chat completions Endpoint: - `POST /v1/chat-completions/stream` +- `POST /v1/chats/:chatId/stream/attach` Transport: - HTTP response uses `Content-Type: text/event-stream; charset=utf-8` @@ -61,6 +62,23 @@ Notes: - For persisted streams, backend stores only new non-assistant input history rows to avoid duplicates. - Attachments are optional and are persisted under `message.metadata.attachments` on stored user messages when `persist` is `true`. +Persisted chat streams with a `chatId` are backend-owned active runs: +- Once started, the backend keeps the stream running even if the HTTP client disconnects or refreshes. +- While running, `GET /v1/active-runs` includes the `chatId`. +- Starting a second persisted stream for the same active `chatId` returns `409`. +- Clients can reattach with `POST /v1/chats/:chatId/stream/attach`. + +## Attach Endpoint + +`POST /v1/chats/:chatId/stream/attach` +- Body: none. +- Response uses the same `text/event-stream` transport and event names as `POST /v1/chat-completions/stream`. +- Replays buffered events for the active in-memory stream, then emits new events until `done` or `error`. +- Returns `404 { "message": "active chat stream not found" }` if no stream is currently active for that chat. +- Authentication is the same as all other API endpoints. + +This endpoint is intended for clients that restored an active `chatId` from `GET /v1/active-runs`, especially after browser refresh. Replayed `delta` events may include text that was originally emitted before the client attached. + ## Event Stream Contract Event order: diff --git a/server/src/active-streams.ts b/server/src/active-streams.ts new file mode 100644 index 0000000..c405976 --- /dev/null +++ b/server/src/active-streams.ts @@ -0,0 +1,59 @@ +export type SseStreamEvent = { + event: string; + data: unknown; +}; + +type SseStreamListener = (event: SseStreamEvent) => void; + +export class ActiveSseStream { + private readonly events: SseStreamEvent[] = []; + private readonly listeners = new Set(); + private completed = false; + private resolveDone!: () => void; + + readonly done: Promise; + + constructor() { + this.done = new Promise((resolve) => { + this.resolveDone = resolve; + }); + } + + get isCompleted() { + return this.completed; + } + + emit(event: string, data: unknown) { + if (this.completed) return; + const entry = { event, data }; + this.events.push(entry); + for (const listener of this.listeners) { + listener(entry); + } + } + + complete(finalEvent?: SseStreamEvent) { + if (this.completed) return; + if (finalEvent) { + this.emit(finalEvent.event, finalEvent.data); + } + this.completed = true; + this.listeners.clear(); + this.resolveDone(); + } + + subscribe(listener: SseStreamListener) { + for (const event of this.events) { + listener(event); + } + + if (this.completed) { + return () => {}; + } + + this.listeners.add(listener); + return () => { + this.listeners.delete(listener); + }; + } +} diff --git a/server/src/routes.ts b/server/src/routes.ts index c61aedb..338143b 100644 --- a/server/src/routes.ts +++ b/server/src/routes.ts @@ -1,12 +1,13 @@ import { performance } from "node:perf_hooks"; import { z } from "zod"; -import type { FastifyInstance } from "fastify"; +import type { FastifyInstance, FastifyReply, FastifyRequest } from "fastify"; +import { ActiveSseStream, type SseStreamEvent } from "./active-streams.js"; import { prisma } from "./db.js"; import { requireAdmin } from "./auth.js"; import { env } from "./env.js"; import { buildComparableAttachments } from "./llm/message-content.js"; import { runMultiplex } from "./llm/multiplexer.js"; -import { runMultiplexStream } from "./llm/streaming.js"; +import { runMultiplexStream, type StreamEvent } from "./llm/streaming.js"; import { getModelCatalogSnapshot } from "./llm/model-catalog.js"; import { openaiClient } from "./llm/providers.js"; import { exaClient } from "./search/exa.js"; @@ -120,6 +121,26 @@ const CompletionMessageSchema = z } }); +const CompletionStreamBody = z + .object({ + chatId: z.string().optional(), + persist: z.boolean().optional(), + provider: z.enum(["openai", "anthropic", "xai"]), + model: z.string().min(1), + messages: z.array(CompletionMessageSchema), + temperature: z.number().min(0).max(2).optional(), + maxTokens: z.number().int().positive().optional(), + }) + .superRefine((value, ctx) => { + if (value.persist === false && value.chatId) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "chatId must be omitted when persist is false", + path: ["chatId"], + }); + } + }); + function mergeAttachmentsIntoMetadata(metadata: unknown, attachments?: ChatAttachment[]) { if (!attachments?.length) return metadata as any; if (!metadata || typeof metadata !== "object" || Array.isArray(metadata)) { @@ -293,6 +314,246 @@ function buildSseHeaders(originHeader: string | undefined) { return headers; } +type SearchRunRequest = z.infer; + +const activeChatStreams = new Map(); +const activeSearchStreams = new Map(); + +function getErrorMessage(err: unknown) { + return err instanceof Error ? err.message : String(err); +} + +function writeSseEvent(reply: FastifyReply, event: SseStreamEvent) { + if (reply.raw.destroyed || reply.raw.writableEnded) return; + reply.raw.write(`event: ${event.event}\n`); + reply.raw.write(`data: ${JSON.stringify(event.data)}\n\n`); +} + +async function streamActiveRun(req: FastifyRequest, reply: FastifyReply, stream: ActiveSseStream) { + reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); + reply.raw.flushHeaders?.(); + + let unsubscribe = () => {}; + let closed = false; + const closedPromise = new Promise((resolve) => { + const onClose = () => { + closed = true; + unsubscribe(); + reply.raw.off("close", onClose); + resolve(); + }; + reply.raw.on("close", onClose); + stream.done.finally(() => { + reply.raw.off("close", onClose); + }); + }); + + unsubscribe = stream.subscribe((event) => writeSseEvent(reply, event)); + await Promise.race([stream.done, closedPromise]); + unsubscribe(); + + if (!closed && !reply.raw.destroyed && !reply.raw.writableEnded) { + reply.raw.end(); + } + + return reply; +} + +function mapChatStreamEvent(ev: StreamEvent): SseStreamEvent { + if (ev.type === "tool_call") return { event: "tool_call", data: ev.event }; + return { event: ev.type, data: ev }; +} + +function startActiveChatStream(chatId: string, body: z.infer) { + const stream = new ActiveSseStream(); + activeChatStreams.set(chatId, stream); + + void (async () => { + let sawTerminalEvent = false; + try { + for await (const ev of runMultiplexStream(body)) { + const event = mapChatStreamEvent(ev); + if (ev.type === "done" || ev.type === "error") { + sawTerminalEvent = true; + stream.complete(event); + break; + } + stream.emit(event.event, event.data); + } + + if (!sawTerminalEvent) { + stream.complete({ event: "error", data: { message: "chat stream ended unexpectedly" } }); + } + } catch (err) { + stream.complete({ event: "error", data: { message: getErrorMessage(err) } }); + } finally { + activeChatStreams.delete(chatId); + } + })(); + + return stream; +} + +async function executeSearchRunStream(searchId: string, body: SearchRunRequest, stream: ActiveSseStream) { + const startedAt = performance.now(); + const query = body.query?.trim(); + if (!query) { + stream.complete({ event: "error", data: { message: "query is required" } }); + return; + } + + const normalizedTitle = body.title?.trim() || query.slice(0, 80); + + try { + const exa = exaClient(); + const searchPromise = exa.search(query, { + type: body.type ?? "auto", + numResults: body.numResults ?? 10, + includeDomains: body.includeDomains, + excludeDomains: body.excludeDomains, + moderation: true, + userLocation: "US", + contents: false, + } as any); + const answerPromise = exa.answer(query, { + text: true, + model: "exa", + userLocation: "US", + }); + + let searchResponse: any | null = null; + let answerResponse: any | null = null; + let enrichedResults: any[] | null = null; + let searchError: string | null = null; + let answerError: string | null = null; + + const searchSettled = searchPromise.then( + async (value) => { + searchResponse = value; + const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index)); + stream.emit("search_results", { + requestId: value?.requestId ?? null, + results: previewResults, + }); + + const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string"); + if (!urls.length) return; + + try { + const contentsResponse = await exa.getContents(urls, { + text: { maxCharacters: 1200 }, + highlights: { + query, + maxCharacters: 320, + numSentences: 2, + highlightsPerUrl: 2, + }, + } as any); + const byUrl = new Map(); + for (const contentItem of contentsResponse?.results ?? []) { + byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem); + } + + enrichedResults = (value?.results ?? []).map((result: any) => { + const contentItem = byUrl.get(normalizeUrlForMatch(result?.url)); + if (!contentItem) return result; + return { + ...result, + text: contentItem.text ?? result.text ?? null, + highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null, + highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null, + }; + }); + + stream.emit("search_results", { + requestId: value?.requestId ?? null, + results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)), + }); + } catch { + // keep preview results if content enrichment fails + } + }, + (reason) => { + searchError = reason?.message ?? String(reason); + stream.emit("search_error", { error: searchError }); + } + ); + + const answerSettled = answerPromise.then( + (value) => { + answerResponse = value; + stream.emit("answer", { + answerText: parseAnswerText(value), + answerRequestId: value?.requestId ?? null, + answerCitations: (value?.citations as any) ?? null, + }); + }, + (reason) => { + answerError = reason?.message ?? String(reason); + stream.emit("answer_error", { error: answerError }); + } + ); + + await Promise.all([searchSettled, answerSettled]); + + const latencyMs = Math.round(performance.now() - startedAt); + const persistedResults = enrichedResults ?? searchResponse?.results ?? []; + const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index)); + const answerText = parseAnswerText(answerResponse); + + await prisma.$transaction(async (tx) => { + await tx.search.update({ + where: { id: searchId }, + data: { + query, + title: normalizedTitle, + requestId: searchResponse?.requestId ?? null, + rawResponse: searchResponse as any, + latencyMs, + error: searchError, + answerText, + answerRequestId: answerResponse?.requestId ?? null, + answerCitations: (answerResponse?.citations as any) ?? null, + answerRawResponse: answerResponse as any, + answerError, + }, + }); + await tx.searchResult.deleteMany({ where: { searchId } }); + if (rows.length) { + await tx.searchResult.createMany({ data: rows as any }); + } + }); + + const search = await prisma.search.findUnique({ + where: { id: searchId }, + include: { results: { orderBy: { rank: "asc" } } }, + }); + if (!search) { + stream.complete({ event: "error", data: { message: "search not found" } }); + } else { + stream.complete({ event: "done", data: { search } }); + } + } catch (err) { + const message = getErrorMessage(err); + try { + await prisma.search.update({ + where: { id: searchId }, + data: { + query, + title: normalizedTitle, + latencyMs: Math.round(performance.now() - startedAt), + error: message, + }, + }); + } catch { + // keep the stream terminal event even if the backing search row disappeared + } + stream.complete({ event: "error", data: { message } }); + } finally { + activeSearchStreams.delete(searchId); + } +} + export async function registerRoutes(app: FastifyInstance) { app.get("/health", { logLevel: "silent" }, async () => ({ ok: true })); @@ -306,6 +567,14 @@ export async function registerRoutes(app: FastifyInstance) { return { providers: getModelCatalogSnapshot() }; }); + app.get("/v1/active-runs", async (req) => { + requireAdmin(req); + return { + chats: Array.from(activeChatStreams.keys()), + searches: Array.from(activeSearchStreams.keys()), + }; + }); + app.get("/v1/chats", async (req) => { requireAdmin(req); const chats = await prisma.chat.findMany({ @@ -695,162 +964,24 @@ export async function registerRoutes(app: FastifyInstance) { const query = body.query?.trim() || existing.query?.trim(); if (!query) return app.httpErrors.badRequest("query is required"); - const startedAt = performance.now(); - const normalizedTitle = body.title?.trim() || query.slice(0, 80); - - reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); - - const send = (event: string, data: any) => { - if (reply.raw.writableEnded) return; - reply.raw.write(`event: ${event}\n`); - reply.raw.write(`data: ${JSON.stringify(data)}\n\n`); - }; - - try { - const exa = exaClient(); - const searchPromise = exa.search(query, { - type: body.type ?? "auto", - numResults: body.numResults ?? 10, - includeDomains: body.includeDomains, - excludeDomains: body.excludeDomains, - moderation: true, - userLocation: "US", - contents: false, - } as any); - const answerPromise = exa.answer(query, { - text: true, - model: "exa", - userLocation: "US", - }); - - let searchResponse: any | null = null; - let answerResponse: any | null = null; - let enrichedResults: any[] | null = null; - let searchError: string | null = null; - let answerError: string | null = null; - - const searchSettled = searchPromise.then( - async (value) => { - searchResponse = value; - const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index)); - send("search_results", { - requestId: value?.requestId ?? null, - results: previewResults, - }); - - const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string"); - if (!urls.length) return; - - try { - const contentsResponse = await exa.getContents(urls, { - text: { maxCharacters: 1200 }, - highlights: { - query, - maxCharacters: 320, - numSentences: 2, - highlightsPerUrl: 2, - }, - } as any); - const byUrl = new Map(); - for (const contentItem of contentsResponse?.results ?? []) { - byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem); - } - - enrichedResults = (value?.results ?? []).map((result: any) => { - const contentItem = byUrl.get(normalizeUrlForMatch(result?.url)); - if (!contentItem) return result; - return { - ...result, - text: contentItem.text ?? result.text ?? null, - highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null, - highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null, - }; - }); - - send("search_results", { - requestId: value?.requestId ?? null, - results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)), - }); - } catch { - // keep preview results if content enrichment fails - } - }, - (reason) => { - searchError = reason?.message ?? String(reason); - send("search_error", { error: searchError }); - } - ); - - const answerSettled = answerPromise.then( - (value) => { - answerResponse = value; - send("answer", { - answerText: parseAnswerText(value), - answerRequestId: value?.requestId ?? null, - answerCitations: (value?.citations as any) ?? null, - }); - }, - (reason) => { - answerError = reason?.message ?? String(reason); - send("answer_error", { error: answerError }); - } - ); - - await Promise.all([searchSettled, answerSettled]); - - const latencyMs = Math.round(performance.now() - startedAt); - const persistedResults = enrichedResults ?? searchResponse?.results ?? []; - const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index)); - const answerText = parseAnswerText(answerResponse); - - await prisma.$transaction(async (tx) => { - await tx.search.update({ - where: { id: searchId }, - data: { - query, - title: normalizedTitle, - requestId: searchResponse?.requestId ?? null, - rawResponse: searchResponse as any, - latencyMs, - error: searchError, - answerText, - answerRequestId: answerResponse?.requestId ?? null, - answerCitations: (answerResponse?.citations as any) ?? null, - answerRawResponse: answerResponse as any, - answerError, - }, - }); - await tx.searchResult.deleteMany({ where: { searchId } }); - if (rows.length) { - await tx.searchResult.createMany({ data: rows as any }); - } - }); - - const search = await prisma.search.findUnique({ - where: { id: searchId }, - include: { results: { orderBy: { rank: "asc" } } }, - }); - if (!search) { - send("error", { message: "search not found" }); - } else { - send("done", { search }); - } - } catch (err: any) { - await prisma.search.update({ - where: { id: searchId }, - data: { - query, - title: normalizedTitle, - latencyMs: Math.round(performance.now() - startedAt), - error: err?.message ?? String(err), - }, - }); - send("error", { message: err?.message ?? String(err) }); - } finally { - reply.raw.end(); + const existingStream = activeSearchStreams.get(searchId); + if (existingStream) { + return streamActiveRun(req, reply, existingStream); } - return reply; + const stream = new ActiveSseStream(); + activeSearchStreams.set(searchId, stream); + void executeSearchRunStream(searchId, { ...body, query }, stream); + return streamActiveRun(req, reply, stream); + }); + + app.post("/v1/searches/:searchId/run/stream/attach", async (req, reply) => { + requireAdmin(req); + const Params = z.object({ searchId: z.string() }); + const { searchId } = Params.parse(req.params); + const stream = activeSearchStreams.get(searchId); + if (!stream) return app.httpErrors.notFound("active search stream not found"); + return streamActiveRun(req, reply, stream); }); app.get("/v1/chats/:chatId", async (req) => { @@ -895,6 +1026,15 @@ export async function registerRoutes(app: FastifyInstance) { return { message: msg }; }); + app.post("/v1/chats/:chatId/stream/attach", async (req, reply) => { + requireAdmin(req); + const Params = z.object({ chatId: z.string() }); + const { chatId } = Params.parse(req.params); + const stream = activeChatStreams.get(chatId); + if (!stream) return app.httpErrors.notFound("active chat stream not found"); + return streamActiveRun(req, reply, stream); + }); + // Main: create a completion via provider+model and store everything. app.post("/v1/chat-completions", async (req) => { requireAdmin(req); @@ -935,27 +1075,7 @@ export async function registerRoutes(app: FastifyInstance) { app.post("/v1/chat-completions/stream", async (req, reply) => { requireAdmin(req); - const Body = z - .object({ - chatId: z.string().optional(), - persist: z.boolean().optional(), - provider: z.enum(["openai", "anthropic", "xai"]), - model: z.string().min(1), - messages: z.array(CompletionMessageSchema), - temperature: z.number().min(0).max(2).optional(), - maxTokens: z.number().int().positive().optional(), - }) - .superRefine((value, ctx) => { - if (value.persist === false && value.chatId) { - ctx.addIssue({ - code: z.ZodIssueCode.custom, - message: "chatId must be omitted when persist is false", - path: ["chatId"], - }); - } - }); - - const parsed = Body.safeParse(req.body); + const parsed = CompletionStreamBody.safeParse(req.body); if (!parsed.success) return app.httpErrors.badRequest(parsed.error.message); const body = parsed.data; @@ -970,23 +1090,24 @@ export async function registerRoutes(app: FastifyInstance) { await storeNonAssistantMessages(body.chatId, body.messages); } + if (body.persist !== false && body.chatId) { + if (activeChatStreams.has(body.chatId)) { + return app.httpErrors.conflict("chat completion already running"); + } + const stream = startActiveChatStream(body.chatId, body); + return streamActiveRun(req, reply, stream); + } + reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); reply.raw.flushHeaders(); - const send = (event: string, data: any) => { - reply.raw.write(`event: ${event}\n`); - reply.raw.write(`data: ${JSON.stringify(data)}\n\n`); - }; - for await (const ev of runMultiplexStream(body)) { - if (ev.type === "meta") send("meta", ev); - else if (ev.type === "tool_call") send("tool_call", ev.event); - else if (ev.type === "delta") send("delta", ev); - else if (ev.type === "done") send("done", ev); - else if (ev.type === "error") send("error", ev); + writeSseEvent(reply, mapChatStreamEvent(ev)); } - reply.raw.end(); + if (!reply.raw.destroyed && !reply.raw.writableEnded) { + reply.raw.end(); + } return reply; }); } diff --git a/server/tests/active-streams.test.ts b/server/tests/active-streams.test.ts new file mode 100644 index 0000000..f4e42eb --- /dev/null +++ b/server/tests/active-streams.test.ts @@ -0,0 +1,34 @@ +import assert from "node:assert/strict"; +import test from "node:test"; +import { ActiveSseStream, type SseStreamEvent } from "../src/active-streams.js"; + +test("ActiveSseStream replays buffered events to late subscribers", () => { + const stream = new ActiveSseStream(); + stream.emit("delta", { text: "hel" }); + stream.emit("delta", { text: "lo" }); + + const events: SseStreamEvent[] = []; + const unsubscribe = stream.subscribe((event) => events.push(event)); + unsubscribe(); + + assert.deepEqual(events, [ + { event: "delta", data: { text: "hel" } }, + { event: "delta", data: { text: "lo" } }, + ]); +}); + +test("ActiveSseStream replays terminal events after completion", async () => { + const stream = new ActiveSseStream(); + stream.emit("delta", { text: "done" }); + stream.complete({ event: "done", data: { text: "done" } }); + await stream.done; + + const events: SseStreamEvent[] = []; + stream.subscribe((event) => events.push(event)); + + assert.equal(stream.isCompleted, true); + assert.deepEqual(events, [ + { event: "delta", data: { text: "done" } }, + { event: "done", data: { text: "done" } }, + ]); +}); diff --git a/web/src/App.tsx b/web/src/App.tsx index e300bdb..555ef49 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -1,5 +1,5 @@ import { useEffect, useMemo, useRef, useState } from "preact/hooks"; -import { Check, ChevronDown, Globe2, Menu, MessageSquare, Paperclip, Plus, Rabbit, Search, SendHorizontal, Trash2, X } from "lucide-preact"; +import { Check, ChevronDown, Globe2, LoaderCircle, Menu, MessageSquare, Paperclip, Plus, Rabbit, Search, SendHorizontal, Trash2, X } from "lucide-preact"; import { Button } from "@/components/ui/button"; import { Textarea } from "@/components/ui/textarea"; import { Separator } from "@/components/ui/separator"; @@ -14,6 +14,9 @@ import { createSearch, deleteChat, deleteSearch, + attachCompletionStream, + attachSearchStream, + getActiveRuns, getChat, listModels, getSearch, @@ -24,6 +27,7 @@ import { suggestChatTitle, getMessageAttachments, type ChatAttachment, + type ActiveRunsResponse, type ModelCatalogResponse, type Provider, type ChatDetail, @@ -53,6 +57,13 @@ type ContextMenuState = { x: number; y: number; }; +type PendingChatState = { + messages: Message[]; +}; +type ActiveRunsState = { + chats: Record; + searches: Record; +}; function readSidebarSelectionFromUrl(): SidebarSelection | null { if (typeof window === "undefined") return null; @@ -107,6 +118,10 @@ const EMPTY_MODEL_PREFERENCES: ProviderModelPreferences = { anthropic: null, xai: null, }; +const EMPTY_ACTIVE_RUNS: ActiveRunsState = { + chats: {}, + searches: {}, +}; const TRANSCRIPT_BOTTOM_GAP = 20; const REPLY_SCROLL_BUFFER_MIN = 288; @@ -588,6 +603,13 @@ function buildSidebarItems(chats: ChatSummary[], searches: SearchSummary[]): Sid return items.sort((a, b) => new Date(b.updatedAt).getTime() - new Date(a.updatedAt).getTime()); } +function buildActiveRunsState(activeRuns: ActiveRunsResponse): ActiveRunsState { + return { + chats: Object.fromEntries(activeRuns.chats.map((id) => [id, true])), + searches: Object.fromEntries(activeRuns.searches.map((id) => [id, true])), + }; +} + function formatDate(value: string) { return new Intl.DateTimeFormat(undefined, { month: "short", @@ -645,9 +667,10 @@ export default function App() { const [draftKind, setDraftKind] = useState(null); const [isLoadingCollections, setIsLoadingCollections] = useState(false); const [isLoadingSelection, setIsLoadingSelection] = useState(false); - const [isSending, setIsSending] = useState(false); const [isStartingSearchChat, setIsStartingSearchChat] = useState(false); - const [pendingChatState, setPendingChatState] = useState<{ chatId: string | null; messages: Message[] } | null>(null); + const [pendingChatStates, setPendingChatStates] = useState>({}); + const [runningSearchStates, setRunningSearchStates] = useState>({}); + const [activeRuns, setActiveRuns] = useState(EMPTY_ACTIVE_RUNS); const [composer, setComposer] = useState(""); const [pendingAttachments, setPendingAttachments] = useState([]); const [isComposerDropActive, setIsComposerDropActive] = useState(false); @@ -684,9 +707,10 @@ export default function App() { const pendingAttachmentsRef = useRef([]); const selectedItemRef = useRef(null); const pendingTitleGenerationRef = useRef>(new Set()); - const searchRunAbortRef = useRef(null); + const chatStreamAbortRefs = useRef>(new Map()); + const searchRunAbortRefs = useRef>(new Map()); const quickQuestionAbortRef = useRef(null); - const searchRunCounterRef = useRef(0); + const searchRunCountersRef = useRef>(new Map()); const shouldAutoScrollRef = useRef(true); const wasSendingRef = useRef(false); const pendingReplyScrollRef = useRef(false); @@ -765,7 +789,18 @@ export default function App() { setSelectedChat(null); setSelectedSearch(null); setDraftKind(null); - setPendingChatState(null); + setPendingChatStates({}); + setRunningSearchStates({}); + setActiveRuns(EMPTY_ACTIVE_RUNS); + for (const controller of chatStreamAbortRefs.current.values()) { + controller.abort(); + } + chatStreamAbortRefs.current.clear(); + for (const controller of searchRunAbortRefs.current.values()) { + controller.abort(); + } + searchRunAbortRefs.current.clear(); + searchRunCountersRef.current.clear(); setComposer(""); setPendingAttachments([]); setIsQuickQuestionOpen(false); @@ -831,6 +866,18 @@ export default function App() { } }; + const refreshActiveRuns = async () => { + try { + const data = await getActiveRuns(); + setActiveRuns(buildActiveRunsState(data)); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (message.includes("bearer token")) { + handleAuthFailure(message); + } + } + }; + const refreshChat = async (chatId: string) => { setIsLoadingSelection(true); try { @@ -871,7 +918,15 @@ export default function App() { if (!isAuthenticated) return; const preferredSelection = initialRouteSelectionRef.current; initialRouteSelectionRef.current = null; - void Promise.all([refreshCollections(preferredSelection ?? undefined), refreshModels()]); + void Promise.all([refreshCollections(preferredSelection ?? undefined), refreshModels(), refreshActiveRuns()]); + }, [isAuthenticated]); + + useEffect(() => { + if (!isAuthenticated) return; + const interval = window.setInterval(() => { + void refreshActiveRuns(); + }, 3000); + return () => window.clearInterval(interval); }, [isAuthenticated]); useEffect(() => { @@ -962,19 +1017,32 @@ export default function App() { }, [quickPrompt, isQuickQuestionOpen]); const selectedKey = selectedItem ? `${selectedItem.kind}:${selectedItem.id}` : null; - const isChatReplyStreamingInView = - isSending && - draftKind !== "search" && - selectedItem?.kind !== "search" && - !!pendingChatState && - (!pendingChatState.chatId || (selectedItem?.kind === "chat" && selectedItem.id === pendingChatState.chatId)); + const selectedChatPendingState = selectedItem?.kind === "chat" ? pendingChatStates[selectedItem.id] ?? null : null; + const selectedSearchRunState = selectedItem?.kind === "search" ? runningSearchStates[selectedItem.id] ?? null : null; + const selectedChatIsActive = selectedItem?.kind === "chat" && (!!selectedChatPendingState || !!activeRuns.chats[selectedItem.id]); + const selectedSearchIsActive = selectedItem?.kind === "search" && (!!selectedSearchRunState || !!activeRuns.searches[selectedItem.id]); + const isSearchMode = draftKind ? draftKind === "search" : selectedItem?.kind === "search"; + const isSearchRunning = !!selectedSearchIsActive; + const isSendingActiveChat = draftKind !== "search" && !!selectedChatIsActive; + const isActiveSelectionSending = isSearchMode ? isSearchRunning : isSendingActiveChat; + const selectedSearchForView = + selectedItem?.kind === "search" + ? selectedSearchRunState ?? (selectedSearch?.id === selectedItem.id ? selectedSearch : null) + : selectedSearch; + + const isItemRunning = (item: SidebarSelection) => + item.kind === "chat" ? !!pendingChatStates[item.id] || !!activeRuns.chats[item.id] : !!runningSearchStates[item.id] || !!activeRuns.searches[item.id]; + const isCurrentSelection = (item: SidebarSelection) => { + const current = selectedItemRef.current; + return current?.kind === item.kind && current.id === item.id; + }; useEffect(() => { shouldAutoScrollRef.current = true; - if (!isSending || !isChatReplyStreamingInView) { + if (!isSendingActiveChat) { setTranscriptTailSpacer(TRANSCRIPT_BOTTOM_GAP); } - }, [draftKind, selectedItem?.kind, selectedKey]); + }, [isSendingActiveChat, selectedKey]); useEffect(() => { selectedItemRef.current = selectedItem; @@ -1002,18 +1070,18 @@ export default function App() { useEffect(() => { if (draftKind === "search" || selectedItem?.kind === "search") return; const wasSending = wasSendingRef.current; - wasSendingRef.current = isSending; - if (isSending) return; + wasSendingRef.current = isSendingActiveChat; + if (isSendingActiveChat) return; if (wasSending) { shouldAutoScrollRef.current = false; return; } if (!shouldAutoScrollRef.current) return; transcriptEndRef.current?.scrollIntoView({ behavior: "auto", block: "end" }); - }, [draftKind, selectedChat?.messages.length, isSending, selectedItem?.kind, selectedKey]); + }, [draftKind, selectedChat?.messages.length, isSendingActiveChat, selectedItem?.kind, selectedKey]); useEffect(() => { - if (!isChatReplyStreamingInView || !pendingReplyScrollRef.current) return; + if (!isSendingActiveChat || !pendingReplyScrollRef.current) return; pendingReplyScrollRef.current = false; shouldAutoScrollRef.current = true; @@ -1022,28 +1090,31 @@ export default function App() { if (!container) return; container.scrollTo({ top: container.scrollHeight, behavior: "smooth" }); }); - }, [isChatReplyStreamingInView, pendingChatState?.chatId]); + }, [isSendingActiveChat, selectedKey]); useEffect(() => { - if (isSending) return; + if (isActiveSelectionSending) return; const hasWorkspaceSelection = Boolean(selectedItem) || draftKind !== null; if (!hasWorkspaceSelection) return; focusComposer(); - }, [draftKind, isSending, selectedKey]); + }, [draftKind, isActiveSelectionSending, selectedKey]); useEffect(() => { return () => { - searchRunAbortRef.current?.abort(); - searchRunAbortRef.current = null; + for (const controller of chatStreamAbortRefs.current.values()) { + controller.abort(); + } + chatStreamAbortRefs.current.clear(); + for (const controller of searchRunAbortRefs.current.values()) { + controller.abort(); + } + searchRunAbortRefs.current.clear(); quickQuestionAbortRef.current?.abort(); quickQuestionAbortRef.current = null; }; }, []); const messages = selectedChat?.messages ?? []; - const isSearchMode = draftKind ? draftKind === "search" : selectedItem?.kind === "search"; - const isSearchRunning = isSending && isSearchMode; - const isSendingActiveChat = isChatReplyStreamingInView; useEffect(() => { if (isSearchMode && pendingAttachments.length) { @@ -1055,15 +1126,9 @@ export default function App() { } }, [isSearchMode, pendingAttachments.length]); const displayMessages = useMemo(() => { - if (!pendingChatState) return messages.filter(isDisplayableMessage); - if (pendingChatState.chatId) { - if (selectedItem?.kind === "chat" && selectedItem.id === pendingChatState.chatId) { - return pendingChatState.messages.filter(isDisplayableMessage); - } - return messages.filter(isDisplayableMessage); - } - return (isSearchMode ? messages : pendingChatState.messages).filter(isDisplayableMessage); - }, [isSearchMode, messages, pendingChatState, selectedItem]); + if (selectedChatPendingState) return selectedChatPendingState.messages.filter(isDisplayableMessage); + return messages.filter(isDisplayableMessage); + }, [messages, selectedChatPendingState]); const quickAnswerText = useMemo(() => { for (let index = quickQuestionMessages.length - 1; index >= 0; index -= 1) { const message = quickQuestionMessages[index]; @@ -1106,10 +1171,10 @@ export default function App() { if (selectedChatSummary) return getChatTitle(selectedChatSummary); return "New chat"; } - if (selectedSearch) return getSearchTitle(selectedSearch); + if (selectedSearchForView) return getSearchTitle(selectedSearchForView); if (selectedSearchSummary) return getSearchTitle(selectedSearchSummary); return "New search"; - }, [draftKind, selectedChat, selectedChatSummary, selectedItem, selectedSearch, selectedSearchSummary]); + }, [draftKind, selectedChat, selectedChatSummary, selectedItem, selectedSearchForView, selectedSearchSummary]); const pageTitle = useMemo(() => { if (draftKind || !selectedItem) return "Sybil"; @@ -1118,12 +1183,12 @@ export default function App() { if (selectedChatSummary) return `${getChatTitle(selectedChatSummary)} — Sybil`; return "Sybil"; } - const searchQuery = selectedSearch?.query?.trim() || selectedSearchSummary?.query?.trim(); + const searchQuery = selectedSearchForView?.query?.trim() || selectedSearchSummary?.query?.trim(); if (searchQuery) return `${searchQuery} — Sybil`; - if (selectedSearch) return `${getSearchTitle(selectedSearch)} — Sybil`; + if (selectedSearchForView) return `${getSearchTitle(selectedSearchForView)} — Sybil`; if (selectedSearchSummary) return `${getSearchTitle(selectedSearchSummary)} — Sybil`; return "Sybil"; - }, [draftKind, selectedChat, selectedChatSummary, selectedItem, selectedSearch, selectedSearchSummary]); + }, [draftKind, selectedChat, selectedChatSummary, selectedItem, selectedSearchForView, selectedSearchSummary]); const primaryShortcutModifier = useMemo(() => { if (typeof navigator === "undefined") return "Ctrl"; return /Mac|iPhone|iPad|iPod/i.test(navigator.platform) ? "Cmd" : "Ctrl"; @@ -1231,7 +1296,7 @@ export default function App() { }; const handleDeleteFromContextMenu = async () => { - if (!contextMenu || isSending) return; + if (!contextMenu || isItemRunning(contextMenu.item)) return; const target = contextMenu.item; setContextMenu(null); setError(null); @@ -1361,7 +1426,45 @@ export default function App() { await appendPendingAttachments(getFilesFromDataTransfer(event.dataTransfer)); }; - const handleSendChat = async (content: string, attachments: ChatAttachment[]) => { + const removePendingChatState = (chatId: string) => { + setPendingChatStates((current) => { + if (!current[chatId]) return current; + const next = { ...current }; + delete next[chatId]; + return next; + }); + }; + + const removeActiveRun = (kind: SidebarSelection["kind"], id: string) => { + setActiveRuns((current) => { + const bucket = current[kind === "chat" ? "chats" : "searches"]; + if (!bucket[id]) return current; + const nextBucket = { ...bucket }; + delete nextBucket[id]; + return kind === "chat" ? { ...current, chats: nextBucket } : { ...current, searches: nextBucket }; + }); + }; + + const addActiveRun = (kind: SidebarSelection["kind"], id: string) => { + setActiveRuns((current) => { + const bucket = current[kind === "chat" ? "chats" : "searches"]; + if (bucket[id]) return current; + return kind === "chat" + ? { ...current, chats: { ...current.chats, [id]: true } } + : { ...current, searches: { ...current.searches, [id]: true } }; + }); + }; + + const removeRunningSearchState = (searchId: string) => { + setRunningSearchStates((current) => { + if (!current[searchId]) return current; + const next = { ...current }; + delete next[searchId]; + return next; + }); + }; + + const handleSendChat = async (content: string, attachments: ChatAttachment[]): Promise => { pendingReplyScrollRef.current = true; expandTranscriptTailSpacer(getReplyScrollBufferHeight()); @@ -1383,11 +1486,6 @@ export default function App() { metadata: null, }; - setPendingChatState({ - chatId: selectedItem?.kind === "chat" ? selectedItem.id : null, - messages: (selectedChat?.messages ?? []).concat(optimisticUserMessage, optimisticAssistantMessage), - }); - let chatId = draftKind === "chat" ? null : selectedItem?.kind === "chat" ? selectedItem.id : null; if (!chatId) { @@ -1399,7 +1497,6 @@ export default function App() { return [chat, ...withoutExisting]; }); setSelectedItem({ kind: "chat", id: chatId }); - setPendingChatState((current) => (current ? { ...current, chatId } : current)); setSelectedChat({ id: chat.id, title: chat.title, @@ -1418,11 +1515,23 @@ export default function App() { throw new Error("Unable to initialize chat"); } + if (pendingChatStates[chatId]) { + throw new Error("This chat is already generating a response."); + } + let baseChat = selectedChat; if (!baseChat || baseChat.id !== chatId) { baseChat = await getChat(chatId); } + setPendingChatStates((current) => ({ + ...current, + [chatId]: { + messages: baseChat.messages.concat(optimisticUserMessage, optimisticAssistantMessage), + }, + })); + addActiveRun("chat", chatId); + const requestMessages: CompletionRequestMessage[] = [ ...baseChat.messages .filter((message) => !isToolCallLogMessage(message)) @@ -1472,99 +1581,109 @@ export default function App() { let streamErrorMessage: string | null = null; - await runCompletionStream( - { - chatId, - provider, - model: selectedModel, - messages: requestMessages, - }, - { - onMeta: (payload) => { - if (payload.chatId !== chatId) return; - setPendingChatState((current) => (current ? { ...current, chatId: payload.chatId } : current)); + try { + await runCompletionStream( + { + chatId, + provider, + model: selectedModel, + messages: requestMessages, }, - onToolCall: (payload) => { - setPendingChatState((current) => { - if (!current) return current; - if ( - current.messages.some( - (message) => - asToolLogMetadata(message.metadata)?.toolCallId === payload.toolCallId || message.id === `temp-tool-${payload.toolCallId}` - ) - ) { - return current; - } + { + onMeta: (payload) => { + if (payload.chatId !== chatId) return; + }, + onToolCall: (payload) => { + setPendingChatStates((current) => { + const pendingState = current[chatId]; + if (!pendingState) return current; + if ( + pendingState.messages.some( + (message) => + asToolLogMetadata(message.metadata)?.toolCallId === payload.toolCallId || message.id === `temp-tool-${payload.toolCallId}` + ) + ) { + return current; + } - const toolMessage = buildOptimisticToolMessage(payload); - const assistantIndex = current.messages.findIndex( - (message, index, all) => index === all.length - 1 && message.id.startsWith("temp-assistant-") - ); - if (assistantIndex < 0) { - return { ...current, messages: current.messages.concat(toolMessage) }; - } - return { - ...current, - messages: [ - ...current.messages.slice(0, assistantIndex), - toolMessage, - ...current.messages.slice(assistantIndex), - ], - }; - }); - }, - onDelta: (payload) => { - if (!payload.text) return; - setPendingChatState((current) => { - if (!current) return current; - let updated = false; - const nextMessages = current.messages.map((message, index, all) => { - const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-"); - if (!isTarget) return message; - updated = true; - return { ...message, content: message.content + payload.text }; + const toolMessage = buildOptimisticToolMessage(payload); + const assistantIndex = pendingState.messages.findIndex( + (message, index, all) => index === all.length - 1 && message.id.startsWith("temp-assistant-") + ); + if (assistantIndex < 0) { + return { + ...current, + [chatId]: { messages: pendingState.messages.concat(toolMessage) }, + }; + } + return { + ...current, + [chatId]: { + messages: [ + ...pendingState.messages.slice(0, assistantIndex), + toolMessage, + ...pendingState.messages.slice(assistantIndex), + ], + }, + }; }); - return updated ? { ...current, messages: nextMessages } : current; - }); - }, - onDone: (payload) => { - setPendingChatState((current) => { - if (!current) return current; - let updated = false; - const nextMessages = current.messages.map((message, index, all) => { - const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-"); - if (!isTarget) return message; - updated = true; - return { ...message, content: payload.text }; + }, + onDelta: (payload) => { + if (!payload.text) return; + setPendingChatStates((current) => { + const pendingState = current[chatId]; + if (!pendingState) return current; + let updated = false; + const nextMessages = pendingState.messages.map((message, index, all) => { + const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-"); + if (!isTarget) return message; + updated = true; + return { ...message, content: message.content + payload.text }; + }); + return updated ? { ...current, [chatId]: { messages: nextMessages } } : current; }); - return updated ? { ...current, messages: nextMessages } : current; - }); - }, - onError: (payload) => { - streamErrorMessage = payload.message; - }, + }, + onDone: (payload) => { + setPendingChatStates((current) => { + const pendingState = current[chatId]; + if (!pendingState) return current; + let updated = false; + const nextMessages = pendingState.messages.map((message, index, all) => { + const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-"); + if (!isTarget) return message; + updated = true; + return { ...message, content: payload.text }; + }); + return updated ? { ...current, [chatId]: { messages: nextMessages } } : current; + }); + }, + onError: (payload) => { + streamErrorMessage = payload.message; + }, + } + ); + + if (streamErrorMessage) { + throw new Error(streamErrorMessage); } - ); - if (streamErrorMessage) { - throw new Error(streamErrorMessage); + await refreshCollections(); + const currentSelection = selectedItemRef.current; + if (currentSelection?.kind === "chat" && currentSelection.id === chatId) { + await refreshChat(chatId); + } + settleTranscriptTailSpacer(); + removePendingChatState(chatId); + removeActiveRun("chat", chatId); + return { kind: "chat", id: chatId }; + } catch (err) { + removePendingChatState(chatId); + removeActiveRun("chat", chatId); + throw err; } - - await refreshCollections(); - const currentSelection = selectedItemRef.current; - if (currentSelection?.kind === "chat" && currentSelection.id === chatId) { - await refreshChat(chatId); - } - settleTranscriptTailSpacer(); - setPendingChatState(null); }; - const handleSendSearch = async (query: string) => { - const runId = ++searchRunCounterRef.current; - searchRunAbortRef.current?.abort(); - const abortController = new AbortController(); - searchRunAbortRef.current = abortController; - + const handleSendSearch = async (query: string): Promise => { let searchId = draftKind === "search" ? null : selectedItem?.kind === "search" ? selectedItem.id : null; if (!searchId) { @@ -1581,42 +1700,50 @@ export default function App() { throw new Error("Unable to initialize search"); } + const targetSearchId = searchId; + const target: SidebarSelection = { kind: "search", id: targetSearchId }; + const runId = (searchRunCountersRef.current.get(targetSearchId) ?? 0) + 1; + searchRunCountersRef.current.set(targetSearchId, runId); + searchRunAbortRefs.current.get(targetSearchId)?.abort(); + const abortController = new AbortController(); + searchRunAbortRefs.current.set(targetSearchId, abortController); + const isCurrentRun = () => searchRunCountersRef.current.get(targetSearchId) === runId; + const nowIso = new Date().toISOString(); - setSelectedSearch((current) => { - if (!current || current.id !== searchId) { - return { - id: searchId, - title: query.slice(0, 80), - query, - createdAt: nowIso, - updatedAt: nowIso, - requestId: null, - latencyMs: null, - error: null, - answerText: null, - answerRequestId: null, - answerCitations: null, - answerError: null, - results: [], - }; - } - return { - ...current, - title: query.slice(0, 80), - query, - error: null, - latencyMs: null, - answerText: null, - answerRequestId: null, - answerCitations: null, - answerError: null, - results: [], - }; - }); + const currentSearch = selectedSearchForView?.id === targetSearchId ? selectedSearchForView : null; + const optimisticSearch: SearchDetail = { + id: targetSearchId, + title: query.slice(0, 80), + query, + createdAt: currentSearch?.createdAt ?? nowIso, + updatedAt: nowIso, + requestId: null, + latencyMs: null, + error: null, + answerText: null, + answerRequestId: null, + answerCitations: null, + answerError: null, + results: [], + }; + setRunningSearchStates((current) => ({ ...current, [targetSearchId]: optimisticSearch })); + addActiveRun("search", targetSearchId); + if (isCurrentSelection(target)) { + setSelectedSearch(optimisticSearch); + setSelectedChat(null); + } + + const updateRunningSearch = (updater: (search: SearchDetail) => SearchDetail) => { + setRunningSearchStates((current) => { + const search = current[targetSearchId]; + if (!search) return current; + return { ...current, [targetSearchId]: updater(search) }; + }); + }; try { await runSearchStream( - searchId, + targetSearchId, { query, title: query.slice(0, 80), @@ -1625,9 +1752,8 @@ export default function App() { }, { onSearchResults: (payload) => { - if (runId !== searchRunCounterRef.current) return; - setSelectedSearch((current) => { - if (!current || current.id !== searchId) return current; + if (!isCurrentRun()) return; + updateRunningSearch((current) => { return { ...current, requestId: payload.requestId ?? current.requestId, @@ -1637,16 +1763,12 @@ export default function App() { }); }, onSearchError: (payload) => { - if (runId !== searchRunCounterRef.current) return; - setSelectedSearch((current) => { - if (!current || current.id !== searchId) return current; - return { ...current, error: payload.error }; - }); + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ ...current, error: payload.error })); }, onAnswer: (payload) => { - if (runId !== searchRunCounterRef.current) return; - setSelectedSearch((current) => { - if (!current || current.id !== searchId) return current; + if (!isCurrentRun()) return; + updateRunningSearch((current) => { return { ...current, answerText: payload.answerText, @@ -1657,45 +1779,285 @@ export default function App() { }); }, onAnswerError: (payload) => { - if (runId !== searchRunCounterRef.current) return; - setSelectedSearch((current) => { - if (!current || current.id !== searchId) return current; - return { ...current, answerError: payload.error }; - }); + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ ...current, answerError: payload.error })); }, onDone: (payload) => { - if (runId !== searchRunCounterRef.current) return; - setSelectedSearch(payload.search); - setSelectedChat(null); + if (!isCurrentRun()) return; + setRunningSearchStates((current) => ({ ...current, [targetSearchId]: payload.search })); + if (isCurrentSelection(target)) { + setSelectedSearch(payload.search); + setSelectedChat(null); + } }, onError: (payload) => { - if (runId !== searchRunCounterRef.current) return; - setError(payload.message); + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ ...current, error: payload.message })); + if (isCurrentSelection(target)) { + setSelectedSearch((current) => (current?.id === targetSearchId ? { ...current, error: payload.message } : current)); + setError(payload.message); + } }, }, { signal: abortController.signal } ); } catch (err) { - if (abortController.signal.aborted) return; + if (abortController.signal.aborted) return target; throw err; } finally { - if (runId === searchRunCounterRef.current) { - searchRunAbortRef.current = null; + if (isCurrentRun()) { + searchRunAbortRefs.current.delete(targetSearchId); + searchRunCountersRef.current.delete(targetSearchId); + removeRunningSearchState(targetSearchId); + removeActiveRun("search", targetSearchId); } } - await refreshCollections({ kind: "search", id: searchId }); + await refreshCollections(target); + return target; }; + const attachToActiveChatStream = async (chatId: string) => { + if (chatStreamAbortRefs.current.has(chatId)) return; + const target: SidebarSelection = { kind: "chat", id: chatId }; + const abortController = new AbortController(); + chatStreamAbortRefs.current.set(chatId, abortController); + addActiveRun("chat", chatId); + + let streamErrorMessage: string | null = null; + + try { + const baseChat = selectedChat?.id === chatId ? selectedChat : await getChat(chatId); + setPendingChatStates((current) => { + if (current[chatId]) return current; + return { + ...current, + [chatId]: { + messages: baseChat.messages.concat({ + id: `temp-assistant-attach-${chatId}-${Date.now()}`, + createdAt: new Date().toISOString(), + role: "assistant", + content: "", + name: null, + metadata: null, + }), + }, + }; + }); + + await attachCompletionStream( + chatId, + { + onToolCall: (payload) => { + setPendingChatStates((current) => { + const pendingState = current[chatId]; + if (!pendingState) return current; + if ( + pendingState.messages.some( + (message) => + asToolLogMetadata(message.metadata)?.toolCallId === payload.toolCallId || message.id === `temp-tool-${payload.toolCallId}` + ) + ) { + return current; + } + + const toolMessage = buildOptimisticToolMessage(payload); + const assistantIndex = pendingState.messages.findIndex( + (message, index, all) => index === all.length - 1 && message.id.startsWith("temp-assistant-") + ); + if (assistantIndex < 0) { + return { ...current, [chatId]: { messages: pendingState.messages.concat(toolMessage) } }; + } + return { + ...current, + [chatId]: { + messages: [ + ...pendingState.messages.slice(0, assistantIndex), + toolMessage, + ...pendingState.messages.slice(assistantIndex), + ], + }, + }; + }); + }, + onDelta: (payload) => { + if (!payload.text) return; + setPendingChatStates((current) => { + const pendingState = current[chatId]; + if (!pendingState) return current; + let updated = false; + const nextMessages = pendingState.messages.map((message, index, all) => { + const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-"); + if (!isTarget) return message; + updated = true; + return { ...message, content: message.content + payload.text }; + }); + return updated ? { ...current, [chatId]: { messages: nextMessages } } : current; + }); + }, + onDone: (payload) => { + setPendingChatStates((current) => { + const pendingState = current[chatId]; + if (!pendingState) return current; + let updated = false; + const nextMessages = pendingState.messages.map((message, index, all) => { + const isTarget = index === all.length - 1 && message.id.startsWith("temp-assistant-"); + if (!isTarget) return message; + updated = true; + return { ...message, content: payload.text }; + }); + return updated ? { ...current, [chatId]: { messages: nextMessages } } : current; + }); + }, + onError: (payload) => { + streamErrorMessage = payload.message; + }, + }, + { signal: abortController.signal } + ); + + if (streamErrorMessage) { + throw new Error(streamErrorMessage); + } + + await refreshCollections(); + if (isCurrentSelection(target)) { + await refreshChat(chatId); + } + } catch (err) { + if (abortController.signal.aborted) return; + const message = err instanceof Error ? err.message : String(err); + if (message.includes("active chat stream not found")) { + await refreshActiveRuns(); + if (isCurrentSelection(target)) await refreshChat(chatId); + } else if (message.includes("bearer token")) { + handleAuthFailure(message); + } else if (isCurrentSelection(target)) { + setError(message); + } + } finally { + chatStreamAbortRefs.current.delete(chatId); + removePendingChatState(chatId); + removeActiveRun("chat", chatId); + } + }; + + const attachToActiveSearchStream = async (searchId: string) => { + if (searchRunAbortRefs.current.has(searchId)) return; + const target: SidebarSelection = { kind: "search", id: searchId }; + const runId = (searchRunCountersRef.current.get(searchId) ?? 0) + 1; + searchRunCountersRef.current.set(searchId, runId); + const abortController = new AbortController(); + searchRunAbortRefs.current.set(searchId, abortController); + addActiveRun("search", searchId); + const isCurrentRun = () => searchRunCountersRef.current.get(searchId) === runId; + + try { + const baseSearch = selectedSearch?.id === searchId ? selectedSearch : await getSearch(searchId); + setRunningSearchStates((current) => ({ ...current, [searchId]: baseSearch })); + + const updateRunningSearch = (updater: (search: SearchDetail) => SearchDetail) => { + setRunningSearchStates((current) => { + const search = current[searchId]; + if (!search) return current; + return { ...current, [searchId]: updater(search) }; + }); + }; + + await attachSearchStream( + searchId, + { + onSearchResults: (payload) => { + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ + ...current, + requestId: payload.requestId ?? current.requestId, + error: null, + results: payload.results, + })); + }, + onSearchError: (payload) => { + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ ...current, error: payload.error })); + }, + onAnswer: (payload) => { + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ + ...current, + answerText: payload.answerText, + answerRequestId: payload.answerRequestId, + answerCitations: payload.answerCitations, + answerError: null, + })); + }, + onAnswerError: (payload) => { + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ ...current, answerError: payload.error })); + }, + onDone: (payload) => { + if (!isCurrentRun()) return; + setRunningSearchStates((current) => ({ ...current, [searchId]: payload.search })); + if (isCurrentSelection(target)) { + setSelectedSearch(payload.search); + setSelectedChat(null); + } + }, + onError: (payload) => { + if (!isCurrentRun()) return; + updateRunningSearch((current) => ({ ...current, error: payload.message })); + if (isCurrentSelection(target)) { + setSelectedSearch((current) => (current?.id === searchId ? { ...current, error: payload.message } : current)); + setError(payload.message); + } + }, + }, + { signal: abortController.signal } + ); + + await refreshCollections(target); + if (isCurrentSelection(target)) { + await refreshSearch(searchId); + } + } catch (err) { + if (abortController.signal.aborted) return; + const message = err instanceof Error ? err.message : String(err); + if (message.includes("active search stream not found")) { + await refreshActiveRuns(); + if (isCurrentSelection(target)) await refreshSearch(searchId); + } else if (message.includes("bearer token")) { + handleAuthFailure(message); + } else if (isCurrentSelection(target)) { + setError(message); + } + } finally { + if (isCurrentRun()) { + searchRunAbortRefs.current.delete(searchId); + searchRunCountersRef.current.delete(searchId); + removeRunningSearchState(searchId); + removeActiveRun("search", searchId); + } + } + }; + + useEffect(() => { + if (!isAuthenticated || !selectedItem || draftKind) return; + if (selectedItem.kind === "chat" && activeRuns.chats[selectedItem.id] && !pendingChatStates[selectedItem.id]) { + void attachToActiveChatStream(selectedItem.id); + } + if (selectedItem.kind === "search" && activeRuns.searches[selectedItem.id] && !runningSearchStates[selectedItem.id]) { + void attachToActiveSearchStream(selectedItem.id); + } + }, [activeRuns, draftKind, isAuthenticated, pendingChatStates, runningSearchStates, selectedKey]); + const handleStartChatFromSearch = async () => { - if (!selectedSearch || isStartingSearchChat || isSending) return; + const sourceSearch = selectedSearchForView; + if (!sourceSearch || isStartingSearchChat || isSearchRunning) return; setError(null); setIsStartingSearchChat(true); try { - const chat = await createChatFromSearch(selectedSearch.id); + const chat = await createChatFromSearch(sourceSearch.id); setDraftKind(null); - setPendingChatState(null); setComposer(""); setPendingAttachments([]); setChats((current) => { @@ -1868,7 +2230,6 @@ export default function App() { }); setDraftKind(null); - setPendingChatState(null); setComposer(""); setPendingAttachments([]); setIsQuickQuestionOpen(false); @@ -1908,7 +2269,7 @@ export default function App() { const handleSend = async () => { const content = composer.trim(); const attachments = pendingAttachments; - if ((!content && !attachments.length) || isSending) return; + if ((!content && !attachments.length) || isActiveSelectionSending) return; if (isSearchMode && attachments.length) { setError("Attachments are only available in chat mode."); return; @@ -1917,37 +2278,38 @@ export default function App() { setComposer(""); setPendingAttachments([]); setError(null); - setIsSending(true); + let sentTarget: SidebarSelection | null = null; + const sentAsSearch = isSearchMode; try { if (isSearchMode) { - await handleSendSearch(content); + sentTarget = await handleSendSearch(content); } else { - await handleSendChat(content, attachments); + sentTarget = await handleSendChat(content, attachments); } } catch (err) { const message = err instanceof Error ? err.message : String(err); if (message.includes("bearer token")) { handleAuthFailure(message); - } else { + } else if (!sentTarget || isCurrentSelection(sentTarget)) { setError(message); } - if (!isSearchMode) { + if (!sentAsSearch && (!sentTarget || isCurrentSelection(sentTarget))) { setComposer(content); setPendingAttachments(attachments); - setPendingChatState(null); } - if (selectedItem?.kind === "chat") { - await refreshChat(selectedItem.id); + if (sentTarget && isCurrentSelection(sentTarget) && sentTarget.kind === "chat") { + await refreshChat(sentTarget.id); } - if (selectedItem?.kind === "search") { - await refreshSearch(selectedItem.id); + if (sentTarget && isCurrentSelection(sentTarget) && sentTarget.kind === "search") { + await refreshSearch(sentTarget.id); } } finally { - setIsSending(false); - focusComposer(); + if (!sentTarget || isCurrentSelection(sentTarget)) { + focusComposer(); + } } }; @@ -2002,7 +2364,7 @@ export default function App() { @@ -2065,6 +2427,7 @@ export default function App() {

{section.label}

{section.items.map((item) => { const active = selectedItem?.kind === item.kind && selectedItem.id === item.id; + const itemIsRunning = isItemRunning(item); const initiatedLabel = item.kind === "chat" && item.initiatedModel ? `${getProviderLabel(item.initiatedProvider)}${item.initiatedProvider ? " · " : ""}${item.initiatedModel}` : null; @@ -2095,8 +2458,16 @@ export default function App() { > {item.kind === "chat" ? : } -

{item.title}

-

{formatDate(item.updatedAt)}

+ + {item.title} + {itemIsRunning ? ( + + ) : null} + + {formatDate(item.updatedAt)} {initiatedLabel ? (

{initiatedLabel}

@@ -2139,7 +2510,7 @@ export default function App() { const options = getModelOptions(modelCatalog, nextProvider); setModel(pickProviderModel(options, providerModelPreferences[nextProvider])); }} - disabled={isSending} + disabled={isActiveSelectionSending} > @@ -2148,7 +2519,7 @@ export default function App() { { const normalizedModel = nextModel.trim(); setModel(normalizedModel); @@ -2182,11 +2553,11 @@ export default function App() { ) : ( )}
{error ?

{error}

: null} @@ -2265,7 +2636,7 @@ export default function App() { onClick={handleOpenAttachmentPicker} size="icon" variant="secondary" - disabled={isSending || pendingAttachments.length >= MAX_CHAT_ATTACHMENTS} + disabled={isActiveSelectionSending || pendingAttachments.length >= MAX_CHAT_ATTACHMENTS} aria-label="Attach files" > @@ -2275,7 +2646,7 @@ export default function App() { className="h-10 w-10 rounded-lg" onClick={() => void handleSend()} size="icon" - disabled={isSending || (!composer.trim() && !pendingAttachments.length)} + disabled={isActiveSelectionSending || (!composer.trim() && !pendingAttachments.length)} > {isSearchMode ? : } @@ -2295,7 +2666,7 @@ export default function App() { type="button" className="flex w-full items-center gap-2 rounded-md px-2 py-1.5 text-left text-sm text-rose-300 transition hover:bg-rose-500/12 disabled:text-muted-foreground" onClick={() => void handleDeleteFromContextMenu()} - disabled={isSending} + disabled={isItemRunning(contextMenu.item)} > Delete diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index 53a6de0..66103ac 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -139,6 +139,11 @@ export type ModelCatalogResponse = { providers: Record; }; +export type ActiveRunsResponse = { + chats: string[]; + searches: string[]; +}; + type CompletionResponse = { chatId: string | null; message: { @@ -217,6 +222,10 @@ export async function listModels() { return api("/v1/models"); } +export async function getActiveRuns() { + return api("/v1/active-runs"); +} + export async function createChat(input?: string | CreateChatRequest) { const body = typeof input === "string" ? { title: input } : input ?? {}; const data = await api<{ chat: ChatSummary }>("/v1/chats", { @@ -333,6 +342,85 @@ type RunSearchStreamHandlers = { onError?: (payload: { message: string }) => void; }; +async function readSseStream(response: Response, dispatch: (eventName: string, payload: any) => void) { + if (!response.ok) { + const fallback = `${response.status} ${response.statusText}`; + let message = fallback; + try { + const body = (await response.json()) as { message?: string }; + if (body.message) message = body.message; + } catch { + // keep fallback message + } + throw new Error(message); + } + + if (!response.body) { + throw new Error("No response stream"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let eventName = "message"; + let dataLines: string[] = []; + + const flushEvent = () => { + if (!dataLines.length) { + eventName = "message"; + return; + } + + const dataText = dataLines.join("\n"); + let payload: any = null; + try { + payload = JSON.parse(dataText); + } catch { + payload = { message: dataText }; + } + + dispatch(eventName, payload); + + dataLines = []; + eventName = "message"; + }; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + let newlineIndex = buffer.indexOf("\n"); + + while (newlineIndex >= 0) { + const rawLine = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine; + + if (!line) { + flushEvent(); + } else if (line.startsWith("event:")) { + eventName = line.slice("event:".length).trim(); + } else if (line.startsWith("data:")) { + dataLines.push(line.slice("data:".length).trimStart()); + } + + newlineIndex = buffer.indexOf("\n"); + } + } + + buffer += decoder.decode(); + if (buffer.length) { + const line = buffer.endsWith("\r") ? buffer.slice(0, -1) : buffer; + if (line.startsWith("event:")) { + eventName = line.slice("event:".length).trim(); + } else if (line.startsWith("data:")) { + dataLines.push(line.slice("data:".length).trimStart()); + } + } + flushEvent(); +} + export async function runSearchStream( searchId: string, body: SearchRunRequest, @@ -437,6 +525,30 @@ export async function runSearchStream( flushEvent(); } +export async function attachSearchStream(searchId: string, handlers: RunSearchStreamHandlers, options?: { signal?: AbortSignal }) { + const headers = new Headers({ + Accept: "text/event-stream", + }); + if (authToken) { + headers.set("Authorization", `Bearer ${authToken}`); + } + + const response = await fetch(`${API_BASE_URL}/v1/searches/${searchId}/run/stream/attach`, { + method: "POST", + headers, + signal: options?.signal, + }); + + await readSseStream(response, (eventName, payload) => { + if (eventName === "search_results") handlers.onSearchResults?.(payload); + else if (eventName === "search_error") handlers.onSearchError?.(payload); + else if (eventName === "answer") handlers.onAnswer?.(payload); + else if (eventName === "answer_error") handlers.onAnswerError?.(payload); + else if (eventName === "done") handlers.onDone?.(payload); + else if (eventName === "error") handlers.onError?.(payload); + }); +} + export async function runCompletion(body: { chatId: string; provider: Provider; @@ -556,3 +668,26 @@ export async function runCompletionStream( } flushEvent(); } + +export async function attachCompletionStream(chatId: string, handlers: CompletionStreamHandlers, options?: { signal?: AbortSignal }) { + const headers = new Headers({ + Accept: "text/event-stream", + }); + if (authToken) { + headers.set("Authorization", `Bearer ${authToken}`); + } + + const response = await fetch(`${API_BASE_URL}/v1/chats/${chatId}/stream/attach`, { + method: "POST", + headers, + signal: options?.signal, + }); + + await readSseStream(response, (eventName, payload) => { + if (eventName === "meta") handlers.onMeta?.(payload); + else if (eventName === "tool_call") handlers.onToolCall?.(payload); + else if (eventName === "delta") handlers.onDelta?.(payload); + else if (eventName === "done") handlers.onDone?.(payload); + else if (eventName === "error") handlers.onError?.(payload); + }); +}