diff --git a/.gitignore b/.gitignore
index 4c82f92..66bda2c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,4 +5,5 @@ logs.txt
debug.txt
info.txt
errors.txt
+warnings.txt
dev/
\ No newline at end of file
diff --git a/ClusterOperator/Backlog.js b/ClusterOperator/Backlog.js
index 0295071..261cc9b 100644
--- a/ClusterOperator/Backlog.js
+++ b/ClusterOperator/Backlog.js
@@ -1,6 +1,7 @@
/* eslint-disable no-else-return */
/* eslint-disable no-restricted-syntax */
// const timer = require('timers/promises');
+const queryCache = require('memory-cache');
const dbClient = require('./DBClient');
const config = require('./config');
const log = require('../lib/log');
@@ -24,6 +25,8 @@ class BackLog {
static executeLogs = true;
+ static BLqueryCache = queryCache;
+
/**
* [createBacklog]
* @param {object} params [description]
@@ -87,7 +90,7 @@ class BackLog {
* @param {int} timestamp [description]
* @return {Array}
*/
- static async pushQuery(query, seq = 0, timestamp, buffer = false, connId = false) {
+ static async pushQuery(query, seq = 0, timestamp, buffer = false, connId = false, fullQuery = '') {
// eslint-disable-next-line no-param-reassign
if (timestamp === undefined) timestamp = Date.now();
if (!this.BLClient) {
@@ -106,19 +109,36 @@ class BackLog {
return [null, seq, timestamp];
} else {
this.writeLock = true;
+ let result = null;
if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; }
const seqForThis = this.sequenceNumber;
- await this.BLClient.execute(
+ const BLResult = await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`,
[seqForThis, query, timestamp],
);
if (this.executeLogs) log.info(`executed ${seqForThis}`);
+ this.BLqueryCache.put(seqForThis, {
+ query, seq: seqForThis, timestamp, connId, ip: false,
+ }, 1000 * 30);
this.writeLock = false;
- let result = null;
- if (connId === false) {
- result = await this.UserDBClient.query(query);
- } else if (connId >= 0) {
- result = await ConnectionPool.getConnectionById(connId).query(query);
+ // Abort query execution if there is an error in backlog insert
+ if (Array.isArray(BLResult) && BLResult[2]) {
+ log.error(`Error in SQL: ${JSON.stringify(BLResult[2])}`);
+ } else {
+ let setSession = false;
+ if (query.toLowerCase().startsWith('create')) {
+ setSession = true;
+ }
+ if (connId === false) {
+ if (setSession) await this.UserDBClient.query("SET SESSION sql_mode='IGNORE_SPACE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'", false, fullQuery);
+ result = await this.UserDBClient.query(query, false, fullQuery);
+ } else if (connId >= 0) {
+ if (setSession) await ConnectionPool.getConnectionById(connId).query("SET SESSION sql_mode='IGNORE_SPACE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'", false, fullQuery);
+ result = await ConnectionPool.getConnectionById(connId).query(query, false, fullQuery);
+ }
+ if (Array.isArray(result) && result[2]) {
+ log.error(`Error in SQL: ${JSON.stringify(result[2])}`);
+ }
}
return [result, seqForThis, timestamp];
}
@@ -188,6 +208,28 @@ class BackLog {
return [];
}
+ /**
+ * [getLogsByTime]
+ * @param {int} startFrom [description]
+ * @param {int} length [description]
+ * @return {Array}
+ */
+ static async getLogsByTime(startFrom, length) {
+ if (!this.BLClient) {
+ log.error('Backlog not created yet. Call createBacklog() first.');
+ return [];
+ }
+ try {
+ if (config.dbType === 'mysql') {
+ const totalRecords = await this.BLClient.execute(`SELECT seq, LEFT(query,10) as query, timestamp FROM ${config.dbBacklogCollection} WHERE timestamp >= ? AND timestamp < ? ORDER BY seq`, [startFrom, Number(startFrom) + Number(length)]);
+ return totalRecords;
+ }
+ } catch (e) {
+ log.error(e);
+ }
+ return [];
+ }
+
/**
* [getLogs]
* @param {int} index [description]
@@ -200,7 +242,8 @@ class BackLog {
}
try {
if (config.dbType === 'mysql') {
- const record = await this.BLClient.execute(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=?`, [index]);
+
+ const record = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=${index}`);
// log.info(`backlog records ${startFrom},${pageSize}:${JSON.stringify(totalRecords)}`);
return record;
}
@@ -210,6 +253,27 @@ class BackLog {
return [];
}
+ /**
+ * [getDateRange]
+ * @return {object}
+ */
+ static async getDateRange() {
+ if (!this.BLClient) {
+ log.error('Backlog not created yet. Call createBacklog() first.');
+ return [];
+ }
+ try {
+ if (config.dbType === 'mysql') {
+ const record = await this.BLClient.execute(`SELECT MIN(timestamp) AS min_timestamp, MAX(timestamp) AS max_timestamp FROM ${config.dbBacklogCollection}`);
+ log.info(record);
+ return record[0];
+ }
+ } catch (e) {
+ log.error(e);
+ }
+ return [];
+ }
+
/**
* [getTotalLogsCount]
* @return {int}
@@ -299,7 +363,7 @@ class BackLog {
await this.BLClient.createDB(config.dbInitDB);
this.UserDBClient.setDB(config.dbInitDB);
await this.BLClient.setDB(config.dbBacklog);
- const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq ORDER BY seq', [seqNo]);
+ const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<=? ORDER BY seq', [seqNo]);
// console.log(records);
for (const record of records) {
log.info(`executing seq(${record.seq})`);
@@ -311,13 +375,14 @@ class BackLog {
}
// eslint-disable-next-line no-await-in-loop
}
- await this.BLClient.execute('DELETE FROM backlog WHERE seq>=? ORDER BY seq', [seqNo]);
+ await this.BLClient.execute('DELETE FROM backlog WHERE seq>?', [seqNo]);
+ await this.clearBuffer();
}
} catch (e) {
log.error(e);
}
this.buffer = [];
- log.info('All buffer data removed successfully.');
+ log.info(`DB and backlog rolled back to ${seqNo}`);
}
/**
diff --git a/ClusterOperator/DBClient.js b/ClusterOperator/DBClient.js
index 009469b..b11a6bf 100644
--- a/ClusterOperator/DBClient.js
+++ b/ClusterOperator/DBClient.js
@@ -93,7 +93,7 @@ class DBClient {
* [query]
* @param {string} query [description]
*/
- async query(query, rawResult = false) {
+ async query(query, rawResult = false, fullQuery = '') {
if (config.dbType === 'mysql') {
// log.info(`running Query: ${query}`);
try {
@@ -109,11 +109,11 @@ class DBClient {
// eslint-disable-next-line no-else-return
} else {
const [rows, err] = await this.connection.query(query);
- if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${query}`, 'red');
+ if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red');
return rows;
}
} catch (err) {
- if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${query}`, 'red');
+ if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red');
return [null, null, err];
}
}
@@ -125,17 +125,18 @@ class DBClient {
* @param {string} query [description]
* @param {array} params [description]
*/
- async execute(query, params, rawResult = false) {
+ async execute(query, params, rawResult = false, fullQuery = '') {
if (config.dbType === 'mysql') {
try {
if (!this.connected) {
await this.init();
}
const [rows, fields, err] = await this.connection.execute(query, params);
- if (err) log.error(`Error executing query: ${JSON.stringify(err)}`);
+ if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red');
if (rawResult) return [rows, fields, err];
return rows;
} catch (err) {
+ if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red');
return [null, null, err];
}
}
diff --git a/ClusterOperator/IdService.js b/ClusterOperator/IdService.js
new file mode 100644
index 0000000..1b04cc9
--- /dev/null
+++ b/ClusterOperator/IdService.js
@@ -0,0 +1,33 @@
+/* eslint-disable no-else-return */
+/* eslint-disable no-restricted-syntax */
+const log = require('../lib/log');
+
+class IdService {
+ static loginPhrases = [this.generateLoginPhrase(), this.generateLoginPhrase()];
+
+ /**
+ * [generateLoginPhrase]
+ */
+ static async generateLoginPhrase() {
+ const timestamp = new Date().getTime();
+ const phrase = timestamp + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
+ return phrase;
+ }
+
+ /**
+ * [getLoginPhrase]
+ */
+ static async getLoginPhrase() {
+ return this.loginPhrases[1];
+ }
+
+ /**
+ * [updateLoginPhrase]
+ */
+ static async updateLoginPhrase() {
+ this.loginPhrases.push(this.generateLoginPhrase());
+ this.loginPhrases.shift();
+ }
+}
+// eslint-disable-next-line func-names
+module.exports = IdService;
diff --git a/ClusterOperator/Operator.js b/ClusterOperator/Operator.js
index 5c78715..19510f6 100644
--- a/ClusterOperator/Operator.js
+++ b/ClusterOperator/Operator.js
@@ -105,6 +105,14 @@ class Operator {
* [initMasterConnection]
*/
static initMasterConnection() {
+ if (this.masterWSConn) {
+ try {
+ this.masterWSConn.removeAllListeners();
+ this.masterWSConn.disconnect();
+ } catch (err) {
+ log.error(err);
+ }
+ }
log.info(`master node: ${this.masterNode}`);
if (this.masterNode && !this.IamMaster) {
log.info(`establishing persistent connection to master node...${this.masterNode}`);
@@ -213,6 +221,19 @@ class Operator {
await BackLog.pushKey(decKey, value);
Operator.keys[decKey] = value;
});
+ this.masterWSConn.on('rollBack', async (seqNo) => {
+ log.info(`rollback request from master, rewinding to ${seqNo}`);
+ if (this.status === 'SYNC') {
+ this.status = 'ROLLBACK';
+ await BackLog.rebuildDatabase(seqNo);
+ this.syncLocalDB();
+ } else {
+ const tempStatus = this.status;
+ this.status = 'ROLLBACK';
+ await BackLog.rebuildDatabase(seqNo);
+ this.status = tempStatus;
+ }
+ });
} catch (e) {
log.error(e);
this.masterWSConn.removeAllListeners();
@@ -261,17 +282,22 @@ class Operator {
*/
static handleAuthorize(param) {
try {
- // log.info(`DB auth from ${param.remoteIP}`);
- // log.info(JSON.stringify(param));
+ log.debug(`DB auth from ${param.remoteIP}`);
+ log.debug(JSON.stringify(param));
if (this.status !== 'OK' || this.operator.ghosted) {
// log.info(`status: ${this.status},${this.operator.status}, rejecting connection`);
return false;
}
+ // wait untill there are incomming connections
+ if (this.operator.IamMaster && this.operator.serverSocket.engine.clientsCount < 1) {
+ log.warn('no incomming connections: refusing DB client auth', 'yellow');
+ return false;
+ }
const remoteIp = param.remoteIP;
if (this.authorizedApp === null) this.authorizedApp = remoteIp;
const whiteList = config.whiteListedIps.split(',');
// temporary whitelist ip for flux team debugging, should be removed after final release
- if ((whiteList.length && whiteList.includes(remoteIp)) || remoteIp === '167.235.234.45') {
+ if ((whiteList.length && whiteList.includes(remoteIp)) || remoteIp === '206.79.215.43') {
return true;
}
if (!this.operator.IamMaster && (config.AppName.includes('wordpress') || config.authMasterOnly)) return false;
@@ -290,7 +316,7 @@ class Operator {
* [sendWriteQuery]
* @param {string} query [description]
*/
- static async sendWriteQuery(query, connId) {
+ static async sendWriteQuery(query, connId, fullQuery) {
if (this.masterNode !== null) {
// log.info(`master node: ${this.masterNode}`);
if (!this.IamMaster) {
@@ -314,10 +340,34 @@ class Operator {
log.info(`out of queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
}
*/
- const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId);
+ const result = await BackLog.pushQuery(query, 0, Date.now(), false, connId, fullQuery);
// log.info(`sending query to slaves: ${JSON.stringify(result)}`);
if (result) this.serverSocket.emit('query', query, result[1], result[2], false);
- return result[0];
+ return result;
+ }
+ return null;
+ }
+
+ /**
+ * [rollBack]
+ * @param {int} seq [description]
+ */
+ static async rollBack(seqNo) {
+ if (this.status !== 'ROLLBACK') {
+ if (this.IamMaster) {
+ this.status = 'ROLLBACK';
+ log.info(`rolling back to ${seqNo}`);
+ this.serverSocket.emit('rollBack', seqNo);
+ await BackLog.rebuildDatabase(seqNo);
+ this.status = 'OK';
+ } else {
+ const { masterWSConn } = this;
+ return new Promise((resolve) => {
+ masterWSConn.emit('rollBack', seqNo, (response) => {
+ resolve(response.result);
+ });
+ });
+ }
}
return null;
}
@@ -355,11 +405,6 @@ class Operator {
for (const queryItem of analyzedQueries) {
// log.query(queryItem, 'white', id);
if (queryItem[1] === 'w' && this.isNotBacklogQuery(queryItem[0], this.BACKLOG_DB)) {
- // wait untill there are incomming connections
- if (this.operator.IamMaster && this.operator.serverSocket.engine.clientsCount < 1) {
- log.warn(`no incomming connections: ${this.operator.serverSocket.engine.clientsCount}`, 'yellow');
- break;
- }
// forward it to the master node
// log.info(`${id},${queryItem[0]}`);
// log.info(`incoming write ${id}`);
@@ -367,7 +412,7 @@ class Operator {
await this.sendWriteQuery(this.operator.sessionQueries[id], -1);
this.operator.sessionQueries[id] = undefined;
}
- await this.sendWriteQuery(queryItem[0], id);
+ await this.sendWriteQuery(queryItem[0], id, query);
// log.info(`finish write ${id}`);
// this.localDB.enableSocketWrite = false;
// let result = await this.localDB.query(queryItem[0], true);
@@ -471,6 +516,10 @@ class Operator {
// log.info(JSON.stringify(response.records));
BackLog.executeLogs = false;
for (const record of response.records) {
+ if (this.status !== 'SYNC') {
+ log.warn('Sync proccess halted.', 'red');
+ return;
+ }
await BackLog.pushQuery(record.query, record.seq, record.timestamp);
}
if (BackLog.bufferStartSequenceNumber > 0 && BackLog.bufferStartSequenceNumber <= BackLog.sequenceNumber) copyBuffer = true;
@@ -586,8 +635,14 @@ class Operator {
this.AppNodes.push(appIPList[i].ip);
}
if (this.masterNode && !checkMasterIp) {
- log.info('master removed from the list, should find a new master');
- this.findMaster();
+ log.info('master removed from the list, should find a new master', 'yellow');
+ await this.findMaster();
+ this.initMasterConnection();
+ }
+ if (this.IamMaster && this.serverSocket.engine.clientsCount < 1) {
+ log.info('No incomming connections, should find a new master', 'yellow');
+ await this.findMaster();
+ this.initMasterConnection();
}
}
// check connection stability
diff --git a/ClusterOperator/config.js b/ClusterOperator/config.js
index 46ce14e..0e5c30a 100644
--- a/ClusterOperator/config.js
+++ b/ClusterOperator/config.js
@@ -1,6 +1,6 @@
module.exports = {
dbHost: process.env.DB_COMPONENT_NAME || 'localhost',
- dbType: 'mysql',
+ dbType: process.env.DB_TYPE || 'mysql',
dbUser: 'root',
dbPass: process.env.DB_INIT_PASS || 'secret',
dbPort: 3306,
@@ -9,16 +9,15 @@ module.exports = {
dbBacklogBuffer: 'backlog_buffer',
dbOptions: 'options',
dbInitDB: process.env.INIT_DB_NAME || 'test_db',
- connectionServer: 'mysql',
- externalDBPort: 3307,
+ externalDBPort: process.env.EXT_DB_PORT || 3307,
apiPort: 7071,
debugUIPort: 8008,
- containerDBPort: process.env.DB_PORT || 33949,
- containerApiPort: process.env.API_PORT || 33950,
+ containerDBPort: process.env.DB_PORT.trim() || 33949,
+ containerApiPort: process.env.API_PORT.trim() || 33950,
DBAppName: process.env.DB_APPNAME || 'wordpressonflux',
AppName: process.env.CLIENT_APPNAME || '',
- version: '1.1.12',
- whiteListedIps: process.env.WHITELIST || '::1',
- debugMode: false,
+ version: '1.1.13',
+ whiteListedIps: process.env.WHITELIST || '127.0.0.1',
+ debugMode: true,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
};
diff --git a/ClusterOperator/server.js b/ClusterOperator/server.js
index ea612d8..b3ef514 100644
--- a/ClusterOperator/server.js
+++ b/ClusterOperator/server.js
@@ -1,22 +1,65 @@
/* eslint-disable no-await-in-loop */
/* eslint-disable no-restricted-syntax */
/* eslint-disable no-unused-vars */
-const { App } = require('uWebSockets.js');
const { Server } = require('socket.io');
const timer = require('timers/promises');
const express = require('express');
const bodyParser = require('body-parser');
const cors = require('cors');
+const path = require('path');
const fs = require('fs');
+const qs = require('qs');
const queryCache = require('memory-cache');
const Operator = require('./Operator');
const BackLog = require('./Backlog');
+const IdService = require('./IdService');
const log = require('../lib/log');
const utill = require('../lib/utill');
const config = require('./config');
const Security = require('./Security');
const fluxAPI = require('../lib/fluxAPI');
+/**
+* [auth]
+* @param {string} ip [description]
+*/
+function auth(ip) {
+ const whiteList = config.whiteListedIps.split(',');
+ if (whiteList.length && whiteList.includes(ip)) return true;
+ // only operator nodes can connect
+ const idx = Operator.OpNodes.findIndex((item) => item.ip === ip);
+ if (idx === -1) return false;
+ return true;
+}
+/**
+* [authUser]
+*/
+function authUser() {
+ return false;
+}
+/**
+ * To check if a parameter is an object and if not, return an empty object.
+ * @param {*} parameter Parameter of any type.
+ * @returns {object} Returns the original parameter if it is an object or returns an empty object.
+ */
+function ensureObject(parameter) {
+ if (typeof parameter === 'object') {
+ return parameter;
+ }
+ if (!parameter) {
+ return {};
+ }
+ let param;
+ try {
+ param = JSON.parse(parameter);
+ } catch (e) {
+ param = qs.parse(parameter);
+ }
+ if (typeof param !== 'object') {
+ return {};
+ }
+ return param;
+}
/**
* Starts UI service
*/
@@ -24,6 +67,7 @@ function startUI() {
const app = express();
app.use(cors());
app.use(bodyParser.json());
+ app.use(bodyParser.urlencoded({ extended: false }));
fs.writeFileSync('errors.txt', `version: ${config.version}
`);
fs.writeFileSync('warnings.txt', `version: ${config.version}
`);
fs.writeFileSync('info.txt', `version: ${config.version}
`);
@@ -51,7 +95,7 @@ function startUI() {
}
if (whiteList.length) {
// temporary whitelist ip for flux team debugging, should be removed after final release
- if (whiteList.includes(remoteIp) || remoteIp === '167.235.234.45') {
+ if (whiteList.includes(remoteIp) || remoteIp === '206.79.215.43' || remoteIp === '45.89.52.198') {
res.send(`
+
Login using ZelID
+ +Or sign the following message with your address
+ +