-
-
Save AlexisLeon/1a2e9ece26fca36d70a9d203cf604bec to your computer and use it in GitHub Desktop.
Example Golang worker with rabbitMQ graceful shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/streadway/amqp" | |
) | |
// newConsumer is a function create a rabbitMQ consumer | |
func newConsumer(connectionString, queueName, consumerName string, fetchCount int) (connection *amqp.Connection, channel *amqp.Channel, msgs <-chan amqp.Delivery) { | |
connection, _ = amqp.Dial(connectionString) | |
channel, _ = connection.Channel() | |
channel.Qos(fetchCount, 0, false) | |
msgs, _ = channel.Consume( | |
queueName, // queue | |
consumerName, // consumer | |
false, // auto-ack | |
false, // exclusive | |
false, // no-local | |
false, // no-wait | |
nil, // args | |
) | |
return | |
} | |
func worker(msgs <-chan amqp.Delivery, done chan bool) { | |
for m := range msgs { | |
body := string(m.Body) | |
log.Printf("Processing data %+v\n", body) | |
time.Sleep(5 * time.Second) | |
log.Printf("Processing data %+v done\n", body) | |
m.Ack(false) | |
log.Printf("Data %+v acked\n", body) | |
} | |
done <- true | |
} | |
func main() { | |
defer log.Println("Program stopped successful") | |
url := flag.String("url", "", "eg. amqp://root:toor@localhost:5672/") | |
queue := flag.String("queue", "", "eg. test-queue") | |
name := flag.String("name", "", "eg. consumer-1") | |
fetchSize := flag.Int("size", 1, "eg. 20") | |
flag.Parse() | |
log.Printf("Connecting to %s queue %s fetch-size %d\n", *url, *queue, *fetchSize) | |
connection, channel, msgs := newConsumer(*url, *queue, *name, *fetchSize) | |
log.Printf("Consumer %s is subscribing queue %s\n", *name, *queue) | |
defer connection.Close() | |
defer channel.Close() | |
defer log.Println("Closing qeueu channel and connection") | |
done := make(chan bool) | |
go worker(msgs, done) | |
exit := make(chan os.Signal, 1) | |
signal.Notify(exit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) | |
// Wait for OS exit signal | |
<-exit | |
log.Println("Got exit signal") | |
// Stop recieving message from queue | |
channel.Cancel(*name, false) | |
log.Println("Stopped receiving message from queue") | |
// Wait for worker procrss recieved message | |
log.Println("Wait for worker procrss recieved message") | |
<-done | |
log.Println("Woker done") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment