run this with > time DEBUG=amqp:* COUNT=30000 PREFETCH=0 BUCKET_SIZE=20000 node index.js
before applying the fix:
real 1m22.216s
user 1m10.973s
sys 0m16.106s
after:
real 0m25.034s
user 0m14.082s
sys 0m15.153s
| var amqp = require('amqp'); | |
| var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1; | |
| var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000; | |
| var exchangeName = 'helloo'; | |
| var exchangeOpts = { | |
| type: 'direct' | |
| , passive: false | |
| , durable: true | |
| , autoDelete: false | |
| , confirm: true | |
| } | |
| function consume () { | |
| var debug = require('debug')('amqp:consume'); | |
| debug('connecting to rabbit'); | |
| var conn = amqp.createConnection(); | |
| 'connect open error'.split(' ').forEach(function (evt) { | |
| conn.on(evt, function () { | |
| debug(evt, arguments); | |
| }) | |
| }); | |
| conn.once('ready', function () { | |
| debug('ready'); | |
| conn.exchange(exchangeName, exchangeOpts, function (ex) { | |
| conn.queue('hello-queue', function (q) { | |
| q.bind(ex, 'hola'); | |
| q.on('queueBindOk', function () { | |
| debug('queueBindOk'); | |
| var opts = {}; | |
| opts.ack = true; | |
| opts.prefetchCount = PREFETCH; | |
| var i = 0; | |
| q.subscribe(opts, function (msg, headers, info) { | |
| ++i; | |
| if (('testing '+COUNT) == msg.msg) { | |
| debug('closing connection'); | |
| conn.end(); | |
| return; | |
| } | |
| if (0 === i%1000) debug('got messages', i); | |
| q.shift(); | |
| }); | |
| }); | |
| debug('waiting for `queueBindOk`'); | |
| process.send({ ready: true }); | |
| }); | |
| }); | |
| }); | |
| } | |
| consume(); |
| var cp = require('child_process'); | |
| var DEBUG = process.env.DEBUG; | |
| var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1; | |
| var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000; | |
| var consumer = cp.fork(__dirname + '/consumer.js'); | |
| consumer.on('message', function (msg) { | |
| if (true !== msg.ready) { | |
| console.log('unknown message', msg); | |
| consumer.kill(); | |
| return; | |
| } | |
| var producer = cp.fork(__dirname + '/producer.js'); | |
| }); | |
| console.log('waiting for consumer ready'); |
| { | |
| "name": "rabbit", | |
| "private": true, | |
| "version": "0.0.0", | |
| "description": "", | |
| "main": "index.js", | |
| "dependencies": { | |
| "amqp": "~0.1.8", | |
| "debug": "~0.7.4" | |
| }, | |
| "devDependencies": {}, | |
| "scripts": { | |
| "test": "echo \"Error: no test specified\" && exit 1" | |
| }, | |
| "author": "", | |
| "license": "BSD-2-Clause" | |
| } |
| var amqp = require('amqp'); | |
| var PREFETCH = parseInt(process.env.PREFETCH, 10) || 1; | |
| var COUNT = process.env.COUNT ? parseInt(process.env.COUNT, 10) : 30000; | |
| var BUCKET_SIZE = process.env.BUCKET_SIZE ? parseInt(process.env.BUCKET_SIZE, 10) : 30000; | |
| var TIMEOUT = 1; | |
| var exchangeOpts = { | |
| type: 'direct' | |
| , passive: false | |
| , durable: true | |
| , autoDelete: false | |
| , confirm: true // causes publish cbs to fire upon confirm | |
| } | |
| var exchangeName = 'helloo'; | |
| function produce () { | |
| var debug = require('debug')('amqp:produce'); | |
| var conn = amqp.createConnection(); | |
| 'connect open error'.split(' ').forEach(function (evt) { | |
| conn.on(evt, function () { | |
| debug(evt, arguments); | |
| }) | |
| }); | |
| conn.once('ready', function () { | |
| debug('producer ready'); | |
| conn.exchange(exchangeName, exchangeOpts, function (ex) { | |
| var iteration = 0; | |
| publish(iteration); | |
| function publish (iteration) { | |
| debug('iteration', iteration, COUNT+1, BUCKET_SIZE); | |
| var start = iteration * BUCKET_SIZE; | |
| if (start >= COUNT+1) { | |
| debug('finishing up'); | |
| setTimeout(function () { | |
| debug('closing connection'); | |
| conn.end(); | |
| },500) | |
| return; | |
| } | |
| var end = Math.min(COUNT+1, start+BUCKET_SIZE); | |
| for (var i = start; i < end; ++i) { | |
| var msg = {msg:'testing '+i}; | |
| if (0 === i%1000) debug('publishing', msg); | |
| ex.publish('hola', msg, {}, (function(i){ return function (failed) { | |
| if (0 === i%1000) { | |
| debug('exchange published sucessfully?', false === failed); | |
| } | |
| }})(i)); | |
| } | |
| iteration++; | |
| setTimeout(function(){ | |
| publish(iteration); | |
| }, TIMEOUT); | |
| } | |
| }); | |
| }); | |
| } | |
| produce(); |