Skip to content

Commit 281a07b

Browse files
committed
feat(upload): commit and history API routes with batch upsert
1 parent ce149bd commit 281a07b

2 files changed

Lines changed: 227 additions & 0 deletions

File tree

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import { NextRequest, NextResponse } from "next/server"
2+
import { parseFileBuffer, getFileType, validateFileSize } from "@/lib/upload-parser"
3+
import { SCHEMAS, normalizeHeader, type UploadSchema, type ColumnMapping } from "@/lib/upload-schemas"
4+
import { getPool } from "@/lib/db"
5+
6+
const BATCH_SIZE = 500
7+
8+
export async function POST(request: NextRequest) {
9+
const userId = request.headers.get("x-user-id") ?? ""
10+
const userEmail = request.headers.get("x-user-email") ?? ""
11+
12+
try {
13+
const formData = await request.formData()
14+
const file = formData.get("file") as File | null
15+
const schemaId = formData.get("schemaId") as string | null
16+
const mappingJson = formData.get("columnMapping") as string | null
17+
18+
if (!file || !schemaId || !mappingJson) {
19+
return NextResponse.json(
20+
{ error: "Missing file, schemaId, or columnMapping" },
21+
{ status: 400 }
22+
)
23+
}
24+
25+
if (!validateFileSize(file.size)) {
26+
return NextResponse.json({ error: "File exceeds 50 MB limit" }, { status: 413 })
27+
}
28+
29+
const schema = SCHEMAS.find((s) => s.id === schemaId)
30+
if (!schema) {
31+
return NextResponse.json({ error: `Unknown schema: ${schemaId}` }, { status: 400 })
32+
}
33+
34+
const columnMapping: ColumnMapping[] = JSON.parse(mappingJson)
35+
36+
const fileType = getFileType(file.name)
37+
if (!fileType) {
38+
return NextResponse.json({ error: "Unsupported file type" }, { status: 400 })
39+
}
40+
41+
const buffer = Buffer.from(await file.arrayBuffer())
42+
const { rows } = await parseFileBuffer(buffer, fileType)
43+
44+
const result = await upsertRows(rows, columnMapping, schema)
45+
46+
// Log to upload_history
47+
const pool = getPool()
48+
const status =
49+
result.errors.length > 0 && result.inserted === 0
50+
? "failed"
51+
: result.errors.length > 0
52+
? "partial"
53+
: "success"
54+
55+
const { rows: historyRows } = await pool.query(
56+
`INSERT INTO upload_history (user_id, user_email, filename, file_type, rows_inserted, rows_skipped, error_count, status)
57+
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`,
58+
[userId, userEmail, file.name, schemaId, result.inserted, result.skipped, result.errors.length, status]
59+
)
60+
61+
return NextResponse.json({
62+
inserted: result.inserted,
63+
skipped: result.skipped,
64+
errors: result.errors.slice(0, 50),
65+
uploadId: historyRows[0].id,
66+
})
67+
} catch (err) {
68+
console.error("Upload commit error:", err)
69+
return NextResponse.json(
70+
{ error: `Upload failed: ${(err as Error).message}` },
71+
{ status: 500 }
72+
)
73+
}
74+
}
75+
76+
interface UpsertResult {
77+
inserted: number
78+
skipped: number
79+
errors: Array<{ row: number; column?: string; message: string }>
80+
}
81+
82+
async function upsertRows(
83+
rows: Record<string, string>[],
84+
columnMapping: ColumnMapping[],
85+
schema: UploadSchema
86+
): Promise<UpsertResult> {
87+
const pool = getPool()
88+
let inserted = 0
89+
let skipped = 0
90+
const errors: UpsertResult["errors"] = []
91+
92+
// Build the header→dbColumn map from the user-confirmed mapping
93+
const headerToDb = new Map<string, string>()
94+
for (const col of columnMapping) {
95+
if (col.mappedTo) {
96+
headerToDb.set(col.header, col.mappedTo)
97+
}
98+
}
99+
100+
// Build transform lookup from schema
101+
const transforms = new Map<string, (v: string) => string>()
102+
for (const col of schema.columns) {
103+
if (col.transform) {
104+
transforms.set(col.name, col.transform)
105+
}
106+
}
107+
108+
// Check required columns are mapped
109+
const mappedDbCols = new Set(headerToDb.values())
110+
for (const col of schema.columns) {
111+
if (col.required && !mappedDbCols.has(col.name)) {
112+
errors.push({ row: 0, column: col.name, message: `Required column not mapped: ${col.name}` })
113+
}
114+
}
115+
if (errors.length > 0) return { inserted: 0, skipped: 0, errors }
116+
117+
// Process in batches
118+
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
119+
const batch = rows.slice(i, i + BATCH_SIZE)
120+
121+
for (let j = 0; j < batch.length; j++) {
122+
const row = batch[j]
123+
const rowIndex = i + j + 1 // 1-based for user display
124+
125+
try {
126+
const dbRow: Record<string, string> = {}
127+
for (const [header, dbCol] of headerToDb) {
128+
let value = row[header] ?? ""
129+
const transform = transforms.get(dbCol)
130+
if (transform) value = transform(value)
131+
dbRow[dbCol] = value
132+
}
133+
134+
// Check required fields have values
135+
const missingRequired = schema.columns
136+
.filter((c) => c.required && (!dbRow[c.name] || dbRow[c.name].trim() === ""))
137+
if (missingRequired.length > 0) {
138+
skipped++
139+
errors.push({
140+
row: rowIndex,
141+
column: missingRequired[0].name,
142+
message: `Empty required field: ${missingRequired[0].name}`,
143+
})
144+
continue
145+
}
146+
147+
const cols = Object.keys(dbRow)
148+
const vals = Object.values(dbRow)
149+
const placeholders = cols.map((_, idx) => `$${idx + 1}`)
150+
const updateSet = cols
151+
.filter((c) => !schema.upsertKey.includes(c))
152+
.map((c) => `${c} = EXCLUDED.${c}`)
153+
.join(", ")
154+
155+
const conflictClause = schema.upsertKey.join(", ")
156+
const sql = updateSet
157+
? `INSERT INTO ${schema.targetTable} (${cols.join(", ")})
158+
VALUES (${placeholders.join(", ")})
159+
ON CONFLICT (${conflictClause}) DO UPDATE SET ${updateSet}`
160+
: `INSERT INTO ${schema.targetTable} (${cols.join(", ")})
161+
VALUES (${placeholders.join(", ")})
162+
ON CONFLICT (${conflictClause}) DO NOTHING`
163+
164+
await pool.query(sql, vals)
165+
inserted++
166+
} catch (err) {
167+
skipped++
168+
errors.push({
169+
row: rowIndex,
170+
message: (err as Error).message,
171+
})
172+
}
173+
}
174+
}
175+
176+
return { inserted, skipped, errors }
177+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { NextRequest, NextResponse } from "next/server"
2+
import { getPool } from "@/lib/db"
3+
4+
export async function GET(request: NextRequest) {
5+
try {
6+
const { searchParams } = new URL(request.url)
7+
const page = Math.max(1, parseInt(searchParams.get("page") ?? "1"))
8+
const pageSize = Math.min(50, Math.max(1, parseInt(searchParams.get("pageSize") ?? "20")))
9+
const offset = (page - 1) * pageSize
10+
11+
const pool = getPool()
12+
13+
const [dataResult, countResult] = await Promise.all([
14+
pool.query(
15+
`SELECT id, user_email, filename, file_type, rows_inserted, rows_skipped,
16+
error_count, status, uploaded_at
17+
FROM upload_history
18+
ORDER BY uploaded_at DESC
19+
LIMIT $1 OFFSET $2`,
20+
[pageSize, offset]
21+
),
22+
pool.query(`SELECT COUNT(*)::int AS total FROM upload_history`),
23+
])
24+
25+
const total = countResult.rows[0].total
26+
27+
return NextResponse.json({
28+
data: dataResult.rows.map((row) => ({
29+
id: row.id,
30+
userEmail: row.user_email,
31+
filename: row.filename,
32+
fileType: row.file_type,
33+
rowsInserted: row.rows_inserted,
34+
rowsSkipped: row.rows_skipped,
35+
errorCount: row.error_count,
36+
status: row.status,
37+
uploadedAt: row.uploaded_at,
38+
})),
39+
total,
40+
page,
41+
pageSize,
42+
})
43+
} catch (err) {
44+
console.error("Upload history error:", err)
45+
return NextResponse.json(
46+
{ error: `Failed to fetch upload history: ${(err as Error).message}` },
47+
{ status: 500 }
48+
)
49+
}
50+
}

0 commit comments

Comments
 (0)