Last active
March 7, 2021 02:54
-
-
Save PercyPham/07ee75a90c1fa84b4c27018e6ffee4b9 to your computer and use it in GitHub Desktop.
Implement mini PubSub
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pubsub | |
import ( | |
"context" | |
"fmt" | |
"sync" | |
"time" | |
) | |
// Topic string | |
type Topic string | |
// Event interface | |
type Event interface { | |
ID() string // ID getter | |
Topic() Topic // Topic getter | |
Data() interface{} // Data getter | |
CreatedAt() time.Time // CreatedAt getter | |
String() string | |
} | |
type event struct { | |
id string | |
topic Topic | |
data interface{} | |
createdAt time.Time | |
} | |
func newEvent(topic Topic, data interface{}) *event { | |
now := time.Now().UTC() | |
return &event{ | |
id: fmt.Sprintf("%d", now.Nanosecond()), | |
topic: topic, | |
data: data, | |
createdAt: now, | |
} | |
} | |
func (e *event) ID() string { return e.id } | |
func (e *event) Topic() Topic { return e.topic } | |
func (e *event) Data() interface{} { return e.data } | |
func (e *event) CreatedAt() time.Time { return e.createdAt } | |
func (e *event) String() string { return fmt.Sprintf("Event %s", e.topic) } | |
// PubSub interface for publish/subscribe pattern | |
type PubSub interface { | |
// Publish an event to topic with specified data | |
Publish(ctx context.Context, topic Topic, data interface{}) error | |
// Subscribe returns a channel to receive events from specific topic and an unsubscribe method | |
Subscribe(ctx context.Context, topic Topic) (c <-chan Event, unsubscribe func()) | |
} | |
type pubsub struct { | |
eventQueue chan Event | |
topicChansMap map[Topic][]chan Event | |
mu sync.RWMutex | |
} | |
// New instance of PubSub implementation | |
func New() PubSub { | |
ps := &pubsub{ | |
eventQueue: make(chan Event, 10000), | |
topicChansMap: make(map[Topic][]chan Event), | |
} | |
ps.run() | |
return ps | |
} | |
func (ps *pubsub) run() { | |
go func() { | |
for { | |
e := <-ps.eventQueue | |
topic := e.Topic() | |
if chans, ok := ps.topicChansMap[topic]; ok { | |
for i := range chans { | |
go func(c chan Event) { | |
c <- e | |
}(chans[i]) | |
} | |
} | |
} | |
}() | |
} | |
func (ps *pubsub) Publish(ctx context.Context, topic Topic, data interface{}) error { | |
go func() { | |
ps.eventQueue <- newEvent(topic, data) | |
}() | |
return nil | |
} | |
func (ps *pubsub) Subscribe(ctx context.Context, topic Topic) (<-chan Event, func()) { | |
c := make(chan Event) | |
ps.mu.Lock() | |
if chans, ok := ps.topicChansMap[topic]; ok { | |
chans = append(ps.topicChansMap[topic], c) | |
ps.topicChansMap[topic] = chans | |
} else { | |
ps.topicChansMap[topic] = []chan Event{c} | |
} | |
ps.mu.Unlock() | |
unsubscribe := func() { | |
if chans, ok := ps.topicChansMap[topic]; ok { | |
for i := range chans { | |
if chans[i] == c { | |
chans = append(chans[:i], chans[i+1:]...) // remove chan at index i | |
ps.mu.Lock() | |
ps.topicChansMap[topic] = chans | |
ps.mu.Unlock() | |
break | |
} | |
} | |
} | |
} | |
return c, unsubscribe | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment