Search: async answer/results

This commit is contained in:
2026-02-14 01:53:34 -08:00
parent bec25aa943
commit 769cd6966a
4 changed files with 540 additions and 66 deletions

View File

@@ -49,6 +49,84 @@ async function storeNonAssistantMessages(chatId: string, messages: IncomingChatM
}); });
} }
const SearchRunBody = z.object({
query: z.string().trim().min(1).optional(),
title: z.string().trim().min(1).optional(),
type: z.enum(["auto", "fast", "deep", "instant"]).optional(),
numResults: z.number().int().min(1).max(25).optional(),
includeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
});
function mapSearchResultRow(searchId: string, result: any, index: number) {
return {
searchId,
rank: index,
title: result.title ?? null,
url: result.url,
publishedDate: result.publishedDate ?? null,
author: result.author ?? null,
text: result.text ?? null,
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
score: typeof result.score === "number" ? result.score : null,
favicon: result.favicon ?? null,
image: result.image ?? null,
};
}
function mapSearchResultPreview(result: any, index: number) {
return {
id: `preview-result-${index}`,
createdAt: new Date().toISOString(),
rank: index,
title: result.title ?? null,
url: result.url,
publishedDate: result.publishedDate ?? null,
author: result.author ?? null,
text: result.text ?? null,
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
score: typeof result.score === "number" ? result.score : null,
favicon: result.favicon ?? null,
image: result.image ?? null,
};
}
function parseAnswerText(answerResponse: any) {
if (typeof answerResponse?.answer === "string") return answerResponse.answer;
if (answerResponse?.answer) return JSON.stringify(answerResponse.answer, null, 2);
return null;
}
function normalizeUrlForMatch(input: string | null | undefined) {
if (!input) return "";
try {
const parsed = new URL(input);
parsed.hash = "";
const normalized = parsed.toString();
return normalized.endsWith("/") ? normalized.slice(0, -1) : normalized;
} catch {
return input.trim().replace(/\/$/, "");
}
}
function buildSseHeaders(originHeader: string | undefined) {
const origin = originHeader && originHeader !== "null" ? originHeader : "*";
const headers: Record<string, string> = {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": origin,
Vary: "Origin",
};
if (origin !== "*") {
headers["Access-Control-Allow-Credentials"] = "true";
}
return headers;
}
export async function registerRoutes(app: FastifyInstance) { export async function registerRoutes(app: FastifyInstance) {
app.get("/health", { logLevel: "silent" }, async () => ({ ok: true })); app.get("/health", { logLevel: "silent" }, async () => ({ ok: true }));
@@ -150,17 +228,9 @@ export async function registerRoutes(app: FastifyInstance) {
app.post("/v1/searches/:searchId/run", async (req) => { app.post("/v1/searches/:searchId/run", async (req) => {
requireAdmin(req); requireAdmin(req);
const Params = z.object({ searchId: z.string() }); const Params = z.object({ searchId: z.string() });
const Body = z.object({
query: z.string().trim().min(1).optional(),
title: z.string().trim().min(1).optional(),
type: z.enum(["auto", "fast", "deep", "instant"]).optional(),
numResults: z.number().int().min(1).max(25).optional(),
includeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
});
const { searchId } = Params.parse(req.params); const { searchId } = Params.parse(req.params);
const body = Body.parse(req.body ?? {}); const body = SearchRunBody.parse(req.body ?? {});
const existing = await prisma.search.findUnique({ const existing = await prisma.search.findUnique({
where: { id: searchId }, where: { id: searchId },
@@ -205,26 +275,8 @@ export async function registerRoutes(app: FastifyInstance) {
const latencyMs = Math.round(performance.now() - startedAt); const latencyMs = Math.round(performance.now() - startedAt);
const normalizedTitle = body.title?.trim() || query.slice(0, 80); const normalizedTitle = body.title?.trim() || query.slice(0, 80);
const rows = (searchResponse?.results ?? []).map((result: any, index: number) => ({ const rows = (searchResponse?.results ?? []).map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
searchId, const answerText = parseAnswerText(answerResponse);
rank: index,
title: result.title ?? null,
url: result.url,
publishedDate: result.publishedDate ?? null,
author: result.author ?? null,
text: result.text ?? null,
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
score: typeof result.score === "number" ? result.score : null,
favicon: result.favicon ?? null,
image: result.image ?? null,
}));
const answerText =
typeof answerResponse?.answer === "string"
? answerResponse.answer
: answerResponse?.answer
? JSON.stringify(answerResponse.answer, null, 2)
: null;
await prisma.$transaction(async (tx) => { await prisma.$transaction(async (tx) => {
await tx.search.update({ await tx.search.update({
@@ -271,6 +323,179 @@ export async function registerRoutes(app: FastifyInstance) {
} }
}); });
app.post("/v1/searches/:searchId/run/stream", async (req, reply) => {
requireAdmin(req);
const Params = z.object({ searchId: z.string() });
const { searchId } = Params.parse(req.params);
const body = SearchRunBody.parse(req.body ?? {});
const existing = await prisma.search.findUnique({
where: { id: searchId },
select: { id: true, query: true },
});
if (!existing) return app.httpErrors.notFound("search not found");
const query = body.query?.trim() || existing.query?.trim();
if (!query) return app.httpErrors.badRequest("query is required");
const startedAt = performance.now();
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
const send = (event: string, data: any) => {
if (reply.raw.writableEnded) return;
reply.raw.write(`event: ${event}\n`);
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
try {
const exa = exaClient();
const searchPromise = exa.search(query, {
type: body.type ?? "auto",
numResults: body.numResults ?? 10,
includeDomains: body.includeDomains,
excludeDomains: body.excludeDomains,
moderation: true,
userLocation: "US",
contents: false,
} as any);
const answerPromise = exa.answer(query, {
text: true,
model: "exa",
userLocation: "US",
});
let searchResponse: any | null = null;
let answerResponse: any | null = null;
let enrichedResults: any[] | null = null;
let searchError: string | null = null;
let answerError: string | null = null;
const searchSettled = searchPromise.then(
async (value) => {
searchResponse = value;
const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index));
send("search_results", {
requestId: value?.requestId ?? null,
results: previewResults,
});
const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string");
if (!urls.length) return;
try {
const contentsResponse = await exa.getContents(urls, {
text: { maxCharacters: 1200 },
highlights: {
query,
maxCharacters: 320,
numSentences: 2,
highlightsPerUrl: 2,
},
} as any);
const byUrl = new Map<string, any>();
for (const contentItem of contentsResponse?.results ?? []) {
byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem);
}
enrichedResults = (value?.results ?? []).map((result: any) => {
const contentItem = byUrl.get(normalizeUrlForMatch(result?.url));
if (!contentItem) return result;
return {
...result,
text: contentItem.text ?? result.text ?? null,
highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null,
highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null,
};
});
send("search_results", {
requestId: value?.requestId ?? null,
results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)),
});
} catch {
// keep preview results if content enrichment fails
}
},
(reason) => {
searchError = reason?.message ?? String(reason);
send("search_error", { error: searchError });
}
);
const answerSettled = answerPromise.then(
(value) => {
answerResponse = value;
send("answer", {
answerText: parseAnswerText(value),
answerRequestId: value?.requestId ?? null,
answerCitations: (value?.citations as any) ?? null,
});
},
(reason) => {
answerError = reason?.message ?? String(reason);
send("answer_error", { error: answerError });
}
);
await Promise.all([searchSettled, answerSettled]);
const latencyMs = Math.round(performance.now() - startedAt);
const persistedResults = enrichedResults ?? searchResponse?.results ?? [];
const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
const answerText = parseAnswerText(answerResponse);
await prisma.$transaction(async (tx) => {
await tx.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
requestId: searchResponse?.requestId ?? null,
rawResponse: searchResponse as any,
latencyMs,
error: searchError,
answerText,
answerRequestId: answerResponse?.requestId ?? null,
answerCitations: (answerResponse?.citations as any) ?? null,
answerRawResponse: answerResponse as any,
answerError,
},
});
await tx.searchResult.deleteMany({ where: { searchId } });
if (rows.length) {
await tx.searchResult.createMany({ data: rows as any });
}
});
const search = await prisma.search.findUnique({
where: { id: searchId },
include: { results: { orderBy: { rank: "asc" } } },
});
if (!search) {
send("error", { message: "search not found" });
} else {
send("done", { search });
}
} catch (err: any) {
await prisma.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
latencyMs: Math.round(performance.now() - startedAt),
error: err?.message ?? String(err),
},
});
send("error", { message: err?.message ?? String(err) });
} finally {
reply.raw.end();
}
return reply;
});
app.get("/v1/chats/:chatId", async (req) => { app.get("/v1/chats/:chatId", async (req) => {
requireAdmin(req); requireAdmin(req);
const Params = z.object({ chatId: z.string() }); const Params = z.object({ chatId: z.string() });
@@ -382,11 +607,7 @@ export async function registerRoutes(app: FastifyInstance) {
await storeNonAssistantMessages(body.chatId, body.messages); await storeNonAssistantMessages(body.chatId, body.messages);
} }
reply.raw.writeHead(200, { reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
});
const send = (event: string, data: any) => { const send = (event: string, data: any) => {
reply.raw.write(`event: ${event}\n`); reply.raw.write(`event: ${event}\n`);

View File

@@ -17,7 +17,7 @@ import {
listChats, listChats,
listSearches, listSearches,
runCompletion, runCompletion,
runSearch, runSearchStream,
type ChatDetail, type ChatDetail,
type ChatSummary, type ChatSummary,
type CompletionRequestMessage, type CompletionRequestMessage,
@@ -119,6 +119,8 @@ export default function App() {
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const transcriptEndRef = useRef<HTMLDivElement>(null); const transcriptEndRef = useRef<HTMLDivElement>(null);
const contextMenuRef = useRef<HTMLDivElement>(null); const contextMenuRef = useRef<HTMLDivElement>(null);
const searchRunAbortRef = useRef<AbortController | null>(null);
const searchRunCounterRef = useRef(0);
const [contextMenu, setContextMenu] = useState<ContextMenuState | null>(null); const [contextMenu, setContextMenu] = useState<ContextMenuState | null>(null);
const sidebarItems = useMemo(() => buildSidebarItems(chats, searches), [chats, searches]); const sidebarItems = useMemo(() => buildSidebarItems(chats, searches), [chats, searches]);
@@ -241,6 +243,13 @@ export default function App() {
transcriptEndRef.current?.scrollIntoView({ behavior: "smooth", block: "end" }); transcriptEndRef.current?.scrollIntoView({ behavior: "smooth", block: "end" });
}, [draftKind, selectedChat?.messages.length, isSending, selectedItem?.kind]); }, [draftKind, selectedChat?.messages.length, isSending, selectedItem?.kind]);
useEffect(() => {
return () => {
searchRunAbortRef.current?.abort();
searchRunAbortRef.current = null;
};
}, []);
const messages = selectedChat?.messages ?? []; const messages = selectedChat?.messages ?? [];
const selectedChatSummary = useMemo(() => { const selectedChatSummary = useMemo(() => {
@@ -411,6 +420,11 @@ export default function App() {
}; };
const handleSendSearch = async (query: string) => { const handleSendSearch = async (query: string) => {
const runId = ++searchRunCounterRef.current;
searchRunAbortRef.current?.abort();
const abortController = new AbortController();
searchRunAbortRef.current = abortController;
let searchId = draftKind === "search" ? null : selectedItem?.kind === "search" ? selectedItem.id : null; let searchId = draftKind === "search" ? null : selectedItem?.kind === "search" ? selectedItem.id : null;
if (!searchId) { if (!searchId) {
@@ -460,15 +474,76 @@ export default function App() {
}; };
}); });
const search = await runSearch(searchId, { try {
query, await runSearchStream(
title: query.slice(0, 80), searchId,
type: "auto", {
numResults: 10, query,
}); title: query.slice(0, 80),
type: "auto",
numResults: 10,
},
{
onSearchResults: (payload) => {
if (runId !== searchRunCounterRef.current) return;
setSelectedSearch((current) => {
if (!current || current.id !== searchId) return current;
return {
...current,
requestId: payload.requestId ?? current.requestId,
error: null,
results: payload.results,
};
});
},
onSearchError: (payload) => {
if (runId !== searchRunCounterRef.current) return;
setSelectedSearch((current) => {
if (!current || current.id !== searchId) return current;
return { ...current, error: payload.error };
});
},
onAnswer: (payload) => {
if (runId !== searchRunCounterRef.current) return;
setSelectedSearch((current) => {
if (!current || current.id !== searchId) return current;
return {
...current,
answerText: payload.answerText,
answerRequestId: payload.answerRequestId,
answerCitations: payload.answerCitations,
answerError: null,
};
});
},
onAnswerError: (payload) => {
if (runId !== searchRunCounterRef.current) return;
setSelectedSearch((current) => {
if (!current || current.id !== searchId) return current;
return { ...current, answerError: payload.error };
});
},
onDone: (payload) => {
if (runId !== searchRunCounterRef.current) return;
setSelectedSearch(payload.search);
setSelectedChat(null);
},
onError: (payload) => {
if (runId !== searchRunCounterRef.current) return;
setError(payload.message);
},
},
{ signal: abortController.signal }
);
} catch (err) {
if (abortController.signal.aborted) return;
throw err;
} finally {
if (runId === searchRunCounterRef.current) {
searchRunAbortRef.current = null;
}
}
setSelectedSearch(search);
setSelectedChat(null);
await refreshCollections({ kind: "search", id: searchId }); await refreshCollections({ kind: "search", id: searchId });
}; };

View File

@@ -68,6 +68,15 @@ export type SearchDetail = {
results: SearchResultItem[]; results: SearchResultItem[];
}; };
export type SearchRunRequest = {
query?: string;
title?: string;
type?: "auto" | "fast" | "deep" | "instant";
numResults?: number;
includeDomains?: string[];
excludeDomains?: string[];
};
export type CompletionRequestMessage = { export type CompletionRequestMessage = {
role: "system" | "user" | "assistant" | "tool"; role: "system" | "user" | "assistant" | "tool";
content: string; content: string;
@@ -172,22 +181,117 @@ export async function deleteSearch(searchId: string) {
await api<{ deleted: true }>(`/v1/searches/${searchId}`, { method: "DELETE" }); await api<{ deleted: true }>(`/v1/searches/${searchId}`, { method: "DELETE" });
} }
export async function runSearch( type RunSearchStreamHandlers = {
onSearchResults?: (payload: { requestId: string | null; results: SearchResultItem[] }) => void;
onSearchError?: (payload: { error: string }) => void;
onAnswer?: (payload: { answerText: string | null; answerRequestId: string | null; answerCitations: SearchDetail["answerCitations"] }) => void;
onAnswerError?: (payload: { error: string }) => void;
onDone?: (payload: { search: SearchDetail }) => void;
onError?: (payload: { message: string }) => void;
};
export async function runSearchStream(
searchId: string, searchId: string,
body: { body: SearchRunRequest,
query?: string; handlers: RunSearchStreamHandlers,
title?: string; options?: { signal?: AbortSignal }
type?: "auto" | "fast" | "deep" | "instant";
numResults?: number;
includeDomains?: string[];
excludeDomains?: string[];
}
) { ) {
const data = await api<{ search: SearchDetail }>(`/v1/searches/${searchId}/run`, { const headers = new Headers({
method: "POST", Accept: "text/event-stream",
body: JSON.stringify(body), "Content-Type": "application/json",
}); });
return data.search; if (authToken) {
headers.set("Authorization", `Bearer ${authToken}`);
}
const response = await fetch(`${API_BASE_URL}/v1/searches/${searchId}/run/stream`, {
method: "POST",
headers,
body: JSON.stringify(body),
signal: options?.signal,
});
if (!response.ok) {
const fallback = `${response.status} ${response.statusText}`;
let message = fallback;
try {
const body = (await response.json()) as { message?: string };
if (body.message) message = body.message;
} catch {
// keep fallback message
}
throw new Error(message);
}
if (!response.body) {
throw new Error("No response stream");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let eventName = "message";
let dataLines: string[] = [];
const flushEvent = () => {
if (!dataLines.length) {
eventName = "message";
return;
}
const dataText = dataLines.join("\n");
let payload: any = null;
try {
payload = JSON.parse(dataText);
} catch {
payload = { message: dataText };
}
if (eventName === "search_results") handlers.onSearchResults?.(payload);
else if (eventName === "search_error") handlers.onSearchError?.(payload);
else if (eventName === "answer") handlers.onAnswer?.(payload);
else if (eventName === "answer_error") handlers.onAnswerError?.(payload);
else if (eventName === "done") handlers.onDone?.(payload);
else if (eventName === "error") handlers.onError?.(payload);
dataLines = [];
eventName = "message";
};
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let newlineIndex = buffer.indexOf("\n");
while (newlineIndex >= 0) {
const rawLine = buffer.slice(0, newlineIndex);
buffer = buffer.slice(newlineIndex + 1);
const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine;
if (!line) {
flushEvent();
} else if (line.startsWith("event:")) {
eventName = line.slice("event:".length).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice("data:".length).trimStart());
}
newlineIndex = buffer.indexOf("\n");
}
}
buffer += decoder.decode();
if (buffer.length) {
const line = buffer.endsWith("\r") ? buffer.slice(0, -1) : buffer;
if (line.startsWith("event:")) {
eventName = line.slice("event:".length).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice("data:".length).trimStart());
}
}
flushEvent();
} }
export async function runCompletion(body: { export async function runCompletion(body: {

View File

@@ -4,7 +4,7 @@ import { AuthScreen } from "@/components/auth/auth-screen";
import { SearchResultsPanel } from "@/components/search/search-results-panel"; import { SearchResultsPanel } from "@/components/search/search-results-panel";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input"; import { Input } from "@/components/ui/input";
import { createSearch, runSearch, type SearchDetail } from "@/lib/api"; import { createSearch, runSearchStream, type SearchDetail } from "@/lib/api";
import { useSessionAuth } from "@/hooks/use-session-auth"; import { useSessionAuth } from "@/hooks/use-session-auth";
function readQueryFromUrl() { function readQueryFromUrl() {
@@ -41,6 +41,7 @@ export default function SearchRoutePage() {
const [isRunning, setIsRunning] = useState(false); const [isRunning, setIsRunning] = useState(false);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const requestCounterRef = useRef(0); const requestCounterRef = useRef(0);
const streamAbortRef = useRef<AbortController | null>(null);
useEffect(() => { useEffect(() => {
const onPopState = () => { const onPopState = () => {
@@ -52,6 +53,13 @@ export default function SearchRoutePage() {
return () => window.removeEventListener("popstate", onPopState); return () => window.removeEventListener("popstate", onPopState);
}, []); }, []);
useEffect(() => {
return () => {
streamAbortRef.current?.abort();
streamAbortRef.current = null;
};
}, []);
const runQuery = async (query: string) => { const runQuery = async (query: string) => {
const trimmed = query.trim(); const trimmed = query.trim();
if (!trimmed) { if (!trimmed) {
@@ -61,6 +69,9 @@ export default function SearchRoutePage() {
} }
const requestId = ++requestCounterRef.current; const requestId = ++requestCounterRef.current;
streamAbortRef.current?.abort();
const abortController = new AbortController();
streamAbortRef.current = abortController;
setError(null); setError(null);
setIsRunning(true); setIsRunning(true);
@@ -86,16 +97,78 @@ export default function SearchRoutePage() {
query: trimmed, query: trimmed,
title: trimmed.slice(0, 80), title: trimmed.slice(0, 80),
}); });
const result = await runSearch(created.id, { if (requestId !== requestCounterRef.current) return;
query: trimmed,
title: trimmed.slice(0, 80), setSearch((current) =>
type: "auto", current
numResults: 10, ? {
}); ...current,
if (requestId === requestCounterRef.current) { id: created.id,
setSearch(result); title: created.title,
} query: created.query,
createdAt: created.createdAt,
updatedAt: created.updatedAt,
}
: current
);
await runSearchStream(
created.id,
{
query: trimmed,
title: trimmed.slice(0, 80),
type: "auto",
numResults: 10,
},
{
onSearchResults: (payload) => {
if (requestId !== requestCounterRef.current) return;
setSearch((current) =>
current
? {
...current,
requestId: payload.requestId ?? current.requestId,
error: null,
results: payload.results,
}
: current
);
},
onSearchError: (payload) => {
if (requestId !== requestCounterRef.current) return;
setSearch((current) => (current ? { ...current, error: payload.error } : current));
},
onAnswer: (payload) => {
if (requestId !== requestCounterRef.current) return;
setSearch((current) =>
current
? {
...current,
answerText: payload.answerText,
answerRequestId: payload.answerRequestId,
answerCitations: payload.answerCitations,
answerError: null,
}
: current
);
},
onAnswerError: (payload) => {
if (requestId !== requestCounterRef.current) return;
setSearch((current) => (current ? { ...current, answerError: payload.error } : current));
},
onDone: (payload) => {
if (requestId !== requestCounterRef.current) return;
setSearch(payload.search);
},
onError: (payload) => {
if (requestId !== requestCounterRef.current) return;
setError(payload.message);
},
},
{ signal: abortController.signal }
);
} catch (err) { } catch (err) {
if (abortController.signal.aborted) return;
const message = err instanceof Error ? err.message : String(err); const message = err instanceof Error ? err.message : String(err);
if (message.includes("bearer token")) { if (message.includes("bearer token")) {
handleAuthFailure(message); handleAuthFailure(message);
@@ -104,6 +177,7 @@ export default function SearchRoutePage() {
} }
} finally { } finally {
if (requestId === requestCounterRef.current) { if (requestId === requestCounterRef.current) {
streamAbortRef.current = null;
setIsRunning(false); setIsRunning(false);
} }
} }