diff --git a/package.json b/package.json index 60db101..d752c7e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mira-video-manager", - "version": "1.5.3", + "version": "1.5.4", "description": "Video Process for mira project", "main": "index.js", "scripts": { diff --git a/src/JobScheduler.ts b/src/JobScheduler.ts index 56be835..591c9a8 100644 --- a/src/JobScheduler.ts +++ b/src/JobScheduler.ts @@ -32,17 +32,16 @@ import { DOWNLOAD_MESSAGE_QUEUE, DownloadMQMessage, JOB_EXCHANGE, - JOB_QUEUE, + JOB_QUEUE, MQMessage, RabbitMQService, Sentry, TYPES, VIDEO_MANAGER_COMMAND, - VIDEO_MANAGER_EXCHANGE, - VIDEO_MANAGER_GENERAL + VIDEO_MANAGER_EXCHANGE } from '@irohalab/mira-shared'; import { randomUUID } from 'crypto'; import { getStdLogger } from './utils/Logger'; -import { META_JOB_KEY, NORMAL_JOB_KEY } from './TYPES'; +import { META_JOB_KEY, META_JOB_QUEUE, NORMAL_JOB_KEY } from './TYPES'; import { JobType } from './domains/JobType'; import { ValidateAction } from './domains/ValidateAction'; @@ -56,6 +55,7 @@ export class JobScheduler implements JobApplication { private _downloadMessageConsumeTag: string; private _commandMessageConsumeTag: string; private _jobMessageConsumeTag: string; + private _metaJobMessageConsumeTag: string; private _jobStatusCheckerTimerId: NodeJS.Timeout; constructor(@inject(TYPES.ConfigManager) private _configManager: ConfigManager, @@ -92,20 +92,13 @@ export class JobScheduler implements JobApplication { } }); this._jobMessageConsumeTag = await this._rabbitmqService.consume(JOB_QUEUE, async (msg) => { - try { - const jobMessage = msg as JobMessage; - const job = await this._databaseService.getJobRepository().findOne({ id: jobMessage.jobId }); - if (job.status === JobStatus.Canceled) { - logger.info('remove canceled job (' + job.id +') from message queue'); - // remove from Message Queue - return true; - } - } catch (ex) { - logger.error(ex); - this._sentry.capture(ex); - } - return false; + return await this.removeJobMessageFromQueue(msg); + }); + + this._metaJobMessageConsumeTag = await this._rabbitmqService.consume(META_JOB_QUEUE, async (msg) => { + return await this.removeJobMessageFromQueue(msg); }); + this.checkJobStatus(); } @@ -113,6 +106,22 @@ export class JobScheduler implements JobApplication { clearTimeout(this._jobStatusCheckerTimerId); } + private async removeJobMessageFromQueue(msg: MQMessage): Promise { + try { + const jobMessage = msg as JobMessage; + const job = await this._databaseService.getJobRepository().findOne({ id: jobMessage.jobId }); + if (job.status === JobStatus.Canceled) { + logger.info('remove canceled job (' + job.id +') from message queue'); + // remove from Message Queue + return true; + } + } catch (ex) { + logger.error(ex); + this._sentry.capture(ex); + } + return false; + } + private async onDownloadMessage(msg: DownloadMQMessage): Promise { let appliedRule: VideoProcessRule; const rules = await this._databaseService.getVideoProcessRuleRepository().findByBangumiId(msg.bangumiId); diff --git a/src/api-service/controller/JobController.ts b/src/api-service/controller/JobController.ts index d47e6bb..f4ad7ca 100644 --- a/src/api-service/controller/JobController.ts +++ b/src/api-service/controller/JobController.ts @@ -59,6 +59,8 @@ export class JobController extends BaseHttpController implements interfaces.Cont try { if (status === 'all') { jobs = await this._databaseService.getJobRepository(true).getRecentJobs(); + } else if (status === 'Running') { + jobs = await this._databaseService.getJobRepository(true).getRunningJobs(); } else { jobs = await this._databaseService.getJobRepository(true).getJobsByStatus(status); } diff --git a/src/repository/JobRepository.ts b/src/repository/JobRepository.ts index 07f927d..8c22287 100644 --- a/src/repository/JobRepository.ts +++ b/src/repository/JobRepository.ts @@ -69,6 +69,14 @@ export class JobRepository extends BaseEntityRepository { }) } + public async getRunningJobs(): Promise { + return await this.find({ $and: [{status: JobStatus.Running}, {status: JobStatus.MetaData}]}, { + orderBy: { + createTime: 'DESC' + } + }) + } + public async getRecentJobs(): Promise { return await this.find({}, { orderBy: {