Files
Sybil-2/server/src/routes.ts
2026-02-14 21:00:37 -08:00

634 lines
21 KiB
TypeScript

import { performance } from "node:perf_hooks";
import { z } from "zod";
import type { FastifyInstance } from "fastify";
import { prisma } from "./db.js";
import { requireAdmin } from "./auth.js";
import { env } from "./env.js";
import { runMultiplex } from "./llm/multiplexer.js";
import { runMultiplexStream } from "./llm/streaming.js";
import { getModelCatalogSnapshot } from "./llm/model-catalog.js";
import { exaClient } from "./search/exa.js";
type IncomingChatMessage = {
role: "system" | "user" | "assistant" | "tool";
content: string;
name?: string;
};
function sameMessage(a: IncomingChatMessage, b: IncomingChatMessage) {
return a.role === b.role && a.content === b.content && (a.name ?? null) === (b.name ?? null);
}
async function storeNonAssistantMessages(chatId: string, messages: IncomingChatMessage[]) {
const incoming = messages.filter((m) => m.role !== "assistant");
if (!incoming.length) return;
const existing = await prisma.message.findMany({
where: { chatId },
orderBy: { createdAt: "asc" },
select: { role: true, content: true, name: true },
});
const existingNonAssistant = existing.filter((m) => m.role !== "assistant");
let sharedPrefix = 0;
const max = Math.min(existingNonAssistant.length, incoming.length);
while (sharedPrefix < max && sameMessage(existingNonAssistant[sharedPrefix] as IncomingChatMessage, incoming[sharedPrefix])) {
sharedPrefix += 1;
}
if (sharedPrefix === incoming.length) return;
const toInsert = sharedPrefix === existingNonAssistant.length ? incoming.slice(existingNonAssistant.length) : incoming;
if (!toInsert.length) return;
await prisma.message.createMany({
data: toInsert.map((m) => ({
chatId,
role: m.role as any,
content: m.content,
name: m.name,
})),
});
}
const SearchRunBody = z.object({
query: z.string().trim().min(1).optional(),
title: z.string().trim().min(1).optional(),
type: z.enum(["auto", "fast", "deep", "instant"]).optional(),
numResults: z.number().int().min(1).max(25).optional(),
includeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(),
});
function mapSearchResultRow(searchId: string, result: any, index: number) {
return {
searchId,
rank: index,
title: result.title ?? null,
url: result.url,
publishedDate: result.publishedDate ?? null,
author: result.author ?? null,
text: result.text ?? null,
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
score: typeof result.score === "number" ? result.score : null,
favicon: result.favicon ?? null,
image: result.image ?? null,
};
}
function mapSearchResultPreview(result: any, index: number) {
return {
id: `preview-result-${index}`,
createdAt: new Date().toISOString(),
rank: index,
title: result.title ?? null,
url: result.url,
publishedDate: result.publishedDate ?? null,
author: result.author ?? null,
text: result.text ?? null,
highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null,
highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null,
score: typeof result.score === "number" ? result.score : null,
favicon: result.favicon ?? null,
image: result.image ?? null,
};
}
function parseAnswerText(answerResponse: any) {
if (typeof answerResponse?.answer === "string") return answerResponse.answer;
if (answerResponse?.answer) return JSON.stringify(answerResponse.answer, null, 2);
return null;
}
function normalizeUrlForMatch(input: string | null | undefined) {
if (!input) return "";
try {
const parsed = new URL(input);
parsed.hash = "";
const normalized = parsed.toString();
return normalized.endsWith("/") ? normalized.slice(0, -1) : normalized;
} catch {
return input.trim().replace(/\/$/, "");
}
}
function buildSseHeaders(originHeader: string | undefined) {
const origin = originHeader && originHeader !== "null" ? originHeader : "*";
const headers: Record<string, string> = {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": origin,
Vary: "Origin",
};
if (origin !== "*") {
headers["Access-Control-Allow-Credentials"] = "true";
}
return headers;
}
export async function registerRoutes(app: FastifyInstance) {
app.get("/health", { logLevel: "silent" }, async () => ({ ok: true }));
app.get("/v1/auth/session", async (req) => {
requireAdmin(req);
return { authenticated: true, mode: env.ADMIN_TOKEN ? "token" : "open" };
});
app.get("/v1/models", async (req) => {
requireAdmin(req);
return { providers: getModelCatalogSnapshot() };
});
app.get("/v1/chats", async (req) => {
requireAdmin(req);
const chats = await prisma.chat.findMany({
orderBy: { updatedAt: "desc" },
take: 100,
select: { id: true, title: true, createdAt: true, updatedAt: true },
});
return { chats };
});
app.post("/v1/chats", async (req) => {
requireAdmin(req);
const Body = z.object({ title: z.string().optional() });
const body = Body.parse(req.body ?? {});
const chat = await prisma.chat.create({ data: { title: body.title } });
return { chat };
});
app.delete("/v1/chats/:chatId", async (req) => {
requireAdmin(req);
const Params = z.object({ chatId: z.string() });
const { chatId } = Params.parse(req.params);
req.log.info({ chatId }, "delete chat requested");
const result = await prisma.chat.deleteMany({ where: { id: chatId } });
if (result.count === 0) {
req.log.warn({ chatId }, "delete chat target not found");
return app.httpErrors.notFound("chat not found");
}
req.log.info({ chatId }, "chat deleted");
return { deleted: true };
});
app.get("/v1/searches", async (req) => {
requireAdmin(req);
const searches = await prisma.search.findMany({
orderBy: { updatedAt: "desc" },
take: 100,
select: { id: true, title: true, query: true, createdAt: true, updatedAt: true },
});
return { searches };
});
app.post("/v1/searches", async (req) => {
requireAdmin(req);
const Body = z.object({ title: z.string().optional(), query: z.string().optional() });
const body = Body.parse(req.body ?? {});
const title = body.title?.trim() || body.query?.trim()?.slice(0, 80);
const query = body.query?.trim() || null;
const search = await prisma.search.create({
data: {
title: title || null,
query,
},
select: { id: true, title: true, query: true, createdAt: true, updatedAt: true },
});
return { search };
});
app.delete("/v1/searches/:searchId", async (req) => {
requireAdmin(req);
const Params = z.object({ searchId: z.string() });
const { searchId } = Params.parse(req.params);
req.log.info({ searchId }, "delete search requested");
const result = await prisma.search.deleteMany({ where: { id: searchId } });
if (result.count === 0) {
req.log.warn({ searchId }, "delete search target not found");
return app.httpErrors.notFound("search not found");
}
req.log.info({ searchId }, "search deleted");
return { deleted: true };
});
app.get("/v1/searches/:searchId", async (req) => {
requireAdmin(req);
const Params = z.object({ searchId: z.string() });
const { searchId } = Params.parse(req.params);
const search = await prisma.search.findUnique({
where: { id: searchId },
include: { results: { orderBy: { rank: "asc" } } },
});
if (!search) return app.httpErrors.notFound("search not found");
return { search };
});
app.post("/v1/searches/:searchId/run", async (req) => {
requireAdmin(req);
const Params = z.object({ searchId: z.string() });
const { searchId } = Params.parse(req.params);
const body = SearchRunBody.parse(req.body ?? {});
const existing = await prisma.search.findUnique({
where: { id: searchId },
select: { id: true, query: true },
});
if (!existing) return app.httpErrors.notFound("search not found");
const query = body.query?.trim() || existing.query?.trim();
if (!query) return app.httpErrors.badRequest("query is required");
const startedAt = performance.now();
try {
const exa = exaClient();
const [searchOutcome, answerOutcome] = await Promise.allSettled([
exa.searchAndContents(query, {
type: body.type ?? "auto",
numResults: body.numResults ?? 10,
includeDomains: body.includeDomains,
excludeDomains: body.excludeDomains,
text: { maxCharacters: 1200 },
highlights: {
query,
maxCharacters: 320,
numSentences: 2,
highlightsPerUrl: 2,
},
moderation: true,
userLocation: "US",
} as any),
exa.answer(query, {
text: true,
model: "exa",
userLocation: "US",
}),
]);
const searchResponse = searchOutcome.status === "fulfilled" ? searchOutcome.value : null;
const answerResponse = answerOutcome.status === "fulfilled" ? answerOutcome.value : null;
const searchError = searchOutcome.status === "rejected" ? searchOutcome.reason?.message ?? String(searchOutcome.reason) : null;
const answerError = answerOutcome.status === "rejected" ? answerOutcome.reason?.message ?? String(answerOutcome.reason) : null;
const latencyMs = Math.round(performance.now() - startedAt);
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
const rows = (searchResponse?.results ?? []).map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
const answerText = parseAnswerText(answerResponse);
await prisma.$transaction(async (tx) => {
await tx.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
requestId: searchResponse?.requestId ?? null,
rawResponse: searchResponse as any,
latencyMs,
error: searchError,
answerText,
answerRequestId: answerResponse?.requestId ?? null,
answerCitations: (answerResponse?.citations as any) ?? null,
answerRawResponse: answerResponse as any,
answerError,
},
});
await tx.searchResult.deleteMany({ where: { searchId } });
if (rows.length) {
await tx.searchResult.createMany({ data: rows as any });
}
});
if (searchError && answerError) {
throw app.httpErrors.badGateway(`Exa search and answer failed: ${searchError}; ${answerError}`);
}
const search = await prisma.search.findUnique({
where: { id: searchId },
include: { results: { orderBy: { rank: "asc" } } },
});
if (!search) return app.httpErrors.notFound("search not found");
return { search };
} catch (err: any) {
await prisma.search.update({
where: { id: searchId },
data: {
latencyMs: Math.round(performance.now() - startedAt),
error: err?.message ?? String(err),
},
});
throw err;
}
});
app.post("/v1/searches/:searchId/run/stream", async (req, reply) => {
requireAdmin(req);
const Params = z.object({ searchId: z.string() });
const { searchId } = Params.parse(req.params);
const body = SearchRunBody.parse(req.body ?? {});
const existing = await prisma.search.findUnique({
where: { id: searchId },
select: { id: true, query: true },
});
if (!existing) return app.httpErrors.notFound("search not found");
const query = body.query?.trim() || existing.query?.trim();
if (!query) return app.httpErrors.badRequest("query is required");
const startedAt = performance.now();
const normalizedTitle = body.title?.trim() || query.slice(0, 80);
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
const send = (event: string, data: any) => {
if (reply.raw.writableEnded) return;
reply.raw.write(`event: ${event}\n`);
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
try {
const exa = exaClient();
const searchPromise = exa.search(query, {
type: body.type ?? "auto",
numResults: body.numResults ?? 10,
includeDomains: body.includeDomains,
excludeDomains: body.excludeDomains,
moderation: true,
userLocation: "US",
contents: false,
} as any);
const answerPromise = exa.answer(query, {
text: true,
model: "exa",
userLocation: "US",
});
let searchResponse: any | null = null;
let answerResponse: any | null = null;
let enrichedResults: any[] | null = null;
let searchError: string | null = null;
let answerError: string | null = null;
const searchSettled = searchPromise.then(
async (value) => {
searchResponse = value;
const previewResults = (value?.results ?? []).map((result: any, index: number) => mapSearchResultPreview(result, index));
send("search_results", {
requestId: value?.requestId ?? null,
results: previewResults,
});
const urls = (value?.results ?? []).map((result: any) => result?.url).filter((url: string | undefined) => typeof url === "string");
if (!urls.length) return;
try {
const contentsResponse = await exa.getContents(urls, {
text: { maxCharacters: 1200 },
highlights: {
query,
maxCharacters: 320,
numSentences: 2,
highlightsPerUrl: 2,
},
} as any);
const byUrl = new Map<string, any>();
for (const contentItem of contentsResponse?.results ?? []) {
byUrl.set(normalizeUrlForMatch(contentItem?.url), contentItem);
}
enrichedResults = (value?.results ?? []).map((result: any) => {
const contentItem = byUrl.get(normalizeUrlForMatch(result?.url));
if (!contentItem) return result;
return {
...result,
text: contentItem.text ?? result.text ?? null,
highlights: Array.isArray(contentItem.highlights) ? contentItem.highlights : result.highlights ?? null,
highlightScores: Array.isArray(contentItem.highlightScores) ? contentItem.highlightScores : result.highlightScores ?? null,
};
});
send("search_results", {
requestId: value?.requestId ?? null,
results: enrichedResults.map((result: any, index: number) => mapSearchResultPreview(result, index)),
});
} catch {
// keep preview results if content enrichment fails
}
},
(reason) => {
searchError = reason?.message ?? String(reason);
send("search_error", { error: searchError });
}
);
const answerSettled = answerPromise.then(
(value) => {
answerResponse = value;
send("answer", {
answerText: parseAnswerText(value),
answerRequestId: value?.requestId ?? null,
answerCitations: (value?.citations as any) ?? null,
});
},
(reason) => {
answerError = reason?.message ?? String(reason);
send("answer_error", { error: answerError });
}
);
await Promise.all([searchSettled, answerSettled]);
const latencyMs = Math.round(performance.now() - startedAt);
const persistedResults = enrichedResults ?? searchResponse?.results ?? [];
const rows = persistedResults.map((result: any, index: number) => mapSearchResultRow(searchId, result, index));
const answerText = parseAnswerText(answerResponse);
await prisma.$transaction(async (tx) => {
await tx.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
requestId: searchResponse?.requestId ?? null,
rawResponse: searchResponse as any,
latencyMs,
error: searchError,
answerText,
answerRequestId: answerResponse?.requestId ?? null,
answerCitations: (answerResponse?.citations as any) ?? null,
answerRawResponse: answerResponse as any,
answerError,
},
});
await tx.searchResult.deleteMany({ where: { searchId } });
if (rows.length) {
await tx.searchResult.createMany({ data: rows as any });
}
});
const search = await prisma.search.findUnique({
where: { id: searchId },
include: { results: { orderBy: { rank: "asc" } } },
});
if (!search) {
send("error", { message: "search not found" });
} else {
send("done", { search });
}
} catch (err: any) {
await prisma.search.update({
where: { id: searchId },
data: {
query,
title: normalizedTitle,
latencyMs: Math.round(performance.now() - startedAt),
error: err?.message ?? String(err),
},
});
send("error", { message: err?.message ?? String(err) });
} finally {
reply.raw.end();
}
return reply;
});
app.get("/v1/chats/:chatId", async (req) => {
requireAdmin(req);
const Params = z.object({ chatId: z.string() });
const { chatId } = Params.parse(req.params);
const chat = await prisma.chat.findUnique({
where: { id: chatId },
include: { messages: { orderBy: { createdAt: "asc" } }, calls: { orderBy: { createdAt: "desc" } } },
});
if (!chat) return app.httpErrors.notFound("chat not found");
return { chat };
});
app.post("/v1/chats/:chatId/messages", async (req) => {
requireAdmin(req);
const Params = z.object({ chatId: z.string() });
const Body = z.object({
role: z.enum(["system", "user", "assistant", "tool"]),
content: z.string(),
name: z.string().optional(),
metadata: z.unknown().optional(),
});
const { chatId } = Params.parse(req.params);
const body = Body.parse(req.body);
const msg = await prisma.message.create({
data: {
chatId,
role: body.role as any,
content: body.content,
name: body.name,
metadata: body.metadata as any,
},
});
return { message: msg };
});
// Main: create a completion via provider+model and store everything.
app.post("/v1/chat-completions", async (req) => {
requireAdmin(req);
const Body = z.object({
chatId: z.string().optional(),
provider: z.enum(["openai", "anthropic", "xai"]),
model: z.string().min(1),
messages: z.array(
z.object({
role: z.enum(["system", "user", "assistant", "tool"]),
content: z.string(),
name: z.string().optional(),
})
),
temperature: z.number().min(0).max(2).optional(),
maxTokens: z.number().int().positive().optional(),
});
const body = Body.parse(req.body);
// ensure chat exists if provided
if (body.chatId) {
const exists = await prisma.chat.findUnique({ where: { id: body.chatId }, select: { id: true } });
if (!exists) return app.httpErrors.notFound("chat not found");
}
// Store only new non-assistant messages to avoid duplicate history entries.
if (body.chatId) {
await storeNonAssistantMessages(body.chatId, body.messages);
}
const result = await runMultiplex(body);
return {
chatId: body.chatId ?? null,
...result,
};
});
// Streaming SSE endpoint.
app.post("/v1/chat-completions/stream", async (req, reply) => {
requireAdmin(req);
const Body = z.object({
chatId: z.string().optional(),
provider: z.enum(["openai", "anthropic", "xai"]),
model: z.string().min(1),
messages: z.array(
z.object({
role: z.enum(["system", "user", "assistant", "tool"]),
content: z.string(),
name: z.string().optional(),
})
),
temperature: z.number().min(0).max(2).optional(),
maxTokens: z.number().int().positive().optional(),
});
const body = Body.parse(req.body);
// ensure chat exists if provided
if (body.chatId) {
const exists = await prisma.chat.findUnique({ where: { id: body.chatId }, select: { id: true } });
if (!exists) return app.httpErrors.notFound("chat not found");
}
// Store only new non-assistant messages to avoid duplicate history entries.
if (body.chatId) {
await storeNonAssistantMessages(body.chatId, body.messages);
}
reply.raw.writeHead(200, buildSseHeaders(typeof req.headers.origin === "string" ? req.headers.origin : undefined));
const send = (event: string, data: any) => {
reply.raw.write(`event: ${event}\n`);
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`);
};
for await (const ev of runMultiplexStream(body)) {
if (ev.type === "meta") send("meta", ev);
else if (ev.type === "delta") send("delta", ev);
else if (ev.type === "done") send("done", ev);
else if (ev.type === "error") send("error", ev);
}
reply.raw.end();
return reply;
});
}