forked from greenkeeperio/greenkeeper
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
121 lines (100 loc) · 3.84 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// const cluster = require('cluster')
global.Promise = require('bluebird')
Promise.config({
longStackTraces: true
})
const _ = require('lodash')
const Queue = require('promise-queue')
const env = require('./lib/env')
const dbs = require('./lib/dbs')
const statsd = require('./lib/statsd')
require('./lib/rollbar')
// if (cluster.isMaster && env.NODE_ENV !== 'development') {
// for (let i = 0; i++ < env.WORKER_SIZE;) cluster.fork()
// cluster.on('exit', (worker, code, signal) => {
// console.log('worker %d died (%s). restarting...', worker.process.pid, signal || code)
// cluster.fork()
// })
// } else {
;(async () => {
const amqp = require('amqplib')
const conn = await amqp.connect(env.AMQP_URL)
const channel = await conn.createChannel()
// only allow 128 items to pile up
channel.prefetch(128)
// 5 different prios because order matters
// e.g. always sync before everything else
// or always uninstall integrations before installing
await channel.assertQueue(env.EVENTS_QUEUE_NAME, {
maxPriority: 5
})
await channel.assertExchange(`${env.JOBS_QUEUE_NAME}-exchange`, 'x-delayed-message', {
arguments: {
'x-delayed-type': 'direct'
}
})
// one prio for free, support and paid plans each
const jobsQueue = await channel.assertQueue(env.JOBS_QUEUE_NAME, {
maxPriority: 3
})
await channel.bindQueue(jobsQueue.queue, `${env.JOBS_QUEUE_NAME}-exchange`, env.JOBS_QUEUE_NAME)
const scheduleJob = channel.publish.bind(channel, `${env.JOBS_QUEUE_NAME}-exchange`, env.JOBS_QUEUE_NAME)
const worker = require('./lib/worker').bind(null, scheduleJob, channel)
// if you need a customized queue configuration, you can add it here
// e.g. const queues = {'stripe-event': new Queue(1, 10)}
const queues = {}
function queueJob (queueId, job) {
const q = queues[queueId] = queues[queueId] || new Queue(1, Infinity)
return q.add(() => worker(job))
}
channel.consume(env.EVENTS_QUEUE_NAME, consume)
channel.consume(env.JOBS_QUEUE_NAME, consume)
if (env.NODE_ENV !== 'testing') {
setInterval(function collectAccountQueueStats () {
statsd.gauge('queues.account-jobs', Object.keys(queues).length)
}, 5000)
}
const scheduleRemindersJobData = Buffer.from(JSON.stringify({name: 'schedule-stale-initial-pr-reminders'}))
async function scheduleReminders () {
try {
await scheduleJob(scheduleRemindersJobData, {priority: 1})
} catch (e) {
console.log(e)
}
}
setTimeout(scheduleReminders, 5000)
setInterval(scheduleReminders, 24 * 60 * 60 * 1000)
async function consume (job) {
const data = JSON.parse(job.content.toString())
const jobsWithoutOwners = ['registry-change', 'stripe-event', 'schedule-stale-initial-pr-reminders', 'reset', 'cancel-stripe-subscription', 'update-nodejs-version', 'deprecate-nodejs-version']
if (jobsWithoutOwners.includes(data.name) || data.type === 'marketplace_purchase') {
return queueJob(data.name, job)
}
let queueId = Number(data.accountId) ||
_.get(data, 'repository.owner.id') ||
_.get(data, 'installation.account.id') ||
_.get(data, 'organization.id')
if (!queueId) {
const login = _.get(data, 'repository.owner.name')
try {
if (!login) throw new Error(`can not identify job owner of ${data.name}`)
const {installations} = await dbs()
queueId = _.get(await installations.query('by_login', {
key: login
}), 'rows[0].id')
if (!queueId) throw new Error('totally can not identify job owner')
} catch (e) {
channel.nack(job, false, false)
throw e
}
}
const spamQueueIds = ['23046691', '1623538', '133018953', 'dalavanmanphonsy', 'CNXTEoEorg', 'tectronics']
if (spamQueueIds.includes(String(queueId))) {
// spam
channel.ack(job)
return
}
queueJob(queueId, job)
}
})()
// }