Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 151 additions & 33 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
const slowDown = require("express-slow-down");
const helmet = require("helmet");
const jwt = require("jsonwebtoken");
const { pipeline } = require("stream/promises");
const { createReadStream, createWriteStream } = require("fs");
const {
askSchema,
askCredentialSchema,
Expand All @@ -19,7 +21,7 @@
summarizeSchema,
summarizeCredentialSchema,
sessionsLookupSchema,
knowledgeGapsSchema,

Check warning on line 24 in server.js

View workflow job for this annotation

GitHub Actions / Backend Tests (Node.js)

'knowledgeGapsSchema' is assigned a value but never used
MAX_QUESTION_LENGTH,
} = require("./validators/schemas");
const { clientIpFromRequest } = require("./security/ip");
Expand Down Expand Up @@ -526,6 +528,11 @@
const CLEANUP_INTERVAL_MS = parseInt(process.env.CLEANUP_INTERVAL_MS || "3600000", 10);

const startUploadsCleanup = () => {
// Skip cleanup interval during tests to prevent ENOENT errors after test completion
if (process.env.NODE_TEST === "1") {
return;
}

const intervalId = setInterval(async () => {
try {
const files = await fsPromises.readdir(UPLOADS_DIR);
Expand Down Expand Up @@ -1193,8 +1200,9 @@
}

// All validation passed — safe to open the file stream for forwarding.
const fileStream = fs.createReadStream(absoluteFilePath);
const formData = {
file: fs.createReadStream(absoluteFilePath),
file: fileStream,
original_filename: req.file.originalname,
};
if (sessionId && sessionSecret) {
Expand All @@ -1205,7 +1213,8 @@
const controller = new AbortController();
const onClientDisconnect = () => {
controller.abort();
cleanupFile(uploadedFilePath);
fileStream.destroy();
void cleanupFile(uploadedFilePath);
};
req.on("close", onClientDisconnect);

Expand Down Expand Up @@ -1323,66 +1332,175 @@
.replace(/[^a-zA-Z0-9._\- ]/g, "_")
.slice(0, 200);

let tempFilePath = null;
let downloadController = null;

try {
// Download the PDF from the remote URL into a Buffer
let pdfBuffer;
try {
const downloadUrl = new URL(trustedSupabaseOrigin);
downloadUrl.pathname = parsedUrl.pathname;
downloadUrl.search = parsedUrl.search;

const dlResponse = await axios.get(downloadUrl.toString(), {
responseType: "arraybuffer",
timeout: 30000,
maxContentLength: 50 * 1024 * 1024, // 50 MB cap
});
pdfBuffer = Buffer.from(dlResponse.data);
} catch (dlErr) {
console.error("Failed to download PDF from URL:", dlErr.message);
return res.status(502).json({ error: "Could not download PDF from the provided URL." });
// Generate a unique temporary file path
const tempId = crypto.randomUUID();
tempFilePath = path.join(UPLOADS_DIR, `${tempId}.pdf`);

// Download the PDF from the remote URL using streaming
const downloadUrl = new URL(trustedSupabaseOrigin);
downloadUrl.pathname = parsedUrl.pathname;
downloadUrl.search = parsedUrl.search;

downloadController = new AbortController();
const downloadTimeout = setTimeout(() => {
downloadController.abort();
}, 30000); // 30 second timeout for download

let bytesDownloaded = 0;
const MAX_DOWNLOAD_SIZE = MAX_PDF_SIZE_BYTES;

// Create a size-tracking transform stream
const { Transform } = require('stream');
const sizeTracker = new Transform({
transform(chunk, encoding, callback) {
bytesDownloaded += chunk.length;
if (bytesDownloaded > MAX_DOWNLOAD_SIZE) {
callback(new Error(`File size exceeds maximum allowed size of ${MAX_DOWNLOAD_SIZE} bytes`));
return;
}
callback(null, chunk);
}
});

// Stream the download to a temporary file
const dlResponse = await axios.get(downloadUrl.toString(), {
responseType: "stream",
signal: downloadController.signal,
timeout: 30000,
maxRedirects: 5,
});

clearTimeout(downloadTimeout);

// Verify PDF magic bytes from the first chunk
const magicByteValidator = new Transform({
transform(chunk, encoding, callback) {
// Only check the first chunk
if (!this._validated) {
this._validated = true;
if (chunk.length < 4 || chunk.slice(0, 4).toString() !== "%PDF") {
callback(new Error("The file at the provided URL is not a valid PDF."));
return;
}
}
callback(null, chunk);
}
});

// Pipeline: download stream -> magic byte validator -> size tracker -> file
await pipeline(
dlResponse.data,
magicByteValidator,
sizeTracker,
createWriteStream(tempFilePath),
{ end: true }
);

// Verify the file was downloaded successfully
const stats = await fsPromises.stat(tempFilePath);
if (stats.size === 0) {
throw new Error("Downloaded file is empty");
}

// Verify PDF magic bytes
if (pdfBuffer.slice(0, 4).toString() !== "%PDF") {
return res.status(415).json({ error: "The file at the provided URL is not a valid PDF." });
if (stats.size > MAX_DOWNLOAD_SIZE) {
throw new Error(`File size exceeds maximum allowed size of ${MAX_DOWNLOAD_SIZE} bytes`);
}

// Build multipart form and forward to RAG service
// Uses axios.postForm with a FormData blob — no extra form-data package needed
const form = new FormData();
const pdfBlob = new Blob([pdfBuffer], { type: 'application/pdf' });
form.append("file", pdfBlob, safeFilename);
form.append("original_filename", safeFilename);
// Use streaming approach with file path instead of loading into memory
const formData = {
file: createReadStream(tempFilePath),
original_filename: safeFilename,
};

// Optionally extend an existing session
const resolvedSessionSecret = await resolveSessionSecret(req, session_id, session_secret);
if (session_id && resolvedSessionSecret) {
form.append("session_id", session_id);
form.append("session_secret", resolvedSessionSecret);
formData.session_id = session_id;
formData.session_secret = resolvedSessionSecret;
}

const ragResponse = await axios.post(
const ragController = new AbortController();
const ragTimeout = setTimeout(() => {
ragController.abort();
}, 120000); // 2 min timeout for RAG processing

const ragResponse = await axios.postForm(
`${RAG_SERVICE_URL}/process-pdf`,
form,
formData,
{
headers: ragAuthHeaders(),
timeout: 120000, // 2 min — embedding generation can be slow
timeout: 120000,
signal: ragController.signal,
maxContentLength: Infinity,
maxBodyLength: Infinity,
}
);

clearTimeout(ragTimeout);

await setSessionSecretCookie(res, ragResponse.data.session_id || session_id, ragResponse.data.session_secret || resolvedSessionSecret);

// Clean up the temporary file
await cleanupFile(tempFilePath);

return res.json({
message: "PDF processed and indexed successfully.",
session_id: ragResponse.data.session_id,
document: ragResponse.data.document,
documents: ragResponse.data.documents || [],
});
} catch (err) {
const statusCode =
err.response?.status || (err.code === "ECONNREFUSED" ? 502 : 500);
// Clean up the temporary file on error
if (tempFilePath) {
try {
await cleanupFile(tempFilePath);
} catch (cleanupErr) {
console.error("Failed to clean up temporary file:", cleanupErr.message);
}
}

// Handle specific error cases
if (err.code === 'ABORTED') {
console.error("process-from-url failed: Request timed out");
return res.status(504).json({ error: "Request timed out during download or processing." });
}

if (err.message && err.message.includes('File size exceeds')) {
console.error("process-from-url failed: File too large");
return res.status(413).json({ error: err.message });
}

if (err.message && err.message.includes('not a valid PDF')) {
console.error("process-from-url failed: Invalid PDF format");
return res.status(415).json({ error: "The file at the provided URL is not a valid PDF." });
}

if (err.code === 'ECONNABORTED' || err.code === 'ETIMEDOUT') {
console.error("process-from-url failed: Connection timeout");
return res.status(504).json({ error: "Connection timeout while downloading PDF." });
}

if (err.code === 'ENOTFOUND' || err.code === 'ECONNREFUSED') {
console.error("process-from-url failed: Cannot reach remote server");
return res.status(502).json({ error: "Could not download PDF from the provided URL." });
}

if (err.response) {
const statusCode = err.response.status;
const details = extractServiceDetails(err, "RAG processing failed");
console.error("process-from-url failed:", details);
return res.status(statusCode).json({
error: typeof details === "string" ? details : "PDF processing failed",
details: isDevelopment ? details : "Internal processing error",
});
}

const statusCode = err.code === "ECONNREFUSED" ? 502 : 500;
const details = extractServiceDetails(err, "RAG processing failed");
console.error("process-from-url failed:", details);

Expand Down
Loading
Loading