Skip to content

Commit

Permalink
DDDhttp
Browse files Browse the repository at this point in the history
  • Loading branch information
simon300000 committed Nov 10, 2019
1 parent c063b39 commit a93f80d
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions src/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const base = process.env.development ? 'ws://0.0.0.0:9013' : 'wss://cluster.vtbs
const wait = ms => new Promise(resolve => setTimeout(resolve, ms))

const parse = string => {
if (string === 'wait') {
return { empty: true }
}
try {
return JSON.parse(string)
} catch (_) {
Expand Down Expand Up @@ -69,20 +72,27 @@ module.exports = async ({ state, db }) => {
const pending = []

ws.on('message', async message => {
const { key, data } = parse(message)
const { type } = data
if (type === 'http') {
const { url } = data
const { key, data, empty } = parse(message)
if (data) {
const { type } = data
if (type === 'http') {
const { url } = data
const resolve = pending.shift()
if (resolve) {
console.log('job received', url)
resolve({ key, url })
}
} else if (type === 'query') {
if (queryTable.has(key)) {
const { result } = data
queryTable.get(key)(result)
queryTable.delete(key)
}
}
} else if (empty) {
const resolve = pending.shift()
if (resolve) {
console.log('job received', url)
resolve({ key, url })
}
} else if (type === 'query') {
if (queryTable.has(key)) {
const { result } = data
queryTable.get(key)(result)
queryTable.delete(key)
resolve({ empty })
}
}
})
Expand All @@ -91,22 +101,24 @@ module.exports = async ({ state, db }) => {
await wait(INTERVAL * PARALLEL * Math.random())
while (true) {
const now = Date.now()
const { key, url } = await new Promise(resolve => {
const { key, url, empty } = await new Promise(resolve => {
pending.push(resolve)
secureSend('DDhttp')
secureSend('DDDhttp')
})
const time = Date.now()
const { body } = await got(url).catch(e => ({ body: JSON.stringify({ code: e.statusCode }) }))
const result = secureSend(JSON.stringify({
key,
data: body
}))
if (result) {
state.delay = Math.round(process.uptime() * 1000 / state.completeNumNow)
console.log(`job complete ${((Date.now() - time) / 1000).toFixed(2)}s`, state.delay, INTERVAL * PARALLEL - Date.now() + now)
state.log = url
state.completeNum++
state.completeNumNow++
if (!empty) {
const time = Date.now()
const { body } = await got(url).catch(e => ({ body: JSON.stringify({ code: e.statusCode }) }))
const result = secureSend(JSON.stringify({
key,
data: body
}))
if (result) {
state.delay = Math.round(process.uptime() * 1000 / state.completeNumNow)
console.log(`job complete ${((Date.now() - time) / 1000).toFixed(2)}s`, state.delay, INTERVAL * PARALLEL - Date.now() + now)
state.log = url
state.completeNum++
state.completeNumNow++
}
}
await wait(INTERVAL * PARALLEL - Date.now() + now)
}
Expand Down

0 comments on commit a93f80d

Please sign in to comment.