Skip to content

Commit 217c601

Browse files
committed
add pgboss client class
1 parent 84f2a99 commit 217c601

File tree

2 files changed

+38
-8
lines changed

2 files changed

+38
-8
lines changed

index.js

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const AppConfig = require("./src/app_config");
66
const ReportProcessingContext = require("./src/report_processing_context");
77
const Logger = require("./src/logger");
88
const Processor = require("./src/processor");
9+
const PgBossClient = require("./src/pg_boss_client");
910

1011
/**
1112
* Gets an array of JSON report objects from the application confing, then runs
@@ -127,7 +128,6 @@ async function runQueuePublish(options = {}) {
127128
connection: appConfig.postgres,
128129
});
129130
const queueClient = await _initQueueClient(knexInstance, appLogger);
130-
const queue = "analytics-reporter-job-queue";
131131

132132
for (const agency of agencies) {
133133
for (const reportConfig of reportConfigs) {
@@ -139,7 +139,7 @@ async function runQueuePublish(options = {}) {
139139
});
140140
try {
141141
let jobId = await queueClient.send(
142-
queue,
142+
appConfig.messageQueueName,
143143
_createQueueMessage(
144144
options,
145145
agency,
@@ -156,13 +156,17 @@ async function runQueuePublish(options = {}) {
156156
);
157157
if (jobId) {
158158
reportLogger.info(
159-
`Created job in queue: ${queue} with job ID: ${jobId}`,
159+
`Created job in queue: ${appConfig.messageQueueName} with job ID: ${jobId}`,
160160
);
161161
} else {
162-
reportLogger.info(`Found a duplicate job in queue: ${queue}`);
162+
reportLogger.info(
163+
`Found a duplicate job in queue: ${appConfig.messageQueueName}`,
164+
);
163165
}
164166
} catch (e) {
165-
reportLogger.error(`Error sending to queue: ${queue}`);
167+
reportLogger.error(
168+
`Error sending to queue: ${appConfig.messageQueueName}`,
169+
);
166170
reportLogger.error(util.inspect(e));
167171
}
168172
}
@@ -199,7 +203,7 @@ function _initAgencies(agencies_file) {
199203
async function _initQueueClient(knexInstance, logger) {
200204
let queueClient;
201205
try {
202-
queueClient = new PgBoss({ db: knexInstance.client.acquireConnection() });
206+
queueClient = new PgBoss({ db: new PgBossClient(knexInstance) });
203207
await queueClient.start();
204208
logger.debug("Starting queue client");
205209
} catch (e) {
@@ -242,7 +246,6 @@ async function runQueueConsume() {
242246
connection: appConfig.postgres,
243247
});
244248
const queueClient = await _initQueueClient(knexInstance, appLogger);
245-
const queue = "analytics-reporter-job-queue";
246249

247250
try {
248251
const context = new ReportProcessingContext(new AsyncLocalStorage());
@@ -253,7 +256,7 @@ async function runQueueConsume() {
253256
);
254257

255258
await queueClient.work(
256-
queue,
259+
appConfig.messageQueueName,
257260
{ newJobCheckIntervalSeconds: 1 },
258261
async (message) => {
259262
appLogger.info("Queue message received");

src/pg_boss_client.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Handles providing a database client for the Pg-Boss library
3+
*/
4+
class PgBossClient {
5+
#knex;
6+
7+
/**
8+
* @param {import('knex')} knexInstance an initialized instance of the knex
9+
* library which provides a database connection.
10+
*/
11+
constructor(knexInstance) {
12+
this.#knex = knexInstance;
13+
}
14+
15+
/**
16+
* Execute PgBoss SQL using the knex library interface
17+
*
18+
* @param {string} text the SQL string to execute.
19+
* @param {string[]} values the values to insert into the SQL string.
20+
* @returns {Promise} which resolves with the result of the SQL query.
21+
*/
22+
async executeSql(text, values) {
23+
return this.#knex.raw(text, values);
24+
}
25+
}
26+
27+
module.exports = PgBossClient;

0 commit comments

Comments
 (0)