Skip to content

Instantly share code, notes, and snippets.

@pracps
Last active April 9, 2019 12:08
Show Gist options
  • Save pracps/585a6869e8356bdeb317191710e29dcc to your computer and use it in GitHub Desktop.
Save pracps/585a6869e8356bdeb317191710e29dcc to your computer and use it in GitHub Desktop.
NATS Async Subscriber
/**
* This is not a group, all subscriber receives the message
*/
package main
import (
nats "github.com/nats-io/go-nats"
"fmt"
"sync"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
panic(err)
}
defer nc.Close()
wg := sync.WaitGroup{}
wg.Add(3)
// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received a message: %s 1\n", string(m.Data))
wg.Done()
})
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received a message: %s 2\n", string(m.Data))
wg.Done()
})
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received a message: %s 3\n", string(m.Data))
wg.Done()
})
if err := nc.Publish("foo", []byte("All is Well")); err != nil {
panic(err)
}
nc.Flush()
wg.Wait()
fmt.Println("last statement")
}
/**
* instead of receiving directly we can receive messages in channel and then
* read message from channel
*/
package main
import (
nats "github.com/nats-io/go-nats"
"fmt"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
panic(err)
}
defer nc.Close()
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
if (err != nil) {
panic(err)
}
if err := nc.Publish("foo", []byte("Msg 1")); err != nil {
panic(err)
}
if err := nc.Publish("foo", []byte("Msg 2")); err != nil {
panic(err)
}
nc.Flush()
msg := <- ch
fmt.Printf("Received a message: %s\n", string(msg.Data))
msg = <- ch
fmt.Printf("Received a message: %s\n", string(msg.Data))
// Unsubscribe
/**
* Don't close the channel until unsubscribe is called
*/
sub.Unsubscribe()
sub.Drain()
fmt.Println("last statement")
}
package main
/**
* Within queue subscribers, only one of them will get the message
* All subscriber without queue name and listening only to topic will recieve messages
*/
import (
nats "github.com/nats-io/go-nats"
"fmt"
"sync"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
panic(err)
}
defer nc.Close()
wg := sync.WaitGroup{}
wg.Add(5)
// Simple Async Subscriber
// only one of "updone" subscriber group will receive message
nc.QueueSubscribe("foo", "updone", func(m *nats.Msg) {
fmt.Printf("Received a message[updone]: %s 1\n", string(m.Data))
wg.Done()
})
nc.QueueSubscribe("foo", "updone", func(m *nats.Msg) {
fmt.Printf("Received a message[updone]: %s 2\n", string(m.Data))
wg.Done()
})
nc.QueueSubscribe("foo", "updone", func(m *nats.Msg) {
fmt.Printf("Received a message[updone]: %s 3\n", string(m.Data))
wg.Done()
})
// only one of "updtwo" subscriber group will receive message
nc.QueueSubscribe("foo", "updtwo", func(m *nats.Msg) {
fmt.Printf("Received a message[updtwo]: %s `\n", string(m.Data))
wg.Done()
})
nc.QueueSubscribe("foo", "updtwo", func(m *nats.Msg) {
fmt.Printf("Received a message[updtwo]: %s 1\n", string(m.Data))
wg.Done()
})
nc.QueueSubscribe("foo", "updtwo", func(m *nats.Msg) {
fmt.Printf("Received a message[updtwo]: %s 3\n", string(m.Data))
wg.Done()
})
// Without queue group all of following will receive message
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Recieved a message[no queue]: %s 100\n", string(m.Data))
wg.Done()
})
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Recieved a message[no queue]: %s 101\n", string(m.Data))
wg.Done()
})
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Recieved a message[no queue]: %s 102\n", string(m.Data))
wg.Done()
})
if err := nc.Publish("foo", []byte("All is Well")); err != nil {
panic(err)
}
fmt.Println("Publish done")
nc.Flush()
wg.Wait()
fmt.Println("last statement")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment