Skip to content

Commit

Permalink
feat: support interrupt task
Browse files Browse the repository at this point in the history
  • Loading branch information
likun7981 committed Jun 28, 2022
1 parent 7c318d9 commit 8e88e02
Show file tree
Hide file tree
Showing 11 changed files with 2,018 additions and 7,730 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
],
"prefer-const": 0,
"@typescript-eslint/no-namespace": "off",
"@typescript-eslint/ban-types": "off"
"@typescript-eslint/ban-types": "off",
"@typescript-eslint/no-non-null-assertion": "off"
}
}
41 changes: 35 additions & 6 deletions packages/app/client/components/RunDetail.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { Button, message, Modal } from 'antd'
import confirm from 'antd/lib/modal/confirm'
import React, { useEffect, useRef, useState } from 'react'
import watchOutput, { statusCopywrite } from '../kit/runTask'
import runTask, {
getCancelText,
getModalType,
getOkText,
getStatusCopywrite,
} from '../kit/runTask'
// @ts-ignore
import ansiHtml from 'ansi-html'
import './RunDetail.css'
import { taskService } from '../service'
import fetch from '../kit/fetch'

ansiHtml.setColors({
red: 'ca372d',
Expand All @@ -30,8 +36,8 @@ function RunDetail(props: IProps) {
onSuccess() {
let modal: ReturnType<typeof confirm>
if (name) {
watchOutput(name, {
onMessage(data, status) {
runTask(name, {
onMessage(data, status, type) {
logRef.current = logRef.current.concat(data)
setTimeout(() => {
if (containerRef.current) {
Expand All @@ -41,11 +47,12 @@ function RunDetail(props: IProps) {
})
}
}, 10)

modal.update({
title: (
<>
任务 <span className="color-#08b">{name}</span>{' '}
{statusCopywrite[status]}
{getStatusCopywrite(status, type)}
</>
),
content: (
Expand All @@ -63,7 +70,11 @@ function RunDetail(props: IProps) {
okButtonProps: {
loading: status === 'ongoing',
},
okText: status === 'ongoing' ? '执行中' : '知道了',
okText: getOkText(status, type),
cancelButtonProps: {
disabled: status !== 'ongoing',
},
type: getModalType(status, type),
})
},
onError() {
Expand All @@ -72,7 +83,8 @@ function RunDetail(props: IProps) {
},
onOpen() {
task.check(undefined)
modal = Modal.info({
modal = Modal.confirm({
type: 'info',
title: (
<>
任务 <span className="color-#08b">{name}</span> 执行中
Expand All @@ -83,6 +95,23 @@ function RunDetail(props: IProps) {
logRef.current = []
onClose()
},
onCancel() {
return new Promise((resolve, reject) => {
fetch
.get<boolean>('/api/task/cancel', { name })
.then((result) => {
if (result) {
reject()
message.success('取消成功')
}
})
.catch((e) => {
reject()
message.error(e.message)
})
})
},
cancelText: '取消',
width: '80vw',
})
},
Expand Down
7 changes: 6 additions & 1 deletion packages/app/client/components/TaskList.css
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,10 @@
.hlink-hover .hlink-play {
display: block;
opacity: 1;
transition: opacity 0.3s;
transition: opacity 0.3s, transform 0.3s, color 0.3s;
}

.hlink-play:hover {
transform: scale(1.5);
color: white;
}
2 changes: 1 addition & 1 deletion packages/app/client/components/TaskList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ function TaskList() {
setRunTaskName(item.name)
}}
className="hidden text-5xl absolute left-50% top-50% op-0 -ml-6 -mt-6 z-25 hlink-play"
color="white"
color="#ddd"
/>
</Card>
</Badge.Ribbon>
Expand Down
64 changes: 56 additions & 8 deletions packages/app/client/kit/runTask.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,79 @@
import { ModalFuncProps } from 'antd/lib/modal'
import { isFunction } from './index'

export type TStatusType = 'succeed' | 'failed' | 'ongoing'

export const statusCopywrite: Record<TStatusType, string> = {
succeed: '执行完成',
failed: '执行出错',
ongoing: '执行中...',
const statusCopywrite: Record<TStatusType, string> = {
succeed: '完成',
failed: '出错',
ongoing: '...',
}

const actions: Record<TCommand, string> = {
main: '执行',
prune: '分析',
}

export type TCommand = 'main' | 'prune'

type TOptions = {
onMessage?: (data: string, status: TStatusType) => void
onMessage?: (data: string, status: TStatusType, type: TCommand) => void
onError?: (e: any) => void
onOpen?: (e: any) => void
}

export function getStatusCopywrite(status: TStatusType, type: TCommand) {
const actionText = actions[type]
const statusText = statusCopywrite[status]
return actionText + statusText
}

export function getOkText(status: TStatusType, type: TCommand) {
if (status === 'ongoing') {
return getStatusCopywrite(status, type)
}
if (type === 'prune') {
return '确认'
}
return '知道了'
}

export function getCancelText(status: TStatusType, type: TCommand) {
if (status === 'ongoing') {
return undefined
}
if (type === 'prune') {
return '取消'
}
return undefined
}

export function getModalType(
status: TStatusType,
type: TCommand
): ModalFuncProps['type'] {
if (status === 'ongoing') {
return 'info'
}
if (type === 'prune') {
return 'confirm'
}
return 'info'
}

function runTask(name: string, options: TOptions) {
const { onError, onMessage, onOpen } = options
if (window.EventSource) {
const watched = new window.EventSource(`/api/task/run?name=${name}`)
watched.onmessage = (event) => {
const result = JSON.parse(event.data) as {
output: string
type: TStatusType
status: TStatusType
type: TCommand
}
if (onMessage) {
onMessage(result.output, result.type)
if (result.type !== 'ongoing') {
onMessage(result.output, result.status, result.type)
if (result.status !== 'ongoing') {
watched.close()
}
}
Expand Down
65 changes: 49 additions & 16 deletions packages/app/server/controller/task.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import Router from '@koa/router'
import task from '../kit/TaskSDK.js'
import koaBody from 'koa-body'
import { main, prune } from '@hlink/core'
import { PassThrough } from 'node:stream'
import EventEmitter from 'node:events'
import sse from '../middleware/sse.js'
import start from '../kit/exec.js'
import { chalk, getTag } from '@hlink/core'

const events = new EventEmitter()
events.setMaxListeners(0)

const ongoingTasks: Partial<Record<string, ReturnType<typeof start> | null>> =
{}
const router = new Router({
prefix: '/task',
})
Expand Down Expand Up @@ -53,29 +53,62 @@ router.get('/check_config', async (ctx) => {
ctx.body = true
})

router.get('/cancel', async (ctx) => {
const { name } = ctx.request.query as {
name: string
}
const ongoingTask = ongoingTasks[name]
if (ongoingTask) {
ongoingTask.kill()
ongoingTasks[name] = null
ctx.body = true
} else {
throw new Error('没有进行中的任务')
}
})

router.get('/run', sse(), async (ctx) => {
const { name } = ctx.request.query as {
name: string
}

const result = await task.getConfig(name)

try {
await start(result.command, result.config, (data) => {
ctx.json({
output: data,
type: 'ongoing',
let currentMonitor = ongoingTasks[name]
if (currentMonitor) {
ctx.send({
output: `${getTag('INFO')} 任务 ${chalk.cyan(name)} 正在执行中..`,
status: 'ongoing',
type: result.command,
})
} else {
currentMonitor = start(result.command, result.config)
}
ongoingTasks[name] = currentMonitor
currentMonitor.handleLog((data) => {
ctx.send({
output: data,
status: 'ongoing',
type: result.command,
})
})
currentMonitor.original
.then(() => {
ctx.send({
status: 'succeed',
type: result.command,
})
})
ctx.json({
type: 'succeed',
.catch(() => {
ctx.send({
status: 'failed',
type: result.command,
output: `${getTag('WARN')} 已取消`,
})
})
} catch (e) {
ctx.json({
type: 'failed',
.finally(() => {
ctx.sendEnd()
ongoingTasks[name] = null
})
}
ctx.jsonEnd()
})

export default router.routes()
5 changes: 4 additions & 1 deletion packages/app/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { Koa } from './kit/base.js'
import formatResponse from './middleware/formatResponse.js'
import time from './middleware/time.js'
import sse from './middleware/sse.js'
import router from './router.js'

const app = new Koa()

app.context.state = {
...app.context.state,
}

app
.use(time)
.use(formatResponse)
Expand Down
43 changes: 30 additions & 13 deletions packages/app/server/kit/exec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,44 @@ export type ReturnType<T extends 'main' | 'prune'> = {
config: OptionsType[T]
}

async function start<T extends 'main' | 'prune'>(
function start<T extends 'main' | 'prune'>(
command: T,
options: OptionsType[T],
log: (data: string, type: 'succeed' | 'failed') => void
log?: (data: string, type: 'succeed' | 'failed') => void
) {
const monitor = execa('node', [
path.join(__dirname, './nodeExec.js'),
command,
JSON.stringify(options),
])
monitor.stdout?.on('data', (e) => {
const str = e.toString()
if (str) {
log(str.split('\n').filter(Boolean).join('\n'), 'succeed')
}
})
monitor.stderr?.on('data', (e) => {
const str = e.toString()
log(str, 'failed')
})
return monitor
if (log) {
monitor.stdout?.on('data', (e) => {
const str = e.toString()
if (str) {
log(str.split('\n').filter(Boolean).join('\n'), 'succeed')
}
})
monitor.stderr?.on('data', (e) => {
const str = e.toString()
log(str, 'failed')
})
}
return {
kill: () => monitor.kill('SIGILL'),
handleLog: (log: (data: string, type: 'succeed' | 'failed') => void) => {
monitor.stdout?.on('data', (e) => {
const str = e.toString()
if (str) {
log(str.split('\n').filter(Boolean).join('\n'), 'succeed')
}
})
monitor.stderr?.on('data', (e) => {
const str = e.toString()
log(str, 'failed')
})
},
original: monitor,
}
}

export default start
Loading

0 comments on commit 8e88e02

Please sign in to comment.