Created
November 6, 2017 11:55
-
-
Save ankon/9c2263bf1d04600129aeaa06e8c49179 to your computer and use it in GitHub Desktop.
Consume all events in a kafka topic using rdkafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function consumeAllEvents(kafkaBrokers, groupId, globalOptions, topic, handleEvent) { | |
const options = Object.assign({ | |
'enable.auto.commit': false, | |
'group.id': groupId, | |
'metadata.broker.list': kafkaBrokers, | |
'socket.keepalive.enable': true, | |
}, globalOptions); | |
const topicOptions = { | |
'auto.offset.reset': 'earliest' | |
}; | |
const streamOptions = { | |
objectMode: true, | |
streamAsBatch: true, | |
topics: topic, | |
waitInterval: 0, | |
}; | |
return new Promise((resolve, reject) => { | |
let resolved = false; | |
const stream = kafka.KafkaConsumer.createReadStream(options, topicOptions, streamOptions); | |
stream.on('error', err => { | |
logger.error(`Error reading from kafka: ${err.message}`); | |
if (!resolved) { | |
resolved = true; | |
return reject(err); | |
} | |
}); | |
stream.on('close', () => { | |
logger.info('Reached end of stream'); | |
if (!resolved) { | |
resolved = true; | |
return resolve(); | |
} | |
}); | |
stream.consumer.on('event.error', err => { | |
logger.error(err); | |
// XXX: Different type of error? | |
if (!resolved) { | |
resolved = true; | |
return reject(err); | |
} | |
}); | |
let closing = false; | |
const collectData = new Writable({ | |
objectMode: true, | |
write(chunks, encoding, callback) { | |
for (const chunk of chunks) { | |
handleEvent(chunk); | |
} | |
if (closing) { | |
// The reader is already closed, so we're just consuming the pending events now. | |
return callback(); | |
} | |
// Check all positions now: if all of them have reached the end, then we can stop reading events. | |
const positions = stream.consumer.position(); | |
const reachedEndPromises = positions.map(position => new Promise((resolve, reject) => { | |
return stream.consumer.queryWatermarkOffsets(topic, position.partition, function queryWOCb(err, watermarks) { | |
console.log(`queryWatermarkOffsets: ${topic}/${position.partition}: ${watermarks.lowOffset} < ${position.offset} < ${watermarks.highOffset}`); | |
// We're at the end right now, and can stop. Note that there | |
// may still be events in-flight, so the main promise is not yet "resolved". | |
return resolve(position.offset === watermarks.highOffset); | |
}); | |
})); | |
return Promise.all(reachedEndPromises).then(reachedEnds => reachedEnds.indexOf(false) === -1).then(allReachedEnd => { | |
if (allReachedEnd) { | |
console.log('All partitions reached, stopping'); | |
closing = true; | |
stream.destroy(); | |
} | |
return callback(); | |
}); | |
} | |
}); | |
stream.pipe(collectData); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment