Skip to content

Commit

Permalink
gracefull shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Joao-vi committed Oct 25, 2024
1 parent a29fe77 commit 52e2c13
Show file tree
Hide file tree
Showing 20 changed files with 493 additions and 209 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { tenants } from 'api/tenants';
import { TaskManager } from 'api/services/tasksmanager/TaskManager';
import { permissionsContext } from 'api/permissions/permissionsContext';
import { Logger } from 'api/log.v2/contracts/Logger';
import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger';
import { InvalidATServerResponse } from '../../errors/generateATErrors';
import { AutomaticTranslationFactory } from '../../AutomaticTranslationFactory';
import { Validator } from '../../infrastructure/Validator';
Expand All @@ -11,23 +13,29 @@ export class ATServiceListener {

private taskManager: TaskManager;

constructor(ATFactory: typeof AutomaticTranslationFactory = AutomaticTranslationFactory) {
constructor(
ATFactory: typeof AutomaticTranslationFactory = AutomaticTranslationFactory,
logger: Logger = DefaultLogger()
) {
const validator = new Validator<TranslationResult>(translationResultSchema);
this.taskManager = new TaskManager({
serviceName: ATServiceListener.SERVICE_NAME,
processResults: async result => {
if (!validator.validate(result)) {
throw new InvalidATServerResponse(validator.getErrors()[0].message, {
cause: validator.getErrors()[0],
});
}
this.taskManager = new TaskManager(
{
serviceName: ATServiceListener.SERVICE_NAME,
processResults: async result => {
if (!validator.validate(result)) {
throw new InvalidATServerResponse(validator.getErrors()[0].message, {
cause: validator.getErrors()[0],
});
}

await tenants.run(async () => {
permissionsContext.setCommandContext();
await ATFactory.defaultSaveEntityTranslations().execute(result);
}, result.key[0]);
await tenants.run(async () => {
permissionsContext.setCommandContext();
await ATFactory.defaultSaveEntityTranslations().execute(result);
}, result.key[0]);
},
},
});
logger
);
}

start(interval = 500) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import RedisSMQ from 'rsmq';
import { UserSchema } from 'shared/types/userType';
import waitForExpect from 'wait-for-expect';
import { ATServiceListener } from '../ATServiceListener';
import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger';

const prepareATFactory = (executeSpy: jest.Mock<any, any, any>) => {
// @ts-ignore
Expand Down Expand Up @@ -39,7 +40,7 @@ describe('ATServiceListener', () => {
userInContext = permissionsContext.getUserInContext();
});

listener = new ATServiceListener(prepareATFactory(executeSpy));
listener = new ATServiceListener(prepareATFactory(executeSpy), createMockLogger());
redisClient = Redis.createClient(redisUrl);
redisSMQ = new RedisSMQ({ client: redisClient });

Expand Down
4 changes: 3 additions & 1 deletion app/api/files/ocrRoutes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Application, Request, Response, NextFunction } from 'express';
import { storage } from 'api/files';
import needsAuthorization from 'api/auth/authMiddleware';
import { isOcrEnabled, ocrManager, getOcrStatus } from 'api/services/ocr/OcrManager';
import { isOcrEnabled, getOcrStatus, OcrManager } from 'api/services/ocr/OcrManager';
import { files } from './files';
import { validation, createError } from '../utils';

Expand Down Expand Up @@ -57,9 +57,11 @@ const ocrRoutes = (app: Application) => {
needsAuthorization(['admin', 'editor']),
validation.validateRequest(ocrRequestDecriptor),
async (req, res) => {
const ocrManager = new OcrManager();
const file = await fileFromRequest(req);

await ocrManager.addToQueue(file);
await ocrManager.stop();

res.sendStatus(200);
}
Expand Down
5 changes: 0 additions & 5 deletions app/api/files/specs/ocrRoutes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import request from 'supertest';
import { storage } from 'api/files';
import relationships from 'api/relationships';
import { search } from 'api/search';
import { ocrManager } from 'api/services/ocr/OcrManager';
import settings from 'api/settings/settings';
import { getFixturesFactory } from 'api/utils/fixturesFactory';
import db, { DBFixture } from 'api/utils/testing_db';
Expand Down Expand Up @@ -94,10 +93,6 @@ describe('OCR service', () => {
jest.spyOn(setupSockets, 'emitToTenant').mockImplementation(() => {});
});

beforeAll(() => {
ocrManager.start();
});

afterAll(async () => {
jest.restoreAllMocks();
await testingEnvironment.tearDown();
Expand Down
78 changes: 43 additions & 35 deletions app/api/services/convertToPDF/ConvertToPdfWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import path from 'path';
import { pipeline } from 'stream/promises';
import { TaskManager } from '../tasksmanager/TaskManager';
import { convertToPDFService } from './convertToPdfService';
import { Logger } from 'api/log.v2/contracts/Logger';
import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger';

const ajv = new Ajv();

Expand Down Expand Up @@ -38,47 +40,53 @@ export class ConvertToPdfWorker {

taskManager: TaskManager;

constructor() {
this.taskManager = new TaskManager({
serviceName: this.SERVICE_NAME,
processResults: async result => {
if (result.success === false) {
throw new Error(result.error_message);
}
if (!validateResult(result)) {
throw new ValidationError(validateResult.errors || [{ message: 'validation failed' }]);
}
await tenants.run(async () => {
permissionsContext.setCommandContext();
const [attachment] = await files.get({ filename: result.params.filename });
if (!attachment.entity) {
throw new Error('attachment does not have an entity');
constructor(logger: Logger = DefaultLogger()) {
this.taskManager = new TaskManager(
{
serviceName: this.SERVICE_NAME,
processResults: async result => {
if (result.success === false) {
throw new Error(result.error_message);
}
await files.save({ ...attachment, status: 'ready' });
if (!validateResult(result)) {
throw new ValidationError(validateResult.errors || [{ message: 'validation failed' }]);
}
await tenants.run(async () => {
permissionsContext.setCommandContext();
const [attachment] = await files.get({ filename: result.params.filename });
if (!attachment.entity) {
throw new Error('attachment does not have an entity');
}
await files.save({ ...attachment, status: 'ready' });

const filename = `${generateFileName({})}.pdf`;
const filename = `${generateFileName({})}.pdf`;

await storage.storeFile(
filename,
await convertToPDFService.download(new URL(result.file_url)),
'document'
);
await pipeline(
await storage.readableFile(filename, 'document'),
createWriteStream(path.join(os.tmpdir(), filename))
);
await storage.storeFile(
filename,
await convertToPDFService.download(new URL(result.file_url)),
'document'
);
await pipeline(
await storage.readableFile(filename, 'document'),
createWriteStream(path.join(os.tmpdir(), filename))
);

await processDocument(attachment.entity, {
filename,
destination: os.tmpdir(),
originalname: chageFileExtesion(attachment.originalname || generateFileName({}), 'pdf'),
mimetype: 'application/pdf',
});
await processDocument(attachment.entity, {
filename,
destination: os.tmpdir(),
originalname: chageFileExtesion(
attachment.originalname || generateFileName({}),
'pdf'
),
mimetype: 'application/pdf',
});

emitToTenant(result.params.namespace, 'documentProcessed', attachment.entity);
}, result.params.namespace);
emitToTenant(result.params.namespace, 'documentProcessed', attachment.entity);
}, result.params.namespace);
},
},
});
logger
);
}

start(interval = 500) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import * as handleError from 'api/utils/handleError.js';
import { ObjectId } from 'mongodb';
import Redis from 'redis';
import RedisSMQ from 'rsmq';
import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger';
import waitForExpect from 'wait-for-expect';
import { convertToPDFService } from '../convertToPdfService';
import { ConvertToPdfWorker } from '../ConvertToPdfWorker';

describe('convertToPdfWorker', () => {
const worker = new ConvertToPdfWorker();
const worker = new ConvertToPdfWorker(createMockLogger());
const redisUrl = `redis://${config.redis.host}:${config.redis.port}`;
const redisClient = Redis.createClient(redisUrl);
const redisSMQ = new RedisSMQ({ client: redisClient });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class InformationExtraction {
this.taskManager.subscribeToResults();
}

async stop() {
await this.taskManager.stop();
}

requestResults = async (message: InternalIXResultsMessage) => {
const response = await request.get(message.data_url);

Expand Down
18 changes: 11 additions & 7 deletions app/api/services/ocr/OcrManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import {
markError,
markReady,
} from './ocrRecords';
import { DefaultLogger } from 'api/log.v2/infrastructure/StandardLogger';
import { Logger } from 'api/log.v2/contracts/Logger';

interface OcrSettings {
url: string;
Expand Down Expand Up @@ -182,11 +184,14 @@ class OcrManager {

ocrTaskManager: TaskManager;

constructor() {
this.ocrTaskManager = new TaskManager({
serviceName: this.SERVICE_NAME,
processResults,
});
constructor(logger: Logger = DefaultLogger()) {
this.ocrTaskManager = new TaskManager(
{
serviceName: this.SERVICE_NAME,
processResults,
},
logger
);
}

start() {
Expand Down Expand Up @@ -227,5 +232,4 @@ class OcrManager {
}
}

const ocrManager = new OcrManager();
export { ocrManager, OcrManager, isEnabled as isOcrEnabled, getStatus as getOcrStatus };
export { OcrManager, isEnabled as isOcrEnabled, getStatus as getOcrStatus };
6 changes: 4 additions & 2 deletions app/api/services/ocr/specs/OcrManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { ResultsMessage, TaskManager } from '../../tasksmanager/TaskManager';
import { mockTaskManagerImpl } from '../../tasksmanager/specs/TaskManagerImplementationMocker';
import { fixtures, fixturesFactory } from './fixtures/fixtures';
import { cleanupRecordsOfFiles } from '../ocrRecords';
import { createMockLogger } from 'api/log.v2/infrastructure/MockLogger';

jest.mock('api/services/tasksmanager/TaskManager.ts');

Expand Down Expand Up @@ -99,11 +100,11 @@ describe('OcrManager', () => {
success: true,
};

ocrManager = new OcrManager();
ocrManager = new OcrManager(createMockLogger());
ocrManager.start();
});

beforeEach(() => {
beforeEach(async () => {
mocks.jestMocks['storage.fileContents'] = jest
.spyOn(storage, 'fileContents')
.mockResolvedValue(Buffer.from('file_content'));
Expand All @@ -112,6 +113,7 @@ describe('OcrManager', () => {
afterAll(async () => {
mocks.release();
await testingEnvironment.tearDown();
await ocrManager.stop();
});

describe('on success', () => {
Expand Down
4 changes: 4 additions & 0 deletions app/api/services/pdfsegmentation/PDFSegmentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class PDFSegmentation {
this.segmentationTaskManager.subscribeToResults();
}

async stop() {
await this.segmentationTaskManager.stop();
}

segmentOnePdf = async (
file: { filename: string; _id: ObjectIdSchema },
serviceUrl: string,
Expand Down
Loading

0 comments on commit 52e2c13

Please sign in to comment.