Skip to content

Instantly share code, notes, and snippets.

@ssk090
Last active November 14, 2024 06:50
Show Gist options
  • Save ssk090/852139926f7157ff61baccd40e900d0c to your computer and use it in GitHub Desktop.
Save ssk090/852139926f7157ff61baccd40e900d0c to your computer and use it in GitHub Desktop.
Kafka Implementation using Typescript
import { kafka } from "./client";
async function init() {
const admin = kafka.admin();
console.log("Admin connecting...");
await admin.connect();
console.log("Admin connected");
console.log("Creating topics...");
await admin.createTopics({
topics: [
{
topic: "rider-updates",
numPartitions: 2,
},
],
});
console.log("Topics created successfully [rider-updates]");
console.log("Closing admin...");
await admin.disconnect();
}
init().catch(console.error);
import { Kafka } from "kafkajs";
export const kafka = new Kafka({
clientId: "my-app",
brokers: ["localhost:9092"],
});
import { kafka } from "./client";
const group = process.argv[2] || "user-1";
async function init() {
const consumer = kafka.consumer({ groupId: group });
console.log("Consumer connecting...");
await consumer.connect();
console.log("Consumer connected");
await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!message.value) return;
console.log(
`GROUP:${group}
TOPIC:[${topic}]
PARTITION:${partition}
MESSAGE:${message.value}`
);
},
});
}
init().catch(console.error);
import { kafka } from "./client";
import * as readline from "readline";
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
async function init() {
const producer = kafka.producer();
console.log("Producer connecting...");
await producer.connect();
console.log("Producer connected");
rl.setPrompt("> ");
rl.prompt();
rl.on("line", async (line: string) => {
const [name, location] = line.split(" ");
await producer.send({
topic: "rider-updates",
messages: [
{
partition: location.toLowerCase() === "north" ? 0 : 1,
key: "location-updates",
value: JSON.stringify({ name, location }),
},
],
});
}).on("close", async () => {
console.log("Closing producer...");
await producer.disconnect();
});
}
init().catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment