Skip to content

Commit f8acbbf

Browse files
committed
[Feature] Add queue and queue message classes
1 parent 90b7603 commit f8acbbf

12 files changed

+851
-79
lines changed

index.js

Lines changed: 50 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
const { AsyncLocalStorage } = require("node:async_hooks");
22
const knex = require("knex");
3-
const PgBoss = require("pg-boss");
43
const util = require("util");
54
const AppConfig = require("./src/app_config");
65
const ReportProcessingContext = require("./src/report_processing_context");
76
const Logger = require("./src/logger");
87
const Processor = require("./src/processor");
9-
const PgBossKnexAdapter = require("./src/pg_boss_knex_adapter");
8+
const Queue = require("./src/queue/queue");
9+
const ReportJobQueueMessage = require("./src/queue/report_job_queue_message");
1010

1111
/**
1212
* Gets an array of JSON report objects from the application confing, then runs
@@ -37,12 +37,16 @@ async function run(options = {}) {
3737
const appConfig = new AppConfig(options);
3838
const context = new ReportProcessingContext(new AsyncLocalStorage());
3939
const reportConfigs = appConfig.filteredReportConfigurations;
40+
const knexInstance = appConfig.shouldWriteToDatabase
41+
? await knex(appConfig.knexConfig)
42+
: undefined;
4043
const processor = Processor.buildAnalyticsProcessor(
4144
appConfig,
4245
Logger.initialize({
4346
agencyName: appConfig.agencyLogName,
4447
scriptName: appConfig.scriptName,
4548
}),
49+
knexInstance,
4650
);
4751

4852
for (const reportConfig of reportConfigs) {
@@ -124,7 +128,11 @@ async function runQueuePublish(options = {}) {
124128
scriptName: appConfig.scriptName,
125129
});
126130
const knexInstance = await knex(appConfig.knexConfig);
127-
const queueClient = await _initQueueClient(knexInstance, appLogger);
131+
const queueClient = await _initQueueClient(
132+
knexInstance,
133+
appConfig.messageQueueName,
134+
appLogger,
135+
);
128136

129137
for (const agency of agencies) {
130138
for (const reportConfig of reportConfigs) {
@@ -134,47 +142,35 @@ async function runQueuePublish(options = {}) {
134142
scriptName: appConfig.scriptName,
135143
reportName: reportConfig.name,
136144
});
145+
let messageId;
137146
try {
138-
let jobId = await queueClient.send(
139-
appConfig.messageQueueName,
140-
_createQueueMessage(
141-
options,
142-
agency,
147+
messageId = await queueClient.sendMessage(
148+
new ReportJobQueueMessage({
149+
agencyName: agency.agencyName,
150+
analyticsReportIds: agency.analyticsReportIds,
151+
awsBucketPath: agency.awsBucketPath,
152+
reportOptions: options,
143153
reportConfig,
144-
appConfig.scriptName,
145-
),
146-
{
147-
priority: _messagePriority(reportConfig),
148-
retryLimit: 2,
149-
retryDelay: 10,
150-
retryBackoff: true,
151-
singletonKey: `${appConfig.scriptName}-${agency.agencyName}-${reportConfig.name}`,
152-
},
154+
scriptName: appConfig.scriptName,
155+
}),
153156
);
154-
if (jobId) {
157+
if (messageId) {
155158
reportLogger.info(
156-
`Created job in queue: ${appConfig.messageQueueName} with job ID: ${jobId}`,
159+
`Created message in queue: ${queueClient.name} with message ID: ${messageId}`,
157160
);
158161
} else {
159162
reportLogger.info(
160-
`Found a duplicate job in queue: ${appConfig.messageQueueName}`,
163+
`Found a duplicate message in queue: ${queueClient.name}`,
161164
);
162165
}
163166
} catch (e) {
164-
reportLogger.error(
165-
`Error sending to queue: ${appConfig.messageQueueName}`,
166-
);
167-
reportLogger.error(util.inspect(e));
167+
// Do nothing so that the remaining messages still process.
168168
}
169169
}
170170
}
171171

172172
try {
173173
await queueClient.stop();
174-
appLogger.debug(`Stopping queue client`);
175-
} catch (e) {
176-
appLogger.error("Error stopping queue client");
177-
appLogger.error(util.inspect(e));
178174
} finally {
179175
appLogger.debug(`Destroying database connection pool`);
180176
knexInstance.destroy();
@@ -198,49 +194,29 @@ function _initAgencies(agencies_file) {
198194
return Array.isArray(agencies) ? agencies : legacyAgencies;
199195
}
200196

201-
async function _initQueueClient(knexInstance, logger) {
202-
let queueClient;
203-
try {
204-
queueClient = new PgBoss({ db: new PgBossKnexAdapter(knexInstance) });
205-
await queueClient.start();
206-
logger.debug("Starting queue client");
207-
} catch (e) {
208-
logger.error("Error starting queue client");
209-
logger.error(util.inspect(e));
210-
}
211-
197+
async function _initQueueClient(knexInstance, queueName, logger) {
198+
const queueClient = Queue.buildQueue({
199+
knexInstance,
200+
queueName,
201+
messageClass: ReportJobQueueMessage,
202+
logger,
203+
});
204+
await queueClient.start();
212205
return queueClient;
213206
}
214207

215-
function _createQueueMessage(options, agency, reportConfig, scriptName) {
216-
return {
217-
...agency,
218-
options,
219-
reportConfig,
220-
scriptName,
221-
};
222-
}
223-
224-
function _messagePriority(reportConfig) {
225-
if (!reportConfig.frequency) {
226-
return 0;
227-
} else if (reportConfig.frequency == "daily") {
228-
return 1;
229-
} else if (reportConfig.frequency == "hourly") {
230-
return 2;
231-
} else if (reportConfig.frequency == "realtime") {
232-
return 3;
233-
}
234-
}
235-
236208
/**
237209
* @returns {Promise} when the process ends
238210
*/
239211
async function runQueueConsume() {
240212
const appConfig = new AppConfig();
241213
const appLogger = Logger.initialize();
242214
const knexInstance = await knex(appConfig.knexConfig);
243-
const queueClient = await _initQueueClient(knexInstance, appLogger);
215+
const queueClient = await _initQueueClient(
216+
knexInstance,
217+
appConfig.messageQueueName,
218+
appLogger,
219+
);
244220

245221
try {
246222
const context = new ReportProcessingContext(new AsyncLocalStorage());
@@ -250,24 +226,19 @@ async function runQueueConsume() {
250226
knexInstance,
251227
);
252228

253-
await queueClient.work(
254-
appConfig.messageQueueName,
255-
{ newJobCheckIntervalSeconds: 1 },
256-
async (message) => {
257-
appLogger.info("Queue message received");
258-
process.env.AGENCY_NAME = message.data.agencyName;
259-
process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds;
260-
process.env.AWS_BUCKET_PATH = message.data.awsBucketPath;
261-
process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName;
262-
263-
await _processReport(
264-
new AppConfig(message.data.options),
265-
context,
266-
message.data.reportConfig,
267-
processor,
268-
);
269-
},
270-
);
229+
await queueClient.poll(async (message) => {
230+
process.env.AGENCY_NAME = message.agencyName;
231+
process.env.ANALYTICS_REPORT_IDS = message.analyticsReportIds;
232+
process.env.AWS_BUCKET_PATH = message.awsBucketPath;
233+
process.env.ANALYTICS_SCRIPT_NAME = message.scriptName;
234+
235+
await _processReport(
236+
new AppConfig(message.options),
237+
context,
238+
message.reportConfig,
239+
processor,
240+
);
241+
});
271242
} catch (e) {
272243
appLogger.error("Error polling queue for messages");
273244
appLogger.error(util.inspect(e));

package-lock.json

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
"@cucumber/cucumber": "^10.3.1",
8787
"@eslint/js": "^8.57.0",
8888
"chai": "^4.4.0",
89+
"chai-as-promised": "^8.0.1",
8990
"dotenv": "^16.4.5",
9091
"dotenv-cli": "^7.4.3",
9192
"eslint": "^8.56.0",
File renamed without changes.

src/queue/queue.js

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
const PgBoss = require("pg-boss");
2+
const PgBossKnexAdapter = require("./pg_boss_knex_adapter");
3+
const util = require("util");
4+
5+
/**
6+
* Implements a message queue using the PgBoss library.
7+
*/
8+
class Queue {
9+
#queueClient;
10+
#queueName;
11+
#messageClass;
12+
#logger;
13+
14+
/**
15+
* @param {object} params the parameter object
16+
* @param {import('pg-boss')} params.queueClient the queue client instance to
17+
* use for queue operations.
18+
* @param {string} params.queueName the identifier for the queue.
19+
* @param {*} params.messageClass a class which implements the fromMessage
20+
* static method to return an instance of the class from a PgBoss message
21+
* object. This can be omitted if the queue instance only sends messages.
22+
* @param {import('winston').Logger} params.logger an application logger instance.
23+
*/
24+
constructor({ queueClient, queueName, messageClass, logger }) {
25+
this.#queueClient = queueClient;
26+
this.#queueName = queueName;
27+
this.#messageClass = messageClass;
28+
this.#logger = logger;
29+
}
30+
31+
/**
32+
* @returns {string} the queue name
33+
*/
34+
get name() {
35+
return this.#queueName;
36+
}
37+
38+
/**
39+
* @returns {Promise} resolves when the PgBoss queue client has started
40+
*/
41+
async start() {
42+
try {
43+
await this.#queueClient.start();
44+
this.#logger.debug("Starting queue client");
45+
} catch (e) {
46+
this.#logger.error("Error starting queue client");
47+
this.#logger.error(util.inspect(e));
48+
throw e;
49+
}
50+
}
51+
52+
/**
53+
* @returns {Promise} resolves when the PgBoss queue client has stopped
54+
*/
55+
async stop() {
56+
try {
57+
await this.#queueClient.stop();
58+
this.#logger.debug(`Stopping queue client`);
59+
} catch (e) {
60+
this.#logger.error("Error stopping queue client");
61+
this.#logger.error(util.inspect(e));
62+
throw e;
63+
}
64+
}
65+
66+
/**
67+
* @param {import('./queue_message')} queueMessage a QueueMessage instance
68+
* @returns {string} a message ID or null if a duplicate message exists on the
69+
* queue.
70+
*/
71+
async sendMessage(queueMessage) {
72+
try {
73+
const messageId = await this.#queueClient.send(
74+
this.#queueName,
75+
queueMessage.toJSON(),
76+
queueMessage.sendOptions(),
77+
);
78+
return messageId;
79+
} catch (e) {
80+
this.#logger.error(`Error sending to queue: ${this.#queueName}`);
81+
this.#logger.error(util.inspect(e));
82+
throw e;
83+
}
84+
}
85+
86+
/**
87+
* @param {Function} callback the function to call for each message
88+
* @param {object} options the options to pass to the PgBoss work function
89+
* @returns {Promise} resolves when the queue poller process stops
90+
*/
91+
poll(callback, options = { newJobCheckIntervalSeconds: 1 }) {
92+
return this.#queueClient.work(this.#queueName, options, async (message) => {
93+
this.#logger.info("Queue message received");
94+
await callback(this.#messageClass.fromMessage(message).toJSON());
95+
});
96+
}
97+
98+
/**
99+
* @param {object} params the parameter object
100+
* @param {import('knex')} params.knexInstance an initialized instance of the knex
101+
* library which provides a database connection.
102+
* @param {string} params.queueName the name of the queue to use for the
103+
* client.
104+
* @param {*} params.messageClass a class which implements the fromMessage
105+
* static method to return an instance of the class from a PgBoss message
106+
* object. This can be omitted if the queue instance only sends messages.
107+
* @param {import('winston').Logger} params.logger an application logger instance.
108+
* @returns {Queue} the queue instance configured with the PgBoss queue
109+
* client.
110+
*/
111+
static buildQueue({ knexInstance, queueName, messageClass, logger }) {
112+
return new Queue({
113+
queueClient: new PgBoss({ db: new PgBossKnexAdapter(knexInstance) }),
114+
queueName,
115+
messageClass,
116+
logger,
117+
});
118+
}
119+
}
120+
121+
module.exports = Queue;

0 commit comments

Comments
 (0)