diff --git a/ClusterOperator/Backlog.js b/ClusterOperator/Backlog.js index c21a4f7..b6c98ca 100644 --- a/ClusterOperator/Backlog.js +++ b/ClusterOperator/Backlog.js @@ -1,631 +1,647 @@ -/* eslint-disable no-else-return */ -/* eslint-disable no-restricted-syntax */ -// const timer = require('timers/promises'); -const queryCache = require('memory-cache'); -const fs = require('fs'); -const path = require('path'); -const dbClient = require('./DBClient'); -const config = require('./config'); -const log = require('../lib/log'); -const Security = require('./Security'); -const ConnectionPool = require('../lib/ConnectionPool'); -const utill = require('../lib/utill'); -const mysqldump = require('../modules/mysqldump'); - -class BackLog { - static buffer = []; - - static sequenceNumber = 0; - - static bufferSequenceNumber = 0; - - static bufferStartSequenceNumber = 0; - - static BLClient = null; - - static UserDBClient = null; - - static writeLock = false; - - static executeLogs = true; - - static BLqueryCache = queryCache; - - /** - * [createBacklog] - * @param {object} params [description] - */ - static async createBacklog(UserDBClient) { - this.BLClient = await dbClient.createClient(); - this.UserDBClient = UserDBClient; - try { - if (config.dbType === 'mysql') { - const dbList = await this.BLClient.query(`SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.dbBacklog}'`); - if (dbList.length === 0) { - log.info('Backlog DB not defined yet, creating backlog DB...'); - await this.BLClient.createDB(config.dbBacklog); - } else { - log.info('Backlog DB already exists, moving on...'); - } - await this.BLClient.setDB(config.dbBacklog); - let tableList = await this.BLClient.query(`SELECT * FROM INFORMATION_SCHEMA.tables - WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbBacklogCollection}'`); - if (tableList.length === 0) { - log.info('Backlog table not defined yet, creating backlog table...'); - await this.BLClient.query(`CREATE TABLE ${config.dbBacklogCollection} (seq bigint, query longtext, timestamp bigint) ENGINE=InnoDB;`); - await this.BLClient.query(`ALTER TABLE \`${config.dbBacklog}\`.\`${config.dbBacklogCollection}\` - MODIFY COLUMN \`seq\` bigint(0) UNSIGNED NOT NULL FIRST, - ADD PRIMARY KEY (\`seq\`), - ADD UNIQUE INDEX \`seq\`(\`seq\`);`); - } else { - log.info('Backlog table already exists, moving on...'); - this.sequenceNumber = await this.getLastSequenceNumber(); - } - tableList = await this.BLClient.query(`SELECT * FROM INFORMATION_SCHEMA.tables - WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbBacklogBuffer}'`); - if (tableList.length === 0) { - log.info('Backlog buffer table not defined yet, creating buffer table...'); - await this.BLClient.query(`CREATE TABLE ${config.dbBacklogBuffer} (seq bigint, query longtext, timestamp bigint) ENGINE=InnoDB;`); - await this.BLClient.query(`ALTER TABLE \`${config.dbBacklog}\`.\`${config.dbBacklogBuffer}\` - MODIFY COLUMN \`seq\` bigint(0) UNSIGNED NOT NULL FIRST, - ADD PRIMARY KEY (\`seq\`), - ADD UNIQUE INDEX \`seq\`(\`seq\`);`); - } else { - log.info('Backlog buffer table already exists, moving on...'); - } - tableList = await this.BLClient.query(`SELECT * FROM INFORMATION_SCHEMA.tables - WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbOptions}'`); - if (tableList.length === 0) { - log.info('Backlog options table not defined yet, creating options table...'); - await this.BLClient.query(`CREATE TABLE ${config.dbOptions} (k varchar(64), value text, PRIMARY KEY (k)) ENGINE=InnoDB;`); - } else { - log.info('Backlog options table already exists, moving on...'); - } - log.info(`Last Seq No: ${this.sequenceNumber}`); - } - } catch (e) { - log.error(`Error creating backlog: ${e}`); - } - } - - /** - * [pushQuery] - * @param {string} query [description] - * @param {int} timestamp [description] - * @return {Array} - */ - static async pushQuery(query, seq = 0, timestamp, buffer = false, connId = false, fullQuery = '') { - // eslint-disable-next-line no-param-reassign - if (timestamp === undefined) timestamp = Date.now(); - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - return []; - } - try { - if (config.dbType === 'mysql') { - if (buffer) { - if (this.bufferStartSequenceNumber === 0) this.bufferStartSequenceNumber = seq; - this.bufferSequenceNumber = seq; - await this.BLClient.execute( - `INSERT INTO ${config.dbBacklogBuffer} (seq, query, timestamp) VALUES (?,?,?)`, - [seq, query, timestamp], - ); - return [null, seq, timestamp]; - } else { - this.writeLock = true; - let result = null; - if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; } - const seqForThis = this.sequenceNumber; - const BLResult = await this.BLClient.execute( - `INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`, - [seqForThis, query, timestamp], - ); - if (this.executeLogs) log.info(`executed ${seqForThis}`); - this.BLqueryCache.put(seqForThis, { - query, seq: seqForThis, timestamp, connId, ip: false, - }, 1000 * 30); - this.writeLock = false; - // Abort query execution if there is an error in backlog insert - if (Array.isArray(BLResult) && BLResult[2]) { - log.error(`Error in SQL: ${JSON.stringify(BLResult[2])}`); - } else { - if (connId === false) { - // most beckup softwares generate wrong sql statements for mysql v8, this prevents sql_mode related sql errors on v8. - if (!query.toLowerCase().startsWith('SET SESSION')) await this.UserDBClient.query("SET SESSION sql_mode='IGNORE_SPACE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'", false, fullQuery); - result = await this.UserDBClient.query(query, false, fullQuery); - } else if (connId >= 0) { - result = await ConnectionPool.getConnectionById(connId).query(query, false, fullQuery); - } - if (Array.isArray(result) && result[2]) { - log.error(`Error in SQL: ${JSON.stringify(result[2])}`); - } - } - return [result, seqForThis, timestamp]; - } - /* - if (seq === 0 || this.sequenceNumber + 1 === seq) { - - while (this.writeLock) await timer.setTimeout(10); - this.writeLock = true; - if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; } - const seqForThis = this.sequenceNumber; - let result2 = null; - if (connId === false) { - result2 = await this.UserDBClient.query(query); - } else { - result2 = await ConnectionPool.getConnectionById(connId).query(query); - } - await this.BLClient.execute( - `INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`, - [seqForThis, query, timestamp], - ); - this.writeLock = false; - return [result2, seqForThis, timestamp]; - } else if (this.bufferStartSequenceNumber === this.sequenceNumber + 1) { - await this.moveBufferToBacklog(); - return await this.pushQuery(query, seq, timestamp, buffer, connId); - } else { - if (this.sequenceNumber + 1 < seq) { - log.error(`Wrong query order, ${this.sequenceNumber + 1} < ${seq}. pushing to buffer.`); - if (this.bufferStartSequenceNumber === 0) this.bufferStartSequenceNumber = seq; - this.bufferSequenceNumber = seq; - await this.BLClient.execute( - `INSERT INTO ${config.dbBacklogBuffer} (seq, query, timestamp) VALUES (?,?,?)`, - [seq, query, timestamp], - ); - } - return []; - } */ - } - } catch (e) { - this.writeLock = false; - log.error(`error executing query, ${query}, ${seq}`); - log.error(e); - } - return []; - } - - /** - * [getLogs] - * @param {int} startFrom [description] - * @param {int} pageSize [description] - * @return {Array} - */ - static async getLogs(startFrom, pageSize) { - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - return []; - } - try { - if (config.dbType === 'mysql') { - const totalRecords = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq >= ${startFrom} ORDER BY seq LIMIT ${pageSize}`); - const trimedRecords = utill.trimArrayToSize(totalRecords, 3 * 1024 * 1024); - log.info(`sending backlog records ${startFrom},${pageSize}, records: ${trimedRecords.length}`); - return trimedRecords; - } - } catch (e) { - log.error(e); - } - return []; - } - - /** - * [getLogsByTime] - * @param {int} startFrom [description] - * @param {int} length [description] - * @return {Array} - */ - static async getLogsByTime(startFrom, length) { - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - return []; - } - try { - if (config.dbType === 'mysql') { - const totalRecords = await this.BLClient.execute(`SELECT seq, LEFT(query,10) as query, timestamp FROM ${config.dbBacklogCollection} WHERE timestamp >= ? AND timestamp < ? ORDER BY seq`, [startFrom, Number(startFrom) + Number(length)]); - return totalRecords; - } - } catch (e) { - log.error(e); - } - return []; - } - - /** - * [getLogs] - * @param {int} index [description] - * @return {object} - */ - static async getLog(index) { - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - return []; - } - try { - if (config.dbType === 'mysql') { - const record = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=${index}`); - // log.info(`backlog records ${startFrom},${pageSize}:${JSON.stringify(totalRecords)}`); - return record; - } - } catch (e) { - log.error(e); - } - return []; - } - - /** - * [getDateRange] - * @return {object} - */ - static async getDateRange() { - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - return []; - } - try { - if (config.dbType === 'mysql') { - const record = await this.BLClient.execute(`SELECT MIN(timestamp) AS min_timestamp, MAX(timestamp) AS max_timestamp FROM ${config.dbBacklogCollection}`); - log.info(record); - return record[0]; - } - } catch (e) { - log.error(e); - } - return []; - } - - /** - * [getTotalLogsCount] - * @return {int} - */ - static async getTotalLogsCount() { - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - } else { - try { - if (config.dbType === 'mysql') { - const totalRecords = await this.BLClient.query(`SELECT count(*) as total FROM ${config.dbBacklogCollection}`); - log.info(`Total Records: ${JSON.stringify(totalRecords)}`); - return totalRecords[0].total; - } - } catch (e) { - log.error(e); - } - } - return 0; - } - - /** - * [getLastSequenceNumber] - * @return {int} - */ - static async getLastSequenceNumber(buffer = false) { - if (!this.BLClient) { - log.error('Backlog not created yet. Call createBacklog() first.'); - } else { - try { - if (config.dbType === 'mysql') { - let records = []; - if (buffer) { - records = await this.BLClient.query(`SELECT seq as seqNo FROM ${config.dbBacklogBuffer} ORDER BY seq DESC LIMIT 1`); - } else { - records = await this.BLClient.query(`SELECT seq as seqNo FROM ${config.dbBacklogCollection} ORDER BY seq DESC LIMIT 1`); - } - if (records.length) return records[0].seqNo; - } - } catch (e) { - log.error(e); - } - } - return 0; - } - - /** - * [keepConnections] - */ - static async keepConnections() { - if (config.dbType === 'mysql' && this.BLClient) { - await this.BLClient.setDB(config.dbBacklog); - await this.UserDBClient.setDB(config.dbInitDB); - } - } - - /** - * [clearLogs] - */ - static async clearLogs() { - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - await this.BLClient.query(`DELETE FROM ${config.dbBacklogCollection}`); - this.sequenceNumber = 0; - } - } catch (e) { - log.error(e); - } - log.info('All backlog data removed successfully.'); - } - - /** - * [rebuildDatabase] - */ - static async rebuildDatabase(seqNo) { - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - await this.BLClient.query(`DROP DATABASE ${config.dbInitDB}`); - await this.BLClient.createDB(config.dbInitDB); - this.UserDBClient.setDB(config.dbInitDB); - await this.BLClient.setDB(config.dbBacklog); - const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<=? ORDER BY seq', [seqNo]); - // console.log(records); - for (const record of records) { - log.info(`executing seq(${record.seq})`); - try { - // eslint-disable-next-line no-await-in-loop, no-unused-vars - const result = await this.UserDBClient.query(record.query); - } catch (e) { - log.error(e); - } - // eslint-disable-next-line no-await-in-loop - } - await this.BLClient.execute('DELETE FROM backlog WHERE seq>?', [seqNo]); - await this.clearBuffer(); - } - } catch (e) { - log.error(e); - } - this.buffer = []; - log.info(`DB and backlog rolled back to ${seqNo}`); - } - - /** - * [destroyBacklog] - */ - static async destroyBacklog() { - if (!this.BLClient) this.BLClient = await dbClient.createClient(); - try { - if (config.dbType === 'mysql') { - await this.BLClient.query(`DROP DATABASE ${config.dbBacklog}`); - this.sequenceNumber = 0; - } - } catch (e) { - log.error(e); - } - log.info(`${config.dbBacklog} database and all it's data erased successfully.`); - } - - /** - * [clearBuffer] - */ - static async clearBuffer() { - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - await this.BLClient.query(`DELETE FROM ${config.dbBacklogBuffer}`); - this.bufferSequenceNumber = 0; - this.bufferStartSequenceNumber = 0; - } - } catch (e) { - log.error(e); - } - this.buffer = []; - log.info('All buffer data removed successfully.'); - } - - /** - * [moveBufferToBacklog] - */ - static async moveBufferToBacklog() { - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - - if (config.dbType === 'mysql') { - const records = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogBuffer} ORDER BY seq`); - for (const record of records) { - log.info(`copying seq(${record.seq}) from buffer`); - try { - // eslint-disable-next-line no-await-in-loop - await this.pushQuery(record.query, record.seq, record.timestamp); - } catch (e) { - log.error(e); - } - // eslint-disable-next-line no-await-in-loop - await this.BLClient.execute(`DELETE FROM ${config.dbBacklogBuffer} WHERE seq=?`, [record.seq]); - } - const records2 = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogBuffer} ORDER BY seq`); - if (records2.length > 0) { - this.bufferStartSequenceNumber = records2[0].seq; - } else { - this.bufferStartSequenceNumber = 0; - } - } - // this.clearBuffer(); - log.info('All buffer data moved to backlog successfully.'); - } - - /** - * [pushKey] - */ - static async pushKey(key, value, encrypt = true) { - const encryptedValue = (encrypt) ? Security.encrypt(value) : value; - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - const record = await this.BLClient.execute(`SELECT * FROM ${config.dbOptions} WHERE k=?`, [key]); - if (record.length) { - await this.BLClient.execute(`UPDATE ${config.dbOptions} SET value=? WHERE k=?`, [encryptedValue, key]); - } else { - await this.BLClient.execute(`INSERT INTO ${config.dbOptions} (k, value) VALUES (?,?)`, [key, encryptedValue]); - } - } - } catch (e) { - log.error(e); - } - this.buffer = []; - // log.info('Key pushed.'); - } - - /** - * [getKey] - */ - static async getKey(key, decrypt = true) { - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - const records = await this.BLClient.execute(`SELECT * FROM ${config.dbOptions} WHERE k=?`, [key]); - if (records.length) { - return (decrypt) ? Security.encryptComm(Security.decrypt(records[0].value)) : records[0].value; - } - } - } catch (e) { - log.error(e); - } - return null; - } - - /** - * [removeKey] - */ - static async removeKey(key) { - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - const records = await this.BLClient.execute(`DELETE FROM ${config.dbOptions} WHERE k=?`, [key]); - if (records.length) { - return true; - } - } - } catch (e) { - log.error(e); - } - return false; - } - - /** - * [getAllKeys] - */ - static async getAllKeys() { - const keys = {}; - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - try { - if (config.dbType === 'mysql') { - const records = await this.BLClient.execute(`SELECT * FROM ${config.dbOptions}`); - for (const record of records) { - keys[record.k] = Security.encryptComm(Security.decrypt(record.value)); - } - } - } catch (e) { - log.error(e); - } - return keys; - } - - /** - * [dumpBackup] - */ - static async dumpBackup() { - const timestamp = new Date().getTime(); - if (!this.BLClient) { - this.BLClient = await dbClient.createClient(); - if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); - } - if (this.BLClient) { - const startTime = Date.now(); // Record the start time - await mysqldump({ - connection: { - host: config.dbHost, - port: config.dbPort, - user: config.dbUser, - password: Security.getKey(), - database: config.dbInitDB, - }, - dump: { - schema: { - table: { - dropIfExist: true, - }, - }, - data: { - verbose: false, - }, - }, - dumpToFile: `./dumps/BU_${timestamp}.sql`, - }); - const endTime = Date.now(); // Record the end time - log.info(`Backup file created in (${endTime - startTime} ms): BU_${timestamp}.sql`); - } else { - log.info('Can not connect to the DB'); - } - } - - /** - * [listSqlFiles] - */ - static async listSqlFiles() { - const folderPath = './dumps/'; - try { - const files = fs.readdirSync(folderPath); - - const sqlFilesInfo = files.map((file) => { - const filePath = path.join(folderPath, file); - const fileStats = fs.statSync(filePath); - const isSqlFile = path.extname(file) === '.sql'; - - if (isSqlFile) { - return { - fileName: file, - fileSize: fileStats.size, // in bytes - createdDateTime: fileStats.birthtime, // creation date/time - }; - } else { - return null; // Ignore non-SQL files - } - }); - - // Filter out null entries (non-SQL files) and return the result - return sqlFilesInfo.filter((info) => info !== null); - } catch (error) { - log.error(`Error reading folder: ${error}`); - return []; - } - } - - /** - * [deleteBackupFile] - */ - static async deleteBackupFile(fileName) { - try { - fs.unlinkSync(`./dumps/${fileName}.sql`); - log.info(`File "${fileName}.sql" has been deleted.`); - } catch (error) { - log.error(`Error deleting file "${fileName}": ${error.message}`); - } - } -}// end class - -// eslint-disable-next-line func-names -module.exports = BackLog; +/* eslint-disable no-else-return */ +/* eslint-disable no-restricted-syntax */ +// const timer = require('timers/promises'); +const queryCache = require('memory-cache'); +const fs = require('fs'); +const path = require('path'); +const dbClient = require('./DBClient'); +const config = require('./config'); +const log = require('../lib/log'); +const Security = require('./Security'); +const ConnectionPool = require('../lib/ConnectionPool'); +const utill = require('../lib/utill'); +const mysqldump = require('../modules/mysqldump'); + +class BackLog { + static buffer = []; + + static sequenceNumber = 0; + + static bufferSequenceNumber = 0; + + static bufferStartSequenceNumber = 0; + + static BLClient = null; + + static UserDBClient = null; + + static writeLock = false; + + static executeLogs = true; + + static BLqueryCache = queryCache; + + /** + * [createBacklog] + * @param {object} params [description] + */ + static async createBacklog(UserDBClient) { + this.BLClient = await dbClient.createClient(); + this.UserDBClient = UserDBClient; + try { + if (config.dbType === 'mysql') { + const dbList = await this.BLClient.query(`SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.dbBacklog}'`); + if (dbList.length === 0) { + log.info('Backlog DB not defined yet, creating backlog DB...'); + await this.BLClient.createDB(config.dbBacklog); + } else { + log.info('Backlog DB already exists, moving on...'); + } + await this.BLClient.setDB(config.dbBacklog); + let tableList = await this.BLClient.query(`SELECT * FROM INFORMATION_SCHEMA.tables + WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbBacklogCollection}'`); + if (tableList.length === 0) { + log.info('Backlog table not defined yet, creating backlog table...'); + await this.BLClient.query(`CREATE TABLE ${config.dbBacklogCollection} (seq bigint, query longtext, timestamp bigint) ENGINE=InnoDB;`); + await this.BLClient.query(`ALTER TABLE \`${config.dbBacklog}\`.\`${config.dbBacklogCollection}\` + MODIFY COLUMN \`seq\` bigint(0) UNSIGNED NOT NULL FIRST, + ADD PRIMARY KEY (\`seq\`), + ADD UNIQUE INDEX \`seq\`(\`seq\`);`); + } else { + log.info('Backlog table already exists, moving on...'); + this.sequenceNumber = await this.getLastSequenceNumber(); + } + tableList = await this.BLClient.query(`SELECT * FROM INFORMATION_SCHEMA.tables + WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbBacklogBuffer}'`); + if (tableList.length === 0) { + log.info('Backlog buffer table not defined yet, creating buffer table...'); + await this.BLClient.query(`CREATE TABLE ${config.dbBacklogBuffer} (seq bigint, query longtext, timestamp bigint) ENGINE=InnoDB;`); + await this.BLClient.query(`ALTER TABLE \`${config.dbBacklog}\`.\`${config.dbBacklogBuffer}\` + MODIFY COLUMN \`seq\` bigint(0) UNSIGNED NOT NULL FIRST, + ADD PRIMARY KEY (\`seq\`), + ADD UNIQUE INDEX \`seq\`(\`seq\`);`); + } else { + log.info('Backlog buffer table already exists, moving on...'); + } + tableList = await this.BLClient.query(`SELECT * FROM INFORMATION_SCHEMA.tables + WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbOptions}'`); + if (tableList.length === 0) { + log.info('Backlog options table not defined yet, creating options table...'); + await this.BLClient.query(`CREATE TABLE ${config.dbOptions} (k varchar(64), value text, PRIMARY KEY (k)) ENGINE=InnoDB;`); + } else { + log.info('Backlog options table already exists, moving on...'); + } + log.info(`Last Seq No: ${this.sequenceNumber}`); + } + } catch (e) { + log.error(`Error creating backlog: ${e}`); + } + } + + /** + * [pushQuery] + * @param {string} query [description] + * @param {int} timestamp [description] + * @return {Array} + */ + static async pushQuery(query, seq = 0, timestamp, buffer = false, connId = false, fullQuery = '') { + // eslint-disable-next-line no-param-reassign + if (timestamp === undefined) timestamp = Date.now(); + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + return []; + } + try { + if (config.dbType === 'mysql') { + if (buffer) { + if (this.bufferStartSequenceNumber === 0) this.bufferStartSequenceNumber = seq; + this.bufferSequenceNumber = seq; + await this.BLClient.execute( + `INSERT INTO ${config.dbBacklogBuffer} (seq, query, timestamp) VALUES (?,?,?)`, + [seq, query, timestamp], + ); + return [null, seq, timestamp]; + } else { + this.writeLock = true; + let result = null; + if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; } + const seqForThis = this.sequenceNumber; + const BLResult = await this.BLClient.execute( + `INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`, + [seqForThis, query, timestamp], + ); + if (this.executeLogs) log.info(`executed ${seqForThis}`); + this.BLqueryCache.put(seqForThis, { + query, seq: seqForThis, timestamp, connId, ip: false, + }, 1000 * 30); + this.writeLock = false; + // Abort query execution if there is an error in backlog insert + if (Array.isArray(BLResult) && BLResult[2]) { + log.error(`Error in SQL: ${JSON.stringify(BLResult[2])}`); + } else { + if (connId === false) { + result = await this.UserDBClient.query(query, false, fullQuery); + } else if (connId >= 0) { + result = await ConnectionPool.getConnectionById(connId).query(query, false, fullQuery); + } + if (Array.isArray(result) && result[2]) { + log.error(`Error in SQL: ${JSON.stringify(result[2])}`); + } + } + return [result, seqForThis, timestamp]; + } + /* + if (seq === 0 || this.sequenceNumber + 1 === seq) { + + while (this.writeLock) await timer.setTimeout(10); + this.writeLock = true; + if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; } + const seqForThis = this.sequenceNumber; + let result2 = null; + if (connId === false) { + result2 = await this.UserDBClient.query(query); + } else { + result2 = await ConnectionPool.getConnectionById(connId).query(query); + } + await this.BLClient.execute( + `INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`, + [seqForThis, query, timestamp], + ); + this.writeLock = false; + return [result2, seqForThis, timestamp]; + } else if (this.bufferStartSequenceNumber === this.sequenceNumber + 1) { + await this.moveBufferToBacklog(); + return await this.pushQuery(query, seq, timestamp, buffer, connId); + } else { + if (this.sequenceNumber + 1 < seq) { + log.error(`Wrong query order, ${this.sequenceNumber + 1} < ${seq}. pushing to buffer.`); + if (this.bufferStartSequenceNumber === 0) this.bufferStartSequenceNumber = seq; + this.bufferSequenceNumber = seq; + await this.BLClient.execute( + `INSERT INTO ${config.dbBacklogBuffer} (seq, query, timestamp) VALUES (?,?,?)`, + [seq, query, timestamp], + ); + } + return []; + } */ + } + } catch (e) { + this.writeLock = false; + log.error(`error executing query, ${query}, ${seq}`); + log.error(e); + } + return []; + } + + /** + * [getLogs] + * @param {int} startFrom [description] + * @param {int} pageSize [description] + * @return {Array} + */ + static async getLogs(startFrom, pageSize) { + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + return []; + } + try { + if (config.dbType === 'mysql') { + const totalRecords = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq >= ${startFrom} ORDER BY seq LIMIT ${pageSize}`); + const trimedRecords = utill.trimArrayToSize(totalRecords, 3 * 1024 * 1024); + log.info(`sending backlog records ${startFrom},${pageSize}, records: ${trimedRecords.length}`); + return trimedRecords; + } + } catch (e) { + log.error(e); + } + return []; + } + + /** + * [getLogsByTime] + * @param {int} startFrom [description] + * @param {int} length [description] + * @return {Array} + */ + static async getLogsByTime(startFrom, length) { + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + return []; + } + try { + if (config.dbType === 'mysql') { + const totalRecords = await this.BLClient.execute(`SELECT seq, LEFT(query,10) as query, timestamp FROM ${config.dbBacklogCollection} WHERE timestamp >= ? AND timestamp < ? ORDER BY seq`, [startFrom, Number(startFrom) + Number(length)]); + return totalRecords; + } + } catch (e) { + log.error(e); + } + return []; + } + + /** + * [getLogs] + * @param {int} index [description] + * @return {object} + */ + static async getLog(index) { + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + return []; + } + try { + if (config.dbType === 'mysql') { + const record = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=${index}`); + // log.info(`backlog records ${startFrom},${pageSize}:${JSON.stringify(totalRecords)}`); + return record; + } + } catch (e) { + log.error(e); + } + return []; + } + + /** + * [getDateRange] + * @return {object} + */ + static async getDateRange() { + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + return []; + } + try { + if (config.dbType === 'mysql') { + const record = await this.BLClient.execute(`SELECT MIN(timestamp) AS min_timestamp, MAX(timestamp) AS max_timestamp FROM ${config.dbBacklogCollection}`); + log.info(record); + return record[0]; + } + } catch (e) { + log.error(e); + } + return []; + } + + /** + * [getTotalLogsCount] + * @return {int} + */ + static async getTotalLogsCount() { + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + } else { + try { + if (config.dbType === 'mysql') { + const totalRecords = await this.BLClient.query(`SELECT count(*) as total FROM ${config.dbBacklogCollection}`); + log.info(`Total Records: ${JSON.stringify(totalRecords)}`); + return totalRecords[0].total; + } + } catch (e) { + log.error(e); + } + } + return 0; + } + + /** + * [getLastSequenceNumber] + * @return {int} + */ + static async getLastSequenceNumber(buffer = false) { + if (!this.BLClient) { + log.error('Backlog not created yet. Call createBacklog() first.'); + } else { + try { + if (config.dbType === 'mysql') { + let records = []; + if (buffer) { + records = await this.BLClient.query(`SELECT seq as seqNo FROM ${config.dbBacklogBuffer} ORDER BY seq DESC LIMIT 1`); + } else { + records = await this.BLClient.query(`SELECT seq as seqNo FROM ${config.dbBacklogCollection} ORDER BY seq DESC LIMIT 1`); + } + if (records.length) return records[0].seqNo; + } + } catch (e) { + log.error(e); + } + } + return 0; + } + + /** + * [keepConnections] + */ + static async keepConnections() { + if (config.dbType === 'mysql' && this.BLClient) { + await this.BLClient.setDB(config.dbBacklog); + await this.UserDBClient.setDB(config.dbInitDB); + } + } + + /** + * [clearLogs] + */ + static async clearLogs() { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + await this.BLClient.query(`DELETE FROM ${config.dbBacklogCollection}`); + this.sequenceNumber = 0; + } + } catch (e) { + log.error(e); + } + log.info('All backlog data removed successfully.'); + } + + /** + * [rebuildDatabase] + */ + static async rebuildDatabase(seqNo) { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + await this.BLClient.query(`DROP DATABASE ${config.dbInitDB}`); + await this.BLClient.createDB(config.dbInitDB); + this.UserDBClient.setDB(config.dbInitDB); + await this.BLClient.setDB(config.dbBacklog); + const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<=? ORDER BY seq', [seqNo]); + // console.log(records); + for (const record of records) { + log.info(`executing seq(${record.seq})`); + try { + // eslint-disable-next-line no-await-in-loop, no-unused-vars + const result = await this.UserDBClient.query(record.query); + } catch (e) { + log.error(e); + } + // eslint-disable-next-line no-await-in-loop + } + await this.BLClient.execute('DELETE FROM backlog WHERE seq>?', [seqNo]); + await this.clearBuffer(); + } + } catch (e) { + log.error(e); + } + this.buffer = []; + log.info(`DB and backlog rolled back to ${seqNo}`); + } + + /** + * [destroyBacklog] + */ + static async destroyBacklog() { + if (!this.BLClient) this.BLClient = await dbClient.createClient(); + try { + if (config.dbType === 'mysql') { + await this.BLClient.query(`DROP DATABASE ${config.dbBacklog}`); + this.sequenceNumber = 0; + } + } catch (e) { + log.error(e); + } + log.info(`${config.dbBacklog} database and all it's data erased successfully.`); + } + + /** + * [clearBuffer] + */ + static async clearBuffer() { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + await this.BLClient.query(`DELETE FROM ${config.dbBacklogBuffer}`); + this.bufferSequenceNumber = 0; + this.bufferStartSequenceNumber = 0; + } + } catch (e) { + log.error(e); + } + this.buffer = []; + log.info('All buffer data removed successfully.'); + } + + /** + * [moveBufferToBacklog] + */ + static async moveBufferToBacklog() { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + + if (config.dbType === 'mysql') { + const records = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogBuffer} ORDER BY seq`); + for (const record of records) { + log.info(`copying seq(${record.seq}) from buffer`); + try { + // eslint-disable-next-line no-await-in-loop + await this.pushQuery(record.query, record.seq, record.timestamp); + } catch (e) { + log.error(e); + } + // eslint-disable-next-line no-await-in-loop + await this.BLClient.execute(`DELETE FROM ${config.dbBacklogBuffer} WHERE seq=?`, [record.seq]); + } + const records2 = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogBuffer} ORDER BY seq`); + if (records2.length > 0) { + this.bufferStartSequenceNumber = records2[0].seq; + } else { + this.bufferStartSequenceNumber = 0; + } + } + // this.clearBuffer(); + log.info('All buffer data moved to backlog successfully.'); + } + + /** + * [pushKey] + */ + static async pushKey(key, value, encrypt = true) { + const encryptedValue = (encrypt) ? Security.encrypt(value) : value; + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + const record = await this.BLClient.execute(`SELECT * FROM ${config.dbOptions} WHERE k=?`, [key]); + if (record.length) { + await this.BLClient.execute(`UPDATE ${config.dbOptions} SET value=? WHERE k=?`, [encryptedValue, key]); + } else { + await this.BLClient.execute(`INSERT INTO ${config.dbOptions} (k, value) VALUES (?,?)`, [key, encryptedValue]); + } + } + } catch (e) { + log.error(e); + } + this.buffer = []; + // log.info('Key pushed.'); + } + + /** + * [getKey] + */ + static async getKey(key, decrypt = true) { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + const records = await this.BLClient.execute(`SELECT * FROM ${config.dbOptions} WHERE k=?`, [key]); + if (records.length) { + return (decrypt) ? Security.encryptComm(Security.decrypt(records[0].value)) : records[0].value; + } + } + } catch (e) { + log.error(e); + } + return null; + } + + /** + * [removeKey] + */ + static async removeKey(key) { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + const records = await this.BLClient.execute(`DELETE FROM ${config.dbOptions} WHERE k=?`, [key]); + if (records.length) { + return true; + } + } + } catch (e) { + log.error(e); + } + return false; + } + + /** + * [getAllKeys] + */ + static async getAllKeys() { + const keys = {}; + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + const records = await this.BLClient.execute(`SELECT * FROM ${config.dbOptions}`); + for (const record of records) { + keys[record.k] = Security.encryptComm(Security.decrypt(record.value)); + } + } + } catch (e) { + log.error(e); + } + return keys; + } + + /** + * [dumpBackup] + */ + static async dumpBackup() { + const timestamp = new Date().getTime(); + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + if (this.BLClient) { + const startTime = Date.now(); // Record the start time + await mysqldump({ + connection: { + host: config.dbHost, + port: config.dbPort, + user: config.dbUser, + password: Security.getKey(), + database: config.dbInitDB, + }, + dump: { + schema: { + table: { + dropIfExist: true, + }, + }, + data: { + verbose: false, + }, + }, + dumpToFile: `./dumps/BU_${timestamp}.sql`, + }); + const endTime = Date.now(); // Record the end time + log.info(`Backup file created in (${endTime - startTime} ms): BU_${timestamp}.sql`); + } else { + log.info('Can not connect to the DB'); + } + } + + /** + * [listSqlFiles] + */ + static async listSqlFiles() { + const folderPath = './dumps/'; + try { + const files = fs.readdirSync(folderPath); + + const sqlFilesInfo = files.map((file) => { + const filePath = path.join(folderPath, file); + const fileStats = fs.statSync(filePath); + const isSqlFile = path.extname(file) === '.sql'; + + if (isSqlFile) { + return { + fileName: file, + fileSize: fileStats.size, // in bytes + createdDateTime: fileStats.birthtime, // creation date/time + }; + } else { + return null; // Ignore non-SQL files + } + }); + + // Filter out null entries (non-SQL files) and return the result + return sqlFilesInfo.filter((info) => info !== null); + } catch (error) { + log.error(`Error reading folder: ${error}`); + return []; + } + } + + /** + * [deleteBackupFile] + */ + static async deleteBackupFile(fileName) { + try { + fs.unlinkSync(`./dumps/${fileName}.sql`); + log.info(`File "${fileName}.sql" has been deleted.`); + } catch (error) { + log.error(`Error deleting file "${fileName}": ${error.message}`); + } + } + + /** + * [purgeBinLogs] + */ + static async purgeBinLogs() { + if (!this.BLClient) { + this.BLClient = await dbClient.createClient(); + if (this.BLClient && config.dbType === 'mysql') await this.BLClient.setDB(config.dbBacklog); + } + try { + if (config.dbType === 'mysql') { + await this.BLClient.execute('FLUSH LOGS'); + await this.BLClient.execute("PURGE BINARY LOGS BEFORE '2026-04-03'"); + } + } catch (e) { + log.error(e); + } + } +}// end class + +// eslint-disable-next-line func-names +module.exports = BackLog; diff --git a/ClusterOperator/config.js b/ClusterOperator/config.js index cc47a93..1f41879 100644 --- a/ClusterOperator/config.js +++ b/ClusterOperator/config.js @@ -1,24 +1,24 @@ -module.exports = { - dbHost: process.env.DB_COMPONENT_NAME || 'localhost', - dbType: process.env.DB_TYPE || 'mysql', - dbUser: 'root', - dbPass: process.env.DB_INIT_PASS || 'secret', - dbPort: 3306, - dbBacklog: 'flux_backlog', - dbBacklogCollection: 'backlog', - dbBacklogBuffer: 'backlog_buffer', - dbOptions: 'options', - dbInitDB: process.env.INIT_DB_NAME || 'test_db', - externalDBPort: process.env.EXT_DB_PORT || 3307, - apiPort: 7071, - debugUIPort: 8008, - containerDBPort: String(process.env.DB_PORT || 33949).trim(), - containerApiPort: String(process.env.API_PORT || 33950).trim(), - DBAppName: process.env.DB_APPNAME || 'wordpressonflux', - AppName: process.env.CLIENT_APPNAME || 'explorer', - version: '1.2.3', - whiteListedIps: process.env.WHITELIST || '127.0.0.1', - debugMode: true, - authMasterOnly: process.env.AUTH_MASTER_ONLY || false, - ssl: false, -}; +module.exports = { + dbHost: process.env.DB_COMPONENT_NAME || 'localhost', + dbType: process.env.DB_TYPE || 'mysql', + dbUser: 'root', + dbPass: process.env.DB_INIT_PASS || 'secret', + dbPort: 3306, + dbBacklog: 'flux_backlog', + dbBacklogCollection: 'backlog', + dbBacklogBuffer: 'backlog_buffer', + dbOptions: 'options', + dbInitDB: process.env.INIT_DB_NAME || 'test_db', + externalDBPort: process.env.EXT_DB_PORT || 3307, + apiPort: 7071, + debugUIPort: 8008, + containerDBPort: String(process.env.DB_PORT || 33949).trim(), + containerApiPort: String(process.env.API_PORT || 33950).trim(), + DBAppName: process.env.DB_APPNAME || 'wordpressonflux', + AppName: process.env.CLIENT_APPNAME || 'explorer', + version: '1.2.4', + whiteListedIps: process.env.WHITELIST || '127.0.0.1', + debugMode: true, + authMasterOnly: process.env.AUTH_MASTER_ONLY || false, + ssl: false, +}; diff --git a/ClusterOperator/server.js b/ClusterOperator/server.js index e23554b..e2afac1 100644 --- a/ClusterOperator/server.js +++ b/ClusterOperator/server.js @@ -1,672 +1,674 @@ -/* eslint-disable consistent-return */ -/* eslint-disable no-await-in-loop */ -/* eslint-disable no-restricted-syntax */ -/* eslint-disable no-unused-vars */ -const { Server } = require('socket.io'); -const https = require('https'); -const timer = require('timers/promises'); -const express = require('express'); -const RateLimit = require('express-rate-limit'); -const fileUpload = require('express-fileupload'); -const bodyParser = require('body-parser'); -const cookieParser = require('cookie-parser'); -const cors = require('cors'); -const path = require('path'); -const fs = require('fs'); -const qs = require('qs'); -const sanitize = require('sanitize-filename'); -const queryCache = require('memory-cache'); -const Operator = require('./Operator'); -const BackLog = require('./Backlog'); -const IdService = require('./IdService'); -const log = require('../lib/log'); -const utill = require('../lib/utill'); -const config = require('./config'); -const Security = require('./Security'); -const fluxAPI = require('../lib/fluxAPI'); -const SqlImporter = require('../modules/mysql-import'); - -/** -* [auth] -* @param {string} ip [description] -*/ -function auth(ip) { - const whiteList = config.whiteListedIps.split(','); - if (whiteList.length && whiteList.includes(ip)) return true; - // only operator nodes can connect - const idx = Operator.OpNodes.findIndex((item) => item.ip === ip); - if (idx === -1) return false; - return true; -} -/** -* [authUser] -*/ -function authUser(req) { - let remoteIp = utill.convertIP(req.ip); - if (!remoteIp) remoteIp = req.socket.address().address; - let loginphrase = false; - if (req.headers.loginphrase) { - loginphrase = req.headers.loginphrase; - } else { - loginphrase = req.cookies.loginphrase; - } - if (loginphrase && IdService.verifySession(loginphrase, remoteIp)) { - return true; - } - return false; -} -/** - * To check if a parameter is an object and if not, return an empty object. - * @param {*} parameter Parameter of any type. - * @returns {object} Returns the original parameter if it is an object or returns an empty object. - */ -function ensureObject(parameter) { - if (typeof parameter === 'object') { - return parameter; - } - if (!parameter) { - return {}; - } - let param; - try { - param = JSON.parse(parameter); - } catch (e) { - param = qs.parse(parameter); - } - if (typeof param !== 'object') { - return {}; - } - return param; -} -/** -* Starts UI service -*/ -function startUI() { - const app = express(); - app.use(cors()); - app.use(cookieParser()); - app.use(bodyParser.json()); - app.use(bodyParser.urlencoded({ extended: false })); - app.use(fileUpload()); - const limiter = RateLimit({ - windowMs: 15 * 60 * 1000, // 15 minutes - max: 100, // max 100 requests per windowMs - }); - fs.writeFileSync('errors.txt', `version: ${config.version}
`); - fs.writeFileSync('warnings.txt', `version: ${config.version}
`); - fs.writeFileSync('info.txt', `version: ${config.version}
`); - fs.writeFileSync('query.txt', `version: ${config.version}
`); - fs.appendFileSync('debug.txt', `------------------------------------------------------
version: ${config.version}
`); - - app.options('/*', (req, res, next) => { - res.header('Access-Control-Allow-Origin', '*'); - res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS'); - res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, Content-Length, X-Requested-With'); - res.send(200); - }); - app.get('/', (req, res) => { - const { host } = req.headers; - if (host) { - if (authUser(req)) { - res.sendFile(path.join(__dirname, '../ui/index.html')); - } else { - res.sendFile(path.join(__dirname, '../ui/login.html')); - } - } else if (Operator.IamMaster) { // request comming from fdm - res.send('OK'); - } else { - res.status(500).send('Bad Request'); - } - }); - - app.get('/assets/zelID.svg', (req, res) => { - res.sendFile(path.join(__dirname, '../ui/assets/zelID.svg')); - }); - app.get('/assets/Flux_white-blue.svg', (req, res) => { - res.sendFile(path.join(__dirname, '../ui/assets/Flux_white-blue.svg')); - }); - // apply rate limiter to all requests below - app.use(limiter); - app.get('/logs/:file?', (req, res) => { - const remoteIp = utill.convertIP(req.ip); - let { file } = req.params; - file = file || req.query.file; - const whiteList = config.whiteListedIps.split(','); - let logFile = 'errors.txt'; - switch (file) { - case 'info': - logFile = 'info.txt'; - break; - case 'warnings': - logFile = 'warnings.txt'; - break; - case 'debug': - logFile = 'debug.txt'; - break; - case 'query': - logFile = 'query.txt'; - break; - default: - logFile = 'errors.txt'; - break; - } - - if (whiteList.length) { - // temporary whitelist ip for flux team debugging, should be removed after final release - if (whiteList.includes(remoteIp) || remoteIp === '167.235.234.45' || remoteIp === '45.89.52.198') { - res.send(`FluxDB Debug Screen
${fs.readFileSync(logFile).toString()}`); - } - } - }); - - app.post('/rollback', async (req, res) => { - if (authUser(req)) { - let { seqNo } = req.body; - seqNo = seqNo || req.query.seqNo; - if (seqNo) { - await Operator.rollBack(seqNo); - res.send({ status: 'OK' }); - } - } - }); - - app.get('/nodelist', (req, res) => { - if (authUser(req)) { - res.send(Operator.OpNodes); - res.end(); - } else { - res.status(403).send('Bad Request'); - } - }); - app.get('/getstatus', async (req, res) => { - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache'); - res.setHeader('Connection', 'keep-alive'); - res.setHeader('Access-Control-Allow-Origin', '*'); - res.setHeader('X-Accel-Buffering', 'no'); - let count = 1; - while (true) { - res.write(`${JSON.stringify({ - type: 'stream', - chunk: count++, - })}\r\n\r\n`); - await timer.setTimeout(2000); - //console.log(count); - } - }); - - app.get('/status', (req, res) => { - res.header('Access-Control-Allow-Origin', '*'); - res.header('Access-Control-Allow-Headers', 'X-Requested-With'); - res.send({ - status: Operator.status, - sequenceNumber: BackLog.sequenceNumber, - masterIP: Operator.getMaster(), - }); - res.end(); - }); - - app.get('/getLogDateRange', async (req, res) => { - if (authUser(req)) { - res.send(await BackLog.getDateRange()); - res.end(); - } else { - res.status(403).send('Bad Request'); - } - }); - - app.get('/getLogsByTime', async (req, res) => { - if (authUser(req)) { - const { starttime } = req.query; - const { length } = req.query; - res.send(await BackLog.getLogsByTime(starttime, length)); - res.end(); - } else { - res.status(403).send('Bad Request'); - } - }); - - app.get('/disableWrites', (req, res) => { - const remoteIp = utill.convertIP(req.ip); - const whiteList = config.whiteListedIps.split(','); - if (whiteList.length) { - if (whiteList.includes(remoteIp)) { - log.info('Writes Disabled'); - Operator.status = 'DISABLED'; - } - } - }); - - app.get('/enableWrites', (req, res) => { - const remoteIp = utill.convertIP(req.ip); - const whiteList = config.whiteListedIps.split(','); - if (whiteList.length) { - if (whiteList.includes(remoteIp)) { - log.info('Writes Enabled'); - Operator.status = 'OK'; - } - } - }); - - app.get('/secret/:key', (req, res) => { - const remoteIp = utill.convertIP(req.ip); - const whiteList = config.whiteListedIps.split(','); - if (whiteList.length) { - if (whiteList.includes(remoteIp)) { - const { key } = req.params; - const value = BackLog.getKey(`_sk${key}`); - if (value) { - res.send(value); - } else { - res.status(404).send('Key not found'); - } - } - } - }); - - app.post('/secret/', (req, res) => { - const remoteIp = utill.convertIP(req.ip); - const whiteList = config.whiteListedIps.split(','); - let secret = req.body; - BackLog.pushKey(`_sk${secret.key}`, secret.value, true); - // console.log(secret.key); - if (whiteList.length) { - if (whiteList.includes(remoteIp)) { - secret = req.body; - value = BackLog.pushKey(`_sk${secret.key}`, secret.value, true); - if (value) { - res.send('OK'); - } - } - } - res.status(404).send('Key not found'); - }); - - app.delete('/secret/:key', async (req, res) => { - const remoteIp = utill.convertIP(req.ip); - const whiteList = config.whiteListedIps.split(','); - if (whiteList.length) { - if (whiteList.includes(remoteIp)) { - const { key } = req.params; - if (await BackLog.removeKey(`_sk${key}`)) { - res.send('OK'); - } - } - } - res.status(404).send('Key not found'); - }); - app.get('/listbackups', async (req, res) => { - if (authUser(req)) { - res.send(await BackLog.listSqlFiles()); - res.end(); - } else { - res.status(403).send('Bad Request'); - } - }); - app.get('/getbackupfile/:filename', async (req, res) => { - if (authUser(req)) { - const { filename } = req.params; - res.download(path.join(__dirname, `../dumps/${sanitize(filename)}.sql`), `${sanitize(filename)}.sql`, (err) => { - if (err) { - // Handle errors, such as file not found - res.status(404).send('File not found'); - } - }); - } else { - res.status(403).send('Bad Request'); - } - }); - app.post('/upload-sql', async (req, res) => { - if (authUser(req)) { - if (!req.files || !req.files.sqlFile) { - return res.status(400).send('No file uploaded.'); - } - const { sqlFile } = req.files; - const uploadPath = path.join(__dirname, '../dumps/', sqlFile.name); // Adjust the destination folder as needed - // Move the uploaded .sql file to the specified location - sqlFile.mv(uploadPath, (err) => { - if (err) { - return res.status(500).send(`Error uploading file: ${err.message}`); - } - res.send('File uploaded successfully.'); - }); - } else { - res.status(403).send('Bad Request'); - } - }); - app.post('/generatebackup', async (req, res) => { - if (authUser(req)) { - res.send(await BackLog.dumpBackup()); - res.end(); - } else { - res.status(403).send('Bad Request'); - } - }); - app.post('/deletebackup', async (req, res) => { - if (authUser(req)) { - const { body } = req; - if (body) { - const { filename } = body; - res.send(await BackLog.deleteBackupFile(sanitize(filename))); - res.end(); - } - } else { - res.status(403).send('Bad Request'); - } - }); - app.post('/executebackup', async (req, res) => { - if (authUser(req)) { - const { body } = req; - if (body) { - const { filename } = body; - // create a snapshot - await BackLog.dumpBackup(); - // removing old db + resetting secuence numbers: - await Operator.rollBack(0); - await timer.setTimeout(2000); - const importer = new SqlImporter({ - callback: Operator.sendWriteQuery, - serverSocket: Operator.serverSocket, - }); - importer.onProgress((progress) => { - const percent = Math.floor((progress.bytes_processed / progress.total_bytes) * 10000) / 100; - log.info(`${percent}% Completed`, 'cyan'); - }); - importer.setEncoding('utf8'); - await importer.import(`./dumps/${sanitize(filename)}.sql`).then(async () => { - const filesImported = importer.getImported(); - log.info(`${filesImported.length} SQL file(s) imported.`); - res.send('OK'); - }).catch((err) => { - res.status(500).send(JSON.stringify(err)); - log.error(err); - }); - res.end(); - } - } else { - res.status(403).send('Bad Request'); - } - }); - app.get('/isloggedin/', (req, res) => { - if (authUser(req)) { - res.cookie('loginphrase', req.headers.loginphrase); - res.send('OK'); - } else { - res.status(403).send('Bad Request'); - } - }); - app.post('/verifylogin/', (req, res) => { - let { body } = req; - if (body) { - const { signature } = body; - const { message } = body; - if (IdService.verifyLogin(message, signature)) { - let remoteIp = utill.convertIP(req.ip); - if (!remoteIp) remoteIp = req.socket.address().address; - IdService.addNewSession(message, remoteIp); - Operator.emitUserSession('add', message, remoteIp); - res.cookie('loginphrase', message); - res.send('OK'); - } else { - res.send('SIGNATURE NOT VALID'); - } - } - req.on('data', (data) => { - body += data; - }); - req.on('end', async () => { - try { - const processedBody = ensureObject(body); - let { signature } = processedBody; - if (!signature) signature = req.query.signature; - const message = processedBody.loginPhrase || processedBody.message || req.query.message; - if (IdService.verifyLogin(message, signature)) { - let remoteIp = utill.convertIP(req.ip); - if (!remoteIp) remoteIp = req.socket.address().address; - IdService.addNewSession(message, remoteIp); - Operator.emitUserSession('add', message, remoteIp); - res.cookie('loginphrase', message); - res.send('OK'); - } else { - res.send('SIGNATURE NOT VALID'); - } - } catch (error) { - log.error(error); - } - res.send('Error'); - }); - }); - - app.get('/loginphrase/', (req, res) => { - res.send(IdService.generateLoginPhrase()); - }); - - app.get('/logout/', (req, res) => { - if (authUser(req)) { - IdService.removeSession(req.cookies.loginphrase); - Operator.emitUserSession('remove', req.cookies.loginphrase, ''); - res.send('OK'); - } else { - res.status(403).send('Bad Request'); - } - }); - - if (config.ssl) { - const keys = Security.generateRSAKey(); - const httpsOptions = { - key: keys.pemPrivateKey, - cert: keys.pemCertificate, - }; - https.createServer(httpsOptions, app).listen(config.debugUIPort, () => { - log.info(`starting SSL interface on port ${config.debugUIPort}`); - }); - } else { - app.listen(config.debugUIPort, () => { - log.info(`starting interface on port ${config.debugUIPort}`); - }); - } -} -/** -* [validate] -* @param {string} ip [description] -*/ -async function validate(ip) { - if (Operator.AppNodes.includes(ip)) return true; - return false; - // const validateApp = await fluxAPI.validateApp(config.DBAppName, ip); - // if (validateApp) return true; - // return false; -} -/** -* [initServer] -*/ -async function initServer() { - Security.init(); - startUI(); - await Operator.init(); - const io = new Server(config.apiPort, { transports: ['websocket', 'polling'], maxHttpBufferSize: 4 * 1024 * 1024 }); - // const app = new App(); - // io.attachApp(app); - Operator.setServerSocket(io); - - io.on('connection', async (socket) => { - const ip = utill.convertIP(socket.handshake.address); - log.debug(`connection from ${ip}`, 'red'); - if (auth(ip)) { - // log.info(`validating ${ip}: ${await auth(ip)}`); - socket.on('disconnect', (reason) => { - log.info(`disconnected from ${ip}`, 'red'); - }); - socket.on('getStatus', async (callback) => { - // log.info(`getStatus from ${ip}`); - callback({ - status: Operator.status, - sequenceNumber: BackLog.sequenceNumber, - remoteIP: utill.convertIP(socket.handshake.address), - masterIP: Operator.getMaster(), - }); - }); - socket.on('getMaster', async (callback) => { - // log.info(`getMaster from ${ip}`); - callback({ status: 'success', message: Operator.getMaster() }); - }); - socket.on('getMyIp', async (callback) => { - // log.info(`getMyIp from ${ip}`); - callback({ status: 'success', message: utill.convertIP(socket.handshake.address) }); - }); - socket.on('getBackLog', async (start, callback) => { - const records = await BackLog.getLogs(start, 200); - callback({ status: Operator.status, sequenceNumber: BackLog.sequenceNumber, records }); - }); - socket.on('writeQuery', async (query, connId, callback) => { - log.info(`writeQuery from ${utill.convertIP(socket.handshake.address)}:${connId}`); - /* - if (BackLog.writeLock) { - const myTicket = Operator.getTicket(); - log.info(`put into queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan'); - Operator.masterQueue.push(myTicket); - while (BackLog.writeLock || Operator.masterQueue[0] !== myTicket) { - await timer.setTimeout(5); - } - BackLog.writeLock = true; - Operator.masterQueue.shift(); - log.info(`out of queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan'); - } - */ - const result = await BackLog.pushQuery(query); - // log.info(`forwarding query to slaves: ${JSON.stringify(result)}`); - socket.broadcast.emit('query', query, result[1], result[2], false); - socket.emit('query', query, result[1], result[2], connId); - // cache write queries for 20 seconds - queryCache.put(result[1], { - query, seq: result[1], timestamp: result[2], connId, ip, - }, 1000 * 60); - callback({ status: Operator.status, result: result[0] }); - }); - socket.on('askQuery', async (index, callback) => { - log.info(`${ip} asking for seqNo: ${index}`, 'magenta'); - const record = queryCache.get(index); - let connId = false; - if (record) { - if (record.ip === ip && record.connId) connId = record.connId; - log.info(`sending query: ${index}`, 'magenta'); - socket.emit('query', record.query, record.seq, record.timestamp, connId); - } else { - log.warn(`query ${index} not in query cache`, 'red'); - let BLRecord = BackLog.BLqueryCache.get(index); - log.info(JSON.stringify(BLRecord), 'red'); - if (!BLRecord) { - BLRecord = await BackLog.getLog(index); - log.info(`from DB : ${JSON.stringify(BLRecord)}`, 'red'); - try { - socket.emit('query', BLRecord[0].query, BLRecord[0].seq, BLRecord[0].timestamp, connId); - } catch (err) { - log.error(JSON.stringify(err)); - } - } - } - // if (record) { - - // log.info(`record type: ${Array.isArray(record)}`, 'magenta'); - // if (Array.isArray(record)) { - // socket.emit('query', record[0].query, record[0].seq, record[0].timestamp, false); - // log.warn(`query ${index} not in query cache`, 'red'); - // } else { - - // } - // } - callback({ status: Operator.status }); - }); - socket.on('shareKeys', async (pubKey, callback) => { - const nodeip = utill.convertIP(socket.handshake.address); - // log.info(`shareKeys from ${nodeip}`); - let nodeKey = null; - if (!(`N${nodeip}` in Operator.keys)) { - Operator.keys = await BackLog.getAllKeys(); - if (`N${nodeip}` in Operator.keys) nodeKey = Operator.keys[`N${nodeip}`]; - if (nodeKey) nodeKey = Security.publicEncrypt(pubKey, Buffer.from(nodeKey, 'hex')); - } - callback({ - status: Operator.status, - commAESKey: Security.publicEncrypt(pubKey, Security.getCommAESKey()), - commAESIV: Security.publicEncrypt(pubKey, Security.getCommAESIv()), - key: nodeKey, - }); - }); - socket.on('updateKey', async (key, value, callback) => { - const decKey = Security.decryptComm(key); - log.info(`updateKey from ${decKey}`); - await BackLog.pushKey(decKey, value); - Operator.keys[decKey] = value; - socket.broadcast.emit('updateKey', key, value); - callback({ status: Operator.status }); - }); - socket.on('getKeys', async (callback) => { - const keysToSend = {}; - const nodeip = utill.convertIP(socket.handshake.address); - for (const key in Operator.keys) { - if ((key.startsWith('N') || key.startsWith('_')) && key !== `N${nodeip}`) { - keysToSend[key] = Operator.keys[key]; - } - } - keysToSend[`N${Operator.myIP}`] = Security.encryptComm(`${Security.getKey()}:${Security.getIV()}`); - callback({ status: Operator.status, keys: Security.encryptComm(JSON.stringify(keysToSend)) }); - }); - socket.on('resetMaster', async (callback) => { - if (Operator.IamMaster) { - Object.keys(io.sockets.sockets).forEach((s) => { - io.sockets.sockets[s].disconnect(true); - }); - Operator.findMaster(); - } - callback({ status: Operator.status }); - }); - socket.on('rollBack', async (seqNo, callback) => { - if (Operator.IamMaster) { - Operator.rollBack(seqNo); - } - callback({ status: Operator.status }); - }); - socket.on('userSession', async (op, key, value, callback) => { - if (op === 'add') { IdService.addNewSession(key, value); } else { IdService.removeSession(key); } - socket.broadcast.emit('userSession', op, key, value); - callback({ status: Operator.status }); - }); - } else { - log.warn(`rejected from ${ip}`); - socket.disconnect(); - } - if (await validate(ip)) { - // log.info(`auth: ${ip} is validated`); - } else { - log.warn(`validation failed for ${ip}`, 'red'); - socket.disconnect(); - } - }); - IdService.init(); - log.info(`Api Server started on port ${config.apiPort}`); - await Operator.findMaster(); - log.info(`find master finished, master is ${Operator.masterNode}`); - if (!Operator.IamMaster) { - Operator.initMasterConnection(); - } - setInterval(async () => { - Operator.doHealthCheck(); - }, 120000); -} - -initServer(); +/* eslint-disable consistent-return */ +/* eslint-disable no-await-in-loop */ +/* eslint-disable no-restricted-syntax */ +/* eslint-disable no-unused-vars */ +const { Server } = require('socket.io'); +const https = require('https'); +const timer = require('timers/promises'); +const express = require('express'); +const RateLimit = require('express-rate-limit'); +const fileUpload = require('express-fileupload'); +const bodyParser = require('body-parser'); +const cookieParser = require('cookie-parser'); +const cors = require('cors'); +const path = require('path'); +const fs = require('fs'); +const qs = require('qs'); +const sanitize = require('sanitize-filename'); +const queryCache = require('memory-cache'); +const Operator = require('./Operator'); +const BackLog = require('./Backlog'); +const IdService = require('./IdService'); +const log = require('../lib/log'); +const utill = require('../lib/utill'); +const config = require('./config'); +const Security = require('./Security'); +const SqlImporter = require('../modules/mysql-import'); + +/** +* [auth] +* @param {string} ip [description] +*/ +function auth(ip) { + const whiteList = config.whiteListedIps.split(','); + if (whiteList.length && whiteList.includes(ip)) return true; + // only operator nodes can connect + const idx = Operator.OpNodes.findIndex((item) => item.ip === ip); + if (idx === -1) return false; + return true; +} +/** +* [authUser] +*/ +function authUser(req) { + let remoteIp = utill.convertIP(req.ip); + if (!remoteIp) remoteIp = req.socket.address().address; + let loginphrase = false; + if (req.headers.loginphrase) { + loginphrase = req.headers.loginphrase; + } else { + loginphrase = req.cookies.loginphrase; + } + if (loginphrase && IdService.verifySession(loginphrase, remoteIp)) { + return true; + } + return false; +} +/** + * To check if a parameter is an object and if not, return an empty object. + * @param {*} parameter Parameter of any type. + * @returns {object} Returns the original parameter if it is an object or returns an empty object. + */ +function ensureObject(parameter) { + if (typeof parameter === 'object') { + return parameter; + } + if (!parameter) { + return {}; + } + let param; + try { + param = JSON.parse(parameter); + } catch (e) { + param = qs.parse(parameter); + } + if (typeof param !== 'object') { + return {}; + } + return param; +} +/** +* Starts UI service +*/ +function startUI() { + const app = express(); + app.use(cors()); + app.use(cookieParser()); + app.use(bodyParser.json()); + app.use(bodyParser.urlencoded({ extended: false })); + app.use(fileUpload()); + const limiter = RateLimit({ + windowMs: 15 * 60 * 1000, // 15 minutes + max: 100, // max 100 requests per windowMs + }); + fs.writeFileSync('errors.txt', `version: ${config.version}
`); + fs.writeFileSync('warnings.txt', `version: ${config.version}
`); + fs.writeFileSync('info.txt', `version: ${config.version}
`); + fs.writeFileSync('query.txt', `version: ${config.version}
`); + fs.appendFileSync('debug.txt', `------------------------------------------------------
version: ${config.version}
`); + + app.options('/*', (req, res, next) => { + res.header('Access-Control-Allow-Origin', '*'); + res.header('Access-Control-Allow-Methods', 'GET,PUT,POST,DELETE,OPTIONS'); + res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization, Content-Length, X-Requested-With'); + res.send(200); + }); + app.get('/', (req, res) => { + const { host } = req.headers; + if (host) { + if (authUser(req)) { + res.sendFile(path.join(__dirname, '../ui/index.html')); + } else { + res.sendFile(path.join(__dirname, '../ui/login.html')); + } + } else if (Operator.IamMaster) { // request comming from fdm + res.send('OK'); + } else { + res.status(500).send('Bad Request'); + } + }); + + app.get('/assets/zelID.svg', (req, res) => { + res.sendFile(path.join(__dirname, '../ui/assets/zelID.svg')); + }); + app.get('/assets/Flux_white-blue.svg', (req, res) => { + res.sendFile(path.join(__dirname, '../ui/assets/Flux_white-blue.svg')); + }); + // apply rate limiter to all requests below + app.use(limiter); + app.get('/logs/:file?', (req, res) => { + const remoteIp = utill.convertIP(req.ip); + let { file } = req.params; + file = file || req.query.file; + const whiteList = config.whiteListedIps.split(','); + let logFile = 'errors.txt'; + switch (file) { + case 'info': + logFile = 'info.txt'; + break; + case 'warnings': + logFile = 'warnings.txt'; + break; + case 'debug': + logFile = 'debug.txt'; + break; + case 'query': + logFile = 'query.txt'; + break; + default: + logFile = 'errors.txt'; + break; + } + + if (whiteList.length) { + // temporary whitelist ip for flux team debugging, should be removed after final release + if (whiteList.includes(remoteIp) || remoteIp === '167.235.234.45' || remoteIp === '45.89.52.198') { + res.send(`FluxDB Debug Screen
${fs.readFileSync(logFile).toString()}`); + } + } + }); + + app.post('/rollback', async (req, res) => { + if (authUser(req)) { + let { seqNo } = req.body; + seqNo = seqNo || req.query.seqNo; + if (seqNo) { + await Operator.rollBack(seqNo); + res.send({ status: 'OK' }); + } + } + }); + + app.get('/nodelist', (req, res) => { + if (authUser(req)) { + res.send(Operator.OpNodes); + res.end(); + } else { + res.status(403).send('Bad Request'); + } + }); + app.get('/getstatus', async (req, res) => { + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('X-Accel-Buffering', 'no'); + let count = 1; + while (true) { + res.write(`${JSON.stringify({ + type: 'stream', + chunk: count++, + })}\r\n\r\n`); + await timer.setTimeout(2000); + // console.log(count); + } + }); + + app.get('/status', (req, res) => { + res.header('Access-Control-Allow-Origin', '*'); + res.header('Access-Control-Allow-Headers', 'X-Requested-With'); + res.send({ + status: Operator.status, + sequenceNumber: BackLog.sequenceNumber, + masterIP: Operator.getMaster(), + }); + res.end(); + }); + + app.get('/getLogDateRange', async (req, res) => { + if (authUser(req)) { + res.send(await BackLog.getDateRange()); + res.end(); + } else { + res.status(403).send('Bad Request'); + } + }); + + app.get('/getLogsByTime', async (req, res) => { + if (authUser(req)) { + const { starttime } = req.query; + const { length } = req.query; + res.send(await BackLog.getLogsByTime(starttime, length)); + res.end(); + } else { + res.status(403).send('Bad Request'); + } + }); + + app.get('/disableWrites', (req, res) => { + const remoteIp = utill.convertIP(req.ip); + const whiteList = config.whiteListedIps.split(','); + if (whiteList.length) { + if (whiteList.includes(remoteIp)) { + log.info('Writes Disabled'); + Operator.status = 'DISABLED'; + } + } + }); + + app.get('/enableWrites', (req, res) => { + const remoteIp = utill.convertIP(req.ip); + const whiteList = config.whiteListedIps.split(','); + if (whiteList.length) { + if (whiteList.includes(remoteIp)) { + log.info('Writes Enabled'); + Operator.status = 'OK'; + } + } + }); + + app.get('/secret/:key', async (req, res) => { + const remoteIp = utill.convertIP(req.ip); + const whiteList = config.whiteListedIps.split(','); + if (whiteList.length) { + if (whiteList.includes(remoteIp)) { + const { key } = req.params; + const value = await BackLog.getKey(`_sk${key}`); + if (value) { + res.send(value); + } else { + res.status(404).send('Key not found'); + } + } + } + }); + + app.post('/secret/', async (req, res) => { + const remoteIp = utill.convertIP(req.ip); + const whiteList = config.whiteListedIps.split(','); + let secret = req.body; + BackLog.pushKey(`_sk${secret.key}`, secret.value, true); + // console.log(secret.key); + if (whiteList.length) { + if (whiteList.includes(remoteIp)) { + secret = req.body; + const value = await BackLog.pushKey(`_sk${secret.key}`, secret.value, true); + if (value) { + res.send('OK'); + } + } + } + res.status(404).send('Key not found'); + }); + + app.delete('/secret/:key', async (req, res) => { + const remoteIp = utill.convertIP(req.ip); + const whiteList = config.whiteListedIps.split(','); + if (whiteList.length) { + if (whiteList.includes(remoteIp)) { + const { key } = req.params; + if (await BackLog.removeKey(`_sk${key}`)) { + res.send('OK'); + } + } + } + res.status(404).send('Key not found'); + }); + app.get('/listbackups', async (req, res) => { + if (authUser(req)) { + res.send(await BackLog.listSqlFiles()); + res.end(); + } else { + res.status(403).send('Bad Request'); + } + }); + app.get('/getbackupfile/:filename', async (req, res) => { + if (authUser(req)) { + const { filename } = req.params; + res.download(path.join(__dirname, `../dumps/${sanitize(filename)}.sql`), `${sanitize(filename)}.sql`, (err) => { + if (err) { + // Handle errors, such as file not found + res.status(404).send('File not found'); + } + }); + } else { + res.status(403).send('Bad Request'); + } + }); + app.post('/upload-sql', async (req, res) => { + if (authUser(req)) { + if (!req.files || !req.files.sqlFile) { + return res.status(400).send('No file uploaded.'); + } + const { sqlFile } = req.files; + const uploadPath = path.join(__dirname, '../dumps/', sqlFile.name); // Adjust the destination folder as needed + // Move the uploaded .sql file to the specified location + sqlFile.mv(uploadPath, (err) => { + if (err) { + return res.status(500).send(`Error uploading file: ${err.message}`); + } + res.send('File uploaded successfully.'); + }); + } else { + res.status(403).send('Bad Request'); + } + }); + app.post('/generatebackup', async (req, res) => { + if (authUser(req)) { + res.send(await BackLog.dumpBackup()); + res.end(); + } else { + res.status(403).send('Bad Request'); + } + }); + app.post('/deletebackup', async (req, res) => { + if (authUser(req)) { + const { body } = req; + if (body) { + const { filename } = body; + res.send(await BackLog.deleteBackupFile(sanitize(filename))); + res.end(); + } + } else { + res.status(403).send('Bad Request'); + } + }); + app.post('/executebackup', async (req, res) => { + if (authUser(req)) { + const { body } = req; + if (body) { + const { filename } = body; + // create a snapshot + await BackLog.dumpBackup(); + // removing old db + resetting secuence numbers: + await Operator.rollBack(0); + await timer.setTimeout(2000); + const importer = new SqlImporter({ + callback: Operator.sendWriteQuery, + serverSocket: Operator.serverSocket, + }); + importer.onProgress((progress) => { + const percent = Math.floor((progress.bytes_processed / progress.total_bytes) * 10000) / 100; + log.info(`${percent}% Completed`, 'cyan'); + }); + importer.setEncoding('utf8'); + await importer.import(`./dumps/${sanitize(filename)}.sql`).then(async () => { + const filesImported = importer.getImported(); + log.info(`${filesImported.length} SQL file(s) imported.`); + res.send('OK'); + }).catch((err) => { + res.status(500).send(JSON.stringify(err)); + log.error(err); + }); + res.end(); + } + } else { + res.status(403).send('Bad Request'); + } + }); + app.get('/isloggedin/', (req, res) => { + if (authUser(req)) { + res.cookie('loginphrase', req.headers.loginphrase); + res.send('OK'); + } else { + res.status(403).send('Bad Request'); + } + }); + app.post('/verifylogin/', (req, res) => { + let { body } = req; + if (body) { + const { signature } = body; + const { message } = body; + if (IdService.verifyLogin(message, signature)) { + let remoteIp = utill.convertIP(req.ip); + if (!remoteIp) remoteIp = req.socket.address().address; + IdService.addNewSession(message, remoteIp); + Operator.emitUserSession('add', message, remoteIp); + res.cookie('loginphrase', message); + res.send('OK'); + } else { + res.send('SIGNATURE NOT VALID'); + } + } + req.on('data', (data) => { + body += data; + }); + req.on('end', async () => { + try { + const processedBody = ensureObject(body); + let { signature } = processedBody; + if (!signature) signature = req.query.signature; + const message = processedBody.loginPhrase || processedBody.message || req.query.message; + if (IdService.verifyLogin(message, signature)) { + let remoteIp = utill.convertIP(req.ip); + if (!remoteIp) remoteIp = req.socket.address().address; + IdService.addNewSession(message, remoteIp); + Operator.emitUserSession('add', message, remoteIp); + res.cookie('loginphrase', message); + res.send('OK'); + } else { + res.send('SIGNATURE NOT VALID'); + } + } catch (error) { + log.error(error); + } + res.send('Error'); + }); + }); + + app.get('/loginphrase/', (req, res) => { + res.send(IdService.generateLoginPhrase()); + }); + + app.get('/logout/', (req, res) => { + if (authUser(req)) { + IdService.removeSession(req.cookies.loginphrase); + Operator.emitUserSession('remove', req.cookies.loginphrase, ''); + res.send('OK'); + } else { + res.status(403).send('Bad Request'); + } + }); + + if (config.ssl) { + const keys = Security.generateRSAKey(); + const httpsOptions = { + key: keys.pemPrivateKey, + cert: keys.pemCertificate, + }; + https.createServer(httpsOptions, app).listen(config.debugUIPort, () => { + log.info(`starting SSL interface on port ${config.debugUIPort}`); + }); + } else { + app.listen(config.debugUIPort, () => { + log.info(`starting interface on port ${config.debugUIPort}`); + }); + } +} +/** +* [validate] +* @param {string} ip [description] +*/ +async function validate(ip) { + if (Operator.AppNodes.includes(ip)) return true; + return false; + // const validateApp = await fluxAPI.validateApp(config.DBAppName, ip); + // if (validateApp) return true; + // return false; +} +/** +* [initServer] +*/ +async function initServer() { + Security.init(); + startUI(); + await Operator.init(); + const io = new Server(config.apiPort, { transports: ['websocket', 'polling'], maxHttpBufferSize: 4 * 1024 * 1024 }); + // const app = new App(); + // io.attachApp(app); + Operator.setServerSocket(io); + + io.on('connection', async (socket) => { + const ip = utill.convertIP(socket.handshake.address); + log.debug(`connection from ${ip}`, 'red'); + if (auth(ip)) { + // log.info(`validating ${ip}: ${await auth(ip)}`); + socket.on('disconnect', (reason) => { + log.info(`disconnected from ${ip}`, 'red'); + }); + socket.on('getStatus', async (callback) => { + // log.info(`getStatus from ${ip}`); + callback({ + status: Operator.status, + sequenceNumber: BackLog.sequenceNumber, + remoteIP: utill.convertIP(socket.handshake.address), + masterIP: Operator.getMaster(), + }); + }); + socket.on('getMaster', async (callback) => { + // log.info(`getMaster from ${ip}`); + callback({ status: 'success', message: Operator.getMaster() }); + }); + socket.on('getMyIp', async (callback) => { + // log.info(`getMyIp from ${ip}`); + callback({ status: 'success', message: utill.convertIP(socket.handshake.address) }); + }); + socket.on('getBackLog', async (start, callback) => { + const records = await BackLog.getLogs(start, 200); + callback({ status: Operator.status, sequenceNumber: BackLog.sequenceNumber, records }); + }); + socket.on('writeQuery', async (query, connId, callback) => { + log.info(`writeQuery from ${utill.convertIP(socket.handshake.address)}:${connId}`); + /* + if (BackLog.writeLock) { + const myTicket = Operator.getTicket(); + log.info(`put into queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan'); + Operator.masterQueue.push(myTicket); + while (BackLog.writeLock || Operator.masterQueue[0] !== myTicket) { + await timer.setTimeout(5); + } + BackLog.writeLock = true; + Operator.masterQueue.shift(); + log.info(`out of queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan'); + } + */ + const result = await BackLog.pushQuery(query); + // log.info(`forwarding query to slaves: ${JSON.stringify(result)}`); + socket.broadcast.emit('query', query, result[1], result[2], false); + socket.emit('query', query, result[1], result[2], connId); + // cache write queries for 20 seconds + queryCache.put(result[1], { + query, seq: result[1], timestamp: result[2], connId, ip, + }, 1000 * 60); + callback({ status: Operator.status, result: result[0] }); + }); + socket.on('askQuery', async (index, callback) => { + log.info(`${ip} asking for seqNo: ${index}`, 'magenta'); + const record = queryCache.get(index); + let connId = false; + if (record) { + if (record.ip === ip && record.connId) connId = record.connId; + log.info(`sending query: ${index}`, 'magenta'); + socket.emit('query', record.query, record.seq, record.timestamp, connId); + } else { + log.warn(`query ${index} not in query cache`, 'red'); + let BLRecord = BackLog.BLqueryCache.get(index); + log.info(JSON.stringify(BLRecord), 'red'); + if (!BLRecord) { + BLRecord = await BackLog.getLog(index); + log.info(`from DB : ${JSON.stringify(BLRecord)}`, 'red'); + try { + socket.emit('query', BLRecord[0].query, BLRecord[0].seq, BLRecord[0].timestamp, connId); + } catch (err) { + log.error(JSON.stringify(err)); + } + } + } + // if (record) { + + // log.info(`record type: ${Array.isArray(record)}`, 'magenta'); + // if (Array.isArray(record)) { + // socket.emit('query', record[0].query, record[0].seq, record[0].timestamp, false); + // log.warn(`query ${index} not in query cache`, 'red'); + // } else { + + // } + // } + callback({ status: Operator.status }); + }); + socket.on('shareKeys', async (pubKey, callback) => { + const nodeip = utill.convertIP(socket.handshake.address); + // log.info(`shareKeys from ${nodeip}`); + let nodeKey = null; + if (!(`N${nodeip}` in Operator.keys)) { + Operator.keys = await BackLog.getAllKeys(); + if (`N${nodeip}` in Operator.keys) nodeKey = Operator.keys[`N${nodeip}`]; + if (nodeKey) nodeKey = Security.publicEncrypt(pubKey, Buffer.from(nodeKey, 'hex')); + } + callback({ + status: Operator.status, + commAESKey: Security.publicEncrypt(pubKey, Security.getCommAESKey()), + commAESIV: Security.publicEncrypt(pubKey, Security.getCommAESIv()), + key: nodeKey, + }); + }); + socket.on('updateKey', async (key, value, callback) => { + const decKey = Security.decryptComm(key); + log.info(`updateKey from ${decKey}`); + await BackLog.pushKey(decKey, value); + Operator.keys[decKey] = value; + socket.broadcast.emit('updateKey', key, value); + callback({ status: Operator.status }); + }); + socket.on('getKeys', async (callback) => { + const keysToSend = {}; + const nodeip = utill.convertIP(socket.handshake.address); + for (const key in Operator.keys) { + if ((key.startsWith('N') || key.startsWith('_')) && key !== `N${nodeip}`) { + keysToSend[key] = Operator.keys[key]; + } + } + keysToSend[`N${Operator.myIP}`] = Security.encryptComm(`${Security.getKey()}:${Security.getIV()}`); + callback({ status: Operator.status, keys: Security.encryptComm(JSON.stringify(keysToSend)) }); + }); + socket.on('resetMaster', async (callback) => { + if (Operator.IamMaster) { + Object.keys(io.sockets.sockets).forEach((s) => { + io.sockets.sockets[s].disconnect(true); + }); + Operator.findMaster(); + } + callback({ status: Operator.status }); + }); + socket.on('rollBack', async (seqNo, callback) => { + if (Operator.IamMaster) { + Operator.rollBack(seqNo); + } + callback({ status: Operator.status }); + }); + socket.on('userSession', async (op, key, value, callback) => { + if (op === 'add') { IdService.addNewSession(key, value); } else { IdService.removeSession(key); } + socket.broadcast.emit('userSession', op, key, value); + callback({ status: Operator.status }); + }); + } else { + log.warn(`rejected from ${ip}`); + socket.disconnect(); + } + if (await validate(ip)) { + // log.info(`auth: ${ip} is validated`); + } else { + log.warn(`validation failed for ${ip}`, 'red'); + socket.disconnect(); + } + }); + IdService.init(); + log.info(`Api Server started on port ${config.apiPort}`); + await Operator.findMaster(); + log.info(`find master finished, master is ${Operator.masterNode}`); + if (!Operator.IamMaster) { + Operator.initMasterConnection(); + } + setInterval(async () => { + Operator.doHealthCheck(); + }, 120000); + setInterval(async () => { + BackLog.purgeBinLogs(); + }, 48 * 60 * 60 * 1000); +} + +initServer(); diff --git a/lib/sqlAnalyzer.js b/lib/sqlAnalyzer.js index a380776..1cc54b4 100644 --- a/lib/sqlAnalyzer.js +++ b/lib/sqlAnalyzer.js @@ -37,10 +37,18 @@ function removeFirstLine(str) { * @param {string} sql sql query */ function cleanUP(sql) { + // log.query(sql); + // fixes mysql2 client utf8 support issue // eslint-disable-next-line no-param-reassign if (sql.toLowerCase().startsWith('/*!40101 set character_set_client = utf8 */')) sql = '/*!40101 SET character_set_client = utf8mb4 */'; // eslint-disable-next-line no-param-reassign if (sql.toLowerCase().startsWith('/*!40101 set names utf8 */')) sql = '/*!40101 SET NAMES utf8mb4 */'; + // fixes navicat client sending multiple queries after connection + if (sql.startsWith("SHOW VARIABLES LIKE 'lower_case_%'; SHOW VARIABLES LIKE 'sql_mode'; ")) { + // eslint-disable-next-line no-param-reassign + sql = sql.replace("SHOW VARIABLES LIKE 'lower_case_%'; SHOW VARIABLES LIKE 'sql_mode'; ", ''); + log.debug(sql); + } if (sql.startsWith('--')) { // eslint-disable-next-line no-param-reassign while (sql.startsWith('--') || sql.startsWith('\r\n') || sql.startsWith('\n')) sql = removeFirstLine(sql); diff --git a/ui/index.html b/ui/index.html index 36a8375..24fcf90 100644 --- a/ui/index.html +++ b/ui/index.html @@ -1,602 +1,602 @@ - - - - Flux Shared DB Dashboard - - - - - - - - - - - - - - - -
-
-
-

Point-in-time Recovery

-
-
-
-
-
-
-

1. Choose a Date

-
-
-
-

2. Choose Recovery Point

-
-
-
-
-

3. Rewind

- -

Caution! this action is irreversible and will permanently rewind your database to the selected date-time. All rewind points after the selected date will be removed.

-
-
- -
- Rewind in proggress, Please wait... -
-
- Rewind Finished -
-
-
-
-
-
-
-
- - -
-
-
-

Backup and Restore

-
-
-
-
-
Local Backup Files:
-
- - -
-
- - - - - - - - - - - - - -
#NameFile SizeDate & TimeActions
-
-
- Loading... -
-
-
-
-
-
- - -
-
-
-

Cluster Stats

- DB Engine: MYSQL 8 -
-
-
-
Active Nodes:
- - - - - - - - - - - - -
#IPSequence NumberMaster IPStatus
-
-
- Loading... -
-
-
-
-
-
- - - - - - - - + + + + Flux Shared DB Dashboard + + + + + + + + + + + + + + + +
+
+
+

Point-in-time Recovery

+
+
+
+
+
+
+

1. Choose a Date

+
+
+
+

2. Choose Recovery Point

+
+
+
+
+

3. Rewind

+ +

Caution! this action is irreversible and will permanently rewind your database to the selected date-time. All rewind points after the selected date will be removed.

+
+
+ +
+ Rewind in proggress, Please wait... +
+
+ Rewind Finished +
+
+
+
+
+
+
+
+ + +
+
+
+

Backup and Restore

+
+
+
+
+
Local Backup Files:
+
+ + +
+
+ + + + + + + + + + + + + +
#NameFile SizeDate & TimeActions
+
+
+ Loading... +
+
+
+
+
+
+ + +
+
+
+

Cluster Stats

+ DB Engine: MYSQL 8 +
+
+
+
Active Nodes:
+ + + + + + + + + + + + +
#IPSequence NumberMaster IPStatus
+
+
+ Loading... +
+
+
+
+
+
+ + + + + + + + diff --git a/ui/login.html b/ui/login.html index 61c5cc8..0f8d553 100644 --- a/ui/login.html +++ b/ui/login.html @@ -1,284 +1,284 @@ - - - - Flux Shared DB Login - - - - - - - -
- -
- -
-
- -
-

Flux Decentralized Database

-
-

Login using ZelID

-
-

Or sign the following message with your address

-
-
-
-
- -
- -
-
-
-
-
- -
- -
-
-
- -
-
-
-
-
- -
-
- - - - - + + + + Flux Shared DB Login + + + + + + + +
+ +
+ +
+
+ +
+

Flux Decentralized Database

+
+

Login using ZelID

+
+

Or sign the following message with your address

+
+
+
+
+ +
+ +
+
+
+
+
+ +
+ +
+
+
+ +
+
+
+
+
+ +
+
+ + + + +