Skip to content

Instantly share code, notes, and snippets.

@duyquang6
Forked from kzhui125/multilock.go
Created September 10, 2020 16:58
Show Gist options
  • Save duyquang6/d7b1e6cdbfdb64a99ef5b76108a02fb5 to your computer and use it in GitHub Desktop.
Save duyquang6/d7b1e6cdbfdb64a99ef5b76108a02fb5 to your computer and use it in GitHub Desktop.
golang key baed locking
package xsync
import (
"sync"
)
type mutexItem struct {
mu sync.Mutex
counter int64
}
// mutexPool is not concurrent safe
type mutexPool struct {
pool []*mutexItem
}
func (p *mutexPool) Put(mi *mutexItem) {
p.pool = append(p.pool, mi)
}
func (p *mutexPool) Get() *mutexItem {
if len(p.pool) == 0 {
return &mutexItem{}
}
i := len(p.pool) - 1
mi := p.pool[i]
p.pool = p.pool[:i]
return mi
}
type messageGet struct {
key string
c chan *sync.Mutex
}
type messagePut string
// Multilocker is key based multiple locker
type Multilocker struct {
gets chan messageGet
puts chan messagePut
inUse map[string]*mutexItem
done chan struct{}
mp mutexPool
cp sync.Pool
}
// Get get key related sync.Mutex
func (l *Multilocker) Get(key string) *sync.Mutex {
msg := messageGet{
key: key,
// c: l.cp.Get().(chan *sync.Mutex),
c: make(chan *sync.Mutex, 1),
}
l.gets <- msg
mu := <-msg.c
// l.cp.Put(msg.c)
return mu
}
// Put release key related mutexItem
func (l *Multilocker) Put(key string) {
l.puts <- messagePut(key)
}
// Close will stop schedule
func (l *Multilocker) Close() {
close(l.done)
}
func (l *Multilocker) handleGet(msg *messageGet) {
key := msg.key
c := msg.c
mi, ok := l.inUse[key]
if !ok {
mi = l.mp.Get()
l.inUse[key] = mi
}
mi.counter++
c <- &mi.mu
}
func (l *Multilocker) handlePut(msg *messagePut) {
key := string(*msg)
mi, ok := l.inUse[key]
if !ok {
panic("should call lock first")
}
mi.counter--
if mi.counter == 0 {
l.mp.Put(mi)
delete(l.inUse, key)
}
}
func (l *Multilocker) schedule() {
loop:
for {
select {
case msg := <-l.gets:
l.handleGet(&msg)
case msg := <-l.puts:
l.handlePut(&msg)
case <-l.done:
break loop
}
}
}
// NewMultilocker return a new Multilocker
func NewMultilocker() *Multilocker {
l := &Multilocker{
gets: make(chan messageGet, 1000),
puts: make(chan messagePut, 1000),
inUse: make(map[string]*mutexItem),
cp: sync.Pool{
New: func() interface{} { return make(chan *sync.Mutex, 1) },
},
}
go l.schedule()
return l
}
package xsync
import (
"log"
"strconv"
"sync"
"testing"
"time"
)
func TestSingleLock(t *testing.T) {
l := NewMultilocker()
mu := l.Get("3")
mu.Lock()
mu.Unlock()
l.Put("3")
}
func TestMultiple(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("start")
defer log.Println("end")
l := NewMultilocker()
var n sync.WaitGroup
for i := 0; i < 2000; i++ {
n.Add(1)
i := i
key := strconv.Itoa(i % 2)
// key := strconv.Itoa(i)
go func() {
defer n.Done()
mu := l.Get(key)
mu.Lock()
// log.Printf("%d - %s start", i, key)
time.Sleep(time.Millisecond * 10)
// log.Printf("%d - %s end", i, key)
mu.Unlock()
l.Put(key)
}()
}
n.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment