Skip to content

Instantly share code, notes, and snippets.

@techzilla
Created January 30, 2024 19:50

Revisions

  1. techzilla created this gist Jan 30, 2024.
    74 changes: 74 additions & 0 deletions KafkaGolang2
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,74 @@
    package main

    import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    )

    const (
    kafkaBrokers = "localhost:9092"
    kafkaTopic = "syslog_topic"
    logMessage = "Example log message"
    logFacility = "local0"
    logSeverity = "info"
    logTimestamp = "2006-01-02T15:04:05.999Z07:00"
    )

    // SyslogMessage represents the structure of the syslog-like JSON message
    type SyslogMessage struct {
    Message string `json:"message"`
    Facility string `json:"facility"`
    Severity string `json:"severity"`
    Timestamp string `json:"timestamp"`
    }

    func main() {
    // Initialize Kafka writer configuration
    config := kafka.WriterConfig{
    Brokers: []string{kafkaBrokers},
    Topic: kafkaTopic,
    Balancer: &kafka.LeastBytes{},
    }

    // Create Kafka writer
    writer := kafka.NewWriter(config)

    // Construct syslog-like JSON message
    syslogMessage := SyslogMessage{
    Message: logMessage,
    Facility: logFacility,
    Severity: logSeverity,
    Timestamp: logTimestamp,
    }

    // Convert syslogMessage to JSON string
    syslogJSON, err := json.Marshal(syslogMessage)
    if err != nil {
    log.Fatalf("Error marshaling syslog message: %v", err)
    }

    // Create Kafka message
    message := kafka.Message{
    Key: nil,
    Value: syslogJSON,
    }

    // Send the Kafka message
    err = writer.WriteMessages(context.Background(), message)
    if err != nil {
    log.Fatalf("Failed to send message to Kafka: %v", err)
    }

    fmt.Println("Message sent to Kafka")

    // Close Kafka writer
    err = writer.Close()
    if err != nil {
    log.Fatalf("Error closing Kafka writer: %v", err)
    }
    }