diff --git a/next.config.js b/next.config.js index c6441d18..93a742ad 100644 --- a/next.config.js +++ b/next.config.js @@ -1,7 +1,8 @@ /** @type {import('next').NextConfig} */ +const { queuePlatform } = require('./src/services/queue/utils'); const { i18n } = require('./next-i18next.config'); const { cronJob } = require('./src/services/crons'); -const { queueReceiver } = require('./src/services/queue'); +const { queueReceiver } = require('./src/services/queue/azure'); const { sendMessageToRabbitQueue, consumerMessagesRabbit } = require('./src/services/queue/rabbit'); const { PHASE_DEVELOPMENT_SERVER, PHASE_PRODUCTION_SERVER } = require('next/constants'); const isProduction = process.env.NEXT_PUBLIC_BUILD_ENV === 'production'; @@ -18,9 +19,18 @@ module.exports = async (phase, { defaultConfig }) => { // if (!isProduction) shouldRunQueue = false; if (process.env.NEXT_PUBLIC_SHORT_DOMAIN === 'true') shouldRunQueue = false; if (shouldRunQueue) { - // queueReceiver(); - sendMessageToRabbitQueue({ subject: 'health', body: 'Queue is starting...' }); - consumerMessagesRabbit(); + console.log('queuePlatform', queuePlatform); + switch (queuePlatform) { + case 'AZURE': + queueReceiver() + break; + case 'RABBIT': + default: + sendMessageToRabbitQueue({ subject: 'health', body: 'Queue is starting...' }); + consumerMessagesRabbit(); + break; + } + } // cronJob(); return nextConfig; diff --git a/src/controllers/forward.ts b/src/controllers/forward.ts index 1ab89561..5fe3d437 100644 --- a/src/controllers/forward.ts +++ b/src/controllers/forward.ts @@ -2,7 +2,7 @@ import { clone, isEmpty } from 'ramda'; import { redis } from '../redis'; import { shortenCacheService } from '../services/cache'; import { forwardCacheService } from '../services/cache/forward.service'; -import { sendMessageToRabbitQueue } from '../services/queue/rabbit'; +import { sendMessageToQueue } from '../services/queue'; import { shortenService } from '../services/shorten'; import { REDIS_KEY, getRedisKey } from '../types/constants'; import { Forward, ForwardMeta } from '../types/forward'; @@ -54,8 +54,7 @@ export const handler = api( // cache hit valid = shortenService.verifyToken(shortenedUrlCache, token); if (!valid) return res.send({ errorCode: HttpStatusCode.UNAUTHORIZED, errorMessage: 'UNAUTHORIZED' }); - sendMessageToRabbitQueue({ subject: 'forward', body: data }); - // sendMessageToQueue([{ subject: 'forward', body: data }]); + sendMessageToQueue({ subject: 'forward', body: data }); return successHandler(res, { history: shortenedUrlCache, token: encryptS(shortenedUrlCache.id.toString()) }); } // cache missed, fetch and write back to cache @@ -67,7 +66,7 @@ export const handler = api( valid = shortenService.verifyToken(history, token); if (!valid) return res.send({ errorCode: HttpStatusCode.UNAUTHORIZED, errorMessage: 'UNAUTHORIZED' }); - sendMessageToRabbitQueue({ subject: 'forward', body: data }); + sendMessageToQueue({ subject: 'forward', body: data }); shortenCacheService.postShortenHash(clone(history)); if (history?.email) history.email = ''; diff --git a/src/services/queue/azure/index.js b/src/services/queue/azure/index.js new file mode 100644 index 00000000..ff2ad6be --- /dev/null +++ b/src/services/queue/azure/index.js @@ -0,0 +1,69 @@ +const { postProcessForward } = require('../postProcessForward'); +const { ServiceBusClient } = require('@azure/service-bus'); +const { connectionString, queueName, logger } = require('../utils'); + +/** + * An array of objects representing message types. + * @typedef {Object} MessageType + * @property {string} subject - The subject of the message. + * @property {*} body - The body of the message, which can be of any type. + */ + +/** + * Process a single message. + * + * @param {MessageType} message - The message to be processed. + * @returns {Promise} A Promise that resolves when the processing is complete. + * + * @throws {Error} Throws an error if the message processing fails. + */ +async function myMessageHandler(message) { + try { + // console.log(`Processing message ${message.subject} with content: ${JSON.stringify(message.body)}`); + switch (message.subject) { + case 'forward': + await postProcessForward(message.body); + break; + default: + break; + } + } catch (error) { + logger.error(error); + throw error; + } +} + +let sbClient; +let receiver; + +async function main() { + console.log('Starting Queue Receiver'); + // create a Service Bus client using the connection string to the Service Bus namespace + sbClient = new ServiceBusClient(connectionString); + + // createReceiver() can also be used to create a receiver for a subscription. + receiver = sbClient.createReceiver(queueName); + + // function to handle any errors + const myErrorHandler = async (error) => { + console.log('Error Queue Handler', error); + }; + + // subscribe and specify the message and error handlers + receiver.subscribe({ + processMessage: myMessageHandler, + processError: myErrorHandler, + }); +} + +const queueReceiver = () => { + main().catch((err) => { + console.log('Error Queue Receiver: ', err); + }); + // .finally(async () => { + // await receiver.close(); + // await sbClient.close(); + // }); +}; + +module.exports = { queueReceiver }; diff --git a/src/services/queue/sendMessage.js b/src/services/queue/azure/sendMessageToAzureQueue.js similarity index 87% rename from src/services/queue/sendMessage.js rename to src/services/queue/azure/sendMessageToAzureQueue.js index 84b8c418..13e15dca 100644 --- a/src/services/queue/sendMessage.js +++ b/src/services/queue/azure/sendMessageToAzureQueue.js @@ -1,6 +1,5 @@ const { ServiceBusClient } = require('@azure/service-bus'); -const { connectionString, queueName, isTest, logger } = require('./utils'); -const { sendMessageToRabbitQueue } = require('./rabbit') +const { connectionString, queueName, isTest, isLocal, logger } = require('../utils'); /** * An array of objects representing message types. @@ -19,7 +18,7 @@ const { sendMessageToRabbitQueue } = require('./rabbit') * @param {MessageTypesArray} messages - An array of message types. * @returns {Promise} A Promise that resolves when the processing is complete. */ -async function sendMessageToQueue(messages) { +async function sendMessageToAzureQueue(messages) { if (isTest) return; try { // create a Service Bus client using the connection string to the Service Bus namespace @@ -55,7 +54,7 @@ async function sendMessageToQueue(messages) { } // Send the last created batch of messages to the queue - // console.log(`Sending a batch of messages to the queue: ${queueName}`); + if (isLocal) console.log(`Sending a batch of messages ${JSON.stringify(messages)} to the queue: ${queueName}`); await sender.sendMessages(batch); // Close the sender @@ -67,4 +66,4 @@ async function sendMessageToQueue(messages) { } } -module.exports = { sendMessageToQueue }; +module.exports = { sendMessageToAzureQueue }; diff --git a/src/services/queue/index.js b/src/services/queue/index.js index 0511e3d2..bdd9f4c9 100644 --- a/src/services/queue/index.js +++ b/src/services/queue/index.js @@ -1,69 +1,17 @@ -const { postProcessForward } = require('./postProcessForward'); -const { ServiceBusClient } = require('@azure/service-bus'); -const { connectionString, queueName, logger } = require('./utils'); - -/** - * An array of objects representing message types. - * @typedef {Object} MessageType - * @property {string} subject - The subject of the message. - * @property {*} body - The body of the message, which can be of any type. - */ - -/** - * Process a single message. - * - * @param {MessageType} message - The message to be processed. - * @returns {Promise} A Promise that resolves when the processing is complete. - * - * @throws {Error} Throws an error if the message processing fails. - */ -async function myMessageHandler(message) { - try { - // console.log(`Processing message ${message.subject} with content: ${JSON.stringify(message.body)}`); - switch (message.subject) { - case 'forward': - await postProcessForward(message.body); - break; - default: - break; +const { sendMessageToAzureQueue } = require("./azure/sendMessageToAzureQueue"); +const { sendMessageToRabbitQueue } = require("./rabbit"); +const { queuePlatform } = require("./utils"); + +const sendMessageToQueue = async (message) => { + switch (queuePlatform) { + case 'AZURE': + await sendMessageToAzureQueue([message]) + break; + case 'RABBIT': + default: + await sendMessageToRabbitQueue(message); + break; } - } catch (error) { - logger.error(error); - throw error; - } } -let sbClient; -let receiver; - -async function main() { - console.log('Starting Queue Receiver'); - // create a Service Bus client using the connection string to the Service Bus namespace - sbClient = new ServiceBusClient(connectionString); - - // createReceiver() can also be used to create a receiver for a subscription. - receiver = sbClient.createReceiver(queueName); - - // function to handle any errors - const myErrorHandler = async (error) => { - console.log('Error Queue Handler', error); - }; - - // subscribe and specify the message and error handlers - receiver.subscribe({ - processMessage: myMessageHandler, - processError: myErrorHandler, - }); -} - -const queueReceiver = () => { - main().catch((err) => { - console.log('Error Queue Receiver: ', err); - }); - // .finally(async () => { - // await receiver.close(); - // await sbClient.close(); - // }); -}; - -module.exports = { queueReceiver }; +module.exports = { sendMessageToQueue }; diff --git a/src/services/queue/utils.js b/src/services/queue/utils.js index 6e9ae0ff..ffef9ce8 100644 --- a/src/services/queue/utils.js +++ b/src/services/queue/utils.js @@ -9,6 +9,7 @@ const queueName = process.env.AZURE_BUS_QUEUE_NAME || ''; const isTest = process.env.NODE_ENV === 'test'; const isLocal = process.env.NEXT_PUBLIC_BUILD_ENV === 'local'; +const queuePlatform = process.env.QUEUE_PLATFORM; const logger = pino( { @@ -30,5 +31,6 @@ module.exports = { queueName, isTest, isLocal, + queuePlatform, logger, };