Created
July 20, 2017 15:09
-
-
Save varunnayal/ce8b5be8a5f1f45ca841814c8e74990d to your computer and use it in GitHub Desktop.
RabbitMQ: Process a message maximum of "N" times using dead letter exchange
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var amqp = require('amqplib'); | |
var url = 'amqp://localhost'; | |
var WORK_QUEUE_NAME = 'work.queue'; | |
var WORK_EXCHANGE_NAME = 'work.exchange'; | |
var DEAD_QUEUE_NAME = 'dead.work.queue'; | |
var DEAD_EXCHANGE_NAME = 'dead.work.exchange'; | |
var MAX_RETRY_COUNT = 3; | |
// Working | |
// | |
// +-------------+ +-----------------+ | |
// | |----- nack'ed message -----> | | | |
// | work.queue | | dead.work.queue | | |
// | |<---- expired message ------ | | | |
// +-------------+ +-----------------+ | |
// | | |
// +-----> message moved out after N unsuccessful processing | |
// (using 'x-death' header) | |
function messageHandler(channel, msg) { | |
console.log('Message: ' + msg.content.toString()); | |
// Prepare seenCount from 'x-death' header | |
var seenCount = 1; | |
if (Array.isArray(msg.properties.headers['x-death'])) { | |
seenCount = msg.properties.headers['x-death'].reduce((cnt, deathObj) => { | |
if (deathObj.queue === WORK_QUEUE_NAME) { | |
return cnt + deathObj.count; | |
} | |
return cnt; | |
}, 1); | |
} | |
// Tries exhausted, acknowledge, or take some other action | |
if (seenCount > MAX_RETRY_COUNT) { | |
console.log('Maximum retries reached...!!!') | |
channel.ack(msg); | |
} else { | |
channel.nack(msg, false, false); // Last argument should be false!!! | |
} | |
} | |
// Setup dead.work.queue in dead.work.exchange that will receive message from work.queue (after being nack'ed) | |
// Message will sit here for 'x-message-ttl' milliseconds before being routes to work.queue via work.exchange | |
function setupDeadLetter() { | |
return amqp.connect(url).then(function (conn) { | |
return conn.createChannel(); | |
}).then(function (ch) { | |
return ch.assertExchange(DEAD_EXCHANGE_NAME, 'topic', { durable: true, autoDelete: false }).then(function () { | |
return ch.assertQueue(DEAD_QUEUE_NAME, { | |
autoDelete: false, | |
durable: true, | |
arguments: { | |
'x-dead-letter-exchange': WORK_EXCHANGE_NAME, | |
'x-dead-letter-routing-key': WORK_QUEUE_NAME, | |
'x-message-ttl': 5000, | |
} | |
}); | |
}).then(function () { | |
// This Queue is interested in listening messages from WORK QUEUE only | |
// This DEAD Queue will receive message whose routing key is marked as DEAD_QUEUE_NAME, | |
// hence, in working queue we set routing key to DEAD_QUEUE_NAME | |
return ch.bindQueue(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME); | |
}); | |
}); | |
}; | |
// Setup work.queue in work.exchange. nack'ed messages will be dead-letter'ed | |
// to dead.work.queue via dead.work.exchange | |
var setupWorker = function () { | |
return amqp.connect(url).then(function (conn) { | |
return conn.createChannel(); | |
}).then(function (ch) { | |
return ch.assertExchange(WORK_EXCHANGE_NAME, 'direct', { durable: true, autoDelete: false }).then(function () { | |
return ch.assertQueue(WORK_QUEUE_NAME, { | |
autoDelete: false, | |
durable: true, | |
arguments: { | |
'x-dead-letter-exchange': DEAD_EXCHANGE_NAME, | |
'x-dead-letter-routing-key': DEAD_QUEUE_NAME, | |
} | |
}); | |
}).then(function () { | |
return ch.bindQueue(WORK_QUEUE_NAME, WORK_EXCHANGE_NAME, WORK_QUEUE_NAME); | |
}).then(function () { | |
return ch.consume(WORK_QUEUE_NAME, messageHandler.bind(null, ch), { noAck: false }); | |
}).then(function () { | |
// Returning to publish message | |
return ch; | |
}); | |
}); | |
}; | |
console.log('- Setting up Dead Letter Exchange -'); | |
setupDeadLetter().then(function () { | |
console.log('- Setting up Worker Exchage -'); | |
return setupWorker(); | |
}).then(function (ch) { | |
// Publish message | |
console.log('- Sending Message -'); | |
// return ch.publish(WORK_EXCHANGE_NAME, WORK_QUEUE_NAME, new Buffer('Hitman!!!')); | |
return ch.sendToQueue(WORK_QUEUE_NAME, new Buffer('Time is ' + (new Date()).toISOString())); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment