Skip to content

Commit

Permalink
use queue fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
EllAchE committed Apr 1, 2024
1 parent 5b9dfa4 commit 01a7635
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/fileReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export async function* gameChunks(
// Empty line, do nothing
} else {
// Unknown line, ignore the current game
console.log(`Unknown line: ${line}, game will be ignored.`);
// console.log(`Unknown line: ${line}, game will be ignored.`);
ignoreGame = true;
// Clear the metadata for the current game so that the game is not yielded
metadata = [];
Expand Down
31 changes: 13 additions & 18 deletions src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,34 @@ import * as net from 'net';
export const RESULTS_PATH = `${__dirname}/results.json`;

function launchQueueServer() {
// ensure results.json exists
if (!fs.existsSync(RESULTS_PATH)) {
fs.writeFileSync(RESULTS_PATH, '{}');
}

// Create a write to result.json queue with a concurrency of 1
// Possibly the simplest fix would be to run this as a separate process, then we can enforce messages sent to this queue are processed in order
const queue = asyncLib.queue<any>((task, callback) => {
const { analysisKey, results } = task;
console.log('received task', analysisKey);

// read the results from aggregate results.json
let existingResults = {};
if (fs.existsSync(RESULTS_PATH)) {
const fileContent = fs.readFileSync(RESULTS_PATH, 'utf8');
if (fileContent !== '') {
existingResults = JSON.parse(fileContent);
}
}

// TODO: Probably we need to read in the existing results in the queue server and merge them, as when there are multiple items in the queue
// this is going to be out of date
existingResults[analysisKey] = {
'Number of games analyzed': 0,
};
const fileContent = fs.readFileSync(RESULTS_PATH, 'utf8');
const existingResults = JSON.parse(fileContent);
existingResults[analysisKey] = results;

try {
fs.writeFileSync(RESULTS_PATH, JSON.stringify(results, null, 2));
console.log(
`Analysis "${analysisKey}" has been written to ${RESULTS_PATH}`
);
fs.writeFileSync(RESULTS_PATH, JSON.stringify(existingResults, null, 2));
console.log(`"${analysisKey}" written to ${RESULTS_PATH}`);
callback();
} catch (err) {
console.error('Error writing to results.json', err);
callback(err);
}
}, 1);

queue.drain(function () {
console.log('all items have been processed');
console.log('no more tasks to process');
});

// this event listener receives tasks from the parallel processes
Expand Down
42 changes: 21 additions & 21 deletions src/zst_decompressor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const rl = readline.createInterface({
const promptForSizeLimit = () => {
return new Promise<void>((resolve) => {
rl.question('Enter the SIZE_LIMIT in MB (default is 10MB): ', (input) => {
const inputSizeMB = parseInt(input, 10);
const inputSizeMB = Number(input);
if (!isNaN(inputSizeMB) && inputSizeMB > 0) {
SIZE_LIMIT = inputSizeMB * 1024 * 1024; // Convert MB to bytes
console.log(`Using SIZE_LIMIT of ${SIZE_LIMIT} bytes.`);
Expand All @@ -43,7 +43,7 @@ const promptForConcurrentFilesLimit = () => {
rl.question(
'Enter the number of files to analyze concurrently (default is 10): ',
(input) => {
const inputLimit = parseInt(input, 10);
const inputLimit = Number(input);
if (!isNaN(inputLimit) && inputLimit > 0) {
concurrentFilesLimit = inputLimit;
console.log(
Expand Down Expand Up @@ -164,8 +164,6 @@ const decompressAndAnalyze = async (file, start = 0) => {
`Starting decompression of chunk number ${total_chunk_counter}.`
);

let startTime = Date.now();

// https://www.npmjs.com/package/node-zstandard#decompressionstreamfromfile-inputfile-callback
zstd.decompressionStreamFromFile(
`${compressedFilePath}/${file}`,
Expand All @@ -187,22 +185,20 @@ const decompressAndAnalyze = async (file, start = 0) => {
result.on('data', async (data) => {
decompressedStream.write(data);

const duration = Date.now() - startTime;
const durationFormatted = formatDuration(duration);
fileLength += data.length;
batch_files_total_decompressed_size += data.length;
these_chunks_counter++;

// Check if the file size exceeds the limit, if so we need to make a new file
if (getFileSize(newFilePath) >= SIZE_LIMIT) {
console.log(
`Finished decompression of data starting from byte ${start} and ending on byte ${
start + fileLength
} of ${file} in ${durationFormatted}`
);
console.log(
`Total number of chunks decompressed so far: ${total_chunk_counter}`
);
// console.log(
// `Finished decompression of data starting from byte ${start} and ending on byte ${
// start + fileLength
// } of ${file} in ${durationFormatted}`
// );
// console.log(
// `Total number of chunks decompressed so far: ${total_chunk_counter}`
// );

// Increment the file counter
file_counter++;
Expand All @@ -212,7 +208,7 @@ const decompressAndAnalyze = async (file, start = 0) => {
filesProduced.add(newFilePath);

// Switch to a new file
console.log(`Creating file number ${file_counter}`);
// console.log(`Creating file number ${file_counter}`);
decompressedStream = fs.createWriteStream(newFilePath, {
flags: 'a',
});
Expand All @@ -222,11 +218,11 @@ const decompressAndAnalyze = async (file, start = 0) => {
total_chunk_counter += these_chunks_counter;
these_chunks_counter = 0;

console.log(
`${these_chunks_counter} chunks decompressed with decompressed size ${
fileLength / 1024 / 1024
} MB`
);
// console.log(
// `${these_chunks_counter} chunks decompressed with decompressed size ${
// fileLength / 1024 / 1024
// } MB`
// );
}

// Stop decompression if the size of the combined decompressed files exceeds the decompressed total combined files size limit
Expand All @@ -243,6 +239,7 @@ const decompressAndAnalyze = async (file, start = 0) => {
result.on('end', async () => {
// When all data is decompressed, run the analysis on the produced files concurrently
for (const file of Array.from(filesProduced).slice(0, 10)) {
// TODO: This is debug code, I believe
// TODO: this won't work out of the box for a large number of files as there is no max concurrency. But the sample only produces 4 decompressed files
// I'm slicing to test this with a smaller number of files

Expand Down Expand Up @@ -276,7 +273,9 @@ const decompressAndAnalyze = async (file, start = 0) => {

// Function to process all files
const processFiles = async (files: string[]) => {
console.log(`Initiating decompression and analysis of files: ${files}...`);
console.log(
`Initiating decompression and analysis of ${files.length} files: ${files}...`
);
console.time('Final Total Compressed File Analysis Execution Time');
for (const file of files) {
try {
Expand Down Expand Up @@ -310,6 +309,7 @@ if (require.main === module) {
promptForSizeLimit().then(() => {
promptForConcurrentFilesLimit().then(() => {
rl.close(); // Close the readline interface after all prompts
// const files = ['lichess_db_standard_rated_2013-01.pgn.zst' /*...*/];
const files = ['lichess_db_standard_rated_2013-01.pgn.zst' /*...*/];
processFiles(files);
});
Expand Down

0 comments on commit 01a7635

Please sign in to comment.