Skip to content

Instantly share code, notes, and snippets.

@IharKrasnik
Created January 17, 2017 09:21
Show Gist options
  • Save IharKrasnik/cbc18e33f17ed01be52c5bdc5d4beae3 to your computer and use it in GitHub Desktop.
Save IharKrasnik/cbc18e33f17ed01be52c5bdc5d4beae3 to your computer and use it in GitHub Desktop.
'use strict';
var kafka = require('golance-kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var async = require('async');
var _ = require('lodash');
var events = require('events');
var util = require('util');
var Q = require('q');
var _prevTopicPayloads = null;
var Consumer = function (config, logger) {
var clientId = config.clientId + '_' + new Date().getTime();
var consumerGroupId = config.consumerGroupId;
var topic = config.topic.topicName;
logger.info('Creating kafka consumer %s, groupId: %s,' +
' topic: %s, zookeeper: %s', clientId, consumerGroupId, topic, config.zookeeper);
var topics = [];
if (_.isArray(topic)) {
for (var i = 0; i < topic.length; i++) {
topics.push({ topic: topic[i] })
}
} else {
topics.push({ topic: topic });
}
var self = this;
var client = new kafka.Client(config.zookeeper, clientId);
this.client = client;
this.topic = topic;
this.consumerGroupId = consumerGroupId;
this.consumer = new HighLevelConsumer(client,
topics,
{
groupId: consumerGroupId,
autoCommit: false,
paused: true,
fetchMaxBytes: 5 * 1024 * 1024
}
);
this.consumer.on('registered', function () {
logger.info('Kafka consumer [%s], group [%s] is registered. Listening topics: ', clientId, consumerGroupId, topic);
});
this.consumer.on('error', function (err) {
if (err instanceof Error) {
self.emit('error', err);
} else {
self.emit('error', new Error(err));
}
logger.error('Kafka consumer [%s] has failed to connect', clientId, err);
});
this.consumer.on('rebalanced', function () {
logger.info('Rebalanced consumer %s. Assigned partitions: ', consumerGroupId, formatTopicPayloads(self.consumer.topicPayloads));
});
function formatTopicPayloads (payloads) {
var res = {};
for (var i = 0; i < payloads.length; i++) {
res[payloads[i].topic] = res[payloads[i].topic] || [];
res[payloads[i].topic].push(payloads[i].partition);
}
return res;
}
this.consumer.on('offsetOutOfRange', function (err) {
logger.error('Kafka consumer [%s] offset is out of range', clientId, err);
});
this.consumer.once('rebalanced', function () {
_prevTopicPayloads = _.cloneDeep(self.consumer.topicPayloads);
});
};
util.inherits(Consumer, events.EventEmitter);
var callbacksChain = Q();
Consumer.prototype.connect = function () {
return this.consumer.connect();
};
Consumer.prototype.on = function (event, cb) {
if (event === 'message') {
this.consumer.on('message', function (msg) {
//enqueue to callbacks chain to guarantee callbacks order
callbacksChain = callbacksChain.then(function () {
return cb(msg);
});
callbacksChain.done();
});
} else {
this.consumer.on(event, cb);
}
};
Consumer.prototype.getConsumerGroupId = function () {
return this.consumerGroupId;
};
Consumer.prototype.pause = function () {
this.consumer.pause();
};
Consumer.prototype.isRebalancing = function () {
return this.consumer.rebalancing;
};
Consumer.prototype.isPaused = function () {
return this.consumer.paused;
};
Consumer.prototype.fetch = function () {
this.consumer.fetch();
};
Consumer.prototype.resume = function () {
this.consumer.resume();
};
Consumer.prototype.commitOffsets = function (topicPayloads, cb) {
var self = this;
self.consumer.pause();
var isChanged = !_.isEqual(_prevTopicPayloads, topicPayloads);
if (isChanged) {
self.setOffset(topicPayloads);
self.consumer.commit({
force: true,
topicPayloads: topicPayloads
}, function (err, data) {
if (!err) {
if (self.isPaused()) {
self.consumer.resume();
}
}
_prevTopicPayloads = _.cloneDeep(topicPayloads);
cb(err, data);
});
} else {
cb(null, {
notCommitedDueToNoChanges: true
});
}
};
Consumer.prototype.getTopicPayloads = function () {
return this.consumer.topicPayloads;
};
Consumer.prototype.setOffset = function (topicPayloads) {
var self = this;
_.each(topicPayloads, function (topicPayload) {
if (topicPayload.offset > 0) {
var consumerPayloadToUpdate = _.find(self.consumer.topicPayloads, function (consumerTopicPayload) {
return (consumerTopicPayload.topic == topicPayload.topic) && (consumerTopicPayload.partition == topicPayload.partition);
});
if (consumerPayloadToUpdate && consumerPayloadToUpdate.offset !== topicPayload.offset) {
consumerPayloadToUpdate.offset = topicPayload.offset;
}
}
});
};
Consumer.prototype.close = function (cb) {
this.client.close(cb);
};
Consumer.prototype.getId = function () {
return this.consumer.id;
};
module.exports = Consumer;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment