-
Notifications
You must be signed in to change notification settings - Fork 1
/
deadletter.js
71 lines (62 loc) · 1.88 KB
/
deadletter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//
// Example:
// Retry failed task in 3 seconds
//
const amqp = require('amqplib');
const debug = require('debug');
const host = 'amqp://localhost';
const debugPro = debug('producer');
const debugCon = debug('consumer');
// CONSUMER
(async () => {
try {
debugCon('Started');
const connection = await amqp.connect(host);
const channel = await connection.createChannel();
await channel.deleteQueue('WorkQueue');
await channel.assertExchange('WorkExchange', 'direct');
await channel.assertQueue('WorkQueue', {
autoDelete: false,
durable: true,
arguments: {
'x-dead-letter-exchange': 'DeadExchange', // <-- failed messages will be put there
'x-dead-letter-routing-key': 'rk2', // <--
'x-message-ttl': 6000,
'x-expires': 10000
}
});
await channel.bindQueue('WorkQueue', 'WorkExchange', 'rk1');
await channel.consume('WorkQueue', async(msg) => {
debugCon('Received message.');
debugCon(msg.content.toString());
channel.nack(msg, false, false);
// channel.ack(msg);
// process.exit();
});
} catch (error) {
debugCon(error);
}
})();
// PRODUCER
(async () => {
try {
debugPro('Started');
const connection = await amqp.connect(host);
const channel = await connection.createChannel();
await channel.deleteQueue('DEQ');
await channel.assertExchange('DeadExchange', 'direct');
await channel.assertQueue('DEQ', {
arguments: {
'x-dead-letter-exchange': 'WorkExchange', // <--
'x-dead-letter-routing-key': 'rk1', // <--
'x-message-ttl': 3000,
}
});
await channel.bindQueue('DEQ', 'DeadExchange', 'rk2');
debugPro('Sending message');
await channel.publish('WorkExchange', 'rk1', new Buffer.from("Over the Hills and Far Away!"));
debugPro('Sent');
} catch (error) {
console.log(error);
}
})();