diff --git a/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/ATServiceListener.ts b/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/ATServiceListener.ts index 20d2d33f4d..b08acc3720 100644 --- a/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/ATServiceListener.ts +++ b/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/ATServiceListener.ts @@ -1,6 +1,8 @@ import { tenants } from 'api/tenants'; import { TaskManager } from 'api/services/tasksmanager/TaskManager'; import { permissionsContext } from 'api/permissions/permissionsContext'; +import { Logger } from 'api/log.v2/contracts/Logger'; +import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger'; import { InvalidATServerResponse } from '../../errors/generateATErrors'; import { AutomaticTranslationFactory } from '../../AutomaticTranslationFactory'; import { Validator } from '../../infrastructure/Validator'; @@ -11,23 +13,29 @@ export class ATServiceListener { private taskManager: TaskManager; - constructor(ATFactory: typeof AutomaticTranslationFactory = AutomaticTranslationFactory) { + constructor( + ATFactory: typeof AutomaticTranslationFactory = AutomaticTranslationFactory, + logger: Logger = DefaultLogger() + ) { const validator = new Validator(translationResultSchema); - this.taskManager = new TaskManager({ - serviceName: ATServiceListener.SERVICE_NAME, - processResults: async result => { - if (!validator.validate(result)) { - throw new InvalidATServerResponse(validator.getErrors()[0].message, { - cause: validator.getErrors()[0], - }); - } + this.taskManager = new TaskManager( + { + serviceName: ATServiceListener.SERVICE_NAME, + processResults: async result => { + if (!validator.validate(result)) { + throw new InvalidATServerResponse(validator.getErrors()[0].message, { + cause: validator.getErrors()[0], + }); + } - await tenants.run(async () => { - permissionsContext.setCommandContext(); - await ATFactory.defaultSaveEntityTranslations().execute(result); - }, result.key[0]); + await tenants.run(async () => { + permissionsContext.setCommandContext(); + await ATFactory.defaultSaveEntityTranslations().execute(result); + }, result.key[0]); + }, }, - }); + logger + ); } start(interval = 500) { diff --git a/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/specs/ATServiceListener.spec.ts b/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/specs/ATServiceListener.spec.ts index 0e9a3361b4..34acadc7cc 100644 --- a/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/specs/ATServiceListener.spec.ts +++ b/app/api/externalIntegrations.v2/automaticTranslation/adapters/driving/specs/ATServiceListener.spec.ts @@ -8,6 +8,7 @@ import RedisSMQ from 'rsmq'; import { UserSchema } from 'shared/types/userType'; import waitForExpect from 'wait-for-expect'; import { ATServiceListener } from '../ATServiceListener'; +import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger'; const prepareATFactory = (executeSpy: jest.Mock) => { // @ts-ignore @@ -39,7 +40,7 @@ describe('ATServiceListener', () => { userInContext = permissionsContext.getUserInContext(); }); - listener = new ATServiceListener(prepareATFactory(executeSpy)); + listener = new ATServiceListener(prepareATFactory(executeSpy), createMockLogger()); redisClient = Redis.createClient(redisUrl); redisSMQ = new RedisSMQ({ client: redisClient }); diff --git a/app/api/files/ocrRoutes.ts b/app/api/files/ocrRoutes.ts index 45368b4b1b..d4182c7e84 100644 --- a/app/api/files/ocrRoutes.ts +++ b/app/api/files/ocrRoutes.ts @@ -1,7 +1,7 @@ import { Application, Request, Response, NextFunction } from 'express'; import { storage } from 'api/files'; import needsAuthorization from 'api/auth/authMiddleware'; -import { isOcrEnabled, ocrManager, getOcrStatus } from 'api/services/ocr/OcrManager'; +import { isOcrEnabled, getOcrStatus, OcrManager } from 'api/services/ocr/OcrManager'; import { files } from './files'; import { validation, createError } from '../utils'; @@ -57,9 +57,11 @@ const ocrRoutes = (app: Application) => { needsAuthorization(['admin', 'editor']), validation.validateRequest(ocrRequestDecriptor), async (req, res) => { + const ocrManager = new OcrManager(); const file = await fileFromRequest(req); await ocrManager.addToQueue(file); + await ocrManager.stop(); res.sendStatus(200); } diff --git a/app/api/files/specs/ocrRoutes.spec.ts b/app/api/files/specs/ocrRoutes.spec.ts index cd0757f77a..3bb968d849 100644 --- a/app/api/files/specs/ocrRoutes.spec.ts +++ b/app/api/files/specs/ocrRoutes.spec.ts @@ -6,7 +6,6 @@ import request from 'supertest'; import { storage } from 'api/files'; import relationships from 'api/relationships'; import { search } from 'api/search'; -import { ocrManager } from 'api/services/ocr/OcrManager'; import settings from 'api/settings/settings'; import { getFixturesFactory } from 'api/utils/fixturesFactory'; import db, { DBFixture } from 'api/utils/testing_db'; @@ -94,10 +93,6 @@ describe('OCR service', () => { jest.spyOn(setupSockets, 'emitToTenant').mockImplementation(() => {}); }); - beforeAll(() => { - ocrManager.start(); - }); - afterAll(async () => { jest.restoreAllMocks(); await testingEnvironment.tearDown(); diff --git a/app/api/services/convertToPDF/ConvertToPdfWorker.ts b/app/api/services/convertToPDF/ConvertToPdfWorker.ts index 529af18528..f3617377af 100644 --- a/app/api/services/convertToPDF/ConvertToPdfWorker.ts +++ b/app/api/services/convertToPDF/ConvertToPdfWorker.ts @@ -11,6 +11,8 @@ import path from 'path'; import { pipeline } from 'stream/promises'; import { TaskManager } from '../tasksmanager/TaskManager'; import { convertToPDFService } from './convertToPdfService'; +import { Logger } from 'api/log.v2/contracts/Logger'; +import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger'; const ajv = new Ajv(); @@ -38,47 +40,53 @@ export class ConvertToPdfWorker { taskManager: TaskManager; - constructor() { - this.taskManager = new TaskManager({ - serviceName: this.SERVICE_NAME, - processResults: async result => { - if (result.success === false) { - throw new Error(result.error_message); - } - if (!validateResult(result)) { - throw new ValidationError(validateResult.errors || [{ message: 'validation failed' }]); - } - await tenants.run(async () => { - permissionsContext.setCommandContext(); - const [attachment] = await files.get({ filename: result.params.filename }); - if (!attachment.entity) { - throw new Error('attachment does not have an entity'); + constructor(logger: Logger = DefaultLogger()) { + this.taskManager = new TaskManager( + { + serviceName: this.SERVICE_NAME, + processResults: async result => { + if (result.success === false) { + throw new Error(result.error_message); } - await files.save({ ...attachment, status: 'ready' }); + if (!validateResult(result)) { + throw new ValidationError(validateResult.errors || [{ message: 'validation failed' }]); + } + await tenants.run(async () => { + permissionsContext.setCommandContext(); + const [attachment] = await files.get({ filename: result.params.filename }); + if (!attachment.entity) { + throw new Error('attachment does not have an entity'); + } + await files.save({ ...attachment, status: 'ready' }); - const filename = `${generateFileName({})}.pdf`; + const filename = `${generateFileName({})}.pdf`; - await storage.storeFile( - filename, - await convertToPDFService.download(new URL(result.file_url)), - 'document' - ); - await pipeline( - await storage.readableFile(filename, 'document'), - createWriteStream(path.join(os.tmpdir(), filename)) - ); + await storage.storeFile( + filename, + await convertToPDFService.download(new URL(result.file_url)), + 'document' + ); + await pipeline( + await storage.readableFile(filename, 'document'), + createWriteStream(path.join(os.tmpdir(), filename)) + ); - await processDocument(attachment.entity, { - filename, - destination: os.tmpdir(), - originalname: chageFileExtesion(attachment.originalname || generateFileName({}), 'pdf'), - mimetype: 'application/pdf', - }); + await processDocument(attachment.entity, { + filename, + destination: os.tmpdir(), + originalname: chageFileExtesion( + attachment.originalname || generateFileName({}), + 'pdf' + ), + mimetype: 'application/pdf', + }); - emitToTenant(result.params.namespace, 'documentProcessed', attachment.entity); - }, result.params.namespace); + emitToTenant(result.params.namespace, 'documentProcessed', attachment.entity); + }, result.params.namespace); + }, }, - }); + logger + ); } start(interval = 500) { diff --git a/app/api/services/convertToPDF/specs/convertToPdfWorker.spec.ts b/app/api/services/convertToPDF/specs/convertToPdfWorker.spec.ts index 19b1d2ef08..a8e2f938e6 100644 --- a/app/api/services/convertToPDF/specs/convertToPdfWorker.spec.ts +++ b/app/api/services/convertToPDF/specs/convertToPdfWorker.spec.ts @@ -10,12 +10,13 @@ import * as handleError from 'api/utils/handleError.js'; import { ObjectId } from 'mongodb'; import Redis from 'redis'; import RedisSMQ from 'rsmq'; +import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger'; import waitForExpect from 'wait-for-expect'; import { convertToPDFService } from '../convertToPdfService'; import { ConvertToPdfWorker } from '../ConvertToPdfWorker'; describe('convertToPdfWorker', () => { - const worker = new ConvertToPdfWorker(); + const worker = new ConvertToPdfWorker(createMockLogger()); const redisUrl = `redis://${config.redis.host}:${config.redis.port}`; const redisClient = Redis.createClient(redisUrl); const redisSMQ = new RedisSMQ({ client: redisClient }); diff --git a/app/api/services/informationextraction/InformationExtraction.ts b/app/api/services/informationextraction/InformationExtraction.ts index 98677dc320..3d0f654b9e 100644 --- a/app/api/services/informationextraction/InformationExtraction.ts +++ b/app/api/services/informationextraction/InformationExtraction.ts @@ -143,6 +143,10 @@ class InformationExtraction { this.taskManager.subscribeToResults(); } + async stop() { + await this.taskManager.stop(); + } + requestResults = async (message: InternalIXResultsMessage) => { const response = await request.get(message.data_url); diff --git a/app/api/services/ocr/OcrManager.ts b/app/api/services/ocr/OcrManager.ts index 02eea9d0d5..e5d570ac7a 100644 --- a/app/api/services/ocr/OcrManager.ts +++ b/app/api/services/ocr/OcrManager.ts @@ -25,6 +25,8 @@ import { markError, markReady, } from './ocrRecords'; +import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger'; +import { Logger } from 'api/log.v2/contracts/Logger'; interface OcrSettings { url: string; @@ -182,11 +184,14 @@ class OcrManager { ocrTaskManager: TaskManager; - constructor() { - this.ocrTaskManager = new TaskManager({ - serviceName: this.SERVICE_NAME, - processResults, - }); + constructor(logger: Logger = DefaultLogger()) { + this.ocrTaskManager = new TaskManager( + { + serviceName: this.SERVICE_NAME, + processResults, + }, + logger + ); } start() { @@ -227,5 +232,4 @@ class OcrManager { } } -const ocrManager = new OcrManager(); -export { ocrManager, OcrManager, isEnabled as isOcrEnabled, getStatus as getOcrStatus }; +export { OcrManager, isEnabled as isOcrEnabled, getStatus as getOcrStatus }; diff --git a/app/api/services/ocr/specs/OcrManager.spec.ts b/app/api/services/ocr/specs/OcrManager.spec.ts index e8a9b53239..32cf8a3994 100644 --- a/app/api/services/ocr/specs/OcrManager.spec.ts +++ b/app/api/services/ocr/specs/OcrManager.spec.ts @@ -17,6 +17,7 @@ import { ResultsMessage, TaskManager } from '../../tasksmanager/TaskManager'; import { mockTaskManagerImpl } from '../../tasksmanager/specs/TaskManagerImplementationMocker'; import { fixtures, fixturesFactory } from './fixtures/fixtures'; import { cleanupRecordsOfFiles } from '../ocrRecords'; +import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger'; jest.mock('api/services/tasksmanager/TaskManager.ts'); @@ -99,11 +100,11 @@ describe('OcrManager', () => { success: true, }; - ocrManager = new OcrManager(); + ocrManager = new OcrManager(createMockLogger()); ocrManager.start(); }); - beforeEach(() => { + beforeEach(async () => { mocks.jestMocks['storage.fileContents'] = jest .spyOn(storage, 'fileContents') .mockResolvedValue(Buffer.from('file_content')); @@ -112,6 +113,7 @@ describe('OcrManager', () => { afterAll(async () => { mocks.release(); await testingEnvironment.tearDown(); + await ocrManager.stop(); }); describe('on success', () => { diff --git a/app/api/services/pdfsegmentation/PDFSegmentation.ts b/app/api/services/pdfsegmentation/PDFSegmentation.ts index b5c6636471..41fe7e26b0 100644 --- a/app/api/services/pdfsegmentation/PDFSegmentation.ts +++ b/app/api/services/pdfsegmentation/PDFSegmentation.ts @@ -35,6 +35,10 @@ class PDFSegmentation { this.segmentationTaskManager.subscribeToResults(); } + async stop() { + await this.segmentationTaskManager.stop(); + } + segmentOnePdf = async ( file: { filename: string; _id: ObjectIdSchema }, serviceUrl: string, diff --git a/app/api/services/tasksmanager/DistributedLoop.ts b/app/api/services/tasksmanager/DistributedLoop.ts index a6253ec451..1addc70cb2 100644 --- a/app/api/services/tasksmanager/DistributedLoop.ts +++ b/app/api/services/tasksmanager/DistributedLoop.ts @@ -1,6 +1,20 @@ import Redis from 'redis'; import Redlock from 'redlock'; import { handleError } from 'api/utils/handleError'; +import { Logger } from 'api/log.v2/contracts/Logger'; +import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger'; +import { PromiseManager } from './PromiseManager'; + +const TEN_SECONDS_IN_MS = 10_000; + +export type OptionsProps = { + maxLockTime?: number; + delayTimeBetweenTasks?: number; + retryDelay?: number; + port?: number; + host?: string; + stopTimeout?: number; +}; export class DistributedLoop { private lockName: string; @@ -9,64 +23,56 @@ export class DistributedLoop { private redlock: Redlock; - private stopTask: Function | undefined; - private redisClient: Redis.RedisClient; private maxLockTime: number; - private delayTimeBetweenTasks: number; - private retryDelay: number; private port: number; private host: string; + taskDelayPromise: PromiseManager; + + stopPromise: PromiseManager; + constructor( lockName: string, task: () => Promise, - options: { - maxLockTime?: number; - delayTimeBetweenTasks?: number; - retryDelay?: number; - port?: number; - host?: string; - } + { + maxLockTime = 2000, + delayTimeBetweenTasks = 1000, + retryDelay = 200, + port = 6379, + host = 'localhost', + stopTimeout = TEN_SECONDS_IN_MS, + }: OptionsProps, + private logger: Logger = DefaultLogger() ) { - const _options = { - maxLockTime: 2000, - delayTimeBetweenTasks: 1000, - retryDelay: 200, - port: 6379, - host: 'localhost', - ...options, - }; - this.maxLockTime = _options.maxLockTime; - this.retryDelay = _options.retryDelay; - this.delayTimeBetweenTasks = _options.delayTimeBetweenTasks; + this.maxLockTime = maxLockTime; + this.retryDelay = retryDelay; this.lockName = `locks:${lockName}`; this.task = task; - this.port = _options.port; - this.host = _options.host; + this.port = port; + this.host = host; this.redisClient = Redis.createClient(`redis://${this.host}:${this.port}`); this.redlock = new Redlock([this.redisClient], { retryJitter: 0, retryDelay: this.retryDelay, }); + this.taskDelayPromise = new PromiseManager({ timeout: delayTimeBetweenTasks }); + this.stopPromise = new PromiseManager({ + timeout: stopTimeout, + onTimeout: () => this.logStopTimeoutMessage(), + }); } - async start() { + start() { // eslint-disable-next-line no-void void this.lockTask(); } - async waitBetweenTasks(delay = this.delayTimeBetweenTasks) { - await new Promise(resolve => { - setTimeout(resolve, delay); - }); - } - async runTask() { try { await this.task(); @@ -74,14 +80,18 @@ export class DistributedLoop { handleError(error, { useContext: false }); } - await this.waitBetweenTasks(); + await this.taskDelayPromise.init(); } - async stop() { - await new Promise(resolve => { - this.stopTask = resolve; - }); + private logStopTimeoutMessage() { + this.logger.info( + `The task ${this.lockName} tried to be stopped and reached stop timeout of ${this.stopPromise.timeout} milliseconds` + ); + } + async stop() { + this.taskDelayPromise.stop(); + await this.stopPromise.init(); await this.redlock.quit(); this.redisClient.end(true); } @@ -90,11 +100,11 @@ export class DistributedLoop { try { const lock = await this.redlock.lock( this.lockName, - this.maxLockTime + this.delayTimeBetweenTasks + this.maxLockTime + this.taskDelayPromise.timeout ); - if (this.stopTask) { - this.stopTask(); + if (this.stopPromise.isPending) { + this.stopPromise.stop(); return; } diff --git a/app/api/services/tasksmanager/PromiseManager.ts b/app/api/services/tasksmanager/PromiseManager.ts new file mode 100644 index 0000000000..19f612a738 --- /dev/null +++ b/app/api/services/tasksmanager/PromiseManager.ts @@ -0,0 +1,54 @@ +type Props = { + timeout: number; + onTimeout?: () => void; +}; + +export class PromiseManager { + private _resolve?: Function; + + private timeoutId?: NodeJS.Timeout; + + private onTimeout?: () => void; + + timeout: number; + + constructor({ timeout, onTimeout }: Props) { + this.timeout = timeout; + this.onTimeout = onTimeout; + this._resolve = undefined; + } + + private setResolve(resolve: Function | undefined) { + this._resolve = resolve; + } + + private resolve() { + if (this._resolve) this._resolve(); + this.setResolve(undefined); + } + + get isPending() { + return !!this._resolve; + } + + stop() { + if (typeof this.timeoutId !== 'undefined') { + clearTimeout(this.timeoutId); + } + + this.resolve(); + } + + async init() { + const promise = new Promise(resolve => { + this.setResolve(resolve); + + this.timeoutId = setTimeout(() => { + this.resolve(); + if (this.onTimeout) this.onTimeout(); + }, this.timeout); + }); + + return promise; + } +} diff --git a/app/api/services/tasksmanager/TaskManager.ts b/app/api/services/tasksmanager/TaskManager.ts index c426a71b9e..1776f04761 100644 --- a/app/api/services/tasksmanager/TaskManager.ts +++ b/app/api/services/tasksmanager/TaskManager.ts @@ -4,6 +4,8 @@ import Redis, { RedisClient } from 'redis'; import { Repeater } from 'api/utils/Repeater'; import { config } from 'api/config'; import { handleError } from 'api/utils'; +import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger'; +import { Logger } from 'api/log.v2/contracts/Logger'; type DefaultTaskType = string; @@ -31,6 +33,7 @@ export interface Service { serviceName: string; processResults?: (results: R) => Promise; processResultsMessageHiddenTime?: number; + stopTimeout?: number; } export class TaskManager { @@ -46,7 +49,10 @@ export class TaskManager { redisClient: RedisClient; - constructor(service: Service) { + constructor( + service: Service, + private logger: Logger = DefaultLogger() + ) { this.service = service; this.taskQueue = `${config.ENVIRONMENT}_${service.serviceName}_tasks`; this.resultsQueue = `${config.ENVIRONMENT}_${service.serviceName}_results`; @@ -57,6 +63,12 @@ export class TaskManager { this.subscribeToEvents(); } + private logStopTimeoutMessage() { + this.logger.info( + `The task ${this.service.serviceName} tried to be stopped and reached stop timeout of ${this.repeater?.stopPromise.timeout} milliseconds` + ); + } + subscribeToEvents() { this.redisClient.on('error', (error: any | undefined) => { if (error && error.code !== 'ECONNREFUSED') { @@ -86,7 +98,10 @@ export class TaskManager { } subscribeToResults(interval = 500): void { - this.repeater = new Repeater(this.checkForResults.bind(this), interval); + this.repeater = new Repeater(this.checkForResults.bind(this), interval, { + onTimeout: () => this.logStopTimeoutMessage(), + timeout: this.service.stopTimeout, + }); // eslint-disable-next-line @typescript-eslint/no-floating-promises this.repeater.start(); } diff --git a/app/api/services/tasksmanager/specs/PromiseManager.spec.ts b/app/api/services/tasksmanager/specs/PromiseManager.spec.ts new file mode 100644 index 0000000000..293c11beb6 --- /dev/null +++ b/app/api/services/tasksmanager/specs/PromiseManager.spec.ts @@ -0,0 +1,30 @@ +import waitForExpect from 'wait-for-expect'; +import { PromiseManager } from '../PromiseManager'; + +describe('PromiseManager', () => { + it('should init a promise and resolve after timeout', async () => { + const onTimeout = jest.fn(); + const sut = new PromiseManager({ timeout: 1, onTimeout }); + + const promise = sut.init(); + + expect(sut.isPending).toBeTruthy(); + await expect(promise).resolves.toBeUndefined(); + expect(onTimeout).toHaveBeenCalled(); + expect(sut.isPending).toBeFalsy(); + }); + + it('should resolve the promise when sut.stop() is executed', async () => { + const onTimeout = jest.fn(); + const sut = new PromiseManager({ timeout: 1, onTimeout }); + + const promise = sut.init(); + expect(sut.isPending).toBeTruthy(); + + sut.stop(); + + await expect(promise).resolves.toBeUndefined(); + expect(sut.isPending).toBeFalsy(); + await waitForExpect(() => expect(onTimeout).not.toHaveBeenCalled()); + }); +}); diff --git a/app/api/services/tasksmanager/specs/distributedLoop.spec.js b/app/api/services/tasksmanager/specs/distributedLoop.spec.js index 7479b4c88f..9fbdd2eea5 100644 --- a/app/api/services/tasksmanager/specs/distributedLoop.spec.js +++ b/app/api/services/tasksmanager/specs/distributedLoop.spec.js @@ -1,16 +1,36 @@ import * as errorHelper from 'api/utils/handleError'; +import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger'; import waitForExpect from 'wait-for-expect'; import { DistributedLoop } from '../DistributedLoop'; +let finishTask; +let task; +let rejectTask; +let pendingTasks; +let mockLogger; + +async function sleepTime(time) { + await new Promise(resolve => { + setTimeout(resolve, time); + }); +} + +const createSut = ({ lockName, options }) => + new DistributedLoop( + lockName, + task, + { + delayTimeBetweenTasks: 0, + ...options, + }, + mockLogger + ); + /* eslint-disable max-statements */ describe('DistributedLoopLock', () => { - let finishTask; - let task; - let rejectTask; - let pendingTasks; - - beforeEach(async () => { + beforeEach(() => { pendingTasks = []; + mockLogger = createMockLogger(); task = jest.fn().mockImplementation( () => new Promise((resolve, reject) => { @@ -25,19 +45,10 @@ describe('DistributedLoopLock', () => { await pendingTasks.map(pendingTask => pendingTask()); }); - async function sleepTime(time) { - await new Promise(resolve => { - setTimeout(resolve, time); - }); - } - it('should run one task at a time', async () => { - const nodeOne = new DistributedLoop('my_locked_task', task, { - delayTimeBetweenTasks: 0, - }); - const nodeTwo = new DistributedLoop('my_locked_task', task, { - delayTimeBetweenTasks: 0, - }); + const nodeOne = createSut({ lockName: 'my_locked_task' }); + const nodeTwo = createSut({ lockName: 'my_locked_task' }); + await nodeOne.start(); await nodeTwo.start(); await waitForExpect(async () => { @@ -58,13 +69,19 @@ describe('DistributedLoopLock', () => { }); it('should handle when a lock fails for too many retries', async () => { - const nodeOne = new DistributedLoop('my_long_locked_task', task, { - retryDelay: 20, - delayTimeBetweenTasks: 0, + const nodeOne = createSut({ + lockName: 'my_long_locked_task', + options: { + retryDelay: 20, + delayTimeBetweenTasks: 0, + }, }); - const nodeTwo = new DistributedLoop('my_long_locked_task', task, { - retryDelay: 20, - delayTimeBetweenTasks: 0, + const nodeTwo = createSut({ + lockName: 'my_long_locked_task', + options: { + retryDelay: 20, + delayTimeBetweenTasks: 0, + }, }); await nodeOne.start(); @@ -81,13 +98,19 @@ describe('DistributedLoopLock', () => { }); it('should handle when a node fails to unlock the lock', async () => { - const nodeOne = new DistributedLoop('my_locked_task', task, { - maxLockTime: 50, - delayTimeBetweenTasks: 0, + const nodeOne = createSut({ + lockName: 'my_locked_task', + options: { + maxLockTime: 50, + delayTimeBetweenTasks: 0, + }, }); - const nodeTwo = new DistributedLoop('my_locked_task', task, { - maxLockTime: 50, - delayTimeBetweenTasks: 0, + const nodeTwo = createSut({ + lockName: 'my_locked_task', + options: { + maxLockTime: 50, + delayTimeBetweenTasks: 0, + }, }); await nodeOne.start(); @@ -107,9 +130,12 @@ describe('DistributedLoopLock', () => { it('should continue executing the task if one task fails', async () => { jest.spyOn(errorHelper, 'handleError').mockImplementation(() => {}); - const nodeOne = new DistributedLoop('my_locked_task', task, { - maxLockTime: 500, - delayTimeBetweenTasks: 0, + const nodeOne = createSut({ + lockName: 'my_locked_task', + options: { + maxLockTime: 500, + delayTimeBetweenTasks: 0, + }, }); await nodeOne.start(); @@ -132,17 +158,22 @@ describe('DistributedLoopLock', () => { await nodeOne.stop(); }); - // eslint-disable-next-line max-statements it('should add a delay between task executions', async () => { - const nodeOne = new DistributedLoop('my_locked_task', task, { - maxLockTime: 50, - delayTimeBetweenTasks: 50, - retryDelay: 20, - }); - const nodeTwo = new DistributedLoop('my_locked_task', task, { - maxLockTime: 50, - delayTimeBetweenTasks: 50, - retryDelay: 20, + const nodeOne = createSut({ + lockName: 'my_locked_task', + options: { + maxLockTime: 50, + delayTimeBetweenTasks: 50, + retryDelay: 20, + }, + }); + const nodeTwo = createSut({ + lockName: 'my_locked_task', + options: { + maxLockTime: 50, + delayTimeBetweenTasks: 50, + retryDelay: 20, + }, }); await nodeOne.start(); @@ -162,4 +193,39 @@ describe('DistributedLoopLock', () => { finishTask(); await nodeTwo.stop(); }); + + it('should continue stop process if a task takes too long to stop', async () => { + const sut = createSut({ lockName: 'stop_mocked_test', options: { stopTimeout: 0 } }); + + sut.start(); + await waitForExpect(async () => { + expect(task).toHaveBeenCalled(); + }); + + await expect(sut.stop()).resolves.toBe(undefined); + expect(mockLogger.info).toHaveBeenCalled(); + }); + + it("should skip wait between tasks, if there's no pending tasks", async () => { + const sut = createSut({ + lockName: 'stop_mocked_test', + options: { stopTimeout: 0, delayTimeBetweenTasks: 10_000 }, + }); + + sut.start(); + await waitForExpect(async () => { + expect(task).toHaveBeenCalled(); + }); + finishTask(); + await waitForExpect(async () => { + expect(task.mock.results[0].value).resolves.toBeUndefined(); + }); + + expect(sut.taskDelayPromise.isPending).toBeTruthy(); + + const stopPromise = sut.stop(); + + expect(sut.taskDelayPromise.isPending).toBeFalsy(); + await expect(stopPromise).resolves.toBe(undefined); + }); }); diff --git a/app/api/services/tasksmanager/specs/taskManager.spec.ts b/app/api/services/tasksmanager/specs/taskManager.spec.ts index 5d12f73ff3..3c210d9278 100644 --- a/app/api/services/tasksmanager/specs/taskManager.spec.ts +++ b/app/api/services/tasksmanager/specs/taskManager.spec.ts @@ -4,6 +4,7 @@ import { TaskManager, Service } from 'api/services/tasksmanager/TaskManager'; import { config } from 'api/config'; import * as handleError from 'api/utils/handleError.js'; import { ExternalDummyService } from './ExternalDummyService'; +import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger'; describe('taskManager', () => { let taskManager: TaskManager | undefined; @@ -22,7 +23,7 @@ describe('taskManager', () => { externalDummyService = new ExternalDummyService(1234, service.serviceName); await externalDummyService.start(redisUrl); - taskManager = new TaskManager(service); + taskManager = new TaskManager(service, createMockLogger()); taskManager.subscribeToResults(); await new Promise(resolve => { diff --git a/app/api/services/twitterintegration/TwitterIntegration.ts b/app/api/services/twitterintegration/TwitterIntegration.ts index 1c598f57f4..7888ae423f 100644 --- a/app/api/services/twitterintegration/TwitterIntegration.ts +++ b/app/api/services/twitterintegration/TwitterIntegration.ts @@ -49,6 +49,10 @@ class TwitterIntegration { this.twitterTaskManager.subscribeToResults(); } + async stop() { + await this.twitterTaskManager.stop(); + } + getTwitterIntegrationSettings = async (): Promise => { const settingsValues = await settings.get({}, 'features'); if (!settingsValues.features || !settingsValues.features.twitterIntegration) { diff --git a/app/api/utils/Repeater.js b/app/api/utils/Repeater.js index 6a3b21629b..cbe51e80cf 100644 --- a/app/api/utils/Repeater.js +++ b/app/api/utils/Repeater.js @@ -1,29 +1,28 @@ -const timeout = async interval => - new Promise(resolve => { - setTimeout(resolve, interval); - }); +import { PromiseManager } from 'api/services/tasksmanager/PromiseManager'; + +const TEN_SECONDS_IN_MS = 10_000; export class Repeater { - constructor(cb, interval) { + constructor(cb, interval, stopPromiseOptions) { this.cb = cb; this.interval = interval; - this.stopped = null; + this.stopPromise = new PromiseManager({ timeout: TEN_SECONDS_IN_MS, ...stopPromiseOptions }); + this.delayPromise = new PromiseManager({ timeout: interval }); } async start() { - while (!this.stopped) { + while (!this.stopPromise.isPending) { // eslint-disable-next-line no-await-in-loop await this.cb(); // eslint-disable-next-line no-await-in-loop - await timeout(this.interval); + await this.delayPromise.init(); } - this.stopped(); + this.stopPromise.stop(); } async stop() { - return new Promise(resolve => { - this.stopped = resolve; - }); + this.delayPromise.stop(); + await this.stopPromise.init(); } } diff --git a/app/api/utils/specs/Repeater.spec.js b/app/api/utils/specs/Repeater.spec.js index a92635f311..b245e33e70 100644 --- a/app/api/utils/specs/Repeater.spec.js +++ b/app/api/utils/specs/Repeater.spec.js @@ -1,5 +1,31 @@ +/* eslint-disable max-statements */ +import waitForExpect from 'wait-for-expect'; import { Repeater } from '../Repeater'; +const createTaskMock = () => { + const pending = []; + let resolveLast; + let rejectLast; + const resolveAll = () => pending.forEach(resolve => resolve()); + + const task = jest.fn().mockImplementation( + () => + new Promise((resolve, reject) => { + pending.push(resolve); + resolveLast = resolve; + rejectLast = reject; + }) + ); + + return { + task, + pending, + resolveLast, + rejectLast, + resolveAll, + }; +}; + describe('Repeater', () => { let callbackOne; let callbackTwo; @@ -27,7 +53,7 @@ describe('Repeater', () => { it('should be able to have two independant repeaters', async () => { repeaterOne = new Repeater(callbackOne, 1); - repeaterTwo = new Repeater(callbackTwo, 1); + repeaterTwo = new Repeater(callbackTwo, 2); repeaterTwo.start(); repeaterOne.start(); @@ -39,7 +65,7 @@ describe('Repeater', () => { await advanceTime(1); expect(callbackOne).toHaveBeenCalledTimes(1); - expect(callbackTwo).toHaveBeenCalledTimes(2); + expect(callbackTwo).toHaveBeenCalledTimes(1); }); it('should resolve stopped promise', async () => { @@ -50,4 +76,38 @@ describe('Repeater', () => { await expect(repeaterOne.stop()).resolves.toBeUndefined(); }); + + it('should skip delay between tasks if stop() is executed', async () => { + const { task, resolveAll } = createTaskMock(); + const sut = new Repeater(task, 10_000); + + sut.start(); + + await waitForExpect(() => expect(task).toHaveBeenCalled()); + resolveAll(); + + await waitForExpect(() => expect(sut.delayPromise.isPending).toBeTruthy()); + + sut.stop(); + expect(sut.delayPromise.isPending).toBeFalsy(); + expect(sut.stopPromise.isPending).toBeTruthy(); + await waitForExpect(async () => { + expect(task.mock.results[0].value).resolves.toBeUndefined(); + }); + }); + + it('should continue stop process if a task takes too long to stop', async () => { + jest.resetAllMocks(); + const { task } = createTaskMock(); + const sut = new Repeater(task, 0, { timeout: 1 }); + + sut.start(); + await waitForExpect(() => expect(task).toHaveBeenCalled()); + expect(sut.delayPromise.isPending).toBeFalsy(); + expect(sut.stopPromise.isPending).toBeFalsy(); + + sut.stop(); + expect(sut.delayPromise.isPending).toBeFalsy(); + expect(sut.stopPromise.isPending).toBeTruthy(); + }); }); diff --git a/app/worker.ts b/app/worker.ts index c21548f498..3157d647b9 100644 --- a/app/worker.ts +++ b/app/worker.ts @@ -1,8 +1,9 @@ +/* eslint-disable max-statements */ import { DB } from 'api/odm'; import { config } from 'api/config'; import { tenants } from 'api/tenants'; import { permissionsContext } from 'api/permissions/permissionsContext'; -import { ocrManager } from 'api/services/ocr/OcrManager'; +import { OcrManager } from 'api/services/ocr/OcrManager'; import { PDFSegmentation } from 'api/services/pdfsegmentation/PDFSegmentation'; import { DistributedLoop } from 'api/services/tasksmanager/DistributedLoop'; import { TwitterIntegration } from 'api/services/twitterintegration/TwitterIntegration'; @@ -12,8 +13,8 @@ import { syncWorker } from 'api/sync/syncWorker'; import { InformationExtraction } from 'api/services/informationextraction/InformationExtraction'; import { setupWorkerSockets } from 'api/socketio/setupSockets'; import { ConvertToPdfWorker } from 'api/services/convertToPDF/ConvertToPdfWorker'; -import { handleError } from './api/utils/handleError.js'; import { ATServiceListener } from 'api/externalIntegrations.v2/automaticTranslation/adapters/driving/ATServiceListener'; +import { handleError } from './api/utils/handleError.js'; let dbAuth = {}; @@ -39,57 +40,72 @@ DB.connect(config.DBHOST, dbAuth) setupWorkerSockets(); await tenants.run(async () => { - permissionsContext.setCommandContext(); - console.info('==> 📡 starting external services...'); - ocrManager.start(); - new ATServiceListener().start(); - new InformationExtraction().start(); - new ConvertToPdfWorker().start(); + permissionsContext.setCommandContext(); + + const servicesList = [ + new OcrManager(), + new ATServiceListener(), + new InformationExtraction(), + new ConvertToPdfWorker(), + ] as any[]; const segmentationConnector = new PDFSegmentation(); - segmentationConnector.start(); - const segmentationRepeater = new DistributedLoop( - 'segmentation_repeat', - segmentationConnector.segmentPdfs, - { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 5000 } - ); + servicesList.push(segmentationConnector); - // eslint-disable-next-line no-void - void segmentationRepeater.start(); + servicesList.push( + new DistributedLoop('segmentation_repeat', segmentationConnector.segmentPdfs, { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 5000, + }) + ); const twitterIntegration = new TwitterIntegration(); - twitterIntegration.start(); - const twitterRepeater = new DistributedLoop( - 'twitter_repeat', - twitterIntegration.addTweetsRequestsToQueue, - { port: config.redis.port, host: config.redis.host, delayTimeBetweenTasks: 120000 } + servicesList.push(twitterIntegration); + + servicesList.push( + new DistributedLoop('twitter_repeat', twitterIntegration.addTweetsRequestsToQueue, { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 120000, + }) ); - // eslint-disable-next-line no-void - void twitterRepeater.start(); - - // eslint-disable-next-line no-void - void new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), { - port: config.redis.port, - host: config.redis.host, - delayTimeBetweenTasks: 30000, - }).start(); - - // eslint-disable-next-line no-void - void new DistributedLoop('toc_service', async () => tocService.processAllTenants(), { - port: config.redis.port, - host: config.redis.host, - delayTimeBetweenTasks: 30000, - }).start(); - - // eslint-disable-next-line no-void - void new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), { - port: config.redis.port, - host: config.redis.host, - delayTimeBetweenTasks: 1000, - }).start(); + servicesList.push( + new DistributedLoop('preserve_integration', async () => preserveSync.syncAllTenants(), { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 30000, + }) + ); + + servicesList.push( + new DistributedLoop('toc_service', async () => tocService.processAllTenants(), { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 30000, + }) + ); + + servicesList.push( + new DistributedLoop('sync_job', async () => syncWorker.runAllTenants(), { + port: config.redis.port, + host: config.redis.host, + delayTimeBetweenTasks: 1000, + }) + ); + + servicesList.forEach(service => service.start()); + + process.on('SIGINT', async () => { + console.log('Received SIGINT, waiting for graceful stop...'); + await Promise.all(servicesList.map(async service => service.stop())); + console.log('Graceful stop process has finished, now exiting...'); + + process.exit(0); + }); }); }) .catch(error => {