Skip to content

Commit

Permalink
Merge branch 'release/sync'
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy Ho <jujaga@gmail.com>
  • Loading branch information
jujaga committed Sep 7, 2023
2 parents e6d661b + ed3494a commit b6ddb48
Show file tree
Hide file tree
Showing 48 changed files with 3,242 additions and 316 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#
# Build the application
#
FROM registry.access.redhat.com/ubi9/nodejs-18:1-59 as builder
FROM registry.access.redhat.com/ubi9/nodejs-18:1-62.1692771036 as builder

ENV NO_UPDATE_NOTIFIER=true

Expand All @@ -22,7 +22,7 @@ RUN npm ci --omit=dev
#
# Create the final container image
#
FROM registry.access.redhat.com/ubi9/nodejs-18-minimal:1-63
FROM registry.access.redhat.com/ubi9/nodejs-18-minimal:1-67

ENV APP_PORT=3000 \
NO_UPDATE_NOTIFIER=true
Expand Down
138 changes: 76 additions & 62 deletions app/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ const { ValidationError } = require('express-validation');
const { AuthMode, DEFAULTCORS } = require('./src/components/constants');
const log = require('./src/components/log')(module.filename);
const httpLogger = require('./src/components/log').httpLogger;
const QueueManager = require('./src/components/queueManager');
const { getAppAuthMode, getGitRevision } = require('./src/components/utils');
const DataConnection = require('./src/db/dataConnection');
const v1Router = require('./src/routes/v1');

const DataConnection = require('./src/db/dataConnection');
const dataConnection = new DataConnection();
const queueManager = new QueueManager();

const apiRouter = express.Router();
const state = {
Expand All @@ -23,7 +25,9 @@ const state = {
ready: false,
shutdown: false
};

let probeId;
let queueId;

const app = express();
const jsonParser = express.json({ limit: config.get('server.bodyLimit') });
Expand Down Expand Up @@ -138,63 +142,75 @@ app.use((req, res) => {
}).send(res);
});

// Prevent unhandled errors from crashing application
// Ensure unhandled errors gracefully shut down the application
process.on('unhandledRejection', err => {
if (err && err.stack) {
log.error(err);
}
log.error(`Unhandled Rejection: ${err.message ?? err}`, { function: 'onUnhandledRejection' });
fatalErrorHandler();
});
process.on('uncaughtException', err => {
log.error(`Unhandled Exception: ${err.message ?? err}`, { function: 'onUncaughtException' });
fatalErrorHandler();
});

// Graceful shutdown support
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
process.on('SIGUSR1', shutdown);
process.on('SIGUSR2', shutdown);
process.on('exit', () => {
log.info('Exiting...');
['SIGHUP', 'SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR1', 'SIGUSR2']
.forEach(signal => process.on(signal, shutdown));
process.on('exit', code => {
log.info(`Exiting with code ${code}`, { function: 'onExit' });
});

/**
* @function shutdown
* Shuts down this application after at least 3 seconds.
*/
function shutdown() {
log.info('Received kill signal. Shutting down...');
// Wait 3 seconds before starting cleanup
if (!state.shutdown) setTimeout(cleanup, 3000);
}

/**
* @function cleanup
* Cleans up connections in this application.
*/
function cleanup() {
log.info('Service no longer accepting traffic', { function: 'cleanup' });
state.shutdown = true;
log.info('Cleaning up', { function: 'cleanup' });
// Set 10 seconds max deadline before hard exiting
setTimeout(process.exit, 10000).unref(); // Prevents the timeout from registering on event loop

log.info('Cleaning up...', { function: 'cleanup' });
clearInterval(probeId);
clearInterval(queueId);
queueManager.close(dataConnection.close(process.exit));
}

dataConnection.close(() => process.exit());
/**
* @function checkConnections
* Checks Database connectivity
* This may force the application to exit if a connection fails
*/
function checkConnections() {
const wasReady = state.ready;
if (!state.shutdown) {
dataConnection.checkConnection().then(results => {
state.connections.data = results;
state.ready = Object.values(state.connections).every(x => x);
if (!wasReady && state.ready) log.info('Application ready to accept traffic', { function: 'checkConnections' });
if (wasReady && !state.ready) log.warn('Application not ready to accept traffic', { function: 'checkConnections' });
log.silly('App state', { function: 'checkConnections', state: state });
if (!state.ready) notReadyHandler();
});
}
}

// Wait 10 seconds max before hard exiting
setTimeout(() => process.exit(), 10000);
/**
* @function fatalErrorHandler
* Forces the application to shutdown
*/
function fatalErrorHandler() {
process.exitCode = 1;
shutdown();
}

/**
* @function initializeConnections
* Initializes the database connections
* This will force the application to exit if it fails
* This may force the application to exit if it fails
*/
function initializeConnections() {
// Initialize connections and exit if unsuccessful
const tasks = [
dataConnection.checkAll()
];

Promise.all(tasks)
dataConnection.checkAll()
.then(results => {
state.connections.data = results[0];
state.connections.data = results;

if (state.connections.data) {
log.info('DataConnection Reachable', { function: 'initializeConnections' });
Expand All @@ -203,43 +219,41 @@ function initializeConnections() {
.catch(error => {
log.error(`Initialization failed: Database OK = ${state.connections.data}`, { function: 'initializeConnections' });
log.error('Connection initialization failure', error.message, { function: 'initializeConnections' });
if (!state.ready) {
process.exitCode = 1;
shutdown();
}
if (!state.ready) notReadyHandler();
})
.finally(() => {
state.ready = Object.values(state.connections).every(x => x);
if (state.ready) {
log.info('Service ready to accept traffic', { function: 'initializeConnections' });
// Start periodic 10 second connection probe check
probeId = setInterval(checkConnections, 10000);
}
if (state.ready) log.info('Application ready to accept traffic', { function: 'initializeConnections' });

// Start periodic 10 second connection probes
probeId = setInterval(checkConnections, 10000);
queueId = setInterval(() => {
if (state.ready) queueManager.checkQueue();
}, 10000);
});
}

/**
* @function checkConnections
* Checks Database connectivity
* This will force the application to exit if a connection fails
* @function notReadyHandler
* Forces an application shutdown if `server.hardReset` is defined.
* Otherwise will flush and attempt to reset the connection pool.
*/
function checkConnections() {
const wasReady = state.ready;
if (!state.shutdown) {
const tasks = [
dataConnection.checkConnection()
];
function notReadyHandler() {
if (config.has('server.hardReset')) fatalErrorHandler();
else dataConnection.resetConnection();
}

Promise.all(tasks).then(results => {
state.connections.data = results[0];
state.ready = Object.values(state.connections).every(x => x);
if (!wasReady && state.ready) log.info('Service ready to accept traffic', { function: 'checkConnections' });
log.silly('App state', { function: 'checkConnections', state });
if (!state.ready) {
process.exitCode = 1;
shutdown();
}
});
/**
* @function shutdown
* Shuts down this application after at least 3 seconds.
*/
function shutdown() {
log.info('Shutting down', { function: 'shutdown' });
// Wait 3 seconds before starting cleanup
if (!state.shutdown) {
state.shutdown = true;
log.info('Application no longer accepting traffic', { function: 'shutdown' });
setTimeout(cleanup, 3000);
}
}

Expand Down
2 changes: 2 additions & 0 deletions app/config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
},
"server": {
"bodyLimit": "SERVER_BODYLIMIT",
"hardReset": "SERVER_HARDRESET",
"logFile": "SERVER_LOGFILE",
"logLevel": "SERVER_LOGLEVEL",
"maxRetries": "SERVER_MAXRETRIES",
"passphrase": "SERVER_PASSPHRASE",
"port": "SERVER_PORT",
"privacyMask": "SERVER_PRIVACY_MASK"
Expand Down
1 change: 1 addition & 0 deletions app/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"server": {
"bodyLimit": "30mb",
"logLevel": "http",
"maxRetries": "3",
"port": "3000"
}
}
1 change: 1 addition & 0 deletions app/knexfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ module.exports = {
// This shouldn't be here: https://github.com/knex/knex/issues/3455#issuecomment-535554401
// propagateCreateError: false
},
searchPath: ['public', 'queue'], // Define postgres schemas to match on
seeds: {
directory: __dirname + '/src/db/seeds'
}
Expand Down
117 changes: 117 additions & 0 deletions app/src/components/queueManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
const config = require('config');

const log = require('./log')(module.filename);
const { objectQueueService, syncService } = require('../services');

/**
* @class QueueManager
* A singleton wrapper for managing the queue worker thread
*/
class QueueManager {
/**
* @constructor
*/
constructor() {
if (!QueueManager._instance) {
this._isBusy = false;
this._toClose = false;
QueueManager._instance = this;
}

return QueueManager._instance;
}

/**
* @function isBusy
* Gets the isBusy state
*/
get isBusy() {
return this._isBusy;
}

/**
* @function toClose
* Gets the toClose state
*/
get toClose() {
return this._toClose;
}

/**
* @function checkQueue
* Checks the queue for any unprocessed jobs
*/
checkQueue() {
if (!this.isBusy && !this.toClose) {
objectQueueService.queueSize().then(size => {
if (size > 0) this.processNextJob();
}).catch((err) => {
log.error(`Error encountered while checking queue: ${err.message}`, { function: 'checkQueue', error: err });
});
}
}

/**
* @function close
* Spinlock until any remaining jobs are completed
* @param {function} [cb] Optional callback
*/
close(cb = undefined) {
this._toClose = true;
if (this.isBusy) setTimeout(this.close(cb), 250);
else {
log.info('No longer processing jobs', { function: 'close' });
if (cb) cb();
}
}

/**
* @function processNextJob
* Attempts to process the next job if available
* @param {object} message A message object
*/
async processNextJob() {
let job;

try {
const response = await objectQueueService.dequeue();

if (response.length) {
job = response[0];
this._isBusy = true;

log.verbose(`Started processing job id ${job.id}`, { function: 'processNextJob', job: job });

const objectId = await syncService.syncJob(job.path, job.bucketId, job.full, job.createdBy);

this._isBusy = false;
log.verbose(`Finished processing job id ${job.id}`, { function: 'processNextJob', job: job, objectId: objectId });

// If job is completed, check if there are more jobs
if (!this.toClose) this.checkQueue();
}
} catch (err) {
log.error(`Error encountered on job id ${job.id}: ${err.message}`, { function: 'processNextJob', job: job, error: err });

const maxRetries = parseInt(config.get('server.maxRetries'));
if (job.retries + 1 > maxRetries) {
log.warn(`Job has exceeded the ${maxRetries} maximum retries permitted`, { function: 'processNextJob', job: job, maxRetries: maxRetries });
} else {
objectQueueService.enqueue({
jobs: [{ bucketId: job.bucketId, path: job.path }],
full: job.full,
retries: job.retries + 1,
createdBy: job.createdBy
}).then(() => {
log.verbose('Job has been reenqueued', { function: 'processNextJob', job: job });
}).catch((e) => {
log.error(`Failed to reenqueue job id ${job.id}: ${e.message}`, { function: 'processNextJob', job: job });
});
}

this._isBusy = false;
}
}
}

module.exports = QueueManager;
19 changes: 18 additions & 1 deletion app/src/components/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,30 @@ const utils = {
}, []);
},

/**
* @function isAtPath
* Predicate function determining if the `path` is a member of the `prefix` path
* @param {string} prefix The base "folder"
* @param {string} path The "file" to check
* @returns {boolean} True if path is member of prefix. False in all other cases.
*/
isAtPath(prefix, path) {
if (typeof prefix !== 'string' || typeof path !== 'string') return false;
if (prefix === path) return true; // Matching strings are always at the at the path

const pathParts = path.split(DELIMITER).filter(part => part);
const prefixParts = prefix.split(DELIMITER).filter(part => part);
return prefixParts.every((part, i) => pathParts[i] === part)
&& pathParts.filter(part => !prefixParts.includes(part)).length === 1;
},

/**
* @function isTruthy
* Returns true if the element name in the object contains a truthy value
* @param {object} value The object to evaluate
* @returns {boolean} True if truthy, false if not, and undefined if undefined
*/
isTruthy: (value) => {
isTruthy(value) {
if (value === undefined) return value;

const isStr = typeof value === 'string' || value instanceof String;
Expand Down
Loading

0 comments on commit b6ddb48

Please sign in to comment.