diff --git a/server/src/index.ts b/server/src/index.ts index 962a711..aaa920c 100644 --- a/server/src/index.ts +++ b/server/src/index.ts @@ -16,11 +16,31 @@ function isValidDate(value: string): boolean { (value.includes('-') || value.includes('/')); } -function guessSqlType(value: any): string { - if (value === null || value === undefined) return 'TEXT'; - if (typeof value === 'string' && isValidDate(value)) return 'TIMESTAMP'; - if (!isNaN(value) && value.toString().includes('.')) return 'NUMERIC'; - if (!isNaN(value)) return 'INTEGER'; +function guessSqlType(values: any[]): string { + let hasText = false; + let hasTimestamp = false; + let hasNumeric = false; + let hasInteger = false; + + for (const value of values) { + if (value === null || value === undefined) continue; + + if (typeof value === 'string' && isValidDate(value)) { + hasTimestamp = true; + } else if (!isNaN(value) && value.toString().includes('.')) { + hasNumeric = true; + } else if (!isNaN(value)) { + hasInteger = true; + } else { + hasText = true; + break; // Text is most general, no need to check further + } + } + + if (hasText) return 'TEXT'; + if (hasTimestamp) return 'TIMESTAMP'; + if (hasNumeric) return 'NUMERIC'; + if (hasInteger) return 'INTEGER'; return 'TEXT'; } @@ -51,7 +71,51 @@ function validateColumnNames(columns: string[]): boolean { function validateFilePath(filePath: string, uploadDir: string): boolean { const resolvedUploadDir = path.resolve(uploadDir); const resolvedFilePath = path.resolve(filePath); - return resolvedFilePath.startsWith(resolvedUploadDir); + const relative = path.relative(resolvedUploadDir, resolvedFilePath); + return !!relative && !relative.startsWith('..') && !path.isAbsolute(relative); +} + +// Table operation locks to prevent race conditions +const tableOperationLocks = new Map>(); + +async function acquireTableLock(tableName: string): Promise<() => void> { + const existingLock = tableOperationLocks.get(tableName); + if (existingLock) { + await existingLock; + } + + let releaseLock: () => void; + const lockPromise = new Promise((resolve) => { + releaseLock = resolve; + }); + + tableOperationLocks.set(tableName, lockPromise); + + return () => { + tableOperationLocks.delete(tableName); + releaseLock!(); + }; +} + +async function insertBatch(records: any[], columns: string[], originalColumns: string[], tableName: string): Promise { + const values: any[] = []; + const placeholders: string[] = []; + + records.forEach((record, batchIndex) => { + const recordValues = originalColumns.map(c => record[c]); + values.push(...recordValues); + const recordPlaceholders = columns.map((_, colIndex) => + `$${batchIndex * columns.length + colIndex + 1}` + ).join(', '); + placeholders.push(`(${recordPlaceholders})`); + }); + + const insertSQL = ` + INSERT INTO "${tableName}" (${columns.map(c => `"${c}"`).join(', ')}) + VALUES ${placeholders.join(', ')} + `; + + await query(insertSQL, values); } async function startServer() { @@ -84,41 +148,108 @@ async function startServer() { return res.status(400).json({ error: 'Invalid file path' }); } - // Single pass CSV processing with sampling and batch insertion - const csvStream = createReadStream(req.file.path); - const parser = parse({ - columns: true, - skip_empty_lines: true - }); - - const sampleRows: any[] = []; - const columnTypes = new Map(); - const allRecords: any[] = []; - let isInitialized = false; - let columns: string[] = []; + // Acquire table lock to prevent race conditions + const releaseLock = await acquireTableLock(tableName); - for await (const record of csvStream.pipe(parser)) { - allRecords.push(record); + try { + // Streaming CSV processing with reduced memory usage + const csvStream = createReadStream(req.file.path); + const parser = parse({ + columns: true, + skip_empty_lines: true + }); + + const sampleRows: any[] = []; + const columnSamples = new Map(); + let batchRecords: any[] = []; + let isInitialized = false; + let columns: string[] = []; + let originalColumns: string[] = []; + let recordCount = 0; + const batchSize = 100; - // Collect first 10 rows for sampling - if (sampleRows.length < 10) { - sampleRows.push(record); + for await (const record of csvStream.pipe(parser)) { + recordCount++; + + // Collect first 10 rows for sampling + if (sampleRows.length < 10) { + sampleRows.push(record); + + // Collect samples for each column + Object.keys(record).forEach(key => { + if (!columnSamples.has(key)) { + columnSamples.set(key, []); + } + columnSamples.get(key)!.push(record[key]); + }); + } + + // Initialize table after collecting enough samples + if (sampleRows.length === 10 && !isInitialized) { + originalColumns = Object.keys(sampleRows[0]); + columns = originalColumns.map(normalizeColumnName); + + // Validate column names to prevent SQL injection + if (!validateColumnNames(columns)) { + return res.status(400).json({ error: 'Invalid column names detected' }); + } + + if (columns.length === 0) { + return res.status(400).json({ error: 'CSV file has no columns' }); + } + + // Determine column types from sample data using multiple rows + const columnTypes = new Map(); + originalColumns.forEach((originalColumn, index) => { + const column = columns[index]; + const samples = columnSamples.get(originalColumn) || []; + columnTypes.set(column, guessSqlType(samples)); + }); + + // Drop existing table if it exists + await query(`DROP TABLE IF EXISTS "${tableName}"`); + + // Create new table with quoted identifiers + const createTableSQL = ` + CREATE TABLE "${tableName}" ( + ${columns.map(column => `"${column}" ${columnTypes.get(column)}`).join(',\n')} + ) + `; + console.log(createTableSQL); + await query(createTableSQL); + isInitialized = true; + } + + // Add record to batch + batchRecords.push(record); + + // Process batch when it reaches batch size + if (batchRecords.length >= batchSize && isInitialized) { + await insertBatch(batchRecords, columns, originalColumns, tableName); + batchRecords = []; + } } - - // Initialize table after collecting first sample - if (sampleRows.length === 1 && !isInitialized) { - // Determine column types from sample data - columns = Object.keys(sampleRows[0]).map(normalizeColumnName); + + // Handle case where we have fewer than 10 rows but still need to initialize + if (!isInitialized && sampleRows.length > 0) { + originalColumns = Object.keys(sampleRows[0]); + columns = originalColumns.map(normalizeColumnName); // Validate column names to prevent SQL injection if (!validateColumnNames(columns)) { return res.status(400).json({ error: 'Invalid column names detected' }); } - columns.forEach((column, index) => { - const originalColumn = Object.keys(sampleRows[0])[index]; - const value = sampleRows[0][originalColumn]; - columnTypes.set(column, guessSqlType(value)); + if (columns.length === 0) { + return res.status(400).json({ error: 'CSV file has no columns' }); + } + + // Determine column types from available sample data + const columnTypes = new Map(); + originalColumns.forEach((originalColumn, index) => { + const column = columns[index]; + const samples = columnSamples.get(originalColumn) || []; + columnTypes.set(column, guessSqlType(samples)); }); // Drop existing table if it exists @@ -134,65 +265,45 @@ async function startServer() { await query(createTableSQL); isInitialized = true; } - } - if (allRecords.length === 0) { - return res.status(400).json({ error: 'CSV file is empty' }); - } + if (recordCount === 0) { + return res.status(400).json({ error: 'CSV file is empty' }); + } - // Batch insert records - const batchSize = 100; - const originalColumns = Object.keys(allRecords[0]); - - for (let i = 0; i < allRecords.length; i += batchSize) { - const batch = allRecords.slice(i, i + batchSize); - const values: any[] = []; - const placeholders: string[] = []; - - batch.forEach((record, batchIndex) => { - const recordValues = originalColumns.map(c => record[c]); - values.push(...recordValues); - const recordPlaceholders = columns.map((_, colIndex) => - `$${batchIndex * columns.length + colIndex + 1}` - ).join(', '); - placeholders.push(`(${recordPlaceholders})`); - }); - - const insertSQL = ` - INSERT INTO "${tableName}" (${columns.map(c => `"${c}"`).join(', ')}) - VALUES ${placeholders.join(', ')} - `; + if (!isInitialized || columns.length === 0) { + return res.status(400).json({ error: 'Failed to initialize table - CSV may be invalid' }); + } + + // Insert remaining records in batch + if (batchRecords.length > 0) { + await insertBatch(batchRecords, columns, originalColumns, tableName); + } + + // After successful upload, analyze the table and store the results + const analysis = await analyzeTable(tableName); - await query(insertSQL, values); - } + // Store the analysis in TABLE_SCHEMA + await query( + `INSERT INTO TABLE_SCHEMA (table_name, analysis) + VALUES ($1, $2) + ON CONFLICT (table_name) + DO UPDATE SET + analysis = $2, + updated_at = CURRENT_TIMESTAMP`, + [tableName, analysis] + ); - // After successful upload, analyze the table and store the results - const analysis = await analyzeTable(tableName); - - // Validate table name again before storing in schema - if (!validateTableName(tableName)) { - return res.status(400).json({ error: 'Invalid table name' }); + res.json({ + message: 'CSV data successfully imported to database', + tableName, + recordCount, + columnCount: columns.length, + analysis + }); + } finally { + // Always release the table lock + releaseLock(); } - - // Store the analysis in TABLE_SCHEMA - await query( - `INSERT INTO TABLE_SCHEMA (table_name, analysis) - VALUES ($1, $2) - ON CONFLICT (table_name) - DO UPDATE SET - analysis = $2, - updated_at = CURRENT_TIMESTAMP`, - [tableName, analysis] - ); - - res.json({ - message: 'CSV data successfully imported to database', - tableName, - recordCount: allRecords.length, - columnCount: columns.length, - columnTypes: Object.fromEntries(columnTypes), - analysis - }); } catch (error) { console.error('Error processing CSV:', error); res.status(500).json({ error: 'Failed to process CSV file' });