Created
February 23, 2022 02:12
-
-
Save jkomyno/484e7252c1272910bf45661f2e2d542f to your computer and use it in GitHub Desktop.
fastify-kafka.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { FastifyPluginCallback } from 'fastify'; | |
import fp from 'fastify-plugin'; | |
import kafka, { ProducerGlobalConfig, Producer } from 'node-rdkafka'; | |
export class KafkaProducer { | |
constructor(private producer: Producer) {} | |
send = <T extends string>({ topic, key, payload }: { topic: string; key: string; payload: T }) =>{ | |
this.producer.produce( | |
topic, | |
null, | |
Buffer.from(payload), | |
key, | |
Date.now(), | |
); | |
} | |
} | |
declare module 'fastify' { | |
interface FastifyRequest { | |
kafka: KafkaProducer; | |
} | |
} | |
const fastifyKafkaPlugin: FastifyPluginCallback<ProducerGlobalConfig> = (fastify, options, done) => { | |
const producer = new kafka.Producer(options); | |
fastify.addHook('onReady', async () => { | |
// hook triggered before the server starts listening for requests | |
await new Promise<void>((resolve, reject) => { | |
producer.connect(); | |
producer.once('ready', () => { | |
fastify.log.info('kafka:ready'); | |
resolve(); | |
}); | |
producer.once('event.error', (kafkaError) => { | |
fastify.log.error({ kafkaError }, 'kafka:connection-error'); | |
reject(kafkaError); | |
}); | |
}); | |
}); | |
fastify.addHook('onClose', async () => { | |
await new Promise<void>((resolve, reject) => { | |
producer.disconnect(err => { | |
if (err) { | |
fastify.log.error({ err }, 'kafka:disconnect-error'); | |
reject(err); | |
} else { | |
fastify.log.error('kafka:disconnect'); | |
resolve(); | |
} | |
}); | |
}); | |
}); | |
const kafkaProducer = new KafkaProducer(producer); | |
fastify.decorateRequest('kafka', null); | |
fastify.addHook('onRequest', async (request, reply) => { | |
request.kafka = kafkaProducer; | |
}); | |
done(); | |
} | |
export const fastifyKafka = fp(fastifyKafkaPlugin); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment