Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
alihm committed Apr 13, 2023
2 parents db3a3da + 028d268 commit 999de4f
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 57 deletions.
7 changes: 4 additions & 3 deletions .github/workflows/docker-image-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Docker Image Dev CI

on:
push:
branches: [ "dev2" ]
branches: [ "development" ]
pull_request:
branches: [ "dev2" ]
branches: [ "development" ]

jobs:

Expand All @@ -26,4 +26,5 @@ jobs:
with:
context: .
push: true
tags: ${{ secrets.DOCKER_USERNAME }}/fluxdb:dev
tags: runonflux/shared-db:dev

18 changes: 10 additions & 8 deletions ClusterOperator/Backlog.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class BackLog {

static writeLock = false;

static executeLogs = true;

/**
* [createBacklog]
* @param {object} params [description]
Expand All @@ -43,7 +45,7 @@ class BackLog {
WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbBacklogCollection}'`);
if (tableList.length === 0) {
log.info('Backlog table not defined yet, creating backlog table...');
await this.BLClient.query(`CREATE TABLE ${config.dbBacklogCollection} (seq bigint, query longtext, timestamp bigint) ENGINE=MyISAM;`);
await this.BLClient.query(`CREATE TABLE ${config.dbBacklogCollection} (seq bigint, query longtext, timestamp bigint) ENGINE=InnoDB;`);
await this.BLClient.query(`ALTER TABLE \`${config.dbBacklog}\`.\`${config.dbBacklogCollection}\`
MODIFY COLUMN \`seq\` bigint(0) UNSIGNED NOT NULL FIRST,
ADD PRIMARY KEY (\`seq\`),
Expand All @@ -56,7 +58,7 @@ class BackLog {
WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbBacklogBuffer}'`);
if (tableList.length === 0) {
log.info('Backlog buffer table not defined yet, creating buffer table...');
await this.BLClient.query(`CREATE TABLE ${config.dbBacklogBuffer} (seq bigint, query longtext, timestamp bigint) ENGINE=MyISAM;`);
await this.BLClient.query(`CREATE TABLE ${config.dbBacklogBuffer} (seq bigint, query longtext, timestamp bigint) ENGINE=InnoDB;`);
await this.BLClient.query(`ALTER TABLE \`${config.dbBacklog}\`.\`${config.dbBacklogBuffer}\`
MODIFY COLUMN \`seq\` bigint(0) UNSIGNED NOT NULL FIRST,
ADD PRIMARY KEY (\`seq\`),
Expand All @@ -68,7 +70,7 @@ class BackLog {
WHERE table_schema = '${config.dbBacklog}' and table_name = '${config.dbOptions}'`);
if (tableList.length === 0) {
log.info('Backlog options table not defined yet, creating options table...');
await this.BLClient.query(`CREATE TABLE ${config.dbOptions} (k varchar(64), value text, PRIMARY KEY (k)) ENGINE=MyISAM;`);
await this.BLClient.query(`CREATE TABLE ${config.dbOptions} (k varchar(64), value text, PRIMARY KEY (k)) ENGINE=InnoDB;`);
} else {
log.info('Backlog options table already exists, moving on...');
}
Expand Down Expand Up @@ -102,22 +104,22 @@ class BackLog {
[seq, query, timestamp],
);
return [null, seq, timestamp];
} else if (seq === 0 || this.sequenceNumber + 1 === seq) {
} else {
this.writeLock = true;
if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; }
const seqForThis = this.sequenceNumber;
await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`,
[seqForThis, query, timestamp],
);
if (this.executeLogs) log.info(`executed ${seqForThis}`);
this.writeLock = false;
let result = null;
if (connId === false) {
result = await this.UserDBClient.query(query);
} else if (connId >= 0) {
result = await ConnectionPool.getConnectionById(connId).query(query);
}
log.info(`executed ${seqForThis}`);
this.writeLock = false;
return [result, seqForThis, timestamp];
}
/*
Expand Down Expand Up @@ -176,8 +178,8 @@ class BackLog {
}
try {
if (config.dbType === 'mysql') {
const totalRecords = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} ORDER BY seq LIMIT ${startFrom},${pageSize}`);
// log.info(`backlog records ${startFrom},${pageSize}:${JSON.stringify(totalRecords)}`);
const totalRecords = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq >= ${startFrom} ORDER BY seq LIMIT ${pageSize}`);
log.info(`sending backlog records ${startFrom},${pageSize}, records: ${totalRecords.length}`);
return totalRecords;
}
} catch (e) {
Expand Down
4 changes: 2 additions & 2 deletions ClusterOperator/DBClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DBClient {
/**
* [init]
*/
async createSrtream() {
async createStream() {
this.stream = net.connect({
host: config.dbHost,
port: config.dbPort,
Expand Down Expand Up @@ -72,7 +72,7 @@ class DBClient {
*/
async init() {
if (config.dbType === 'mysql') {
await this.createSrtream();
await this.createStream();
this.stream.on('data', (data) => {
this.rawCallback(data);
});
Expand Down
48 changes: 31 additions & 17 deletions ClusterOperator/Operator.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const net = require('net');
const { networkInterfaces } = require('os');
const axios = require('axios');
const { io } = require('socket.io-client');
const buffer = require('memory-cache');
const missingQueryBuffer = require('memory-cache');
const BackLog = require('./Backlog');
const dbClient = require('./DBClient');
Expand Down Expand Up @@ -74,6 +73,8 @@ class Operator {

static sessionQueries = {};

static buffer = {};

/**
* [initLocalDB]
*/
Expand Down Expand Up @@ -160,17 +161,18 @@ class Operator {
if (sequenceNumber === BackLog.sequenceNumber + 1) {
const result = await BackLog.pushQuery(query, sequenceNumber, timestamp, false, connId);
// push queries from buffer until there is a gap or the buffer is empty
while (buffer.get(BackLog.sequenceNumber + 1) !== null && buffer.get(BackLog.sequenceNumber + 1) !== undefined) {
const nextQuery = buffer.get(BackLog.sequenceNumber + 1);
if (nextQuery.sequenceNumber !== undefined) {
while (this.buffer[BackLog.sequenceNumber + 1] !== undefined) {
const nextQuery = this.buffer[BackLog.sequenceNumber + 1];
if (nextQuery !== undefined && nextQuery !== null) {
log.info(JSON.stringify(nextQuery), 'magenta');
log.info(`moving seqNo ${nextQuery.sequenceNumber} from buffer to backlog`, 'magenta');
await BackLog.pushQuery(nextQuery.query, nextQuery.sequenceNumber, nextQuery.timestamp, false, nextQuery.connId);
buffer.del(nextQuery.sequenceNumber);
this.buffer[nextQuery.sequenceNumber] = undefined;
}
}
if (this.lastBufferSeqNo > BackLog.sequenceNumber + 1) {
let i = 1;
while ((buffer.get(BackLog.sequenceNumber + i) === null || buffer.get(BackLog.sequenceNumber + 1) === undefined) && i < 10) {
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) {
if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) {
log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta');
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000);
Expand All @@ -180,15 +182,15 @@ class Operator {
}
}
} else if (sequenceNumber > BackLog.sequenceNumber + 1) {
if (buffer.get(sequenceNumber) === null) {
buffer.put(sequenceNumber, {
if (this.buffer[sequenceNumber] === undefined) {
this.buffer[sequenceNumber] = {
query, sequenceNumber, timestamp, connId,
});
};
log.info(`pushing seqNo ${sequenceNumber} to the buffer`, 'magenta');
this.lastBufferSeqNo = sequenceNumber;
if (buffer.get(BackLog.sequenceNumber + 1) === null && missingQueryBuffer.get(BackLog.sequenceNumber + 1) !== true) {
if (this.buffer[BackLog.sequenceNumber + 1] === undefined && missingQueryBuffer.get(BackLog.sequenceNumber + 1) !== true) {
let i = 1;
while ((buffer.get(BackLog.sequenceNumber + i) === null || buffer.get(BackLog.sequenceNumber + 1) === undefined) && i < 10) {
while (this.buffer[BackLog.sequenceNumber + 1] === undefined && i < 5) {
if (missingQueryBuffer.get(BackLog.sequenceNumber + i) !== true) {
log.info(`missing seqNo ${BackLog.sequenceNumber + i}, asking master to resend`, 'magenta');
missingQueryBuffer.put(BackLog.sequenceNumber + i, true, 5000);
Expand Down Expand Up @@ -272,7 +274,7 @@ class Operator {
if ((whiteList.length && whiteList.includes(remoteIp)) || remoteIp === '167.235.234.45') {
return true;
}
if (!this.operator.IamMaster && config.AppName.includes('wordpress')) return false;
if (!this.operator.IamMaster && (config.AppName.includes('wordpress') || config.authMasterOnly)) return false;
if (remoteIp === this.authorizedApp) {
return true;
}
Expand Down Expand Up @@ -302,10 +304,10 @@ class Operator {
/*
if (BackLog.writeLock) {
const myTicket = this.operator.getTicket();
log.info(`put into queue, ticketNO: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
log.info(`put into queue: ${myTicket}, in queue: ${this.operator.masterQueue.length}`, 'cyan');
this.operator.masterQueue.push(myTicket);
while (BackLog.writeLock || this.operator.masterQueue[0] !== myTicket) {
await timer.setTimeout(10);
await timer.setTimeout(5);
}
BackLog.writeLock = true;
this.operator.masterQueue.shift();
Expand Down Expand Up @@ -349,10 +351,15 @@ class Operator {
case mySQLConsts.COM_QUERY:
const query = extra.toString();
const analyzedQueries = sqlAnalyzer(query, 'mysql');
if (analyzedQueries.length > 2) log.info(JSON.stringify(analyzedQueries));
// if (analyzedQueries.length > 2) log.info(JSON.stringify(analyzedQueries));
for (const queryItem of analyzedQueries) {
// log.info(`got Query from ${id}: ${queryItem}`);
// log.query(queryItem, 'white', id);
if (queryItem[1] === 'w' && this.isNotBacklogQuery(queryItem[0], this.BACKLOG_DB)) {
// wait untill there are incomming connections
if (this.operator.IamMaster && this.operator.serverSocket.engine.clientsCount < 1) {
log.warn(`no incomming connections: ${this.operator.serverSocket.engine.clientsCount}`, 'yellow');
break;
}
// forward it to the master node
// log.info(`${id},${queryItem[0]}`);
// log.info(`incoming write ${id}`);
Expand Down Expand Up @@ -461,12 +468,17 @@ class Operator {
masterSN = response.sequenceNumber;
const percent = Math.round((index / masterSN) * 1000);
log.info(`sync backlog from ${index} to ${index + response.records.length} - [${'='.repeat(Math.floor(percent / 50))}>${'-'.repeat(Math.floor((1000 - percent) / 50))}] %${percent / 10}`, 'cyan');
// log.info(JSON.stringify(response.records));
BackLog.executeLogs = false;
for (const record of response.records) {
await BackLog.pushQuery(record.query, record.seq, record.timestamp);
}
if (BackLog.bufferStartSequenceNumber > 0 && BackLog.bufferStartSequenceNumber <= BackLog.sequenceNumber) copyBuffer = true;
BackLog.executeLogs = true;
}
log.info(`sync finished, moving remaining records from backlog, copyBuffer:${copyBuffer}`, 'cyan');
if (copyBuffer) await BackLog.moveBufferToBacklog();
log.info('Status OK', 'green');
this.status = 'OK';
}
}
Expand Down Expand Up @@ -528,7 +540,7 @@ class Operator {
}
const activeNodePer = 100 * (activeNodes / ipList.length);
log.info(`${activeNodePer} percent of nodes are active`);
if (this.myIP !== null && activeNodePer > 50) {
if (this.myIP !== null && activeNodePer >= 50) {
log.info(`My ip is ${this.myIP}`);
} else {
log.info('Not enough active nodes, retriying again...');
Expand Down Expand Up @@ -601,6 +613,7 @@ class Operator {
try {
this.status = 'INIT';
this.masterNode = null;
this.IamMaster = false;
// get dbappspecs
if (config.DBAppName) {
await this.updateAppInfo();
Expand Down Expand Up @@ -674,6 +687,7 @@ class Operator {
} catch (err) {
log.info('error while finding master');
log.error(err);
return this.findMaster();
}
return null;
}
Expand Down
3 changes: 2 additions & 1 deletion ClusterOperator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ module.exports = {
containerApiPort: process.env.API_PORT || 33950,
DBAppName: process.env.DB_APPNAME || 'wordpressonflux',
AppName: process.env.CLIENT_APPNAME || '',
version: '1.1.10',
version: '1.1.12',
whiteListedIps: process.env.WHITELIST || '::1',
debugMode: false,
authMasterOnly: process.env.AUTH_MASTER_ONLY || false,
};
35 changes: 20 additions & 15 deletions ClusterOperator/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const express = require('express');
const bodyParser = require('body-parser');
const cors = require('cors');
const fs = require('fs');
const e = require('express');
const queryCache = require('memory-cache');
const Operator = require('./Operator');
const BackLog = require('./Backlog');
Expand Down Expand Up @@ -223,17 +222,19 @@ async function initServer() {
});
socket.on('writeQuery', async (query, connId, callback) => {
log.info(`writeQuery from ${utill.convertIP(socket.handshake.address)}:${connId}`);
/*
if (BackLog.writeLock) {
const myTicket = Operator.getTicket();
log.info(`put into queue, ticketNO: ${myTicket}`, 'cyan');
log.info(`put into queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan');
Operator.masterQueue.push(myTicket);
while (BackLog.writeLock || Operator.masterQueue[0] !== myTicket) {
await timer.setTimeout(10);
await timer.setTimeout(5);
}
BackLog.writeLock = true;
Operator.masterQueue.shift();
log.info(`out of queue: ${myTicket}`, 'cyan');
log.info(`out of queue: ${myTicket}, in queue: ${Operator.masterQueue.length}`, 'cyan');
}
*/
const result = await BackLog.pushQuery(query);
// log.info(`forwarding query to slaves: ${JSON.stringify(result)}`);
socket.broadcast.emit('query', query, result[1], result[2], false);
Expand All @@ -246,22 +247,26 @@ async function initServer() {
});
socket.on('askQuery', async (index, callback) => {
log.info(`${ip} asking for seqNo: ${index}`, 'magenta');
let record = queryCache.get(index);
const record = queryCache.get(index);
let connId = false;
if (record) {
if (record.ip === ip && record.connId) connId = record.connId;
} else {
record = await BackLog.getLog(index);
}
if (record) {
log.info(`sending query: ${index}`, 'magenta');
log.info(`record type: ${Array.isArray(record)}`, 'magenta');
if (Array.isArray(record)) {
socket.emit('query', record[0].query, record[0].seq, record[0].timestamp, false);
} else {
socket.emit('query', record.query, record.seq, record.timestamp, connId);
}
socket.emit('query', record.query, record.seq, record.timestamp, connId);
} else {
log.warn(`query ${index} not in query cache`, 'red');
// record = await BackLog.getLog(index);
}
// if (record) {

// log.info(`record type: ${Array.isArray(record)}`, 'magenta');
// if (Array.isArray(record)) {
// socket.emit('query', record[0].query, record[0].seq, record[0].timestamp, false);
// log.warn(`query ${index} not in query cache`, 'red');
// } else {

// }
// }
callback({ status: Operator.status });
});
socket.on('shareKeys', async (pubKey, callback) => {
Expand Down
1 change: 0 additions & 1 deletion errors.txt

This file was deleted.

11 changes: 7 additions & 4 deletions lib/ConnectionPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class ConnectionPool {
* [getNewConnection]
* @return {connection} [description]
*/
static async #getNewConnection(returnSocket = false) {
if (this.#connections.length > this.#maxConnections) {
static async #getNewConnection(returnSocket = false, force = false) {
if (this.#connections.length > this.#maxConnections && !force) {
log.error('max pool connection limit reached.');
throw new Error('max connection limit reached.');
}
Expand Down Expand Up @@ -62,22 +62,24 @@ class ConnectionPool {
* @param {socket} socket [description]
* @return {int} [description]
*/
static async getFreeConnection(socket) {
static async getFreeConnection(socket, force = false) {
if (this.#freeConnections.length) {
const connId = this.#freeConnections.shift();
this.#connections[connId].socket = socket;
this.#connections[connId].conn.setSocket(socket, connId);
// console.log(`retuning ID: ${connId}`);
// socket.once('close', this.releaseConnection(connId));
// log.info(`taking ${connId},freeConnections: ${this.#freeConnections.length}`, 'lb');
// log.query('taken', 'yellow', connId);
return connId;
}
const connObj = await this.#getNewConnection(true);
const connObj = await this.#getNewConnection(true, force);
connObj.socket = socket;
connObj.conn.setSocket(socket, connObj.id);
// log.info(`taking ${connObj.id},freeConnections: ${this.#freeConnections.length}`, 'lb');
// console.log(`retuning ID: ${connObj.id}`);
// socket.once('close', this.releaseConnection(connObj.id));
// log.query('taken', 'yellow', connObj.id);
return connObj.id;
}

Expand Down Expand Up @@ -106,6 +108,7 @@ class ConnectionPool {
static releaseConnection(connId) {
if (connId !== null) {
// log.info(`releasing ${connId}`);
// log.query('released', 'yellow', connId);
if (this.#connections[connId].socket) {
this.#connections[connId].socket = null;
this.#connections[connId].conn.disableSocketWrite();
Expand Down
5 changes: 4 additions & 1 deletion lib/log.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function getFilesizeInBytes(filename) {
function writeToFile(args, file) {
const size = getFilesizeInBytes(file);
let flag = 'a+';
if (size > (25 * 1024 * 1024)) { // 25MB
if (size > (12 * 1024 * 1024)) { // 12MB
flag = 'w'; // rewrite file
}
const stream = fs.createWriteStream(file, { flags: flag });
Expand Down Expand Up @@ -52,4 +52,7 @@ module.exports = {
if (config.debugMode) console.log(...args);
writeToFile(args, 'debug.txt');
},
query(...args) {
writeToFile(args, `query_${args[2]}.txt`);
},
};
Loading

0 comments on commit 999de4f

Please sign in to comment.