Last active
September 10, 2020 16:58
-
-
Save kzhui125/d60a52968376c6febf8e08f9c3d78563 to your computer and use it in GitHub Desktop.
golang key baed locking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package xsync | |
import ( | |
"sync" | |
) | |
type mutexItem struct { | |
mu sync.Mutex | |
counter int64 | |
} | |
// mutexPool is not concurrent safe | |
type mutexPool struct { | |
pool []*mutexItem | |
} | |
func (p *mutexPool) Put(mi *mutexItem) { | |
p.pool = append(p.pool, mi) | |
} | |
func (p *mutexPool) Get() *mutexItem { | |
if len(p.pool) == 0 { | |
return &mutexItem{} | |
} | |
i := len(p.pool) - 1 | |
mi := p.pool[i] | |
p.pool = p.pool[:i] | |
return mi | |
} | |
type messageGet struct { | |
key string | |
c chan *sync.Mutex | |
} | |
type messagePut string | |
// Multilocker is key based multiple locker | |
type Multilocker struct { | |
gets chan messageGet | |
puts chan messagePut | |
inUse map[string]*mutexItem | |
done chan struct{} | |
mp mutexPool | |
cp sync.Pool | |
} | |
// Get get key related sync.Mutex | |
func (l *Multilocker) Get(key string) *sync.Mutex { | |
msg := messageGet{ | |
key: key, | |
// c: l.cp.Get().(chan *sync.Mutex), | |
c: make(chan *sync.Mutex, 1), | |
} | |
l.gets <- msg | |
mu := <-msg.c | |
// l.cp.Put(msg.c) | |
return mu | |
} | |
// Put release key related mutexItem | |
func (l *Multilocker) Put(key string) { | |
l.puts <- messagePut(key) | |
} | |
// Close will stop schedule | |
func (l *Multilocker) Close() { | |
close(l.done) | |
} | |
func (l *Multilocker) handleGet(msg *messageGet) { | |
key := msg.key | |
c := msg.c | |
mi, ok := l.inUse[key] | |
if !ok { | |
mi = l.mp.Get() | |
l.inUse[key] = mi | |
} | |
mi.counter++ | |
c <- &mi.mu | |
} | |
func (l *Multilocker) handlePut(msg *messagePut) { | |
key := string(*msg) | |
mi, ok := l.inUse[key] | |
if !ok { | |
panic("should call lock first") | |
} | |
mi.counter-- | |
if mi.counter == 0 { | |
l.mp.Put(mi) | |
delete(l.inUse, key) | |
} | |
} | |
func (l *Multilocker) schedule() { | |
loop: | |
for { | |
select { | |
case msg := <-l.gets: | |
l.handleGet(&msg) | |
case msg := <-l.puts: | |
l.handlePut(&msg) | |
case <-l.done: | |
break loop | |
} | |
} | |
} | |
// NewMultilocker return a new Multilocker | |
func NewMultilocker() *Multilocker { | |
l := &Multilocker{ | |
gets: make(chan messageGet, 1000), | |
puts: make(chan messagePut, 1000), | |
inUse: make(map[string]*mutexItem), | |
cp: sync.Pool{ | |
New: func() interface{} { return make(chan *sync.Mutex, 1) }, | |
}, | |
} | |
go l.schedule() | |
return l | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package xsync | |
import ( | |
"log" | |
"strconv" | |
"sync" | |
"testing" | |
"time" | |
) | |
func TestSingleLock(t *testing.T) { | |
l := NewMultilocker() | |
mu := l.Get("3") | |
mu.Lock() | |
mu.Unlock() | |
l.Put("3") | |
} | |
func TestMultiple(t *testing.T) { | |
log.SetFlags(log.LstdFlags | log.Lmicroseconds) | |
log.Println("start") | |
defer log.Println("end") | |
l := NewMultilocker() | |
var n sync.WaitGroup | |
for i := 0; i < 2000; i++ { | |
n.Add(1) | |
i := i | |
key := strconv.Itoa(i % 2) | |
// key := strconv.Itoa(i) | |
go func() { | |
defer n.Done() | |
mu := l.Get(key) | |
mu.Lock() | |
// log.Printf("%d - %s start", i, key) | |
time.Sleep(time.Millisecond * 10) | |
// log.Printf("%d - %s end", i, key) | |
mu.Unlock() | |
l.Put(key) | |
}() | |
} | |
n.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment