Skip to content

Instantly share code, notes, and snippets.

@linnv
Last active July 17, 2023 13:25
Show Gist options
  • Save linnv/809213cb2115f0a9333c9e1b12bb9cf4 to your computer and use it in GitHub Desktop.
Save linnv/809213cb2115f0a9333c9e1b12bb9cf4 to your computer and use it in GitHub Desktop.
all kinds of exchange demo of rabbitmq in golang
package main
import (
"flag"
"fmt"
"log"
"strconv"
"time"
"github.com/streadway/amqp"
)
func main() {
runMode := flag.String("type", "direct", "Exchange type to test")
flag.Parse()
// conn, err := amqp.Dial("amqp://localhost:5672/")
conn, err := amqp.Dial("amqp://192.168.1.237:5672/")
failOnErr(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnErr(err, "Failed to open a channel")
defer ch.Close()
switch *runMode {
case "direct":
runDirectExchange(ch)
case "topic":
runTopicExchange(ch)
case "fanout":
runFanoutExchange(ch)
default:
log.Fatalf("Invalid exchange type %s", *runMode)
}
log.Printf("[*] Waiting for messages. To exit, press CTRL+C")
forever := make(chan bool)
<-forever
}
const MaxMsgCount = 3
func runDirectExchange(ch *amqp.Channel) {
err := ch.ExchangeDeclare("logs-direct", "direct", true, false, false, false, nil)
failOnErr(err, "Failed to declare exchange")
// queue must declare before publish, all consume of the queue won't get the msg before QueueDeclare
q, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnErr(err, "Failed to declare queue")
err = ch.QueueBind(q.Name, "info", "logs-direct", false, nil)
failOnErr(err, "Failed to bind queue")
// Publisher
go func() {
for i := 0; i < MaxMsgCount; i++ {
oneNow := time.Now()
nowStr := strconv.FormatInt(int64(oneNow.UnixNano()), 10)
msg := amqp.Publishing{
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("%s Your message here-%d", nowStr, i)),
}
err := ch.Publish("logs-direct", "info", false, false, msg)
failOnErr(err, "Failed to publish message")
time.Sleep(time.Second)
}
for i := 0; i < MaxMsgCount; i++ {
oneNow := time.Now()
nowStr := strconv.FormatInt(int64(oneNow.UnixNano()), 10)
msg := amqp.Publishing{
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("%s Your message here-again-%d", nowStr, i)),
}
err := ch.Publish("logs-direct", "info", false, false, msg)
failOnErr(err, "Failed to publish message")
time.Sleep(time.Second)
}
}()
// Subscriber
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnErr(err, "Failed to consume messages")
go func() {
for d := range msgs {
log.Printf("Received: %s", d.Body)
}
}()
}
func runTopicExchange(ch *amqp.Channel) {
err := ch.ExchangeDeclare("logs-topic", "topic", true, false, false, false, nil)
failOnErr(err, "Failed to declare exchange")
q, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnErr(err, "Failed to declare queue")
err = ch.QueueBind(q.Name, "*.critical", "logs-topic", false, nil)
failOnErr(err, "Failed to bind queue")
// Publisher
go func() {
for i := 0; i < MaxMsgCount; i++ {
oneNow := time.Now()
nowStr := strconv.FormatInt(int64(oneNow.UnixNano()), 10)
msg := amqp.Publishing{
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("%s Your message here-%d", nowStr, i)),
}
err := ch.Publish("logs-topic", "kernel.critical", false, false, msg)
failOnErr(err, "Failed to publish message")
time.Sleep(time.Second)
}
}()
// Subscriber
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnErr(err, "Failed to consume messages")
go func() {
for d := range msgs {
log.Printf("Received: %s", d.Body)
}
}()
}
func runFanoutExchange(ch *amqp.Channel) {
err := ch.ExchangeDeclare("logs-fanout", "fanout", true, false, false, false, nil)
failOnErr(err, "Failed to declare exchange")
q1, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnErr(err, "Failed to declare queue")
q2, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnErr(err, "Failed to declare queue")
err = ch.QueueBind(q1.Name, "", "logs-fanout", false, nil)
failOnErr(err, "Failed to bind queue")
err = ch.QueueBind(q2.Name, "", "logs-fanout", false, nil)
failOnErr(err, "Failed to bind queue")
// Publisher
go func() {
for i := 0; i < MaxMsgCount; i++ {
oneNow := time.Now()
nowStr := strconv.FormatInt(int64(oneNow.UnixNano()), 10)
msg := amqp.Publishing{
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("%s Your message here-%d", nowStr, i)),
}
err := ch.Publish("logs-fanout", "", false, false, msg)
failOnErr(err, "Failed to publish message")
time.Sleep(time.Second)
}
}()
// Subscribers
msgs1, err := ch.Consume(q1.Name, "", true, false, false, false, nil)
failOnErr(err, "Failed to consume messages")
msgs2, err := ch.Consume(q2.Name, "", true, false, false, false, nil)
failOnErr(err, "Failed to consume messages")
go func() {
for d := range msgs1 {
log.Printf("Q1: %s", d.Body)
}
}()
go func() {
for d := range msgs2 {
log.Printf("Q2: %s", d.Body)
}
}()
}
func failOnErr(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment