Last active
April 18, 2017 15:37
-
-
Save jasonphillips/81f2d73ba9d76b1de4cdcdf3528bbe04 to your computer and use it in GitHub Desktop.
RabbitMQ Router (Express-like) for Topic Exchanges
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const amqp = require('amqplib'); | |
class JackRabbit { | |
constructor(url, opts) { | |
this.channel = this.makeConnection(url, opts); | |
this.errorCB = (error) => console.log(error); | |
this.exchanges = {}; | |
this.queued = {}; | |
this.channel.catch((error) => this.errorCB(error)); | |
} | |
exchange(ex, opts) { | |
return new JackRabbitExchange(this, ex, opts); | |
} | |
makeConnection(url, opts) { | |
return new Promise((resolve, reject) => { | |
amqp.connect(url, opts).then((conn) => { | |
process.once('SIGINT', () => conn.close() ); | |
conn.createChannel().then((ch) => resolve(ch)); | |
}); | |
}); | |
} | |
createHandler(ex, route, opts, cb) { | |
this.channel.then((ch) => { | |
ch.assertQueue(route, opts) | |
.then(() => ch.bindQueue(route, ex, route)) | |
.then(() => ch.prefetch(1)) | |
.then(() => ch.consume(route, this.wrapHandler(ch, ex, cb))); | |
}); | |
} | |
wrapHandler(ch, ex, cb) { | |
return (msg) => { | |
ch.pub = (route, msg) => ch.publish( | |
ex, route, new Buffer(JSON.stringify(msg)) | |
); | |
msg.json = JSON.parse(msg.content); | |
const response = cb(msg, ch); | |
if (response===true) ch.ack(msg); | |
if (response===false) ch.nack(msg); | |
} | |
} | |
addExchange(exchange, name) { | |
this.exchanges[name] = exchange; | |
} | |
publish(ex, route, json) { | |
this.channel.then((ch) => { | |
this.exchanges[ex].then(() => { | |
ch.publish(ex, route, new Buffer(JSON.stringify(json))); | |
}); | |
}, (e) => this.errorCB(e)); | |
} | |
onError(cb) { | |
this.errorCB = cb; | |
} | |
} | |
class JackRabbitExchange { | |
constructor(rabbit, ex, opts) { | |
this.ex = ex; | |
this.rabbit = rabbit; | |
this.exchange = this.makeExchange(ex, opts); | |
rabbit.addExchange(this.exchange, ex); | |
} | |
handle(route, opts, cb) { | |
this.exchange.then(() => { | |
this.rabbit.createHandler(this.ex, route, opts, cb); | |
}); | |
return this; | |
} | |
makeExchange(ex, opts) { | |
return new Promise((resolve, reject) => { | |
this.rabbit.channel.then((ch) => | |
ch.assertExchange(ex, 'topic', opts).then((exchange) => resolve(exchange)) | |
); | |
}); | |
} | |
} | |
export default JackRabbit; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const JackRabbit = require('./JackRabbit'); | |
const app = new JackRabbit('amqp://localhost') | |
app.exchange('coolExchange') | |
.handle('cool.tasks.#', null, (msg, ch) => { | |
console.log('I got a task message', msg.json); | |
ch.pub('cool.events.forYou', {hey: 'over to you, other route'}); | |
return true; | |
}) | |
.handle('cool.events.#', {noack:true}, (msg, ch) => { | |
console.log('And I received an event message:', msg.json); | |
}) | |
const publishTask = (coolTask) => app.publish('coolExchange', 'cool.tasks.newTasks', coolTask); | |
publishTask({look: 'I sent this immediately and it waited for instantiation'}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
I got a task message {"look":"I sent this immediately and it waited for instantiation"} | |
And I received an event message: {"hey":"over to you, other route"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A router-like interface for RabbitMQ, when using a 'topic' exchange. See
usage.js
above.Advantages:
msg.json
containing JSON-decoded message.contentsch
, with convenience methodch.pub
for publishing to same exchange as routeack(msg)
or nack for you if route simply returns true/false (you can still callch.ack
if desired)