diff --git a/src/JobManager/JobManager.ts b/src/JobManager/JobManager.ts index b279be6..61a2223 100644 --- a/src/JobManager/JobManager.ts +++ b/src/JobManager/JobManager.ts @@ -90,20 +90,8 @@ export class JobManager { } }); - this._vm.events.on(EVENT_VERTEX_FAIL, async (error) => { - try { - this._jobLogger.error(error); - this._job.status = JobStatus.UnrecoverableError; - this._job = await jobRepo.save(this._job) as Job; - this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id); - this._jobLogger.error('Job failed with vertex failure'); - this._jobLogger.info(LOG_END_FLAG); - } catch (err) { - this._jobLogger.error(err); - this._jobLogger.info(LOG_END_FLAG); - this._sentry.capture(err); - this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id); - } + this._vm.events.on(EVENT_VERTEX_FAIL, (error) => { + this.onVxFailed(error); }); this._vm.events.on(TERMINAL_VERTEX_FINISHED, async () => { @@ -125,10 +113,9 @@ export class JobManager { this._jobLogger.info(LOG_END_FLAG); } } catch (error) { - this._jobLogger.error(error); - this._jobLogger.info(LOG_END_FLAG); this._sentry.capture(error); - this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id); + await this.onVxFailed(error); + this._jobLogger.error(error); } }); @@ -139,6 +126,23 @@ export class JobManager { } } + private async onVxFailed(error:any): Promise { + const jobRepo = this._databaseService.getJobRepository(); + try { + this._jobLogger.error(error); + this._job.status = JobStatus.UnrecoverableError; + this._job = await jobRepo.save(this._job) as Job; + this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id); + this._jobLogger.error('Job failed with vertex failure'); + this._jobLogger.info(LOG_END_FLAG); + } catch (err) { + this._jobLogger.error(err); + this._jobLogger.info(LOG_END_FLAG); + this._sentry.capture(err); + this.events.emit(JobManager.EVENT_JOB_FAILED, this._job.id); + } + } + /** * Cancel current running job if possible */ diff --git a/src/JobManager/JobMetadataHelperImpl.ts b/src/JobManager/JobMetadataHelperImpl.ts index f1384af..230e6c3 100644 --- a/src/JobManager/JobMetadataHelperImpl.ts +++ b/src/JobManager/JobMetadataHelperImpl.ts @@ -31,7 +31,7 @@ import { JobMetadataHelper } from './JobMetadataHelper'; import { readdir } from 'fs/promises'; import { StringDecoder } from 'string_decoder'; -const COMMAND_TIMEOUT = 5000; +const COMMAND_TIMEOUT = 30 * 60 * 1000; const TILE_SIZE = 10; // fixed tile size to avoid large image const SCALE_HEIGHT = 120; @@ -60,30 +60,22 @@ export class JobMetadataHelperImpl implements JobMetadataHelper { throw new Error('No video output found!'); } const metadata = new VideoOutputMetadata(); - try { - const outputPath = videoVertex.outputPath; - const trackInfos = await getStreamsInfo(outputPath); - const container = new MediaContainer(trackInfos); - const videoStream = new VideoStream(container.getDefaultVideoStreamInfo()); - const thumbnailPath = join(dirname(outputPath), `thumb-${basename(outputPath)}.png`); - jobLogger.info(`Generating thumbnail of the video at 00:00:01.000, output is ${thumbnailPath}`); - await this.runCommand('ffmpeg', ['-y','-ss', '00:00:01.000', '-i', outputPath, '-vframes','1', thumbnailPath], jobLogger); - jobLogger.info(`Thumbnail generated, getting dominant color of the thumbnail`); - const dominantColor = await getAverageColor(thumbnailPath, {algorithm: 'dominant'}); - metadata.width = videoStream.getWidth(); - metadata.height = videoStream.getHeight(); - metadata.duration = container.getDuration() * 1000; - metadata.dominantColorOfThumbnail = dominantColor.hex; - metadata.thumbnailPath = thumbnailPath; - jobLogger.info('Generating keyframes preview tile'); - await this.generatePreviewImage(outputPath, metadata, jobLogger); - } catch (ex) { - jobLogger.error(ex); - this._sentry.capture(ex); - } - if (!metadata) { - throw new Error('no metadata for this job, check the job logs for more information'); - } + const outputPath = videoVertex.outputPath; + const trackInfos = await getStreamsInfo(outputPath); + const container = new MediaContainer(trackInfos); + const videoStream = new VideoStream(container.getDefaultVideoStreamInfo()); + const thumbnailPath = join(dirname(outputPath), `thumb-${basename(outputPath)}.png`); + jobLogger.info(`Generating thumbnail of the video at 00:00:01.000, output is ${thumbnailPath}`); + await this.runCommand('ffmpeg', ['-y','-ss', '00:00:01.000', '-i', outputPath, '-vframes','1', thumbnailPath], jobLogger); + jobLogger.info(`Thumbnail generated, getting dominant color of the thumbnail`); + const dominantColor = await getAverageColor(thumbnailPath, {algorithm: 'dominant'}); + metadata.width = videoStream.getWidth(); + metadata.height = videoStream.getHeight(); + metadata.duration = container.getDuration() * 1000; + metadata.dominantColorOfThumbnail = dominantColor.hex; + metadata.thumbnailPath = thumbnailPath; + jobLogger.info('Generating keyframes preview tile'); + await this.generatePreviewImage(outputPath, metadata, jobLogger); return metadata; } @@ -106,11 +98,13 @@ export class JobMetadataHelperImpl implements JobMetadataHelper { const keyframeImagePath = join(imageDirPath, `${imageFilenameBase}-%3d.jpg`); // generate tiles for key frames every 1 second await this.runCommand('ffmpeg', ['-y', '-i', videoPath, + '-an', + '-vsync', '0', '-vf', `select=isnan(prev_selected_t)+gte(t-prev_selected_t\\,2),scale=${metaData.frameWidth}:${metaData.frameHeight},tile=${metaData.tileSize}x${metaData.tileSize}`, - '-an', '-vsync', '0', keyframeImagePath + keyframeImagePath ], jobLogger); - + jobLogger.info('keyframes generated!'); const filenameList = await readdir(imageDirPath); metaData.keyframeImagePathList = filenameList.filter(f => f.endsWith('.jpg') && f.startsWith(imageFilenameBase)).map(f => join(imageDirPath, f)); } @@ -128,11 +122,12 @@ export class JobMetadataHelperImpl implements JobMetadataHelper { logger.info(decoder.end(data)); }); child.stderr.on('data', (data) => { - logger.error(decoder.end(data)); + logger.info(decoder.end(data)); }); child.on('close', (code) => { + logger.info('command finished, exit code = ' + code); if (code !== 0) { - reject(); + reject('ffmpeg command failed with non-0 exit code'); return; } resolve(undefined); diff --git a/src/JobManager/VertexManagerImpl.ts b/src/JobManager/VertexManagerImpl.ts index 2b48aac..841b7fe 100644 --- a/src/JobManager/VertexManagerImpl.ts +++ b/src/JobManager/VertexManagerImpl.ts @@ -184,17 +184,17 @@ export class VertexManagerImpl implements VertexManager { const vertex = vertexMap[vertexId]; const vertexLogger = this._vertexLoggerDict[vertexId]; if (vertex.status === VertexStatus.Running && this._runningVertexDict[vertexId]) { - vertexLogger.warn('trying to cancel vertex'); + vertexLogger.info('trying to cancel vertex'); vertex.status = VertexStatus.Canceled; allPromise.push(vertexRepo.save(vertex) .then(() => { return this._runningVertexDict[vertexId].videoProcessor.cancel() }) .then(() => { - vertexLogger.warn('vertex canceled'); + vertexLogger.info('vertex canceled'); }) .catch((error) => { - vertexLogger.error(error); + vertexLogger.info(error); })); } }); @@ -258,7 +258,7 @@ export class VertexManagerImpl implements VertexManager { vertex.videoProcessor = this._processorFactory(vertex.actionType); vertex.videoProcessor.registerLogHandler((logChunk, ch) => { if (ch === 'stderr') { - vertexLogger.error(logChunk); + vertexLogger.info(logChunk); } else { vertexLogger.info(logChunk); } @@ -284,7 +284,7 @@ export class VertexManagerImpl implements VertexManager { await vertex.videoProcessor.dispose(); this.onVertexFinished(vertex.id); } catch (error) { - vertexLogger.error(error); + vertexLogger.info(error); this.onVertexError(vertex.id, error); await vertex.videoProcessor.dispose(); } finally { @@ -342,7 +342,7 @@ export class VertexManagerImpl implements VertexManager { .catch((err) => { // save error this._sentry.capture(err); - this._vertexLoggerDict[vertexId].error(err); + this._vertexLoggerDict[vertexId].info(err); this._vertexLoggerDict[vertexId].info(LOG_END_FLAG); }); } diff --git a/src/api-service/log-streaming-helper.ts b/src/api-service/log-streaming-helper.ts index 5d86613..bf85e05 100644 --- a/src/api-service/log-streaming-helper.ts +++ b/src/api-service/log-streaming-helper.ts @@ -1,5 +1,5 @@ /* - * Copyright 2022 IROHA LAB + * Copyright 2023 IROHA LAB * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,44 +20,52 @@ import { getStdLogger, LOG_END_FLAG } from '../utils/Logger'; import { createInterface } from 'readline'; import { createReadStream } from 'fs'; import { stat } from 'fs/promises'; +import { promisify } from 'util'; +const sleep = promisify(setTimeout); const logger = getStdLogger(); export function tailing(logPath: string, socket: Socket, breakAtEndFlag: boolean): void { - const tail = new Tail(logPath, { - fromBeginning: true, - flushAtEOF: true - }); - tail.on('line', (line) => { - if (line) { - try { - const lineDict = JSON.parse(line); - if (lineDict.msg === LOG_END_FLAG) { - socket.emit('log:line_end', 'end of the log'); - if (breakAtEndFlag) { - tail.unwatch(); + waitForFileCreation(logPath) + .then(() => { + const tail = new Tail(logPath, { + fromBeginning: true, + flushAtEOF: true + }); + tail.on('line', (line) => { + if (line) { + try { + const lineDict = JSON.parse(line); + if (lineDict.msg === LOG_END_FLAG) { + socket.emit('log:line_end', 'end of the log'); + if (breakAtEndFlag) { + tail.unwatch(); + } + // socket.disconnect(); + return; + } + } catch (err) { + logger.error(err); } - // socket.disconnect(); - return; } - } catch (err) { - logger.error(err); - } - } - socket.emit('log:line', line); - }); + socket.emit('log:line', line); + }); - tail.on('error', (error) => { - logger.error(error); - socket.emit('error', error); - socket.disconnect(); - }); + tail.on('error', (error) => { + logger.error(error); + socket.emit('error', error); + socket.disconnect(); + }); - socket.on('disconnect', (reason) => { - if (tail) { - tail.unwatch(); - } - }); + socket.on('disconnect', (reason) => { + if (tail) { + tail.unwatch(); + } + }); + }) + .catch((e) => { + logger.error(e); + }); } export function readToEnd(logPath: string, socket: Socket): void { @@ -95,4 +103,22 @@ export async function isFileExists(logPath: string): Promise { } catch (e) { return false; } +} + +async function waitForFileCreation(filePath: string, waitCount = 0): Promise { + try { + const fileStats = await stat(filePath); + } catch (err: any) { + if (err.code === 'ENOENT') { + if (waitCount < 10) { + await sleep(5000); + return await waitForFileCreation(filePath, waitCount + 1); + } else { + throw new Error('max wait time reached, but file still not exists'); + } + } else { + throw err; + } + } + } \ No newline at end of file