Skip to content

Instantly share code, notes, and snippets.

@george124816
Created February 11, 2025 01:05
Show Gist options
  • Save george124816/b9ff755ba41e0fec4297eed0517fe394 to your computer and use it in GitHub Desktop.
Save george124816/b9ff755ba41e0fec4297eed0517fe394 to your computer and use it in GitHub Desktop.
Simple HTTP endpoint forwading to Kafka
package main
import (
"fmt"
"io"
"log"
"net/http"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
http.HandleFunc("/publish_to_kafka", publishToKafka)
log.Println("listening on port :8080")
http.ListenAndServe(":8080", nil)
}
func publishToKafka(response http.ResponseWriter, request *http.Request) {
bytes, err := io.ReadAll(request.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(bytes))
publishMessage(string(bytes))
fmt.Fprintln(response, "published to kafka")
}
func publishMessage(message string) {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer p.Close()
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
topic := "firstTopic"
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(message),
}, nil)
p.Flush(15 * 1000)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment