Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
alihm committed Aug 11, 2023
2 parents 999de4f + 9354948 commit f365127
Show file tree
Hide file tree
Showing 14 changed files with 806 additions and 57 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ logs.txt
debug.txt
info.txt
errors.txt
warnings.txt
dev/
87 changes: 76 additions & 11 deletions ClusterOperator/Backlog.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable no-else-return */
/* eslint-disable no-restricted-syntax */
// const timer = require('timers/promises');
const queryCache = require('memory-cache');
const dbClient = require('./DBClient');
const config = require('./config');
const log = require('../lib/log');
Expand All @@ -24,6 +25,8 @@ class BackLog {

static executeLogs = true;

static BLqueryCache = queryCache;

/**
* [createBacklog]
* @param {object} params [description]
Expand Down Expand Up @@ -87,7 +90,7 @@ class BackLog {
* @param {int} timestamp [description]
* @return {Array}
*/
static async pushQuery(query, seq = 0, timestamp, buffer = false, connId = false) {
static async pushQuery(query, seq = 0, timestamp, buffer = false, connId = false, fullQuery = '') {
// eslint-disable-next-line no-param-reassign
if (timestamp === undefined) timestamp = Date.now();
if (!this.BLClient) {
Expand All @@ -106,19 +109,36 @@ class BackLog {
return [null, seq, timestamp];
} else {
this.writeLock = true;
let result = null;
if (seq === 0) { this.sequenceNumber += 1; } else { this.sequenceNumber = seq; }
const seqForThis = this.sequenceNumber;
await this.BLClient.execute(
const BLResult = await this.BLClient.execute(
`INSERT INTO ${config.dbBacklogCollection} (seq, query, timestamp) VALUES (?,?,?)`,
[seqForThis, query, timestamp],
);
if (this.executeLogs) log.info(`executed ${seqForThis}`);
this.BLqueryCache.put(seqForThis, {
query, seq: seqForThis, timestamp, connId, ip: false,
}, 1000 * 30);
this.writeLock = false;
let result = null;
if (connId === false) {
result = await this.UserDBClient.query(query);
} else if (connId >= 0) {
result = await ConnectionPool.getConnectionById(connId).query(query);
// Abort query execution if there is an error in backlog insert
if (Array.isArray(BLResult) && BLResult[2]) {
log.error(`Error in SQL: ${JSON.stringify(BLResult[2])}`);
} else {
let setSession = false;
if (query.toLowerCase().startsWith('create')) {
setSession = true;
}
if (connId === false) {
if (setSession) await this.UserDBClient.query("SET SESSION sql_mode='IGNORE_SPACE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'", false, fullQuery);
result = await this.UserDBClient.query(query, false, fullQuery);
} else if (connId >= 0) {
if (setSession) await ConnectionPool.getConnectionById(connId).query("SET SESSION sql_mode='IGNORE_SPACE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'", false, fullQuery);
result = await ConnectionPool.getConnectionById(connId).query(query, false, fullQuery);
}
if (Array.isArray(result) && result[2]) {
log.error(`Error in SQL: ${JSON.stringify(result[2])}`);
}
}
return [result, seqForThis, timestamp];
}
Expand Down Expand Up @@ -188,6 +208,28 @@ class BackLog {
return [];
}

/**
* [getLogsByTime]
* @param {int} startFrom [description]
* @param {int} length [description]
* @return {Array}
*/
static async getLogsByTime(startFrom, length) {
if (!this.BLClient) {
log.error('Backlog not created yet. Call createBacklog() first.');
return [];
}
try {
if (config.dbType === 'mysql') {
const totalRecords = await this.BLClient.execute(`SELECT seq, LEFT(query,10) as query, timestamp FROM ${config.dbBacklogCollection} WHERE timestamp >= ? AND timestamp < ? ORDER BY seq`, [startFrom, Number(startFrom) + Number(length)]);
return totalRecords;
}
} catch (e) {
log.error(e);
}
return [];
}

/**
* [getLogs]
* @param {int} index [description]
Expand All @@ -200,7 +242,8 @@ class BackLog {
}
try {
if (config.dbType === 'mysql') {
const record = await this.BLClient.execute(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=?`, [index]);

const record = await this.BLClient.query(`SELECT * FROM ${config.dbBacklogCollection} WHERE seq=${index}`);
// log.info(`backlog records ${startFrom},${pageSize}:${JSON.stringify(totalRecords)}`);
return record;
}
Expand All @@ -210,6 +253,27 @@ class BackLog {
return [];
}

/**
* [getDateRange]
* @return {object}
*/
static async getDateRange() {
if (!this.BLClient) {
log.error('Backlog not created yet. Call createBacklog() first.');
return [];
}
try {
if (config.dbType === 'mysql') {
const record = await this.BLClient.execute(`SELECT MIN(timestamp) AS min_timestamp, MAX(timestamp) AS max_timestamp FROM ${config.dbBacklogCollection}`);
log.info(record);
return record[0];
}
} catch (e) {
log.error(e);
}
return [];
}

/**
* [getTotalLogsCount]
* @return {int}
Expand Down Expand Up @@ -299,7 +363,7 @@ class BackLog {
await this.BLClient.createDB(config.dbInitDB);
this.UserDBClient.setDB(config.dbInitDB);
await this.BLClient.setDB(config.dbBacklog);
const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<? ORDER BY seq', [seqNo]);
const records = await this.BLClient.execute('SELECT * FROM backlog WHERE seq<=? ORDER BY seq', [seqNo]);
// console.log(records);
for (const record of records) {
log.info(`executing seq(${record.seq})`);
Expand All @@ -311,13 +375,14 @@ class BackLog {
}
// eslint-disable-next-line no-await-in-loop
}
await this.BLClient.execute('DELETE FROM backlog WHERE seq>=? ORDER BY seq', [seqNo]);
await this.BLClient.execute('DELETE FROM backlog WHERE seq>?', [seqNo]);
await this.clearBuffer();
}
} catch (e) {
log.error(e);
}
this.buffer = [];
log.info('All buffer data removed successfully.');
log.info(`DB and backlog rolled back to ${seqNo}`);
}

/**
Expand Down
11 changes: 6 additions & 5 deletions ClusterOperator/DBClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class DBClient {
* [query]
* @param {string} query [description]
*/
async query(query, rawResult = false) {
async query(query, rawResult = false, fullQuery = '') {
if (config.dbType === 'mysql') {
// log.info(`running Query: ${query}`);
try {
Expand All @@ -109,11 +109,11 @@ class DBClient {
// eslint-disable-next-line no-else-return
} else {
const [rows, err] = await this.connection.query(query);
if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${query}`, 'red');
if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red');
return rows;
}
} catch (err) {
if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${query}`, 'red');
if (err && err.toString().includes('Error')) log.error(`Error running query: ${err.toString()}, ${fullQuery}`, 'red');
return [null, null, err];
}
}
Expand All @@ -125,17 +125,18 @@ class DBClient {
* @param {string} query [description]
* @param {array} params [description]
*/
async execute(query, params, rawResult = false) {
async execute(query, params, rawResult = false, fullQuery = '') {
if (config.dbType === 'mysql') {
try {
if (!this.connected) {
await this.init();
}
const [rows, fields, err] = await this.connection.execute(query, params);
if (err) log.error(`Error executing query: ${JSON.stringify(err)}`);
if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red');
if (rawResult) return [rows, fields, err];
return rows;
} catch (err) {
if (err && err.toString().includes('Error')) log.error(`Error executing query: ${err.toString()}, ${fullQuery}`, 'red');
return [null, null, err];
}
}
Expand Down
33 changes: 33 additions & 0 deletions ClusterOperator/IdService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* eslint-disable no-else-return */
/* eslint-disable no-restricted-syntax */
const log = require('../lib/log');

class IdService {
static loginPhrases = [this.generateLoginPhrase(), this.generateLoginPhrase()];

/**
* [generateLoginPhrase]
*/
static async generateLoginPhrase() {
const timestamp = new Date().getTime();
const phrase = timestamp + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
return phrase;
}

/**
* [getLoginPhrase]
*/
static async getLoginPhrase() {
return this.loginPhrases[1];
}

/**
* [updateLoginPhrase]
*/
static async updateLoginPhrase() {
this.loginPhrases.push(this.generateLoginPhrase());
this.loginPhrases.shift();
}
}
// eslint-disable-next-line func-names
module.exports = IdService;
Loading

0 comments on commit f365127

Please sign in to comment.