import { performance } from "node:perf_hooks"; import { z } from "zod"; import type { FastifyInstance } from "fastify"; import { prisma } from "./db.js"; import { requireAdmin } from "./auth.js"; import { env } from "./env.js"; import { runMultiplex } from "./llm/multiplexer.js"; import { runMultiplexStream } from "./llm/streaming.js"; import { exaClient } from "./search/exa.js"; type IncomingChatMessage = { role: "system" | "user" | "assistant" | "tool"; content: string; name?: string; }; function sameMessage(a: IncomingChatMessage, b: IncomingChatMessage) { return a.role === b.role && a.content === b.content && (a.name ?? null) === (b.name ?? null); } async function storeNonAssistantMessages(chatId: string, messages: IncomingChatMessage[]) { const incoming = messages.filter((m) => m.role !== "assistant"); if (!incoming.length) return; const existing = await prisma.message.findMany({ where: { chatId }, orderBy: { createdAt: "asc" }, select: { role: true, content: true, name: true }, }); const existingNonAssistant = existing.filter((m) => m.role !== "assistant"); let sharedPrefix = 0; const max = Math.min(existingNonAssistant.length, incoming.length); while (sharedPrefix < max && sameMessage(existingNonAssistant[sharedPrefix] as IncomingChatMessage, incoming[sharedPrefix])) { sharedPrefix += 1; } if (sharedPrefix === incoming.length) return; const toInsert = sharedPrefix === existingNonAssistant.length ? incoming.slice(existingNonAssistant.length) : incoming; if (!toInsert.length) return; await prisma.message.createMany({ data: toInsert.map((m) => ({ chatId, role: m.role as any, content: m.content, name: m.name, })), }); } export async function registerRoutes(app: FastifyInstance) { app.get("/health", async () => ({ ok: true })); app.get("/v1/auth/session", async (req) => { requireAdmin(req); return { authenticated: true, mode: env.ADMIN_TOKEN ? "token" : "open" }; }); app.get("/v1/chats", async (req) => { requireAdmin(req); const chats = await prisma.chat.findMany({ orderBy: { updatedAt: "desc" }, take: 100, select: { id: true, title: true, createdAt: true, updatedAt: true }, }); return { chats }; }); app.post("/v1/chats", async (req) => { requireAdmin(req); const Body = z.object({ title: z.string().optional() }); const body = Body.parse(req.body ?? {}); const chat = await prisma.chat.create({ data: { title: body.title } }); return { chat }; }); app.get("/v1/searches", async (req) => { requireAdmin(req); const searches = await prisma.search.findMany({ orderBy: { updatedAt: "desc" }, take: 100, select: { id: true, title: true, query: true, createdAt: true, updatedAt: true }, }); return { searches }; }); app.post("/v1/searches", async (req) => { requireAdmin(req); const Body = z.object({ title: z.string().optional(), query: z.string().optional() }); const body = Body.parse(req.body ?? {}); const title = body.title?.trim() || body.query?.trim()?.slice(0, 80); const query = body.query?.trim() || null; const search = await prisma.search.create({ data: { title: title || null, query, }, select: { id: true, title: true, query: true, createdAt: true, updatedAt: true }, }); return { search }; }); app.get("/v1/searches/:searchId", async (req) => { requireAdmin(req); const Params = z.object({ searchId: z.string() }); const { searchId } = Params.parse(req.params); const search = await prisma.search.findUnique({ where: { id: searchId }, include: { results: { orderBy: { rank: "asc" } } }, }); if (!search) return app.httpErrors.notFound("search not found"); return { search }; }); app.post("/v1/searches/:searchId/run", async (req) => { requireAdmin(req); const Params = z.object({ searchId: z.string() }); const Body = z.object({ query: z.string().trim().min(1).optional(), title: z.string().trim().min(1).optional(), type: z.enum(["auto", "fast", "deep", "instant"]).optional(), numResults: z.number().int().min(1).max(25).optional(), includeDomains: z.array(z.string().trim().min(1)).max(50).optional(), excludeDomains: z.array(z.string().trim().min(1)).max(50).optional(), }); const { searchId } = Params.parse(req.params); const body = Body.parse(req.body ?? {}); const existing = await prisma.search.findUnique({ where: { id: searchId }, select: { id: true, query: true }, }); if (!existing) return app.httpErrors.notFound("search not found"); const query = body.query?.trim() || existing.query?.trim(); if (!query) return app.httpErrors.badRequest("query is required"); const startedAt = performance.now(); try { const exa = exaClient(); const [searchOutcome, answerOutcome] = await Promise.allSettled([ exa.searchAndContents(query, { type: body.type ?? "auto", numResults: body.numResults ?? 10, includeDomains: body.includeDomains, excludeDomains: body.excludeDomains, text: { maxCharacters: 1200 }, highlights: { query, maxCharacters: 320, numSentences: 2, highlightsPerUrl: 2, }, moderation: true, userLocation: "US", } as any), exa.answer(query, { text: true, model: "exa", userLocation: "US", }), ]); const searchResponse = searchOutcome.status === "fulfilled" ? searchOutcome.value : null; const answerResponse = answerOutcome.status === "fulfilled" ? answerOutcome.value : null; const searchError = searchOutcome.status === "rejected" ? searchOutcome.reason?.message ?? String(searchOutcome.reason) : null; const answerError = answerOutcome.status === "rejected" ? answerOutcome.reason?.message ?? String(answerOutcome.reason) : null; const latencyMs = Math.round(performance.now() - startedAt); const normalizedTitle = body.title?.trim() || query.slice(0, 80); const rows = (searchResponse?.results ?? []).map((result: any, index: number) => ({ searchId, rank: index, title: result.title ?? null, url: result.url, publishedDate: result.publishedDate ?? null, author: result.author ?? null, text: result.text ?? null, highlights: Array.isArray(result.highlights) ? (result.highlights as any) : null, highlightScores: Array.isArray(result.highlightScores) ? (result.highlightScores as any) : null, score: typeof result.score === "number" ? result.score : null, favicon: result.favicon ?? null, image: result.image ?? null, })); const answerText = typeof answerResponse?.answer === "string" ? answerResponse.answer : answerResponse?.answer ? JSON.stringify(answerResponse.answer, null, 2) : null; await prisma.$transaction(async (tx) => { await tx.search.update({ where: { id: searchId }, data: { query, title: normalizedTitle, requestId: searchResponse?.requestId ?? null, rawResponse: searchResponse as any, latencyMs, error: searchError, answerText, answerRequestId: answerResponse?.requestId ?? null, answerCitations: (answerResponse?.citations as any) ?? null, answerRawResponse: answerResponse as any, answerError, }, }); await tx.searchResult.deleteMany({ where: { searchId } }); if (rows.length) { await tx.searchResult.createMany({ data: rows as any }); } }); if (searchError && answerError) { throw app.httpErrors.badGateway(`Exa search and answer failed: ${searchError}; ${answerError}`); } const search = await prisma.search.findUnique({ where: { id: searchId }, include: { results: { orderBy: { rank: "asc" } } }, }); if (!search) return app.httpErrors.notFound("search not found"); return { search }; } catch (err: any) { await prisma.search.update({ where: { id: searchId }, data: { latencyMs: Math.round(performance.now() - startedAt), error: err?.message ?? String(err), }, }); throw err; } }); app.get("/v1/chats/:chatId", async (req) => { requireAdmin(req); const Params = z.object({ chatId: z.string() }); const { chatId } = Params.parse(req.params); const chat = await prisma.chat.findUnique({ where: { id: chatId }, include: { messages: { orderBy: { createdAt: "asc" } }, calls: { orderBy: { createdAt: "desc" } } }, }); if (!chat) return app.httpErrors.notFound("chat not found"); return { chat }; }); app.post("/v1/chats/:chatId/messages", async (req) => { requireAdmin(req); const Params = z.object({ chatId: z.string() }); const Body = z.object({ role: z.enum(["system", "user", "assistant", "tool"]), content: z.string(), name: z.string().optional(), metadata: z.unknown().optional(), }); const { chatId } = Params.parse(req.params); const body = Body.parse(req.body); const msg = await prisma.message.create({ data: { chatId, role: body.role as any, content: body.content, name: body.name, metadata: body.metadata as any, }, }); return { message: msg }; }); // Main: create a completion via provider+model and store everything. app.post("/v1/chat-completions", async (req) => { requireAdmin(req); const Body = z.object({ chatId: z.string().optional(), provider: z.enum(["openai", "anthropic", "xai"]), model: z.string().min(1), messages: z.array( z.object({ role: z.enum(["system", "user", "assistant", "tool"]), content: z.string(), name: z.string().optional(), }) ), temperature: z.number().min(0).max(2).optional(), maxTokens: z.number().int().positive().optional(), }); const body = Body.parse(req.body); // ensure chat exists if provided if (body.chatId) { const exists = await prisma.chat.findUnique({ where: { id: body.chatId }, select: { id: true } }); if (!exists) return app.httpErrors.notFound("chat not found"); } // Store only new non-assistant messages to avoid duplicate history entries. if (body.chatId) { await storeNonAssistantMessages(body.chatId, body.messages); } const result = await runMultiplex(body); return { chatId: body.chatId ?? null, ...result, }; }); // Streaming SSE endpoint. app.post("/v1/chat-completions/stream", async (req, reply) => { requireAdmin(req); const Body = z.object({ chatId: z.string().optional(), provider: z.enum(["openai", "anthropic", "xai"]), model: z.string().min(1), messages: z.array( z.object({ role: z.enum(["system", "user", "assistant", "tool"]), content: z.string(), name: z.string().optional(), }) ), temperature: z.number().min(0).max(2).optional(), maxTokens: z.number().int().positive().optional(), }); const body = Body.parse(req.body); // ensure chat exists if provided if (body.chatId) { const exists = await prisma.chat.findUnique({ where: { id: body.chatId }, select: { id: true } }); if (!exists) return app.httpErrors.notFound("chat not found"); } // Store only new non-assistant messages to avoid duplicate history entries. if (body.chatId) { await storeNonAssistantMessages(body.chatId, body.messages); } reply.raw.writeHead(200, { "Content-Type": "text/event-stream; charset=utf-8", "Cache-Control": "no-cache, no-transform", Connection: "keep-alive", }); const send = (event: string, data: any) => { reply.raw.write(`event: ${event}\n`); reply.raw.write(`data: ${JSON.stringify(data)}\n\n`); }; for await (const ev of runMultiplexStream(body)) { if (ev.type === "meta") send("meta", ev); else if (ev.type === "delta") send("delta", ev); else if (ev.type === "done") send("done", ev); else if (ev.type === "error") send("error", ev); } reply.raw.end(); return reply; }); }