Skip to content

Instantly share code, notes, and snippets.

@tkssharma
Created March 16, 2022 12:42
Show Gist options
  • Save tkssharma/94e345b4fad77ae88cb14b4ab14c69b9 to your computer and use it in GitHub Desktop.
Save tkssharma/94e345b4fad77ae88cb14b4ab14c69b9 to your computer and use it in GitHub Desktop.
const { Kafka } = require('kafkajs')
// This creates a client instance that is configured to connect to the Kafka broker provided by
// the environment variable KAFKA_BOOTSTRAP_SERVER
const kafka = new Kafka({
clientId: 'qa-topic',
brokers: ['xxxxxxxxx.confluent.cloud:9092'],
ssl: true,
logLevel: 2,
sasl: {
mechanism: 'plain',
username: 'xxxxxxxxxxx',
password: 'xxxxxxxxxx'
}
})
const producer = kafka.producer()
producer.on('producer.connect', () => {
console.log(`KafkaProvider: connected`);
});
producer.on('producer.disconnect', () => {
console.log(`KafkaProvider: could not connect`);
});
producer.on('producer.network.request_timeout', (payload) => {
console.log(`KafkaProvider: request timeout ${payload.clientId}`);
});
const run = async () => {
// Producing
await producer.connect()
await producer.send({
topic: 'supplier-ratings',
messages: [
{
value: Buffer.from(JSON.stringify(
{
"event_name": "QA",
"external_id": user_uuiD,
"payload": {
"supplier_id": i.supplier_id,
"assessment": {
"performance": 7,
"quality": 7,
"communication": 7,
"flexibility": 7,
"cost": 7,
"delivery": 6
}
},
"metadata": {
"user_uuid": "5a12cba8-f4b5-495b-80ea-d0dd5d4ee17e"
}
}
))
},
],
})
Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment