Skip to content

Instantly share code, notes, and snippets.

@samix73
Created September 8, 2018 08:57
Show Gist options
  • Save samix73/9a6c794e40a3219c4efe292cb2527f32 to your computer and use it in GitHub Desktop.
Save samix73/9a6c794e40a3219c4efe292cb2527f32 to your computer and use it in GitHub Desktop.
Broadcasting messages using channels
package broadcast
import (
"sync"
)
var brokers = make(map[string]*Broker)
type Broker struct {
stopCh chan struct{}
publishCh chan Message
subCh chan chan Message
unsubCh chan chan Message
subs map[chan Message]struct{}
m sync.Mutex
}
type Message struct {
SenderID string
Message interface{}
}
func NewBroker(id string) *Broker {
b := &Broker{
stopCh: make(chan struct{}),
publishCh: make(chan Message, 100),
subCh: make(chan chan Message, 1),
unsubCh: make(chan chan Message, 1),
subs: make(map[chan Message]struct{}),
}
brokers[id] = b
return b
}
func GetBroker(id string) *Broker {
if v, ok := brokers[id]; ok {
return v
}
return nil
}
func (b *Broker) Start() {
for {
select {
case <-b.stopCh:
return
case msgCh := <-b.subCh:
b.subs[msgCh] = struct{}{}
case msgCh := <-b.unsubCh:
close(msgCh)
delete(b.subs, msgCh)
case msg := <-b.publishCh:
for msgCh := range b.subs {
select {
case msgCh <- msg:
default:
}
}
}
}
}
func (b *Broker) Stop(id string) {
delete(brokers, id)
close(b.stopCh)
}
func (b *Broker) Subscribe() chan Message {
msgCh := make(chan Message, 100)
b.subCh <- msgCh
return msgCh
}
func (b *Broker) Unsubscribe(msgCh chan Message) {
b.unsubCh <- msgCh
}
func (b *Broker) Publish(msg Message) {
b.publishCh <- msg
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment