import { NextRequest, NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; import { Prisma } from "@agentlens/database"; import { validateApiKey } from "@/lib/api-key"; import { auth } from "@/auth"; // Types interface DecisionPointPayload { id: string; type: "TOOL_SELECTION" | "ROUTING" | "RETRY" | "ESCALATION" | "MEMORY_RETRIEVAL" | "PLANNING" | "CUSTOM"; reasoning?: string; chosen: Prisma.JsonValue; alternatives: Prisma.JsonValue[]; contextSnapshot?: Prisma.JsonValue; durationMs?: number; costUsd?: number; parentSpanId?: string; timestamp: string; } interface SpanPayload { id: string; parentSpanId?: string; name: string; type: "LLM_CALL" | "TOOL_CALL" | "MEMORY_OP" | "CHAIN" | "AGENT" | "CUSTOM"; input?: Prisma.JsonValue; output?: Prisma.JsonValue; tokenCount?: number; costUsd?: number; durationMs?: number; status: "RUNNING" | "COMPLETED" | "ERROR"; statusMessage?: string; startedAt: string; endedAt?: string; metadata?: Prisma.JsonValue; } interface EventPayload { id: string; spanId?: string; type: "ERROR" | "RETRY" | "FALLBACK" | "CONTEXT_OVERFLOW" | "USER_FEEDBACK" | "CUSTOM"; name: string; metadata?: Prisma.JsonValue; timestamp: string; } interface TracePayload { id: string; name: string; sessionId?: string; status: "RUNNING" | "COMPLETED" | "ERROR"; tags: string[]; metadata?: Prisma.JsonValue; totalCost?: number; totalTokens?: number; totalDuration?: number; startedAt: string; endedAt?: string; decisionPoints: DecisionPointPayload[]; spans: SpanPayload[]; events: EventPayload[]; } interface BatchTracesRequest { traces: TracePayload[]; } function topologicalSortSpans(spans: SpanPayload[]): SpanPayload[] { const byId = new Map(spans.map((s) => [s.id, s])); const sorted: SpanPayload[] = []; const visited = new Set(); function visit(span: SpanPayload) { if (visited.has(span.id)) return; visited.add(span.id); if (span.parentSpanId && byId.has(span.parentSpanId)) { visit(byId.get(span.parentSpanId)!); } sorted.push(span); } for (const span of spans) visit(span); return sorted; } // POST /api/traces — Batch ingest traces from SDK export async function POST(request: NextRequest) { try { // Validate Authorization header const authHeader = request.headers.get("authorization"); if (!authHeader || !authHeader.startsWith("Bearer ")) { return NextResponse.json({ error: "Missing or invalid Authorization header" }, { status: 401 }); } const contentLength = parseInt(request.headers.get("content-length") ?? "0", 10); const MAX_BODY_SIZE = 10 * 1024 * 1024; if (contentLength > MAX_BODY_SIZE) { return NextResponse.json({ error: "Request body too large (max 10MB)" }, { status: 413 }); } const rawApiKey = authHeader.slice(7); if (!rawApiKey) { return NextResponse.json({ error: "Missing API key in Authorization header" }, { status: 401 }); } const keyValidation = await validateApiKey(rawApiKey); if (!keyValidation) { return NextResponse.json({ error: "Invalid API key" }, { status: 401 }); } const { userId, subscription } = keyValidation; if (!subscription) { return NextResponse.json({ error: "No subscription found for this user" }, { status: 403 }); } const tier = subscription.tier; const sessionsLimit = subscription.sessionsLimit; if (tier === "FREE") { const startOfToday = new Date(); startOfToday.setUTCHours(0, 0, 0, 0); const dailyCount = await prisma.trace.count({ where: { userId, createdAt: { gte: startOfToday }, }, }); if (dailyCount >= sessionsLimit) { return NextResponse.json( { error: `Rate limit exceeded. Current plan: ${tier}. Limit: ${sessionsLimit}/day. Upgrade at /settings/billing`, }, { status: 429 } ); } } else { if (subscription.sessionsUsed >= sessionsLimit) { return NextResponse.json( { error: `Rate limit exceeded. Current plan: ${tier}. Limit: ${sessionsLimit}/month. Upgrade at /settings/billing`, }, { status: 429 } ); } } // Parse and validate request body const body: BatchTracesRequest = await request.json(); if (!body.traces || !Array.isArray(body.traces)) { return NextResponse.json({ error: "Request body must contain a 'traces' array" }, { status: 400 }); } if (body.traces.length === 0) { return NextResponse.json({ error: "Traces array cannot be empty" }, { status: 400 }); } // Validate each trace payload for (const trace of body.traces) { if (!trace.id || typeof trace.id !== "string") { return NextResponse.json({ error: "Each trace must have a valid 'id' (string)" }, { status: 400 }); } if (!trace.name || typeof trace.name !== "string") { return NextResponse.json({ error: "Each trace must have a valid 'name' (string)" }, { status: 400 }); } if (!trace.startedAt || typeof trace.startedAt !== "string") { return NextResponse.json({ error: "Each trace must have a valid 'startedAt' (ISO date string)" }, { status: 400 }); } if (!["RUNNING", "COMPLETED", "ERROR"].includes(trace.status)) { return NextResponse.json({ error: `Invalid trace status: ${trace.status}` }, { status: 400 }); } if (!Array.isArray(trace.tags)) { return NextResponse.json({ error: "Trace tags must be an array" }, { status: 400 }); } if (!Array.isArray(trace.decisionPoints)) { return NextResponse.json({ error: "Trace decisionPoints must be an array" }, { status: 400 }); } if (!Array.isArray(trace.spans)) { return NextResponse.json({ error: "Trace spans must be an array" }, { status: 400 }); } if (!Array.isArray(trace.events)) { return NextResponse.json({ error: "Trace events must be an array" }, { status: 400 }); } // Validate decision points for (const dp of trace.decisionPoints) { if (!dp.id || typeof dp.id !== "string") { return NextResponse.json({ error: "Each decision point must have a valid 'id' (string)" }, { status: 400 }); } if (!["TOOL_SELECTION", "ROUTING", "RETRY", "ESCALATION", "MEMORY_RETRIEVAL", "PLANNING", "CUSTOM"].includes(dp.type)) { return NextResponse.json({ error: `Invalid decision point type: ${dp.type}` }, { status: 400 }); } if (!dp.timestamp || typeof dp.timestamp !== "string") { return NextResponse.json({ error: "Each decision point must have a valid 'timestamp' (ISO date string)" }, { status: 400 }); } if (!Array.isArray(dp.alternatives)) { return NextResponse.json({ error: "Decision point alternatives must be an array" }, { status: 400 }); } } // Validate spans for (const span of trace.spans) { if (!span.id || typeof span.id !== "string") { return NextResponse.json({ error: "Each span must have a valid 'id' (string)" }, { status: 400 }); } if (!span.name || typeof span.name !== "string") { return NextResponse.json({ error: "Each span must have a valid 'name' (string)" }, { status: 400 }); } if (!["LLM_CALL", "TOOL_CALL", "MEMORY_OP", "CHAIN", "AGENT", "CUSTOM"].includes(span.type)) { return NextResponse.json({ error: `Invalid span type: ${span.type}` }, { status: 400 }); } if (!span.startedAt || typeof span.startedAt !== "string") { return NextResponse.json({ error: "Each span must have a valid 'startedAt' (ISO date string)" }, { status: 400 }); } if (!["RUNNING", "COMPLETED", "ERROR"].includes(span.status)) { return NextResponse.json({ error: `Invalid span status: ${span.status}` }, { status: 400 }); } } // Validate events for (const event of trace.events) { if (!event.id || typeof event.id !== "string") { return NextResponse.json({ error: "Each event must have a valid 'id' (string)" }, { status: 400 }); } if (!event.name || typeof event.name !== "string") { return NextResponse.json({ error: "Each event must have a valid 'name' (string)" }, { status: 400 }); } if (!["ERROR", "RETRY", "FALLBACK", "CONTEXT_OVERFLOW", "USER_FEEDBACK", "CUSTOM"].includes(event.type)) { return NextResponse.json({ error: `Invalid event type: ${event.type}` }, { status: 400 }); } if (!event.timestamp || typeof event.timestamp !== "string") { return NextResponse.json({ error: "Each event must have a valid 'timestamp' (ISO date string)" }, { status: 400 }); } } } // Upsert traces using interactive transaction to control insert order. // If a trace ID already exists, update the trace and replace all child // records (spans, decision points, events) so intermediate flushes and // final flushes both work seamlessly. const result = await prisma.$transaction(async (tx) => { const upserted: string[] = []; let newTraceCount = 0; for (const trace of body.traces) { const existing = await tx.trace.findUnique({ where: { id: trace.id }, select: { id: true, userId: true }, }); // Security: prevent cross-user trace overwrite if (existing && existing.userId !== userId) { continue; // skip traces owned by other users } const traceData = { name: trace.name, sessionId: trace.sessionId, status: trace.status, tags: trace.tags, metadata: trace.metadata as Prisma.InputJsonValue, totalCost: trace.totalCost, totalTokens: trace.totalTokens, totalDuration: trace.totalDuration, startedAt: new Date(trace.startedAt), endedAt: trace.endedAt ? new Date(trace.endedAt) : null, }; await tx.trace.upsert({ where: { id: trace.id }, create: { id: trace.id, userId, ...traceData }, update: traceData, }); if (!existing) { newTraceCount++; } // 2. Delete existing child records (order matters for FK constraints: // decision points reference spans, so delete decisions first) await tx.decisionPoint.deleteMany({ where: { traceId: trace.id } }); await tx.event.deleteMany({ where: { traceId: trace.id } }); await tx.span.deleteMany({ where: { traceId: trace.id } }); // 3. Recreate spans (parents before children for self-referencing FK) if (trace.spans.length > 0) { const spanOrder = topologicalSortSpans(trace.spans); for (const span of spanOrder) { await tx.span.create({ data: { id: span.id, traceId: trace.id, parentSpanId: span.parentSpanId, name: span.name, type: span.type, input: span.input as Prisma.InputJsonValue | undefined, output: span.output as Prisma.InputJsonValue | undefined, tokenCount: span.tokenCount, costUsd: span.costUsd, durationMs: span.durationMs, status: span.status, statusMessage: span.statusMessage, startedAt: new Date(span.startedAt), endedAt: span.endedAt ? new Date(span.endedAt) : null, metadata: span.metadata as Prisma.InputJsonValue | undefined, }, }); } } // 4. Recreate decision points if (trace.decisionPoints.length > 0) { const validSpanIds = new Set(trace.spans.map((s) => s.id)); await tx.decisionPoint.createMany({ data: trace.decisionPoints.map((dp) => ({ id: dp.id, traceId: trace.id, type: dp.type, reasoning: dp.reasoning, chosen: dp.chosen as Prisma.InputJsonValue, alternatives: dp.alternatives as Prisma.InputJsonValue[], contextSnapshot: dp.contextSnapshot as Prisma.InputJsonValue | undefined, durationMs: dp.durationMs, costUsd: dp.costUsd, parentSpanId: dp.parentSpanId && validSpanIds.has(dp.parentSpanId) ? dp.parentSpanId : null, timestamp: new Date(dp.timestamp), })), }); } // 5. Recreate events if (trace.events.length > 0) { await tx.event.createMany({ data: trace.events.map((event) => ({ id: event.id, traceId: trace.id, spanId: event.spanId, type: event.type, name: event.name, metadata: event.metadata as Prisma.InputJsonValue | undefined, timestamp: new Date(event.timestamp), })), }); } upserted.push(trace.id); } if (newTraceCount > 0 && tier !== "FREE") { await tx.subscription.update({ where: { id: subscription.id }, data: { sessionsUsed: { increment: newTraceCount } }, }); } return upserted; }); return NextResponse.json({ success: true, count: result.length }, { status: 200 }); } catch (error) { if (error instanceof SyntaxError) { return NextResponse.json({ error: "Invalid JSON in request body" }, { status: 400 }); } console.error("Error processing traces:", error); return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } } // GET /api/traces — List traces with pagination export async function GET(request: NextRequest) { try { const session = await auth(); if (!session?.user?.id) { return NextResponse.json({ error: "Unauthorized" }, { status: 401 }); } const { searchParams } = new URL(request.url); const page = parseInt(searchParams.get("page") ?? "1", 10); const limit = parseInt(searchParams.get("limit") ?? "20", 10); const status = searchParams.get("status"); const search = searchParams.get("search"); const sessionId = searchParams.get("sessionId"); const tags = searchParams.get("tags"); const sort = searchParams.get("sort") ?? "newest"; const dateFrom = searchParams.get("dateFrom"); const dateTo = searchParams.get("dateTo"); // Validate pagination parameters if (isNaN(page) || page < 1) { return NextResponse.json({ error: "Invalid page parameter. Must be a positive integer." }, { status: 400 }); } if (isNaN(limit) || limit < 1 || limit > 100) { return NextResponse.json({ error: "Invalid limit parameter. Must be between 1 and 100." }, { status: 400 }); } // Validate status parameter if provided const validStatuses = ["RUNNING", "COMPLETED", "ERROR"]; if (status && !validStatuses.includes(status)) { return NextResponse.json({ error: `Invalid status. Must be one of: ${validStatuses.join(", ")}` }, { status: 400 }); } // Validate sort parameter const validSorts = ["newest", "oldest", "longest", "shortest", "costliest"]; if (sort && !validSorts.includes(sort)) { return NextResponse.json({ error: `Invalid sort. Must be one of: ${validSorts.join(", ")}` }, { status: 400 }); } // Validate date parameters if (dateFrom && isNaN(Date.parse(dateFrom))) { return NextResponse.json({ error: "Invalid dateFrom parameter. Must be a valid ISO date string." }, { status: 400 }); } if (dateTo && isNaN(Date.parse(dateTo))) { return NextResponse.json({ error: "Invalid dateTo parameter. Must be a valid ISO date string." }, { status: 400 }); } const where: Record = { userId: session.user.id }; if (status) { where.status = status; } if (search) { where.name = { contains: search, mode: "insensitive", }; } if (sessionId) { where.sessionId = sessionId; } if (tags) { const tagList = tags.split(",").map((t) => t.trim()).filter(Boolean); if (tagList.length > 0) { where.tags = { hasSome: tagList, }; } } if (dateFrom) { where.createdAt = { ...((where.createdAt as Prisma.TraceWhereInput) ?? {}), gte: new Date(dateFrom), }; } if (dateTo) { where.createdAt = { ...((where.createdAt as Prisma.TraceWhereInput) ?? {}), lte: new Date(dateTo), }; } // Build order by clause based on sort parameter let orderBy: Prisma.TraceOrderByWithRelationInput = { startedAt: "desc", }; switch (sort) { case "oldest": orderBy = { startedAt: "asc" }; break; case "longest": orderBy = { totalDuration: "desc" }; break; case "shortest": orderBy = { totalDuration: "asc" }; break; case "costliest": orderBy = { totalCost: "desc" }; break; case "newest": default: orderBy = { startedAt: "desc" }; break; } // Count total traces const total = await prisma.trace.count({ where }); // Calculate pagination const skip = (page - 1) * limit; const totalPages = Math.ceil(total / limit); // Fetch traces with pagination const traces = await prisma.trace.findMany({ where, include: { _count: { select: { decisionPoints: true, spans: true, events: true, }, }, }, orderBy, skip, take: limit, }); return NextResponse.json({ traces, total, page, limit, totalPages, }, { status: 200 }); } catch (error) { console.error("Error listing traces:", error); return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } }