Last active
February 1, 2018 19:27
-
-
Save txbm/288d71cdcaea696fb27ef42fc405420c to your computer and use it in GitHub Desktop.
noderdkafka state test harness
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const kafka = require('node-rdkafka'); | |
const { CODES: { ERRORS: { | |
ERR__ASSIGN_PARTITIONS, | |
ERR__REVOKE_PARTITIONS | |
}}} = kafka; | |
const clientId = 'pelias-test-client'; | |
const groupId = 'pelias-test-cg'; | |
const testTopic = 'pelias-test-topic'; | |
const ENV = process.env.ENV || 'local'; | |
const DURATION_MS = 5000; | |
const producerOpts = { | |
'socket.keepalive.enable': true, | |
'dr_cb': true | |
}; | |
const consumerOpts = { | |
'group.id': groupId, | |
'enable.auto.commit': false | |
}; | |
const config = { | |
local: { | |
'metadata.broker.list': '0.0.0.0:9092', | |
'client.id': clientId | |
}, | |
nonprod: { | |
'metadata.broker.list': 'internal-core-kafka-bootstrap-stg-787583943.us-east-1.elb.amazonaws.com', | |
'client.id': clientId | |
} | |
}; | |
const j = (o) => JSON.stringify(o, null, 2); | |
console.log(kafka.features); | |
console.log(kafka.librdkafkaVersion); | |
function runConsumer(sub = false, rb_cb = false) { | |
console.log(`subscribe: ${sub}`); | |
console.log(`rebalance_cb: ${rb_cb}`); | |
return new Promise((resolve, reject) => { | |
const cfg = Object.assign( | |
{}, | |
config[ENV], | |
consumerOpts, | |
); | |
let consumer; | |
cfg.event_cb = (evt) => { | |
console.log(`librdkafka event: ${evt}`); | |
}; | |
if (rb_cb) { | |
cfg.rebalance_cb = (err, ass) => { | |
console.log(`rb_cb: ${j(err)}`); | |
// console.log(`rb_cb: ${j(ass)}`); | |
if (err.code === ERR__ASSIGN_PARTITIONS) { | |
try { consumer.assign(ass); } catch (e) { | |
console.log(`rb_cb ass err: ${e.message}`); | |
resolve(); | |
} | |
console.log(`rb_cb assign called`); | |
} else if (err.code == ERR__REVOKE_PARTITIONS) { | |
try { consumer.unassign(); } catch (e) { | |
console.log(`rb_cb unass err: ${e.message}`); | |
try { | |
consumer.assign(); | |
} catch (e) { | |
console.log(`rb_cb assnull err: ${e.message}`); | |
} | |
resolve(); | |
} | |
console.log(`rb_cb unassign called`); | |
} else { | |
console.error(`rb_cb error: ${j(err)}`); | |
resolve(); | |
} | |
}; | |
} | |
consumer = new kafka.KafkaConsumer(cfg); | |
consumer.on('ready', () => { | |
console.log(`rdy_cb: ready`); | |
if (sub) { | |
consumer.subscribe([ testTopic ]); | |
console.log(`rdy_cb: sub called`); | |
} else { | |
consumer.assign([ | |
{ topic: testTopic, partition: 0, offset: 4 }, | |
{ topic: testTopic, partition: 1, offset: 4 }, | |
{ topic: testTopic, partition: 2, offset: 4 } | |
]); | |
console.log(`rdy_cb: ass called`); | |
} | |
consumer.consume(); | |
console.log(`rdy_cb: consume called`); | |
setTimeout(() => { | |
console.log('shutdown: starting'); | |
if (sub) { | |
consumer.unsubscribe(); | |
console.log('shutdown: unsub called'); | |
} else { | |
consumer.unassign(); | |
console.log('shutdown: unass called'); | |
} | |
consumer.disconnect((err) => { | |
console.error(`disconn_cb: ${j(err)}`); | |
resolve(); | |
}); | |
console.log('shutdown: disconnect called'); | |
}, DURATION_MS); | |
}); | |
consumer.on('data', (data) => { | |
console.log(`message: ${data.partition}-${data.offset}`); | |
console.log(`data: ${data.value.toString()}`); | |
}); | |
consumer.on('rebalance', (data) => { | |
console.log(`rb_evt: ${j(data)}`); | |
}); | |
consumer.on('event', (evt) => { | |
console.log(`librd_evt: ${j(data)}`); | |
}); | |
consumer.on('disconnected', (err) => { | |
console.log(`dc_evt: ${j(err)}`); | |
resolve(); | |
}); | |
consumer.on('unsubscribed', (evt) => { | |
console.log(`unsub_evt: ${j(evt)}`); | |
}); | |
consumer.connect(); | |
}); | |
}; | |
async function run() { | |
// await runConsumer(true, true); | |
// await runConsumer(true, false); | |
await runConsumer(false, false); | |
} | |
run(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment