Skip to content

Commit

Permalink
修改并发逻辑,系统设置增加定时任务并发设置
Browse files Browse the repository at this point in the history
  • Loading branch information
whyour committed Jun 30, 2023
1 parent db227e5 commit d27d432
Show file tree
Hide file tree
Showing 14 changed files with 153 additions and 73 deletions.
13 changes: 7 additions & 6 deletions back/api/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ export default (app: Router) => {
});

route.get(
'/log/remove',
'/config',
async (req: Request, res: Response, next: NextFunction) => {
const logger: Logger = Container.get('logger');
try {
const systemService = Container.get(SystemService);
const data = await systemService.getLogRemoveFrequency();
const data = await systemService.getSystemConfig();
res.send({ code: 200, data });
} catch (e) {
return next(e);
Expand All @@ -84,18 +84,19 @@ export default (app: Router) => {
);

route.put(
'/log/remove',
'/config',
celebrate({
body: Joi.object({
frequency: Joi.number().required(),
logRemoveFrequency: Joi.number().optional().allow(null),
cronConcurrency: Joi.number().optional().allow(null),
}),
}),
async (req: Request, res: Response, next: NextFunction) => {
const logger: Logger = Container.get('logger');
try {
const systemService = Container.get(SystemService);
const result = await systemService.updateLogRemoveFrequency(
req.body.frequency,
const result = await systemService.updateSystemConfig(
req.body,
);
res.send(result);
} catch (e) {
Expand Down
21 changes: 19 additions & 2 deletions back/data/auth.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { sequelize } from '.';
import { DataTypes, Model, ModelDefined } from 'sequelize';
import { NotificationInfo } from './notify';

export class AuthInfo {
ip?: string;
type: AuthDataType;
info?: any;
info?: AuthModelInfo;
id?: number;

constructor(options: AuthInfo) {
Expand All @@ -25,9 +26,25 @@ export enum AuthDataType {
'authToken' = 'authToken',
'notification' = 'notification',
'removeLogFrequency' = 'removeLogFrequency',
'systemConfig' = 'systemConfig',
}

interface AuthInstance extends Model<AuthInfo, AuthInfo>, AuthInfo {}
export interface SystemConfigInfo {
logRemoveFrequency?: number;
cronConcurrency?: number;
}

export interface LoginLogInfo {
timestamp?: number;
address?: string;
ip?: string;
platform?: string;
status?: LoginStatus,
}

export interface AuthModelInfo extends SystemConfigInfo, NotificationInfo, LoginLogInfo { }

interface AuthInstance extends Model<AuthInfo, AuthInfo>, AuthInfo { }
export const AuthModel = sequelize.define<AuthInstance>('Auth', {
ip: DataTypes.STRING,
type: DataTypes.STRING,
Expand Down
13 changes: 13 additions & 0 deletions back/loaders/initFile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ const confFile = path.join(configPath, 'config.sh');
const authConfigFile = path.join(configPath, 'auth.json');
const sampleConfigFile = path.join(samplePath, 'config.sample.sh');
const sampleAuthFile = path.join(samplePath, 'auth.sample.json');
const sampleTaskShellFile = path.join(samplePath, 'task.sample.sh');
const sampleNotifyJsFile = path.join(samplePath, 'notify.js');
const sampleNotifyPyFile = path.join(samplePath, 'notify.py');
const scriptNotifyJsFile = path.join(scriptPath, 'sendNotify.js');
const scriptNotifyPyFile = path.join(scriptPath, 'notify.py');
const TaskBeforeFile = path.join(configPath, 'task_before.sh');
const TaskAfterFile = path.join(configPath, 'task_after.sh');
const homedir = os.homedir();
const sshPath = path.resolve(homedir, '.ssh');
const sshdPath = path.join(dataPath, 'ssh.d');
Expand All @@ -39,6 +42,8 @@ export default async () => {
const tmpDirExist = await fileExist(tmpPath);
const scriptNotifyJsFileExist = await fileExist(scriptNotifyJsFile);
const scriptNotifyPyFileExist = await fileExist(scriptNotifyPyFile);
const TaskBeforeFileExist = await fileExist(TaskBeforeFile);
const TaskAfterFileExist = await fileExist(TaskAfterFile);

if (!configDirExist) {
fs.mkdirSync(configPath);
Expand Down Expand Up @@ -89,6 +94,14 @@ export default async () => {
fs.writeFileSync(scriptNotifyPyFile, fs.readFileSync(sampleNotifyPyFile));
}

if (!TaskBeforeFileExist) {
fs.writeFileSync(TaskBeforeFile, fs.readFileSync(sampleTaskShellFile));
}

if (!TaskAfterFileExist) {
fs.writeFileSync(TaskAfterFile, fs.readFileSync(sampleTaskShellFile));
}

dotenv.config({ path: confFile });

Logger.info('✌️ Init file down');
Expand Down
2 changes: 1 addition & 1 deletion back/loaders/initTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default async () => {
});

// 运行删除日志任务
const data = await systemService.getLogRemoveFrequency();
const data = await systemService.getSystemConfig();
if (data && data.info && data.info.frequency) {
const rmlogCron = {
id: data.id,
Expand Down
4 changes: 2 additions & 2 deletions back/services/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { Op, where, col as colFn, FindOptions } from 'sequelize';
import path from 'path';
import { TASK_PREFIX, QL_PREFIX } from '../config/const';
import cronClient from '../schedule/client';
import { runWithCpuLimit } from '../shared/pLimit';
import taskLimit from '../shared/pLimit';
import { spawn } from 'cross-spawn';

@Service()
Expand Down Expand Up @@ -387,7 +387,7 @@ export default class CronService {
}

private async runSingle(cronId: number): Promise<number> {
return runWithCpuLimit(() => {
return taskLimit.runWithCpuLimit(() => {
return new Promise(async (resolve: any) => {
const cron = await this.getDb({ id: cronId });
if (cron.status !== CrontabStatus.queued) {
Expand Down
4 changes: 2 additions & 2 deletions back/services/dependence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import SockService from './sock';
import { FindOptions, Op } from 'sequelize';
import { concurrentRun } from '../config/util';
import dayjs from 'dayjs';
import { runOneByOne, runWithCpuLimit } from '../shared/pLimit';
import taskLimit from '../shared/pLimit';

@Service()
export default class DependenceService {
Expand Down Expand Up @@ -147,7 +147,7 @@ export default class DependenceService {
isInstall: boolean = true,
force: boolean = false,
) {
return runOneByOne(() => {
return taskLimit.runOneByOne(() => {
return new Promise(async (resolve) => {
const depIds = [dependency.id!];
const status = isInstall
Expand Down
4 changes: 2 additions & 2 deletions back/services/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
Task,
} from 'toad-scheduler';
import dayjs from 'dayjs';
import { runWithCpuLimit } from '../shared/pLimit';
import taskLimit from '../shared/pLimit';
import { spawn } from 'cross-spawn';

interface ScheduleTaskType {
Expand Down Expand Up @@ -49,7 +49,7 @@ export default class ScheduleService {
callbacks: TaskCallbacks = {},
completionTime: 'start' | 'end' = 'end',
) {
return runWithCpuLimit(() => {
return taskLimit.runWithCpuLimit(() => {
return new Promise(async (resolve, reject) => {
try {
const startTime = dayjs();
Expand Down
42 changes: 24 additions & 18 deletions back/services/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Service, Inject } from 'typedi';
import winston from 'winston';
import config from '../config';
import * as fs from 'fs';
import { AuthDataType, AuthInfo, AuthModel, LoginStatus } from '../data/auth';
import { AuthDataType, AuthInfo, AuthModel, AuthModelInfo } from '../data/auth';
import { NotificationInfo } from '../data/notify';
import NotificationService from './notify';
import ScheduleService, { TaskCallbacks } from './schedule';
Expand All @@ -16,6 +16,7 @@ import {
parseVersion,
} from '../config/util';
import { TASK_COMMAND } from '../config/const';
import taskLimit from '../shared/pLimit'

@Service()
export default class SystemService {
Expand All @@ -28,8 +29,8 @@ export default class SystemService {
private sockService: SockService,
) {}

public async getLogRemoveFrequency() {
const doc = await this.getDb({ type: AuthDataType.removeLogFrequency });
public async getSystemConfig() {
const doc = await this.getDb({ type: AuthDataType.systemConfig });
return doc || {};
}

Expand Down Expand Up @@ -62,25 +63,30 @@ export default class SystemService {
}
}

public async updateLogRemoveFrequency(frequency: number) {
const oDoc = await this.getLogRemoveFrequency();
public async updateSystemConfig(info: AuthModelInfo) {
const oDoc = await this.getSystemConfig();
const result = await this.updateAuthDb({
...oDoc,
type: AuthDataType.removeLogFrequency,
info: { frequency },
type: AuthDataType.systemConfig,
info,
});
const cron = {
id: result.id,
name: '删除日志',
command: `ql rmlog ${frequency}`,
};
await this.scheduleService.cancelIntervalTask(cron);
if (frequency > 0) {
this.scheduleService.createIntervalTask(cron, {
days: frequency,
});
if (info.logRemoveFrequency) {
const cron = {
id: result.id,
name: '删除日志',
command: `ql rmlog ${info.logRemoveFrequency}`,
};
await this.scheduleService.cancelIntervalTask(cron);
if (info.logRemoveFrequency > 0) {
this.scheduleService.createIntervalTask(cron, {
days: info.logRemoveFrequency,
});
}
}
if (info.cronConcurrency) {
await taskLimit.setCustomLimit(info.cronConcurrency);
}
return { code: 200, data: { ...cron } };
return { code: 200, data: info };
}

public async checkUpdate() {
Expand Down
45 changes: 34 additions & 11 deletions back/shared/pLimit.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,40 @@
import pLimit from "p-limit";
import os from 'os';
import { AuthDataType, AuthModel } from "../data/auth";

const cpuLimit = pLimit(os.cpus().length);
const oneLimit = pLimit(1);
class TaskLimit {
private customLimit: number = 0;
private oneLimit = pLimit(1);
private get cpuLimit() {
return pLimit(this.customLimit || Math.max(os.cpus().length, 4))
}

export function runWithCpuLimit<T>(fn: () => Promise<T>): Promise<T> {
return cpuLimit(() => {
return fn();
});
}
constructor() {
this.setCustomLimit();
}

public async setCustomLimit(limit?: number) {
if (limit) {
this.customLimit = limit;
return;
}
const doc = await AuthModel.findOne({ where: { type: AuthDataType.systemConfig } });
if (doc?.info?.cronConcurrency) {
this.customLimit = doc?.info?.cronConcurrency;
}
}

export function runOneByOne<T>(fn: () => Promise<T>): Promise<T> {
return oneLimit(() => {
return fn();
});
public runWithCpuLimit<T>(fn: () => Promise<T>): Promise<T> {
return this.cpuLimit(() => {
return fn();
});
}

public runOneByOne<T>(fn: () => Promise<T>): Promise<T> {
return this.oneLimit(() => {
return fn();
});
}
}

export default new TaskLimit();
4 changes: 2 additions & 2 deletions back/shared/runCron.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { spawn } from 'cross-spawn';
import { runWithCpuLimit } from "./pLimit";
import taskLimit from "./pLimit";
import Logger from '../loaders/logger';

export function runCron(cmd: string): Promise<number> {
return runWithCpuLimit(() => {
return taskLimit.runWithCpuLimit(() => {
return new Promise(async (resolve: any) => {
Logger.silly('运行命令: ' + cmd);

Expand Down
4 changes: 4 additions & 0 deletions sample/config.sample.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ export TG_API_HOST=""
export DD_BOT_TOKEN=""
export DD_BOT_SECRET=""

## 企业微信反向代理地址
## (环境变量名 QYWX_ORIGIN)
export QYWX_ORIGIN=""

## 5. 企业微信机器人
## 官方说明文档:https://work.weixin.qq.com/api/doc/90000/90136/91770
## 下方填写密钥,企业微信推送 webhook 后面的 key
Expand Down
1 change: 0 additions & 1 deletion src/pages/error/index.less
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
.error-wrapper {
display: flex;
align-items: center;
justify-content: center;
height: 100vh;

Expand Down
2 changes: 1 addition & 1 deletion src/pages/error/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const Error = () => {
</Typography.Paragraph>
</div>
) : (
<PageLoading style={{ paddingTop: 0 }} tip="启动中,请稍后..." />
<PageLoading tip="启动中,请稍后..." />
)}
</div>
);
Expand Down
Loading

0 comments on commit d27d432

Please sign in to comment.