diff --git a/server/src/index.ts b/server/src/index.ts index aaa920c..b42234a 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -75,26 +75,48 @@ function validateFilePath(filePath: string, uploadDir: string): boolean { return !!relative && !relative.startsWith('..') && !path.isAbsolute(relative); } -// Table operation locks to prevent race conditions -const tableOperationLocks = new Map>(); +// Table operation locks to prevent race conditions using proper mutex +interface TableMutex { + locked: boolean; + queue: Array<() => void>; +} + +const tableMutexes = new Map(); async function acquireTableLock(tableName: string): Promise<() => void> { - const existingLock = tableOperationLocks.get(tableName); - if (existingLock) { - await existingLock; + if (!tableMutexes.has(tableName)) { + tableMutexes.set(tableName, { locked: false, queue: [] }); } - let releaseLock: () => void; - const lockPromise = new Promise((resolve) => { - releaseLock = resolve; - }); + const mutex = tableMutexes.get(tableName)!; - tableOperationLocks.set(tableName, lockPromise); + if (!mutex.locked) { + mutex.locked = true; + return () => { + mutex.locked = false; + const next = mutex.queue.shift(); + if (next) { + next(); + } else if (mutex.queue.length === 0) { + tableMutexes.delete(tableName); + } + }; + } - return () => { - tableOperationLocks.delete(tableName); - releaseLock!(); - }; + return new Promise<() => void>((resolve) => { + mutex.queue.push(() => { + mutex.locked = true; + resolve(() => { + mutex.locked = false; + const next = mutex.queue.shift(); + if (next) { + next(); + } else if (mutex.queue.length === 0) { + tableMutexes.delete(tableName); + } + }); + }); + }); } async function insertBatch(records: any[], columns: string[], originalColumns: string[], tableName: string): Promise { @@ -159,6 +181,12 @@ async function startServer() { skip_empty_lines: true }); + // Add error handling for parser + parser.on('error', (err) => { + console.error('CSV parser error:', err); + throw new Error('Invalid or corrupted CSV file'); + }); + const sampleRows: any[] = []; const columnSamples = new Map(); let batchRecords: any[] = []; @@ -166,71 +194,79 @@ async function startServer() { let columns: string[] = []; let originalColumns: string[] = []; let recordCount = 0; - const batchSize = 100; + const batchSize = 500; // Increased batch size for better performance + const sampleSize = 100; // Increased sample size for better schema inference - for await (const record of csvStream.pipe(parser)) { - recordCount++; - - // Collect first 10 rows for sampling - if (sampleRows.length < 10) { - sampleRows.push(record); + try { + for await (const record of csvStream.pipe(parser)) { + recordCount++; - // Collect samples for each column - Object.keys(record).forEach(key => { - if (!columnSamples.has(key)) { - columnSamples.set(key, []); + // Collect sample rows for schema inference (increased from 10 to 100) + if (sampleRows.length < sampleSize) { + sampleRows.push(record); + + // Collect samples for each column + Object.keys(record).forEach(key => { + if (!columnSamples.has(key)) { + columnSamples.set(key, []); + } + columnSamples.get(key)!.push(record[key]); + }); + } + + // Initialize table after collecting enough samples + if (sampleRows.length === sampleSize && !isInitialized) { + originalColumns = Object.keys(sampleRows[0]); + columns = originalColumns.map(normalizeColumnName); + + // Validate column names to prevent SQL injection + if (!validateColumnNames(columns)) { + return res.status(400).json({ error: 'Invalid column names detected' }); } - columnSamples.get(key)!.push(record[key]); - }); - } - - // Initialize table after collecting enough samples - if (sampleRows.length === 10 && !isInitialized) { - originalColumns = Object.keys(sampleRows[0]); - columns = originalColumns.map(normalizeColumnName); + + if (columns.length === 0) { + return res.status(400).json({ error: 'CSV file has no columns' }); + } + + // Determine column types from sample data using multiple rows + const columnTypes = new Map(); + originalColumns.forEach((originalColumn, index) => { + const column = columns[index]; + const samples = columnSamples.get(originalColumn) || []; + columnTypes.set(column, guessSqlType(samples)); + }); + + // Drop existing table if it exists (done only once) + await query(`DROP TABLE IF EXISTS "${tableName}"`); + + // Create new table with quoted identifiers (done only once) + const createTableSQL = ` + CREATE TABLE "${tableName}" ( + ${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')} + ) + `; + console.log(createTableSQL); + await query(createTableSQL); + isInitialized = true; + } - // Validate column names to prevent SQL injection - if (!validateColumnNames(columns)) { - return res.status(400).json({ error: 'Invalid column names detected' }); + // Add record to batch only after table is initialized or if still sampling + if (isInitialized || sampleRows.length < sampleSize) { + batchRecords.push(record); } - if (columns.length === 0) { - return res.status(400).json({ error: 'CSV file has no columns' }); + // Process batch when it reaches batch size + if (batchRecords.length >= batchSize && isInitialized) { + await insertBatch(batchRecords, columns, originalColumns, tableName); + batchRecords = []; // Clear batch to reduce memory usage } - - // Determine column types from sample data using multiple rows - const columnTypes = new Map(); - originalColumns.forEach((originalColumn, index) => { - const column = columns[index]; - const samples = columnSamples.get(originalColumn) || []; - columnTypes.set(column, guessSqlType(samples)); - }); - - // Drop existing table if it exists - await query(`DROP TABLE IF EXISTS "${tableName}"`); - - // Create new table with quoted identifiers - const createTableSQL = ` - CREATE TABLE "${tableName}" ( - ${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')} - ) - `; - console.log(createTableSQL); - await query(createTableSQL); - isInitialized = true; - } - - // Add record to batch - batchRecords.push(record); - - // Process batch when it reaches batch size - if (batchRecords.length >= batchSize && isInitialized) { - await insertBatch(batchRecords, columns, originalColumns, tableName); - batchRecords = []; } + } catch (err) { + console.error('Error reading CSV:', err); + return res.status(400).json({ error: 'Invalid or corrupted CSV file' }); } - // Handle case where we have fewer than 10 rows but still need to initialize + // Handle case where we have fewer than sampleSize rows but still need to initialize if (!isInitialized && sampleRows.length > 0) { originalColumns = Object.keys(sampleRows[0]); columns = originalColumns.map(normalizeColumnName); @@ -252,10 +288,10 @@ async function startServer() { columnTypes.set(column, guessSqlType(samples)); }); - // Drop existing table if it exists + // Drop existing table if it exists (done only once) await query(`DROP TABLE IF EXISTS "${tableName}"`); - // Create new table with quoted identifiers + // Create new table with quoted identifiers (done only once) const createTableSQL = ` CREATE TABLE "${tableName}" ( ${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')} @@ -282,7 +318,9 @@ async function startServer() { // After successful upload, analyze the table and store the results const analysis = await analyzeTable(tableName); - // Store the analysis in TABLE_SCHEMA + // Store the analysis in TABLE_SCHEMA with safe table name handling + // tableName is already validated with validateTableName function + const safeTableName = tableName.replace(/[^a-zA-Z0-9_]/g, ''); await query( `INSERT INTO TABLE_SCHEMA (table_name, analysis) VALUES ($1, $2) @@ -290,7 +328,7 @@ async function startServer() { DO UPDATE SET analysis = $2, updated_at = CURRENT_TIMESTAMP`, - [tableName, analysis] + [safeTableName, analysis] ); res.json({