diff --git a/server/src/routes.ts b/server/src/routes.ts index 59a2dfa..0976e4c 100644 --- a/server/src/routes.ts +++ b/server/src/routes.ts @@ -49,6 +49,84 @@ async function storeNonAssistantMessages(chatId: string, messages: IncomingChatM }); } +const SearchRunBody = z.object({ + query: z.string().trim().min(1).optional(), + title: z.string().trim().min(1).optional(), + type: z.enum(["auto", "fast", "deep", "instant"]).optional(), + numResults: z.number().int().min(1).max(25).optional(), + includeDomains: z.array(z.string().trim().min(1)).max(50).optional(), + excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(), +}); + +function mapSearchResultRow(searchId: string, result: any, index: number) { + return { + searchId, + rank: index, + title: result.title ?? null, + url: result.url, + publishedDate: result.publishedDate ?? null, + author: result.author ?? null, + text: result.text ?? null, + highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null, + highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null, + score: typeof result.score === "number" ? result.score : null, + favicon: result.favicon ?? null, + image: result.image ?? null, + }; +} + +function mapSearchResultPreview(result: any, index: number) { + return { + id: `preview-result-${index}`, + createdAt: new Date().toISOString(), + rank: index, + title: result.title ?? null, + url: result.url, + publishedDate: result.publishedDate ?? null, + author: result.author ?? null, + text: result.text ?? null, + highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null, + highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null, + score: typeof result.score === "number" ? result.score : null, + favicon: result.favicon ?? null, + image: result.image ?? null, + }; +} + +function parseAnswerText(answerResponse: any) { + if (typeof answerResponse?.answer === "string") return answerResponse.answer; + if (answerResponse?.answer) return JSON.stringify(answerResponse.answer, null, 2); + return null; +} + +function normalizeUrlForMatch(input: string | null | undefined) { + if (!input) return ""; + try { + const parsed = new URL(input); + parsed.hash = ""; + const normalized = parsed.toString(); + return normalized.endsWith("/") ? normalized.slice(0, -1) : normalized; + } catch { + return input.trim().replace(/\/$/, ""); + } +} + +function buildSseHeaders(originHeader: string | undefined) { + const origin = originHeader && originHeader !== "null" ? originHeader : "*"; + const headers: Record = { + "Content-Type": "text/event-stream; charset=utf-8", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + "Access-Control-Allow-Origin": origin, + Vary: "Origin", + }; + if (origin !== "*") { + headers["Access-Control-Allow-Credentials"] = "true"; + } + return headers; +} + export async function registerRoutes(app: FastifyInstance) { app.get("/health", { logLevel: "silent" }, async () => ({ ok: true })); @@ -150,17 +228,9 @@ export async function registerRoutes(app: FastifyInstance) { app.post("/v1/searches/:searchId/run", async (req) => { requireAdmin(req); const Params = z.object({ searchId: z.string() }); - const Body = z.object({ - query: z.string().trim().min(1).optional(), - title: z.string().trim().min(1).optional(), - type: z.enum(["auto", "fast", "deep", "instant"]).optional(), - numResults: z.number().int().min(1).max(25).optional(), - includeDomains: z.array(z.string().trim().min(1)).max(50).optional(), - excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(), - }); const { searchId } = Params.parse(req.params); - const body = Body.parse(req.body ?? {}); + const body = SearchRunBody.parse(req.body ?? {}); const existing = await prisma.search.findUnique({ where: { id: searchId }, @@ -205,26 +275,8 @@ export async function registerRoutes(app: FastifyInstance) { const latencyMs = Math.round(performance.now() - startedAt); const normalizedTitle = body.title?.trim() || query.slice(0, 80); - const rows = (searchResponse?.results ?? []).map((result: any, index: number) => ({ - searchId, - rank: index, - title: result.title ?? null, - url: result.url, - publishedDate: result.publishedDate ?? null, - author: result.author ?? null, - text: result.text ?? null, - highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null, - highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null, - score: typeof result.score === "number" ? result.score : null, - favicon: result.favicon ?? null, - image: result.image ?? null, - })); - const answerText = - typeof answerResponse?.answer === "string" - ? answerResponse.answer - : answerResponse?.answer - ? JSON.stringify(answerResponse.answer, null, 2) - : null; + const rows = (searchResponse?.results ?? []).map((result: any, index: number) => mapSearchResultRow(searchId, result, index)); + const answerText = parseAnswerText(answerResponse); await prisma.$transaction(async (tx) => { await tx.search.update({ @@ -271,6 +323,179 @@ export async function registerRoutes(app: FastifyInstance) { } }); + app.post("/v1/searches/:searchId/run/stream", async (req, reply) => { + requireAdmin(req); + const Params = z.object({ searchId: z.string() }); + const { searchId } = Params.parse(req.params); + const body = SearchRunBody.parse(req.body ?? {}); + + const existing = await prisma.search.findUnique({ + where: { id: searchId }, + select: { id: true, query: true }, + }); + if (!existing) return app.httpErrors.notFound("search not found"); + + const query = body.query?.trim() || existing.query?.trim(); + if (!query) return app.httpErrors.badRequest("query is required"); + + const startedAt = performance.now(); + const normalizedTitle = body.title?.trim() || query.slice(0, 80); + + reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); + + const send = (event: string, data: any) => { + if (reply.raw.writableEnded) return; + reply.raw.write(`event: ${event}\n`); + reply.raw.write(`data: ${JSON.stringify(data)}\n\n`); + }; + + try { + const exa = exaClient(); + const searchPromise = exa.search(query, { + type: body.type ?? "auto", + numResults: body.numResults ?? 10, + includeDomains: body.includeDomains, + excludeDomains: body.excludeDomains, + moderation: true, + userLocation: "US", + contents: false, + } as any); + const answerPromise = exa.answer(query, { + text: true, + model: "exa", + userLocation: "US", + }); + + let searchResponse: any | null = null; + let answerResponse: any | null = null; + let enrichedResults: any[] | null = null; + let searchError: string | null = null; + let answerError: string | null = null; + + const searchSettled = searchPromise.then( + async (value) => { + searchResponse = value; + const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index)); + send("search_results", { + requestId: value?.requestId ?? null, + results: previewResults, + }); + + const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string"); + if (!urls.length) return; + + try { + const contentsResponse = await exa.getContents(urls, { + text: { maxCharacters: 1200 }, + highlights: { + query, + maxCharacters: 320, + numSentences: 2, + highlightsPerUrl: 2, + }, + } as any); + const byUrl = new Map(); + for (const contentItem of contentsResponse?.results ?? []) { + byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem); + } + + enrichedResults = (value?.results ?? []).map((result: any) => { + const contentItem = byUrl.get(normalizeUrlForMatch(result?.url)); + if (!contentItem) return result; + return { + ...result, + text: contentItem.text ?? result.text ?? null, + highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null, + highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null, + }; + }); + + send("search_results", { + requestId: value?.requestId ?? null, + results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)), + }); + } catch { + // keep preview results if content enrichment fails + } + }, + (reason) => { + searchError = reason?.message ?? String(reason); + send("search_error", { error: searchError }); + } + ); + + const answerSettled = answerPromise.then( + (value) => { + answerResponse = value; + send("answer", { + answerText: parseAnswerText(value), + answerRequestId: value?.requestId ?? null, + answerCitations: (value?.citations as any) ?? null, + }); + }, + (reason) => { + answerError = reason?.message ?? String(reason); + send("answer_error", { error: answerError }); + } + ); + + await Promise.all([searchSettled, answerSettled]); + + const latencyMs = Math.round(performance.now() - startedAt); + const persistedResults = enrichedResults ?? searchResponse?.results ?? []; + const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index)); + const answerText = parseAnswerText(answerResponse); + + await prisma.$transaction(async (tx) => { + await tx.search.update({ + where: { id: searchId }, + data: { + query, + title: normalizedTitle, + requestId: searchResponse?.requestId ?? null, + rawResponse: searchResponse as any, + latencyMs, + error: searchError, + answerText, + answerRequestId: answerResponse?.requestId ?? null, + answerCitations: (answerResponse?.citations as any) ?? null, + answerRawResponse: answerResponse as any, + answerError, + }, + }); + await tx.searchResult.deleteMany({ where: { searchId } }); + if (rows.length) { + await tx.searchResult.createMany({ data: rows as any }); + } + }); + + const search = await prisma.search.findUnique({ + where: { id: searchId }, + include: { results: { orderBy: { rank: "asc" } } }, + }); + if (!search) { + send("error", { message: "search not found" }); + } else { + send("done", { search }); + } + } catch (err: any) { + await prisma.search.update({ + where: { id: searchId }, + data: { + query, + title: normalizedTitle, + latencyMs: Math.round(performance.now() - startedAt), + error: err?.message ?? String(err), + }, + }); + send("error", { message: err?.message ?? String(err) }); + } finally { + reply.raw.end(); + } + + return reply; + }); + app.get("/v1/chats/:chatId", async (req) => { requireAdmin(req); const Params = z.object({ chatId: z.string() }); @@ -382,11 +607,7 @@ export async function registerRoutes(app: FastifyInstance) { await storeNonAssistantMessages(body.chatId, body.messages); } - reply.raw.writeHead(200, { - "Content-Type": "text/event-stream; charset=utf-8", - "Cache-Control": "no-cache, no-transform", - Connection: "keep-alive", - }); + reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined)); const send = (event: string, data: any) => { reply.raw.write(`event: ${event}\n`); diff --git a/web/src/App.tsx b/web/src/App.tsx index a4eba44..a0d9628 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -17,7 +17,7 @@ import { listChats, listSearches, runCompletion, - runSearch, + runSearchStream, type ChatDetail, type ChatSummary, type CompletionRequestMessage, @@ -119,6 +119,8 @@ export default function App() { const [error, setError] = useState(null); const transcriptEndRef = useRef(null); const contextMenuRef = useRef(null); + const searchRunAbortRef = useRef(null); + const searchRunCounterRef = useRef(0); const [contextMenu, setContextMenu] = useState(null); const sidebarItems = useMemo(() => buildSidebarItems(chats, searches), [chats, searches]); @@ -241,6 +243,13 @@ export default function App() { transcriptEndRef.current?.scrollIntoView({ behavior: "smooth", block: "end" }); }, [draftKind, selectedChat?.messages.length, isSending, selectedItem?.kind]); + useEffect(() => { + return () => { + searchRunAbortRef.current?.abort(); + searchRunAbortRef.current = null; + }; + }, []); + const messages = selectedChat?.messages ?? []; const selectedChatSummary = useMemo(() => { @@ -411,6 +420,11 @@ export default function App() { }; const handleSendSearch = async (query: string) => { + const runId = ++searchRunCounterRef.current; + searchRunAbortRef.current?.abort(); + const abortController = new AbortController(); + searchRunAbortRef.current = abortController; + let searchId = draftKind === "search" ? null : selectedItem?.kind === "search" ? selectedItem.id : null; if (!searchId) { @@ -460,15 +474,76 @@ export default function App() { }; }); - const search = await runSearch(searchId, { - query, - title: query.slice(0, 80), - type: "auto", - numResults: 10, - }); + try { + await runSearchStream( + searchId, + { + query, + title: query.slice(0, 80), + type: "auto", + numResults: 10, + }, + { + onSearchResults: (payload) => { + if (runId !== searchRunCounterRef.current) return; + setSelectedSearch((current) => { + if (!current || current.id !== searchId) return current; + return { + ...current, + requestId: payload.requestId ?? current.requestId, + error: null, + results: payload.results, + }; + }); + }, + onSearchError: (payload) => { + if (runId !== searchRunCounterRef.current) return; + setSelectedSearch((current) => { + if (!current || current.id !== searchId) return current; + return { ...current, error: payload.error }; + }); + }, + onAnswer: (payload) => { + if (runId !== searchRunCounterRef.current) return; + setSelectedSearch((current) => { + if (!current || current.id !== searchId) return current; + return { + ...current, + answerText: payload.answerText, + answerRequestId: payload.answerRequestId, + answerCitations: payload.answerCitations, + answerError: null, + }; + }); + }, + onAnswerError: (payload) => { + if (runId !== searchRunCounterRef.current) return; + setSelectedSearch((current) => { + if (!current || current.id !== searchId) return current; + return { ...current, answerError: payload.error }; + }); + }, + onDone: (payload) => { + if (runId !== searchRunCounterRef.current) return; + setSelectedSearch(payload.search); + setSelectedChat(null); + }, + onError: (payload) => { + if (runId !== searchRunCounterRef.current) return; + setError(payload.message); + }, + }, + { signal: abortController.signal } + ); + } catch (err) { + if (abortController.signal.aborted) return; + throw err; + } finally { + if (runId === searchRunCounterRef.current) { + searchRunAbortRef.current = null; + } + } - setSelectedSearch(search); - setSelectedChat(null); await refreshCollections({ kind: "search", id: searchId }); }; diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index 7db620d..218e1f0 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -68,6 +68,15 @@ export type SearchDetail = { results: SearchResultItem[]; }; +export type SearchRunRequest = { + query?: string; + title?: string; + type?: "auto" | "fast" | "deep" | "instant"; + numResults?: number; + includeDomains?: string[]; + excludeDomains?: string[]; +}; + export type CompletionRequestMessage = { role: "system" | "user" | "assistant" | "tool"; content: string; @@ -172,22 +181,117 @@ export async function deleteSearch(searchId: string) { await api<{ deleted: true }>(`/v1/searches/${searchId}`, { method: "DELETE" }); } -export async function runSearch( +type RunSearchStreamHandlers = { + onSearchResults?: (payload: { requestId: string | null; results: SearchResultItem[] }) => void; + onSearchError?: (payload: { error: string }) => void; + onAnswer?: (payload: { answerText: string | null; answerRequestId: string | null; answerCitations: SearchDetail["answerCitations"] }) => void; + onAnswerError?: (payload: { error: string }) => void; + onDone?: (payload: { search: SearchDetail }) => void; + onError?: (payload: { message: string }) => void; +}; + +export async function runSearchStream( searchId: string, - body: { - query?: string; - title?: string; - type?: "auto" | "fast" | "deep" | "instant"; - numResults?: number; - includeDomains?: string[]; - excludeDomains?: string[]; - } + body: SearchRunRequest, + handlers: RunSearchStreamHandlers, + options?: { signal?: AbortSignal } ) { - const data = await api<{ search: SearchDetail }>(`/v1/searches/${searchId}/run`, { - method: "POST", - body: JSON.stringify(body), + const headers = new Headers({ + Accept: "text/event-stream", + "Content-Type": "application/json", }); - return data.search; + if (authToken) { + headers.set("Authorization", `Bearer ${authToken}`); + } + + const response = await fetch(`${API_BASE_URL}/v1/searches/${searchId}/run/stream`, { + method: "POST", + headers, + body: JSON.stringify(body), + signal: options?.signal, + }); + + if (!response.ok) { + const fallback = `${response.status} ${response.statusText}`; + let message = fallback; + try { + const body = (await response.json()) as { message?: string }; + if (body.message) message = body.message; + } catch { + // keep fallback message + } + throw new Error(message); + } + + if (!response.body) { + throw new Error("No response stream"); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let eventName = "message"; + let dataLines: string[] = []; + + const flushEvent = () => { + if (!dataLines.length) { + eventName = "message"; + return; + } + + const dataText = dataLines.join("\n"); + let payload: any = null; + try { + payload = JSON.parse(dataText); + } catch { + payload = { message: dataText }; + } + + if (eventName === "search_results") handlers.onSearchResults?.(payload); + else if (eventName === "search_error") handlers.onSearchError?.(payload); + else if (eventName === "answer") handlers.onAnswer?.(payload); + else if (eventName === "answer_error") handlers.onAnswerError?.(payload); + else if (eventName === "done") handlers.onDone?.(payload); + else if (eventName === "error") handlers.onError?.(payload); + + dataLines = []; + eventName = "message"; + }; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + let newlineIndex = buffer.indexOf("\n"); + + while (newlineIndex >= 0) { + const rawLine = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine; + + if (!line) { + flushEvent(); + } else if (line.startsWith("event:")) { + eventName = line.slice("event:".length).trim(); + } else if (line.startsWith("data:")) { + dataLines.push(line.slice("data:".length).trimStart()); + } + + newlineIndex = buffer.indexOf("\n"); + } + } + + buffer += decoder.decode(); + if (buffer.length) { + const line = buffer.endsWith("\r") ? buffer.slice(0, -1) : buffer; + if (line.startsWith("event:")) { + eventName = line.slice("event:".length).trim(); + } else if (line.startsWith("data:")) { + dataLines.push(line.slice("data:".length).trimStart()); + } + } + flushEvent(); } export async function runCompletion(body: { diff --git a/web/src/pages/search-route-page.tsx b/web/src/pages/search-route-page.tsx index 77c1612..f732ff5 100644 --- a/web/src/pages/search-route-page.tsx +++ b/web/src/pages/search-route-page.tsx @@ -4,7 +4,7 @@ import { AuthScreen } from "@/components/auth/auth-screen"; import { SearchResultsPanel } from "@/components/search/search-results-panel"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; -import { createSearch, runSearch, type SearchDetail } from "@/lib/api"; +import { createSearch, runSearchStream, type SearchDetail } from "@/lib/api"; import { useSessionAuth } from "@/hooks/use-session-auth"; function readQueryFromUrl() { @@ -41,6 +41,7 @@ export default function SearchRoutePage() { const [isRunning, setIsRunning] = useState(false); const [error, setError] = useState(null); const requestCounterRef = useRef(0); + const streamAbortRef = useRef(null); useEffect(() => { const onPopState = () => { @@ -52,6 +53,13 @@ export default function SearchRoutePage() { return () => window.removeEventListener("popstate", onPopState); }, []); + useEffect(() => { + return () => { + streamAbortRef.current?.abort(); + streamAbortRef.current = null; + }; + }, []); + const runQuery = async (query: string) => { const trimmed = query.trim(); if (!trimmed) { @@ -61,6 +69,9 @@ export default function SearchRoutePage() { } const requestId = ++requestCounterRef.current; + streamAbortRef.current?.abort(); + const abortController = new AbortController(); + streamAbortRef.current = abortController; setError(null); setIsRunning(true); @@ -86,16 +97,78 @@ export default function SearchRoutePage() { query: trimmed, title: trimmed.slice(0, 80), }); - const result = await runSearch(created.id, { - query: trimmed, - title: trimmed.slice(0, 80), - type: "auto", - numResults: 10, - }); - if (requestId === requestCounterRef.current) { - setSearch(result); - } + if (requestId !== requestCounterRef.current) return; + + setSearch((current) => + current + ? { + ...current, + id: created.id, + title: created.title, + query: created.query, + createdAt: created.createdAt, + updatedAt: created.updatedAt, + } + : current + ); + + await runSearchStream( + created.id, + { + query: trimmed, + title: trimmed.slice(0, 80), + type: "auto", + numResults: 10, + }, + { + onSearchResults: (payload) => { + if (requestId !== requestCounterRef.current) return; + setSearch((current) => + current + ? { + ...current, + requestId: payload.requestId ?? current.requestId, + error: null, + results: payload.results, + } + : current + ); + }, + onSearchError: (payload) => { + if (requestId !== requestCounterRef.current) return; + setSearch((current) => (current ? { ...current, error: payload.error } : current)); + }, + onAnswer: (payload) => { + if (requestId !== requestCounterRef.current) return; + setSearch((current) => + current + ? { + ...current, + answerText: payload.answerText, + answerRequestId: payload.answerRequestId, + answerCitations: payload.answerCitations, + answerError: null, + } + : current + ); + }, + onAnswerError: (payload) => { + if (requestId !== requestCounterRef.current) return; + setSearch((current) => (current ? { ...current, answerError: payload.error } : current)); + }, + onDone: (payload) => { + if (requestId !== requestCounterRef.current) return; + setSearch(payload.search); + }, + onError: (payload) => { + if (requestId !== requestCounterRef.current) return; + setError(payload.message); + }, + }, + { signal: abortController.signal } + ); } catch (err) { + if (abortController.signal.aborted) return; const message = err instanceof Error ? err.message : String(err); if (message.includes("bearer token")) { handleAuthFailure(message); @@ -104,6 +177,7 @@ export default function SearchRoutePage() { } } finally { if (requestId === requestCounterRef.current) { + streamAbortRef.current = null; setIsRunning(false); } }