diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e647089b..410498a1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,7 +81,6 @@ jobs: CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }} CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_DEV }} DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_DEV }} - MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }} MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }} NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_DEV }} PROXY_FQDN: ${{ vars.PROXY_FQDN_DEV }} @@ -111,7 +110,6 @@ jobs: CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }} CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_STG }} DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_STG }} - MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }} MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }} NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_STG }} PROXY_FQDN: ${{ vars.PROXY_FQDN_STG }} @@ -141,7 +139,6 @@ jobs: CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }} CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_PRD }} DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_PRD }} - MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }} MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }} NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_PRD }} PROXY_FQDN: ${{ vars.PROXY_FQDN_PRD }} diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index d9d5d0b3..fe03029c 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -27,9 +27,6 @@ on: DB_SERVICE_NAME: required: true type: string - MESSAGE_QUEUE_DATABASE_NAME: - required: true - type: string MESSAGE_QUEUE_NAME: required: true type: string @@ -72,7 +69,6 @@ env: CF_SPACE_NAME: ${{ inputs.CF_SPACE_NAME }} DB_SERVICE_NAME: ${{ inputs.DB_SERVICE_NAME }} GA4_CREDS: ${{ secrets.GA4_CREDS }} - MESSAGE_QUEUE_DATABASE_NAME: ${{ inputs.MESSAGE_QUEUE_DATABASE_NAME }} MESSAGE_QUEUE_NAME: ${{ inputs.MESSAGE_QUEUE_NAME }} NEW_RELIC_APP_NAME: ${{ inputs.NEW_RELIC_APP_NAME }} NEW_RELIC_LICENSE_KEY: ${{ secrets.NEW_RELIC_LICENSE_KEY }} diff --git a/.github/workflows/manual_deploy_to_dev.yml b/.github/workflows/manual_deploy_to_dev.yml index b86b7205..04647fc9 100644 --- a/.github/workflows/manual_deploy_to_dev.yml +++ b/.github/workflows/manual_deploy_to_dev.yml @@ -15,7 +15,6 @@ jobs: CF_ORGANIZATION_NAME: ${{ vars.CF_ORGANIZATION_NAME }} CF_SPACE_NAME: ${{ vars.CF_SPACE_NAME_DEV }} DB_SERVICE_NAME: ${{ vars.DB_SERVICE_NAME_DEV }} - MESSAGE_QUEUE_DATABASE_NAME: ${{ vars.MESSAGE_QUEUE_DATABASE_NAME }} MESSAGE_QUEUE_NAME: ${{ vars.MESSAGE_QUEUE_NAME }} NEW_RELIC_APP_NAME: ${{ vars.NEW_RELIC_APP_NAME_DEV }} PROXY_FQDN: ${{ vars.PROXY_FQDN_DEV }} diff --git a/deploy/agencies.json b/deploy/agencies.json index 3b32e764..2674c8a3 100644 --- a/deploy/agencies.json +++ b/deploy/agencies.json @@ -24,11 +24,21 @@ "agencyName": "agriculture", "awsBucketPath": "data/agriculture" }, + { + "analyticsReportIds": "470235781", + "agencyName": "air-force", + "awsBucketPath": "data/air-force" + }, { "analyticsReportIds": "395213963", "agencyName": "american-battle-monuments-commission", "awsBucketPath": "data/american-battle-monuments-commission" }, + { + "analyticsReportIds": "470089273", + "agencyName": "army", + "awsBucketPath": "data/army" + }, { "analyticsReportIds": "395253935", "agencyName": "commerce", @@ -264,6 +274,11 @@ "agencyName": "national-labor-relations-board", "awsBucketPath": "data/national-labor-relations-board" }, + { + "analyticsReportIds": "470944610", + "agencyName": "national-library-medicine", + "awsBucketPath": "data/national-library-medicine" + }, { "analyticsReportIds": "425930242", "agencyName": "national-mediation-board", @@ -294,6 +309,11 @@ "agencyName": "national-transportation-safety-board", "awsBucketPath": "data/national-transportation-safety-board" }, + { + "analyticsReportIds": "470101393", + "agencyName": "navy", + "awsBucketPath": "data/navy" + }, { "analyticsReportIds": "395460734", "agencyName": "nuclear-regulatory-commission", diff --git a/deploy/api.sh b/deploy/api.sh index 4d0ce8d0..b107cd4c 100755 --- a/deploy/api.sh +++ b/deploy/api.sh @@ -3,4 +3,4 @@ export ANALYTICS_REPORTS_PATH=reports/api.json export ANALYTICS_SCRIPT_NAME=api.sh -$ANALYTICS_ROOT_PATH/bin/analytics-publisher --debug --write-to-database --output /tmp --agenciesFile=$ANALYTICS_ROOT_PATH/deploy/agencies.json +$ANALYTICS_ROOT_PATH/bin/analytics-publisher --debug --write-to-database --agenciesFile=$ANALYTICS_ROOT_PATH/deploy/agencies.json diff --git a/deploy/cron.js b/deploy/cron.js index 6fcafc4c..36e5b758 100644 --- a/deploy/cron.js +++ b/deploy/cron.js @@ -61,18 +61,18 @@ const daily_run = () => { runScriptWithLogName(`${scriptRootPath}/daily.sh`, "daily.sh"); }; -const hourly_run = () => { +/*const hourly_run = () => { runScriptWithLogName(`${scriptRootPath}/hourly.sh`, "hourly.sh"); -}; +};*/ const realtime_run = () => { runScriptWithLogName(`${scriptRootPath}/realtime.sh`, "realtime.sh"); }; /** - Daily reports run every morning at 10 AM UTC. - This calculates the offset between now and then for the next scheduled run. -*/ + * Daily and API reports run every morning at 10 AM UTC. + * This calculates the offset between now and then for the next scheduled run. + */ const calculateNextDailyRunTimeOffset = () => { const currentTime = new Date(); const nextRunTime = new Date( @@ -85,26 +85,36 @@ const calculateNextDailyRunTimeOffset = () => { }; /** - * All scripts run immediately upon application start (with a 10 second delay + * All scripts run immediately upon application start (with a 60 second delay * between each so that they don't all run at once), then run again at intervals * going forward. */ setTimeout(realtime_run, 1000 * 10); -setTimeout(hourly_run, 1000 * 20); -setTimeout(daily_run, 1000 * 30); -setTimeout(api_run, 1000 * 40); +// setTimeout(hourly_run, 1000 * 70); No hourly reports exist at this time. +setTimeout(daily_run, 1000 * 70); +setTimeout(api_run, 1000 * 130); -// daily +// Daily and API recurring script run setup. // Runs at 10 AM UTC, then every 24 hours afterwards setTimeout(() => { - daily_run(); - setInterval(daily_run, 1000 * 60 * 60 * 24); - // API - api_run(); - setInterval(api_run, 1000 * 60 * 60 * 24); + // Offset the daily script run by 30 seconds so that it never runs in parallel + // with the realtime script in order to save memory/CPU. + setTimeout(() => { + daily_run(); + setInterval(daily_run, 1000 * 60 * 60 * 24); + }, 1000 * 30); + + // setTimeout(hourly_run, 1000 * 60); + + // Offset the API script run by 90 seconds so that it never runs in parallel + // with the daily or realtime scripts in order to save memory/CPU. + setTimeout(() => { + api_run(); + setInterval(api_run, 1000 * 60 * 60 * 24); + }, 1000 * 90); }, calculateNextDailyRunTimeOffset()); -// hourly -setInterval(hourly_run, 1000 * 60 * 60); -// realtime. Runs every 15 minutes. -// Google updates realtime reports every 30 minutes, so there is some overlap. +// hourly (no hourly reports exist at this time). +// setInterval(hourly_run, 1000 * 60 * 60); + +// Realtime recurring script run setup. Runs every 15 minutes. setInterval(realtime_run, 1000 * 60 * 15); diff --git a/index.js b/index.js index 81f95d14..9a68b7d4 100644 --- a/index.js +++ b/index.js @@ -1,10 +1,12 @@ const { AsyncLocalStorage } = require("node:async_hooks"); +const knex = require("knex"); const PgBoss = require("pg-boss"); const util = require("util"); const AppConfig = require("./src/app_config"); const ReportProcessingContext = require("./src/report_processing_context"); const Logger = require("./src/logger"); const Processor = require("./src/processor"); +const PgBossKnexAdapter = require("./src/pg_boss_knex_adapter"); /** * Gets an array of JSON report objects from the application confing, then runs @@ -80,7 +82,7 @@ async function _processReport(appConfig, context, reportConfig, processor) { await processor.processChain(context); logger.info("Processing complete"); } catch (e) { - logger.error("Encountered an error"); + logger.error("Encountered an error during report processing"); logger.error(util.inspect(e)); } }); @@ -121,8 +123,8 @@ async function runQueuePublish(options = {}) { agencyName: appConfig.agencyLogName, scriptName: appConfig.scriptName, }); - const queueClient = await _initQueueClient(appConfig, appLogger); - const queue = "analytics-reporter-job-queue"; + const knexInstance = await knex(appConfig.knexConfig); + const queueClient = await _initQueueClient(knexInstance, appLogger); for (const agency of agencies) { for (const reportConfig of reportConfigs) { @@ -134,7 +136,7 @@ async function runQueuePublish(options = {}) { }); try { let jobId = await queueClient.send( - queue, + appConfig.messageQueueName, _createQueueMessage( options, agency, @@ -151,13 +153,17 @@ async function runQueuePublish(options = {}) { ); if (jobId) { reportLogger.info( - `Created job in queue: ${queue} with job ID: ${jobId}`, + `Created job in queue: ${appConfig.messageQueueName} with job ID: ${jobId}`, ); } else { - reportLogger.info(`Found a duplicate job in queue: ${queue}`); + reportLogger.info( + `Found a duplicate job in queue: ${appConfig.messageQueueName}`, + ); } } catch (e) { - reportLogger.error(`Error sending to queue: ${queue}`); + reportLogger.error( + `Error sending to queue: ${appConfig.messageQueueName}`, + ); reportLogger.error(util.inspect(e)); } } @@ -169,6 +175,9 @@ async function runQueuePublish(options = {}) { } catch (e) { appLogger.error("Error stopping queue client"); appLogger.error(util.inspect(e)); + } finally { + appLogger.debug(`Destroying database connection pool`); + knexInstance.destroy(); } } @@ -189,10 +198,10 @@ function _initAgencies(agencies_file) { return Array.isArray(agencies) ? agencies : legacyAgencies; } -async function _initQueueClient(appConfig, logger) { +async function _initQueueClient(knexInstance, logger) { let queueClient; try { - queueClient = new PgBoss(appConfig.messageQueueDatabaseConnection); + queueClient = new PgBoss({ db: new PgBossKnexAdapter(knexInstance) }); await queueClient.start(); logger.debug("Starting queue client"); } catch (e) { @@ -230,15 +239,19 @@ function _messagePriority(reportConfig) { async function runQueueConsume() { const appConfig = new AppConfig(); const appLogger = Logger.initialize(); - const queueClient = await _initQueueClient(appConfig, appLogger); - const queue = "analytics-reporter-job-queue"; + const knexInstance = await knex(appConfig.knexConfig); + const queueClient = await _initQueueClient(knexInstance, appLogger); try { const context = new ReportProcessingContext(new AsyncLocalStorage()); - const processor = Processor.buildAnalyticsProcessor(appConfig, appLogger); + const processor = Processor.buildAnalyticsProcessor( + appConfig, + appLogger, + knexInstance, + ); await queueClient.work( - queue, + appConfig.messageQueueName, { newJobCheckIntervalSeconds: 1 }, async (message) => { appLogger.info("Queue message received"); diff --git a/knexfile.js b/knexfile.js index 7876ff44..fcb173e6 100644 --- a/knexfile.js +++ b/knexfile.js @@ -8,6 +8,10 @@ module.exports = { password: process.env.POSTGRES_PASSWORD || "123abc", port: 5432, }, + pool: { + min: 2, + max: 10, + }, }, test: { client: "postgresql", @@ -18,6 +22,10 @@ module.exports = { password: process.env.POSTGRES_PASSWORD || "123abc", port: 5432, }, + pool: { + min: 2, + max: 10, + }, migrations: { tableName: "knex_migrations", }, @@ -31,5 +39,9 @@ module.exports = { password: process.env.POSTGRES_PASSWORD, ssl: true, }, + pool: { + min: 2, + max: 10, + }, }, }; diff --git a/manifest.consumer.yml b/manifest.consumer.yml index 16ad4b8b..40900320 100644 --- a/manifest.consumer.yml +++ b/manifest.consumer.yml @@ -21,7 +21,6 @@ applications: ANALYTICS_REPORT_EMAIL: ${ANALYTICS_REPORT_EMAIL} AWS_CACHE_TIME: '0' GOOGLE_APPLICATION_CREDENTIALS: /home/vcap/app/${ANALYTICS_KEY_FILE_NAME} - MESSAGE_QUEUE_DATABASE_NAME: ${MESSAGE_QUEUE_DATABASE_NAME} MESSAGE_QUEUE_NAME: ${MESSAGE_QUEUE_NAME} NEW_RELIC_APP_NAME: ${NEW_RELIC_APP_NAME} NEW_RELIC_LICENSE_KEY: ${NEW_RELIC_LICENSE_KEY} diff --git a/manifest.publisher.yml b/manifest.publisher.yml index 12a2ed8d..f916a61a 100644 --- a/manifest.publisher.yml +++ b/manifest.publisher.yml @@ -21,7 +21,6 @@ applications: # The default path for reports (used for gov-wide reports) AWS_BUCKET_PATH: data/live AWS_CACHE_TIME: '0' - MESSAGE_QUEUE_DATABASE_NAME: ${MESSAGE_QUEUE_DATABASE_NAME} MESSAGE_QUEUE_NAME: ${MESSAGE_QUEUE_NAME} NEW_RELIC_APP_NAME: ${NEW_RELIC_APP_NAME} NEW_RELIC_LICENSE_KEY: ${NEW_RELIC_LICENSE_KEY} diff --git a/package-lock.json b/package-lock.json index 6ce0f55a..5131d18b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "CC0-1.0", "dependencies": { "@aws-sdk/client-s3": "^3.504.0", - "@google-analytics/data": "^4.7.0", + "@google-analytics/data": "^4.12.0", "@smithy/node-http-handler": "^3.0.0", "@snyk/protect": "^1.1269.0", "fast-csv": "^4.3.6", @@ -1642,9 +1642,10 @@ } }, "node_modules/@google-analytics/data": { - "version": "4.7.0", - "resolved": "https://registry.npmjs.org/@google-analytics/data/-/data-4.7.0.tgz", - "integrity": "sha512-ObTEfzGlih2+7UV282tjSp7UbXhG5viKpUmUvRRDtm5Be6xqBqYVR3rTjekmMHrXKxIDDjwI0WVOmvF+KlLE1g==", + "version": "4.12.0", + "resolved": "https://registry.npmjs.org/@google-analytics/data/-/data-4.12.0.tgz", + "integrity": "sha512-2s14EPCkmIANj5Jm7+VWrg4Nqkuq6kKo6UH+0s+UGd4yCvKr8BdLBW0llws04wWQOSqzLL1nv0tu179YxjaKFQ==", + "license": "Apache-2.0", "dependencies": { "google-gax": "^4.0.3" }, @@ -3840,10 +3841,11 @@ } }, "node_modules/cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "devOptional": true, + "license": "MIT", "dependencies": { "path-key": "^3.1.0", "shebang-command": "^2.0.0", @@ -5086,9 +5088,10 @@ } }, "node_modules/google-gax": { - "version": "4.3.8", - "resolved": "https://registry.npmjs.org/google-gax/-/google-gax-4.3.8.tgz", - "integrity": "sha512-SKAQKtvdjtNW3PMOhmKEqpQP+2C5ZqNKfwWxy70efpSwxvRYuAcgMJs6aRHTBPJjz3SO6ZbiXwM6WIuGYFZ7LQ==", + "version": "4.4.1", + "resolved": "https://registry.npmjs.org/google-gax/-/google-gax-4.4.1.tgz", + "integrity": "sha512-Phyp9fMfA00J3sZbJxbbB4jC55b7DBjE3F6poyL3wKMEBVKA79q6BGuHcTiM28yOzVql0NDbRL8MLLh8Iwk9Dg==", + "license": "Apache-2.0", "dependencies": { "@grpc/grpc-js": "^1.10.9", "@grpc/proto-loader": "^0.7.13", @@ -5096,7 +5099,7 @@ "abort-controller": "^3.0.0", "duplexify": "^4.0.0", "google-auth-library": "^9.3.0", - "node-fetch": "^2.6.1", + "node-fetch": "^2.7.0", "object-hash": "^3.0.0", "proto3-json-serializer": "^2.0.2", "protobufjs": "^7.3.2", diff --git a/package.json b/package.json index aad761ff..270a8bcf 100644 --- a/package.json +++ b/package.json @@ -70,7 +70,7 @@ }, "dependencies": { "@aws-sdk/client-s3": "^3.504.0", - "@google-analytics/data": "^4.7.0", + "@google-analytics/data": "^4.12.0", "@smithy/node-http-handler": "^3.0.0", "@snyk/protect": "^1.1269.0", "fast-csv": "^4.3.6", diff --git a/src/actions/format_processed_analytics_data.js b/src/actions/format_processed_analytics_data.js index 933f9c5c..8df8f38a 100644 --- a/src/actions/format_processed_analytics_data.js +++ b/src/actions/format_processed_analytics_data.js @@ -17,7 +17,7 @@ class FormatProcessedAnalyticsData extends Action { */ async executeStrategy(context) { context.logger.debug("Formatting analytics data"); - const formattedAnalyticsData = {}; + let formattedAnalyticsData = {}; for (const format of context.appConfig.formats) { formattedAnalyticsData[format] = await ResultFormatter.formatResult( context.processedAnalyticsData, @@ -27,7 +27,9 @@ class FormatProcessedAnalyticsData extends Action { }, ); } + context.processedAnalyticsData = undefined; context.formattedAnalyticsData = formattedAnalyticsData; + formattedAnalyticsData = undefined; } } diff --git a/src/app_config.js b/src/app_config.js index baca770e..fdfb0ca6 100644 --- a/src/app_config.js +++ b/src/app_config.js @@ -35,14 +35,21 @@ class AppConfig { return this.#options.csv ? "csv" : "json"; } + /** + * Array order here is important because the CSV formatter maps headers in + * place on the analytics report object and we don't want that mapping done on + * the JSON version. + * + * @returns {string[]} the formats to use for report formatting. + */ get formats() { const formats = []; - if (this.#options.csv) { - formats.push("csv"); - } if (this.#options.json) { formats.push("json"); } + if (this.#options.csv) { + formats.push("csv"); + } return formats; } @@ -194,18 +201,12 @@ class AppConfig { }; } - get messageQueueDatabaseConnection() { - const connection = - knexfile[process.env.NODE_ENV || "development"].connection; - return `postgres://${connection.user}:${connection.password}@${connection.host}/${process.env.MESSAGE_QUEUE_DATABASE_NAME}${process.env.NODE_ENV == "production" ? "?ssl=true" : ""}`; - } - get messageQueueName() { - return process.env.MESSAGE_QUEUE_NAME; + return process.env.MESSAGE_QUEUE_NAME || "analytics_reporter_job_queue"; } - get postgres() { - return knexfile[process.env.NODE_ENV || "development"].connection; + get knexConfig() { + return knexfile[process.env.NODE_ENV || "development"]; } get static() { diff --git a/src/pg_boss_knex_adapter.js b/src/pg_boss_knex_adapter.js new file mode 100644 index 00000000..2c55130e --- /dev/null +++ b/src/pg_boss_knex_adapter.js @@ -0,0 +1,39 @@ +/** + * Handles providing a database client for the Pg-Boss library using knex. + */ +class PgBossKnexAdapter { + #knex; + + /** + * @param {import('knex')} knexInstance an initialized instance of the knex + * library which provides a database connection. + */ + constructor(knexInstance) { + this.#knex = knexInstance; + } + + /** + * Execute PgBoss SQL using the knex library interface + * + * @param {string} sql the SQL string to execute. + * @param {string[]} parameters the parameters to insert into the SQL string. + * @returns {Promise} which resolves with the result of the SQL query. + */ + executeSql(sql, parameters = []) { + // This is needed to replace pg-boss' $1, $2 arguments + // into knex's :val, :val2 style. + const replacedSql = sql.replace( + /\$(\d+)\b/g, + (_, number) => `:param_${number}`, + ); + + const parametersObject = {}; + parameters.forEach( + (value, index) => (parametersObject[`param_${index + 1}`] = value), + ); + + return this.#knex.raw(replacedSql, parametersObject); + } +} + +module.exports = PgBossKnexAdapter; diff --git a/src/process_results/result_formatter.js b/src/process_results/result_formatter.js index 2d57de72..782c6830 100644 --- a/src/process_results/result_formatter.js +++ b/src/process_results/result_formatter.js @@ -1,6 +1,15 @@ const csv = require("fast-csv"); /** + * Formats a processed Google Analytics report to a JSON string or CSV. + * + * NOTE: either option can modify the original object passed to this function. + * This is necessary because for large datasets, making a copy of the object + * can use an excessive amount of memory. + * + * JSON format with a slim option will delete the processed data's "data" field. + * CSV format will map headers to readable names for some columns. + * * @param {object} result an analytics object to be formatted. * @param {object} config optional configuration for the formatter. * @param {string} config.format the format to output can be "json" or "csv" @@ -33,12 +42,11 @@ const _formatJSON = (result, { slim }) => { }; const _formatCSV = (result) => { - const mappedData = _mapCSVHeaders(result.data); - return csv.writeToString(mappedData, { headers: true }); + return csv.writeToString(_mapCSVHeaders(result.data), { headers: true }); }; function _mapCSVHeaders(dataArray) { - return dataArray.map((dataItem) => { + dataArray.forEach((dataItem, index) => { const newDataItem = {}; Object.keys(dataItem).forEach((key) => { if (_keyMappings[key]) { @@ -48,8 +56,9 @@ function _mapCSVHeaders(dataArray) { } }); - return newDataItem; + dataArray[index] = newDataItem; }); + return dataArray; } const _keyMappings = { diff --git a/src/processor.js b/src/processor.js index c021535e..bd03bfab 100644 --- a/src/processor.js +++ b/src/processor.js @@ -45,12 +45,15 @@ class Processor { * referenced here are an implementation of the Chain of Responsibility design * pattern. * - * @param {import('../src/app_config')} appConfig an application config instance. + * @param {import('../src/app_config')} appConfig an application config + * instance. * @param {import('winston').Logger} logger an application logger instance. + * @param {import('knex')} knexInstance an initialized instance of the knex + * library for database operations. * @returns {Processor} an initialized processor instance with a chain of * analytics processing actions. */ - static buildAnalyticsProcessor(appConfig, logger) { + static buildAnalyticsProcessor(appConfig, logger, knexInstance) { return new Processor([ new QueryGoogleAnalytics( new GoogleAnalyticsService( @@ -60,8 +63,8 @@ class Processor { ), ), new ProcessGoogleAnalyticsResults(new AnalyticsDataProcessor()), + new WriteAnalyticsDataToDatabase(new PostgresPublisher(knexInstance)), new FormatProcessedAnalyticsData(), - new WriteAnalyticsDataToDatabase(new PostgresPublisher(appConfig)), new PublishAnalyticsDataToS3(new S3Service(appConfig)), new PublishAnalyticsDataToDisk(), new LogAnalyticsData(), diff --git a/src/publish/postgres.js b/src/publish/postgres.js index ebc18444..44e709e1 100644 --- a/src/publish/postgres.js +++ b/src/publish/postgres.js @@ -1,22 +1,20 @@ -const knex = require("knex"); - Promise.each = async function (arr, fn) { for (const item of arr) await fn(item); }; /** - * Handles connection to the Postgres database and read/write operations. + * Handles read/write operations to the Postgres database for analytics reports. */ class PostgresPublisher { static ANALYTICS_DATA_TABLE_NAME = "analytics_data_ga4"; - #connectionConfig; + #knex; /** - * @param {import('../app_config')} appConfig application config instance. Provides the - * configuration to create a database connection. + * @param {import('knex')} knexInstance an initialized instance of the knex + * library which provides a database connection. */ - constructor(appConfig) { - this.#connectionConfig = appConfig.postgres; + constructor(knexInstance) { + this.#knex = knexInstance; } /** @@ -28,47 +26,37 @@ class PostgresPublisher { */ async publish(results) { if (results.query.dimensions.some((obj) => obj.name === "date")) { - const db = await knex({ - client: "pg", - connection: this.#connectionConfig, - }); - await this.#writeRegularResults({ db, results }); - await db.destroy(); + await this.#writeRegularResults({ results }); } else { return; } } - #writeRegularResults({ db, results }) { + #writeRegularResults({ results }) { const rows = results.data.map((dataPoint) => { return this.#rowForDataPoint({ results, dataPoint }); }); const rowsToInsert = []; return Promise.each(rows, async (row) => { - const existingRow = await this.#queryForExistingRow({ db, row }); + const existingRow = await this.#queryForExistingRow({ row }); if (row.date === undefined) { return; } else if (existingRow.length === 0) { rowsToInsert.push(row); } else if (existingRow.length === 1) { await this.#handleExistingRow({ - db, existingRow: existingRow[0], newRow: row, }); } - }) - .then(() => { - if (rowsToInsert.length > 0) { - return db(this.constructor.ANALYTICS_DATA_TABLE_NAME).insert( - rowsToInsert, - ); - } - }) - .then(() => { - return db.destroy(); - }); + }).then(() => { + if (rowsToInsert.length > 0) { + return this.#knex(this.constructor.ANALYTICS_DATA_TABLE_NAME).insert( + rowsToInsert, + ); + } + }); } #rowForDataPoint({ results, dataPoint }) { @@ -110,8 +98,8 @@ class PostgresPublisher { } } - #queryForExistingRow({ db, row }) { - let query = db(this.constructor.ANALYTICS_DATA_TABLE_NAME); + #queryForExistingRow({ row }) { + let query = this.#knex(this.constructor.ANALYTICS_DATA_TABLE_NAME); Object.keys(row).forEach((key) => { if (row[key] === undefined) { @@ -140,13 +128,13 @@ class PostgresPublisher { return query.select(); } - #handleExistingRow({ db, existingRow, newRow }) { + #handleExistingRow({ existingRow, newRow }) { if ( existingRow.data.visits != newRow.data.visits || existingRow.data.users != newRow.data.users || existingRow.data.total_events != newRow.data.total_events ) { - return db(this.constructor.ANALYTICS_DATA_TABLE_NAME) + return this.#knex(this.constructor.ANALYTICS_DATA_TABLE_NAME) .where({ id: existingRow.id }) .update(newRow); } diff --git a/test/app_config.test.js b/test/app_config.test.js index f5e8a018..418b3a4c 100644 --- a/test/app_config.test.js +++ b/test/app_config.test.js @@ -595,7 +595,7 @@ describe("AppConfig", () => { }); }); - describe(".postgres", () => { + describe(".knexConfig", () => { describe("when NODE_ENV is set", () => { beforeEach(() => { process.env.NODE_ENV = "production"; @@ -606,7 +606,7 @@ describe("AppConfig", () => { }); it("returns the knexfile connection details for the node environment", () => { - expect(subject.postgres).to.equal(knexfile["production"].connection); + expect(subject.knexConfig).to.equal(knexfile["production"]); }); }); @@ -616,7 +616,7 @@ describe("AppConfig", () => { }); it("returns the knexfile connection details for the development environment", () => { - expect(subject.postgres).to.equal(knexfile["development"].connection); + expect(subject.knexConfig).to.equal(knexfile["development"]); }); }); }); diff --git a/test/index.test.js b/test/index.test.js index 59d388a6..e52ef163 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -134,7 +134,11 @@ describe("index", () => { }); it("logs that there was a processing error", () => { - expect(logger.error.calledWith("Encountered an error")).to.equal(true); + expect( + logger.error.calledWith( + "Encountered an error during report processing", + ), + ).to.equal(true); }); it("logs the error", () => { diff --git a/test/publish/postgres.test.js b/test/publish/postgres.test.js index 2556c34c..d776e425 100644 --- a/test/publish/postgres.test.js +++ b/test/publish/postgres.test.js @@ -2,17 +2,15 @@ const expect = require("chai").expect; const knex = require("knex"); const database = require("../support/database"); const resultsFixture = require("../support/fixtures/results"); -const AppConfig = require("../../src/app_config"); const PostgresPublisher = require("../../src/publish/postgres"); -const appConfig = new AppConfig(); describe("PostgresPublisher", () => { - let databaseClient, results, subject; + let knexInstance, results, subject; before(async () => { process.env.NODE_ENV = "test"; // Setup the database client - databaseClient = await knex({ + knexInstance = await knex({ client: "pg", connection: database.connection, }); @@ -20,13 +18,13 @@ describe("PostgresPublisher", () => { after(async () => { // Clean up the database client - await databaseClient.destroy(); + await knexInstance.destroy(); }); beforeEach(async () => { results = Object.assign({}, resultsFixture); - subject = new PostgresPublisher(appConfig); - await database.resetSchema(databaseClient); + subject = new PostgresPublisher(knexInstance); + await database.resetSchema(knexInstance); }); describe(".publish(results)", () => { @@ -46,7 +44,7 @@ describe("PostgresPublisher", () => { await subject .publish(results) .then(() => { - return databaseClient(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME) + return knexInstance(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME) .orderBy("date", "asc") .select(); }) @@ -75,7 +73,7 @@ describe("PostgresPublisher", () => { await subject .publish(results) .then(() => { - return databaseClient + return knexInstance .select() .table(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME); }) @@ -96,7 +94,7 @@ describe("PostgresPublisher", () => { subject .publish(results) .then(() => { - return databaseClient + return knexInstance .select() .table(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME); }) @@ -170,7 +168,7 @@ describe("PostgresPublisher", () => { return subject.publish(secondResults); }) .then(() => { - return databaseClient + return knexInstance .select() .table(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME); }) @@ -214,7 +212,7 @@ describe("PostgresPublisher", () => { return subject.publish(secondResults); }) .then(() => { - return databaseClient + return knexInstance .select() .table(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME); }) @@ -245,7 +243,7 @@ describe("PostgresPublisher", () => { await subject .publish(results) .then(() => { - return databaseClient + return knexInstance .select() .table(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME); }) diff --git a/test/support/database.js b/test/support/database.js index ed1ebf08..36a1b371 100644 --- a/test/support/database.js +++ b/test/support/database.js @@ -6,4 +6,7 @@ const resetSchema = (db) => { return db(PostgresPublisher.ANALYTICS_DATA_TABLE_NAME).delete(); }; -module.exports = { connection: new AppConfig().postgres, resetSchema }; +module.exports = { + connection: new AppConfig().knexConfig.connection, + resetSchema, +};