From a93f80d44752b4d7c35290517286b542da0ee967 Mon Sep 17 00:00:00 2001 From: simon3000 Date: Sun, 10 Nov 2019 18:17:44 +0100 Subject: [PATCH] DDDhttp https://github.com/dd-center/Cluster-center/issues/1 --- src/ws.js | 64 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/src/ws.js b/src/ws.js index a88333a..d14729d 100644 --- a/src/ws.js +++ b/src/ws.js @@ -16,6 +16,9 @@ const base = process.env.development ? 'ws://0.0.0.0:9013' : 'wss://cluster.vtbs const wait = ms => new Promise(resolve => setTimeout(resolve, ms)) const parse = string => { + if (string === 'wait') { + return { empty: true } + } try { return JSON.parse(string) } catch (_) { @@ -69,20 +72,27 @@ module.exports = async ({ state, db }) => { const pending = [] ws.on('message', async message => { - const { key, data } = parse(message) - const { type } = data - if (type === 'http') { - const { url } = data + const { key, data, empty } = parse(message) + if (data) { + const { type } = data + if (type === 'http') { + const { url } = data + const resolve = pending.shift() + if (resolve) { + console.log('job received', url) + resolve({ key, url }) + } + } else if (type === 'query') { + if (queryTable.has(key)) { + const { result } = data + queryTable.get(key)(result) + queryTable.delete(key) + } + } + } else if (empty) { const resolve = pending.shift() if (resolve) { - console.log('job received', url) - resolve({ key, url }) - } - } else if (type === 'query') { - if (queryTable.has(key)) { - const { result } = data - queryTable.get(key)(result) - queryTable.delete(key) + resolve({ empty }) } } }) @@ -91,22 +101,24 @@ module.exports = async ({ state, db }) => { await wait(INTERVAL * PARALLEL * Math.random()) while (true) { const now = Date.now() - const { key, url } = await new Promise(resolve => { + const { key, url, empty } = await new Promise(resolve => { pending.push(resolve) - secureSend('DDhttp') + secureSend('DDDhttp') }) - const time = Date.now() - const { body } = await got(url).catch(e => ({ body: JSON.stringify({ code: e.statusCode }) })) - const result = secureSend(JSON.stringify({ - key, - data: body - })) - if (result) { - state.delay = Math.round(process.uptime() * 1000 / state.completeNumNow) - console.log(`job complete ${((Date.now() - time) / 1000).toFixed(2)}s`, state.delay, INTERVAL * PARALLEL - Date.now() + now) - state.log = url - state.completeNum++ - state.completeNumNow++ + if (!empty) { + const time = Date.now() + const { body } = await got(url).catch(e => ({ body: JSON.stringify({ code: e.statusCode }) })) + const result = secureSend(JSON.stringify({ + key, + data: body + })) + if (result) { + state.delay = Math.round(process.uptime() * 1000 / state.completeNumNow) + console.log(`job complete ${((Date.now() - time) / 1000).toFixed(2)}s`, state.delay, INTERVAL * PARALLEL - Date.now() + now) + state.log = url + state.completeNum++ + state.completeNumNow++ + } } await wait(INTERVAL * PARALLEL - Date.now() + now) }