fix: upsert traces to handle duplicate IDs from intermediate flushes
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-Claude) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -184,33 +184,42 @@ export async function POST(request: NextRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
// Insert traces using interactive transaction to control insert order.
|
||||
// Spans must be inserted before decision points due to the
|
||||
// DecisionPoint.parentSpanId FK referencing Span.id.
|
||||
// Upsert traces using interactive transaction to control insert order.
|
||||
// If a trace ID already exists, update the trace and replace all child
|
||||
// records (spans, decision points, events) so intermediate flushes and
|
||||
// final flushes both work seamlessly.
|
||||
const result = await prisma.$transaction(async (tx) => {
|
||||
const created: string[] = [];
|
||||
const upserted: string[] = [];
|
||||
|
||||
for (const trace of body.traces) {
|
||||
// 1. Create the trace record (no nested relations)
|
||||
await tx.trace.create({
|
||||
data: {
|
||||
id: trace.id,
|
||||
name: trace.name,
|
||||
sessionId: trace.sessionId,
|
||||
status: trace.status,
|
||||
tags: trace.tags,
|
||||
metadata: trace.metadata as Prisma.InputJsonValue,
|
||||
totalCost: trace.totalCost,
|
||||
totalTokens: trace.totalTokens,
|
||||
totalDuration: trace.totalDuration,
|
||||
startedAt: new Date(trace.startedAt),
|
||||
endedAt: trace.endedAt ? new Date(trace.endedAt) : null,
|
||||
},
|
||||
const traceData = {
|
||||
name: trace.name,
|
||||
sessionId: trace.sessionId,
|
||||
status: trace.status,
|
||||
tags: trace.tags,
|
||||
metadata: trace.metadata as Prisma.InputJsonValue,
|
||||
totalCost: trace.totalCost,
|
||||
totalTokens: trace.totalTokens,
|
||||
totalDuration: trace.totalDuration,
|
||||
startedAt: new Date(trace.startedAt),
|
||||
endedAt: trace.endedAt ? new Date(trace.endedAt) : null,
|
||||
};
|
||||
|
||||
// 1. Upsert the trace record
|
||||
await tx.trace.upsert({
|
||||
where: { id: trace.id },
|
||||
create: { id: trace.id, ...traceData },
|
||||
update: traceData,
|
||||
});
|
||||
|
||||
// 2. Create spans FIRST (decision points may reference them via parentSpanId)
|
||||
// 2. Delete existing child records (order matters for FK constraints:
|
||||
// decision points reference spans, so delete decisions first)
|
||||
await tx.decisionPoint.deleteMany({ where: { traceId: trace.id } });
|
||||
await tx.event.deleteMany({ where: { traceId: trace.id } });
|
||||
await tx.span.deleteMany({ where: { traceId: trace.id } });
|
||||
|
||||
// 3. Recreate spans (parents before children for self-referencing FK)
|
||||
if (trace.spans.length > 0) {
|
||||
// Sort spans so parents come before children
|
||||
const spanOrder = topologicalSortSpans(trace.spans);
|
||||
for (const span of spanOrder) {
|
||||
await tx.span.create({
|
||||
@@ -235,9 +244,8 @@ export async function POST(request: NextRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Create decision points AFTER spans exist
|
||||
// 4. Recreate decision points
|
||||
if (trace.decisionPoints.length > 0) {
|
||||
// Collect valid span IDs so we can null-out invalid parentSpanId refs
|
||||
const validSpanIds = new Set(trace.spans.map((s) => s.id));
|
||||
|
||||
await tx.decisionPoint.createMany({
|
||||
@@ -257,7 +265,7 @@ export async function POST(request: NextRequest) {
|
||||
});
|
||||
}
|
||||
|
||||
// 4. Create events
|
||||
// 5. Recreate events
|
||||
if (trace.events.length > 0) {
|
||||
await tx.event.createMany({
|
||||
data: trace.events.map((event) => ({
|
||||
@@ -272,10 +280,10 @@ export async function POST(request: NextRequest) {
|
||||
});
|
||||
}
|
||||
|
||||
created.push(trace.id);
|
||||
upserted.push(trace.id);
|
||||
}
|
||||
|
||||
return created;
|
||||
return upserted;
|
||||
});
|
||||
|
||||
return NextResponse.json({ success: true, count: result.length }, { status: 200 });
|
||||
@@ -284,11 +292,6 @@ export async function POST(request: NextRequest) {
|
||||
return NextResponse.json({ error: "Invalid JSON in request body" }, { status: 400 });
|
||||
}
|
||||
|
||||
// Handle unique constraint violations
|
||||
if (error instanceof Error && error.message.includes("Unique constraint")) {
|
||||
return NextResponse.json({ error: "Duplicate trace ID detected" }, { status: 409 });
|
||||
}
|
||||
|
||||
console.error("Error processing traces:", error);
|
||||
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user