From 01a7635cce658e4ab55f3b3a0846250c14cf3578 Mon Sep 17 00:00:00 2001 From: EllAchE <26192612+EllAchE@users.noreply.github.com> Date: Mon, 1 Apr 2024 00:07:06 -0700 Subject: [PATCH] use queue fixes --- src/fileReader.ts | 2 +- src/queue.ts | 31 +++++++++++++----------------- src/zst_decompressor.ts | 42 ++++++++++++++++++++--------------------- 3 files changed, 35 insertions(+), 40 deletions(-) diff --git a/src/fileReader.ts b/src/fileReader.ts index de8f3eb..9ea3b39 100644 --- a/src/fileReader.ts +++ b/src/fileReader.ts @@ -51,7 +51,7 @@ export async function* gameChunks( // Empty line, do nothing } else { // Unknown line, ignore the current game - console.log(`Unknown line: ${line}, game will be ignored.`); + // console.log(`Unknown line: ${line}, game will be ignored.`); ignoreGame = true; // Clear the metadata for the current game so that the game is not yielded metadata = []; diff --git a/src/queue.ts b/src/queue.ts index 3a91d73..9033e0b 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -5,6 +5,11 @@ import * as net from 'net'; export const RESULTS_PATH = `${__dirname}/results.json`; function launchQueueServer() { + // ensure results.json exists + if (!fs.existsSync(RESULTS_PATH)) { + fs.writeFileSync(RESULTS_PATH, '{}'); + } + // Create a write to result.json queue with a concurrency of 1 // Possibly the simplest fix would be to run this as a separate process, then we can enforce messages sent to this queue are processed in order const queue = asyncLib.queue((task, callback) => { @@ -12,32 +17,22 @@ function launchQueueServer() { console.log('received task', analysisKey); // read the results from aggregate results.json - let existingResults = {}; - if (fs.existsSync(RESULTS_PATH)) { - const fileContent = fs.readFileSync(RESULTS_PATH, 'utf8'); - if (fileContent !== '') { - existingResults = JSON.parse(fileContent); - } - } - - // TODO: Probably we need to read in the existing results in the queue server and merge them, as when there are multiple items in the queue - // this is going to be out of date - existingResults[analysisKey] = { - 'Number of games analyzed': 0, - }; + const fileContent = fs.readFileSync(RESULTS_PATH, 'utf8'); + const existingResults = JSON.parse(fileContent); + existingResults[analysisKey] = results; try { - fs.writeFileSync(RESULTS_PATH, JSON.stringify(results, null, 2)); - console.log( - `Analysis "${analysisKey}" has been written to ${RESULTS_PATH}` - ); + fs.writeFileSync(RESULTS_PATH, JSON.stringify(existingResults, null, 2)); + console.log(`"${analysisKey}" written to ${RESULTS_PATH}`); + callback(); } catch (err) { console.error('Error writing to results.json', err); + callback(err); } }, 1); queue.drain(function () { - console.log('all items have been processed'); + console.log('no more tasks to process'); }); // this event listener receives tasks from the parallel processes diff --git a/src/zst_decompressor.ts b/src/zst_decompressor.ts index bfe3bce..d3847e0 100644 --- a/src/zst_decompressor.ts +++ b/src/zst_decompressor.ts @@ -21,7 +21,7 @@ const rl = readline.createInterface({ const promptForSizeLimit = () => { return new Promise((resolve) => { rl.question('Enter the SIZE_LIMIT in MB (default is 10MB): ', (input) => { - const inputSizeMB = parseInt(input, 10); + const inputSizeMB = Number(input); if (!isNaN(inputSizeMB) && inputSizeMB > 0) { SIZE_LIMIT = inputSizeMB * 1024 * 1024; // Convert MB to bytes console.log(`Using SIZE_LIMIT of ${SIZE_LIMIT} bytes.`); @@ -43,7 +43,7 @@ const promptForConcurrentFilesLimit = () => { rl.question( 'Enter the number of files to analyze concurrently (default is 10): ', (input) => { - const inputLimit = parseInt(input, 10); + const inputLimit = Number(input); if (!isNaN(inputLimit) && inputLimit > 0) { concurrentFilesLimit = inputLimit; console.log( @@ -164,8 +164,6 @@ const decompressAndAnalyze = async (file, start = 0) => { `Starting decompression of chunk number ${total_chunk_counter}.` ); - let startTime = Date.now(); - // https://www.npmjs.com/package/node-zstandard#decompressionstreamfromfile-inputfile-callback zstd.decompressionStreamFromFile( `${compressedFilePath}/${file}`, @@ -187,22 +185,20 @@ const decompressAndAnalyze = async (file, start = 0) => { result.on('data', async (data) => { decompressedStream.write(data); - const duration = Date.now() - startTime; - const durationFormatted = formatDuration(duration); fileLength += data.length; batch_files_total_decompressed_size += data.length; these_chunks_counter++; // Check if the file size exceeds the limit, if so we need to make a new file if (getFileSize(newFilePath) >= SIZE_LIMIT) { - console.log( - `Finished decompression of data starting from byte ${start} and ending on byte ${ - start + fileLength - } of ${file} in ${durationFormatted}` - ); - console.log( - `Total number of chunks decompressed so far: ${total_chunk_counter}` - ); + // console.log( + // `Finished decompression of data starting from byte ${start} and ending on byte ${ + // start + fileLength + // } of ${file} in ${durationFormatted}` + // ); + // console.log( + // `Total number of chunks decompressed so far: ${total_chunk_counter}` + // ); // Increment the file counter file_counter++; @@ -212,7 +208,7 @@ const decompressAndAnalyze = async (file, start = 0) => { filesProduced.add(newFilePath); // Switch to a new file - console.log(`Creating file number ${file_counter}`); + // console.log(`Creating file number ${file_counter}`); decompressedStream = fs.createWriteStream(newFilePath, { flags: 'a', }); @@ -222,11 +218,11 @@ const decompressAndAnalyze = async (file, start = 0) => { total_chunk_counter += these_chunks_counter; these_chunks_counter = 0; - console.log( - `${these_chunks_counter} chunks decompressed with decompressed size ${ - fileLength / 1024 / 1024 - } MB` - ); + // console.log( + // `${these_chunks_counter} chunks decompressed with decompressed size ${ + // fileLength / 1024 / 1024 + // } MB` + // ); } // Stop decompression if the size of the combined decompressed files exceeds the decompressed total combined files size limit @@ -243,6 +239,7 @@ const decompressAndAnalyze = async (file, start = 0) => { result.on('end', async () => { // When all data is decompressed, run the analysis on the produced files concurrently for (const file of Array.from(filesProduced).slice(0, 10)) { + // TODO: This is debug code, I believe // TODO: this won't work out of the box for a large number of files as there is no max concurrency. But the sample only produces 4 decompressed files // I'm slicing to test this with a smaller number of files @@ -276,7 +273,9 @@ const decompressAndAnalyze = async (file, start = 0) => { // Function to process all files const processFiles = async (files: string[]) => { - console.log(`Initiating decompression and analysis of files: ${files}...`); + console.log( + `Initiating decompression and analysis of ${files.length} files: ${files}...` + ); console.time('Final Total Compressed File Analysis Execution Time'); for (const file of files) { try { @@ -310,6 +309,7 @@ if (require.main === module) { promptForSizeLimit().then(() => { promptForConcurrentFilesLimit().then(() => { rl.close(); // Close the readline interface after all prompts + // const files = ['lichess_db_standard_rated_2013-01.pgn.zst' /*...*/]; const files = ['lichess_db_standard_rated_2013-01.pgn.zst' /*...*/]; processFiles(files); });