Skip to content

Instantly share code, notes, and snippets.

@oNddleo
Created March 6, 2025 06:23
Show Gist options
  • Save oNddleo/94700c9c446b124bd1231b7d439059a0 to your computer and use it in GitHub Desktop.
Save oNddleo/94700c9c446b124bd1231b7d439059a0 to your computer and use it in GitHub Desktop.
Producer consumer

Here are examples of Kafka producer and consumer in Go (Golang) that connect to a Bitnami Kafka instance in Kubernetes via port forwarding:

Kafka Producer in Go

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	// Configure the writer
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:      []string{"localhost:9092"},
		Topic:        "my-topic",
		BatchSize:    1,               // For simplicity, send each message immediately
		BatchTimeout: 10 * time.Millisecond,
		// Uncomment if using SASL authentication
		// Transport: &kafka.Transport{
		//     SASL: kafka.SASL{
		//         Mechanism: kafka.PLAIN,
		//         Username:  "user",
		//         Password:  "password",
		//     },
		// },
	})

	// Ensure writer gets closed
	defer writer.Close()

	// Create a message
	message := kafka.Message{
		Key:   []byte("key1"),
		Value: []byte("Hello from Go to K8s Kafka!"),
		Time:  time.Now(),
	}

	// Send the message
	err := writer.WriteMessages(context.Background(), message)
	if err != nil {
		log.Fatalf("Failed to write message: %v", err)
	}

	fmt.Println("Message sent successfully!")
}

Kafka Consumer in Go

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/segmentio/kafka-go"
)

func main() {
	// Set up a context that can be cancelled
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Set up signal handling to gracefully exit
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

	// Configure the reader
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:     []string{"localhost:9092"},
		Topic:       "my-topic",
		GroupID:     "my-go-consumer-group",
		MinBytes:    10e3,  // 10KB
		MaxBytes:    10e6,  // 10MB
		StartOffset: kafka.FirstOffset, // Start from the oldest message
		// Uncomment if using SASL authentication
		// Transport: &kafka.Transport{
		//     SASL: kafka.SASL{
		//         Mechanism: kafka.PLAIN,
		//         Username:  "user",
		//         Password:  "password",
		//     },
		// },
	})

	// Ensure reader gets closed
	defer reader.Close()

	// Start consuming in a separate goroutine
	go func() {
		for {
			// Check if context was cancelled
			select {
			case <-ctx.Done():
				return
			default:
				// Continue processing
			}

			// Read message
			message, err := reader.ReadMessage(ctx)
			if err != nil {
				// Don't fatally exit if context was cancelled
				select {
				case <-ctx.Done():
					return
				default:
					log.Fatalf("Failed to read message: %v", err)
				}
			}

			fmt.Printf("Message received: partition=%d offset=%d key=%s value=%s\n",
				message.Partition, message.Offset, string(message.Key), string(message.Value))
		}
	}()

	// Wait for termination signal
	<-signals
	fmt.Println("\nShutting down consumer...")
}

Installation and Setup

  1. First, install the Kafka Go client library:
go mod init kafka-example
go get github.com/segmentio/kafka-go
  1. Ensure you have port forwarding set up:
kubectl port-forward svc/<release-name>-kafka -n <namespace> 9092:9092
  1. Make sure the topic exists or create it:
kubectl exec -it <kafka-pod-name> -n <namespace> -- kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

Additional Tips for Go with Bitnami Kafka

  1. Error Handling: The examples above include basic error handling, but in production, you might want more robust error handling and retry mechanisms.

  2. TLS Support: If your Bitnami Kafka uses TLS, configure the dialer with TLS:

dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{},
}

reader := kafka.NewReader(kafka.ReaderConfig{
    Brokers:     []string{"localhost:9092"},
    Topic:       "my-topic",
    GroupID:     "my-go-consumer-group",
    Dialer:      dialer,
})
  1. Batch Processing: For higher throughput in the producer, adjust the BatchSize and BatchTimeout settings.

  2. Connection Pooling: The segmentio/kafka-go library handles connection pooling for you, but you can tune it with the MaxAttempts and MaxBackoff settings in the dialer.

These Go examples should work well with a Bitnami Kafka instance accessed through Kubernetes port forwarding.​​​​​​​​​​​​​​​​

@oNddleo
Copy link
Author

oNddleo commented Mar 6, 2025

package main

import (
"context"
"fmt"
"log"
"time"

kafka "github.com/segmentio/kafka-go"

)

const (
topic = "benchmark-topic"
brokerAddress = "your-kafka-broker:9092" // Update with your broker address
numMessages = 100000 // Total messages to send
messageSize = 1024 // Bytes per message
workers = 1000 // Number of concurrent workers
)

func main() {

// mechanism := plain.Mechanism{
// 	Username: "user1",
// 	Password: "8t7tYsMBYQ",
// }

dialer := &kafka.Dialer{
	Timeout:   10 * time.Second,
	DualStack: true,
	// SASLMechanism: mechanism,
}

dialer.DialContext(context.Background(), "tcp", "localhost:9092")

// Configure the writer
writer := kafka.NewWriter(kafka.WriterConfig{
	Brokers:      []string{"localhost:9092"},
	Topic:        "my-topic",
	BatchSize:    1, // For simplicity, send each message immediately
	BatchTimeout: 10 * time.Millisecond,
	// Uncomment if using SASL authentication
	Dialer: dialer,
})
// Ensure writer gets closed
defer writer.Close()

// Create message template
message := kafka.Message{
	Key:   []byte("benchmark-key"),
	Value: make([]byte, messageSize),
}

fmt.Printf("Starting benchmark - %d messages, %d workers\n", numMessages, workers)

start := time.Now()
sem := make(chan struct{}, workers) // Worker semaphore
errors := make(chan error, 1)

for i := 0; i < numMessages; i++ {
	sem <- struct{}{} // Acquire worker slot
	go func(seq int) {
		defer func() { <-sem }()

		// Update message payload
		message.Value = []byte(fmt.Sprintf("Message %d - %s", seq, time.Now().Format(time.RFC3339)))

		if err := writer.WriteMessages(context.Background(), message); err != nil {
			select {
			case errors <- fmt.Errorf("error writing message %d: %v", seq, err):
			default:
			}
		}

		if seq%1000 == 0 {
			fmt.Printf("Sent %d messages\n", seq)
		}
	}(i)
}

// Wait for all workers to finish
for i := 0; i < cap(sem); i++ {
	sem <- struct{}{}
}

close(errors)
if err := <-errors; err != nil {
	log.Fatalf("Encountered error: %v", err)
}

duration := time.Since(start)
fmt.Printf("\nBenchmark completed!\n")
fmt.Printf("Total messages: %d\n", numMessages)
fmt.Printf("Total time: %s\n", duration.Round(time.Millisecond))
fmt.Printf("Throughput: %.2f msg/sec\n", float64(numMessages)/duration.Seconds())

}

@oNddleo
Copy link
Author

oNddleo commented Mar 6, 2025

package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"

kafka "github.com/segmentio/kafka-go"

)

func main() {
// Set up a context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set up signal handling to gracefully exit
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

// Configure the reader
reader := kafka.NewReader(kafka.ReaderConfig{
	Brokers:     []string{"localhost:9092"},
	Topic:       "my-topic",
	GroupID:     "my-go-consumer-group",
	MinBytes:    10e3,              // 10KB
	MaxBytes:    10e6,              // 10MB
	StartOffset: kafka.FirstOffset, // Start from the oldest message
	// Uncomment if using SASL authentication
	// Transport: &kafka.Transport{
	//     SASL: kafka.SASL{
	//         Mechanism: kafka.PLAIN,
	//         Username:  "user",
	//         Password:  "password",
	//     },
	// },
})

// Ensure reader gets closed
defer reader.Close()

// Start consuming in a separate goroutine
go func() {
	for {
		// Check if context was cancelled
		select {
		case <-ctx.Done():
			return
		default:
			// Continue processing
		}

		// Read message
		message, err := reader.ReadMessage(ctx)
		if err != nil {
			// Don't fatally exit if context was cancelled
			select {
			case <-ctx.Done():
				return
			default:
				log.Fatalf("Failed to read message: %v", err)
			}
		}

		fmt.Printf("Message received: partition=%d offset=%d key=%s value=%s\n",
			message.Partition, message.Offset, string(message.Key), string(message.Value))
	}
}()

// Wait for termination signal
<-signals
fmt.Println("\nShutting down consumer...")

}

@oNddleo
Copy link
Author

oNddleo commented Mar 6, 2025

version: '3.8'

services:
kafka:
image: bitnami/kafka:3.1.0
container_name: kafka
command:
- 'sh'
- '-c'
- '/opt/bitnami/scripts/kafka/setup.sh && kafka-storage.sh format --config "$${KAFKA_CONF_FILE}" --cluster-id "lkorDA4qT6W1K_dk0LHvtg" --ignore-formatted && /opt/bitnami/scripts/kafka/run.sh' # Kraft specific initialise
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
# Start Kraft Setup (Kafka as Controller - no Zookeeper)
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_BROKER_ID=1
- [email protected]:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LOG_DIRS=/tmp/logs
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094
# End Kraft Specific Setup
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,INTERNAL://kafka:9094
ports:
- "0.0.0.0:9092:9092"

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8080:8080"
restart: "always"
environment:
KAFKA_CLUSTERS_0_NAME: "lkorDA4qT6W1K_dk0LHvtg"
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9094
depends_on:
- kafka

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment