Created
May 20, 2020 19:39
-
-
Save nvanbenschoten/530085606583869da298e65b7f14b2ad to your computer and use it in GitHub Desktop.
#46793 - attempt to reproduce
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2019 The Cockroach Authors. | |
// | |
// Use of this software is governed by the Business Source License | |
// included in the file licenses/BSL.txt. | |
// | |
// As of the Change Date specified in that file, in accordance with | |
// the Business Source License, use of this software will be governed | |
// by the Apache License, Version 2.0, included in the file | |
// licenses/APL.txt. | |
// Copyright 2016 The Go Authors. All rights reserved. | |
// Use of this source code is governed by a BSD-style | |
// license that can be found in licenses/BSD-golang.txt. | |
// This code originated in Go's sync package. | |
package race | |
import ( | |
"sync" | |
"sync/atomic" | |
"unsafe" | |
) | |
// IntMap is a concurrent map with amortized-constant-time loads, stores, and | |
// deletes. It is safe for multiple goroutines to call a Map's methods | |
// concurrently. | |
// | |
// It is optimized for use in concurrent loops with keys that are | |
// stable over time, and either few steady-state stores, or stores | |
// localized to one goroutine per key. | |
// | |
// For use cases that do not share these attributes, it will likely have | |
// comparable or worse performance and worse type safety than an ordinary | |
// map paired with a read-write mutex. | |
// | |
// Nil values are not supported; to use an IntMap as a set store a | |
// dummy non-nil pointer instead of nil. | |
// | |
// The zero Map is valid and empty. | |
// | |
// A Map must not be copied after first use. | |
type IntMap struct { | |
mu sync.Mutex | |
// read contains the portion of the map's contents that are safe for | |
// concurrent access (with or without mu held). | |
// | |
// The read field itself is always safe to load, but must only be stored with | |
// mu held. | |
// | |
// Entries stored in read may be updated concurrently without mu, but updating | |
// a previously-expunged entry requires that the entry be copied to the dirty | |
// map and unexpunged with mu held. | |
read unsafe.Pointer // *readOnly | |
// dirty contains the portion of the map's contents that require mu to be | |
// held. To ensure that the dirty map can be promoted to the read map quickly, | |
// it also includes all of the non-expunged entries in the read map. | |
// | |
// Expunged entries are not stored in the dirty map. An expunged entry in the | |
// clean map must be unexpunged and added to the dirty map before a new value | |
// can be stored to it. | |
// | |
// If the dirty map is nil, the next write to the map will initialize it by | |
// making a shallow copy of the clean map, omitting stale entries. | |
dirty map[int64]*entry | |
// misses counts the number of loads since the read map was last updated that | |
// needed to lock mu to determine whether the key was present. | |
// | |
// Once enough misses have occurred to cover the cost of copying the dirty | |
// map, the dirty map will be promoted to the read map (in the unamended | |
// state) and the next store to the map will make a new dirty copy. | |
misses int | |
} | |
// readOnly is an immutable struct stored atomically in the Map.read field. | |
type readOnly struct { | |
m map[int64]*entry | |
amended bool // true if the dirty map contains some key not in m. | |
} | |
// expunged is an arbitrary pointer that marks entries which have been deleted | |
// from the dirty map. | |
var expunged = unsafe.Pointer(new(int)) | |
// An entry is a slot in the map corresponding to a particular key. | |
type entry struct { | |
// p points to the value stored for the entry. | |
// | |
// If p == nil, the entry has been deleted and m.dirty == nil. | |
// | |
// If p == expunged, the entry has been deleted, m.dirty != nil, and the entry | |
// is missing from m.dirty. | |
// | |
// Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty | |
// != nil, in m.dirty[key]. | |
// | |
// An entry can be deleted by atomic replacement with nil: when m.dirty is | |
// next created, it will atomically replace nil with expunged and leave | |
// m.dirty[key] unset. | |
// | |
// An entry's associated value can be updated by atomic replacement, provided | |
// p != expunged. If p == expunged, an entry's associated value can be updated | |
// only after first setting m.dirty[key] = e so that lookups using the dirty | |
// map find the entry. | |
p unsafe.Pointer | |
} | |
func newEntry(r unsafe.Pointer) *entry { | |
return &entry{p: r} | |
} | |
// Load returns the value stored in the map for a key, or nil if no | |
// value is present. | |
// The ok result indicates whether value was found in the map. | |
func (m *IntMap) Load(key int64) (value unsafe.Pointer, ok bool) { | |
read := m.getRead() | |
e, ok := read.m[key] | |
if !ok && read.amended { | |
m.mu.Lock() | |
// Avoid reporting a spurious miss if m.dirty got promoted while we were | |
// blocked on m.mu. (If further loads of the same key will not miss, it's | |
// not worth copying the dirty map for this key.) | |
read = m.getRead() | |
e, ok = read.m[key] | |
if !ok && read.amended { | |
e, ok = m.dirty[key] | |
// Regardless of whether the entry was present, record a miss: this key | |
// will take the slow path until the dirty map is promoted to the read | |
// map. | |
m.missLocked() | |
} | |
m.mu.Unlock() | |
} | |
if !ok { | |
return nil, false | |
} | |
return e.load() | |
} | |
func (e *entry) load() (value unsafe.Pointer, ok bool) { | |
p := atomic.LoadPointer(&e.p) | |
if p == nil || p == expunged { | |
return nil, false | |
} | |
return p, true | |
} | |
// Store sets the value for a key. | |
func (m *IntMap) Store(key int64, value unsafe.Pointer) { | |
read := m.getRead() | |
if e, ok := read.m[key]; ok && e.tryStore(value) { | |
return | |
} | |
m.mu.Lock() | |
read = m.getRead() | |
if e, ok := read.m[key]; ok { | |
if e.unexpungeLocked() { | |
// The entry was previously expunged, which implies that there is a | |
// non-nil dirty map and this entry is not in it. | |
m.dirty[key] = e | |
} | |
e.storeLocked(value) | |
} else if e, ok := m.dirty[key]; ok { | |
e.storeLocked(value) | |
} else { | |
if !read.amended { | |
// We're adding the first new key to the dirty map. | |
// Make sure it is allocated and mark the read-only map as incomplete. | |
m.dirtyLocked() | |
atomic.StorePointer(&m.read, unsafe.Pointer(&readOnly{m: read.m, amended: true})) | |
} | |
m.dirty[key] = newEntry(value) | |
} | |
m.mu.Unlock() | |
} | |
// tryStore stores a value if the entry has not been expunged. | |
// | |
// If the entry is expunged, tryStore returns false and leaves the entry | |
// unchanged. | |
func (e *entry) tryStore(r unsafe.Pointer) bool { | |
p := atomic.LoadPointer(&e.p) | |
if p == expunged { | |
return false | |
} | |
for { | |
if atomic.CompareAndSwapPointer(&e.p, p, r) { | |
return true | |
} | |
p = atomic.LoadPointer(&e.p) | |
if p == expunged { | |
return false | |
} | |
} | |
} | |
// unexpungeLocked ensures that the entry is not marked as expunged. | |
// | |
// If the entry was previously expunged, it must be added to the dirty map | |
// before m.mu is unlocked. | |
func (e *entry) unexpungeLocked() (wasExpunged bool) { | |
return atomic.CompareAndSwapPointer(&e.p, expunged, nil) | |
} | |
// storeLocked unconditionally stores a value to the entry. | |
// | |
// The entry must be known not to be expunged. | |
func (e *entry) storeLocked(r unsafe.Pointer) { | |
atomic.StorePointer(&e.p, r) | |
} | |
// LoadOrStore returns the existing value for the key if present. | |
// Otherwise, it stores and returns the given value. | |
// The loaded result is true if the value was loaded, false if stored. | |
func (m *IntMap) LoadOrStore(key int64, value unsafe.Pointer) (actual unsafe.Pointer, loaded bool) { | |
// Avoid locking if it's a clean hit. | |
read := m.getRead() | |
if e, ok := read.m[key]; ok { | |
actual, loaded, ok = e.tryLoadOrStore(value) | |
if ok { | |
return actual, loaded | |
} | |
} | |
m.mu.Lock() | |
read = m.getRead() | |
if e, ok := read.m[key]; ok { | |
if e.unexpungeLocked() { | |
m.dirty[key] = e | |
} | |
actual, loaded, _ = e.tryLoadOrStore(value) | |
} else if e, ok := m.dirty[key]; ok { | |
actual, loaded, _ = e.tryLoadOrStore(value) | |
m.missLocked() | |
} else { | |
if !read.amended { | |
// We're adding the first new key to the dirty map. | |
// Make sure it is allocated and mark the read-only map as incomplete. | |
m.dirtyLocked() | |
atomic.StorePointer(&m.read, unsafe.Pointer(&readOnly{m: read.m, amended: true})) | |
} | |
m.dirty[key] = newEntry(value) | |
actual, loaded = value, false | |
} | |
m.mu.Unlock() | |
return actual, loaded | |
} | |
// tryLoadOrStore atomically loads or stores a value if the entry is not | |
// expunged. | |
// | |
// If the entry is expunged, tryLoadOrStore leaves the entry unchanged and | |
// returns with ok==false. | |
func (e *entry) tryLoadOrStore(r unsafe.Pointer) (actual unsafe.Pointer, loaded, ok bool) { | |
p := atomic.LoadPointer(&e.p) | |
if p == expunged { | |
return nil, false, false | |
} | |
if p != nil { | |
return p, true, true | |
} | |
for { | |
if atomic.CompareAndSwapPointer(&e.p, nil, r) { | |
return r, false, true | |
} | |
p = atomic.LoadPointer(&e.p) | |
if p == expunged { | |
return nil, false, false | |
} | |
if p != nil { | |
return p, true, true | |
} | |
} | |
} | |
// Delete deletes the value for a key. | |
func (m *IntMap) Delete(key int64) { | |
read := m.getRead() | |
e, ok := read.m[key] | |
if !ok && read.amended { | |
m.mu.Lock() | |
read = m.getRead() | |
e, ok = read.m[key] | |
if !ok && read.amended { | |
delete(m.dirty, key) | |
} | |
m.mu.Unlock() | |
} | |
if ok { | |
e.delete() | |
} | |
} | |
func (e *entry) delete() (hadValue bool) { | |
for { | |
p := atomic.LoadPointer(&e.p) | |
if p == nil || p == expunged { | |
return false | |
} | |
if atomic.CompareAndSwapPointer(&e.p, p, nil) { | |
return true | |
} | |
} | |
} | |
// Range calls f sequentially for each key and value present in the map. | |
// If f returns false, range stops the iteration. | |
// | |
// Range does not necessarily correspond to any consistent snapshot of the Map's | |
// contents: no key will be visited more than once, but if the value for any key | |
// is stored or deleted concurrently, Range may reflect any mapping for that key | |
// from any point during the Range call. | |
// | |
// Range may be O(N) with the number of elements in the map even if f returns | |
// false after a constant number of calls. | |
func (m *IntMap) Range(f func(key int64, value unsafe.Pointer) bool) { | |
// We need to be able to iterate over all of the keys that were already | |
// present at the start of the call to Range. | |
// If read.amended is false, then read.m satisfies that property without | |
// requiring us to hold m.mu for a long time. | |
read := m.getRead() | |
if read.amended { | |
// m.dirty contains keys not in read.m. Fortunately, Range is already O(N) | |
// (assuming the caller does not break out early), so a call to Range | |
// amortizes an entire copy of the map: we can promote the dirty copy | |
// immediately! | |
m.mu.Lock() | |
read = m.getRead() | |
if read.amended { | |
// Don't let read escape directly, otherwise it will allocate even | |
// when read.amended is false. Instead, constrain the allocation to | |
// just this branch. | |
newRead := &readOnly{m: m.dirty} | |
atomic.StorePointer(&m.read, unsafe.Pointer(newRead)) | |
read = *newRead | |
m.dirty = nil | |
m.misses = 0 | |
} | |
m.mu.Unlock() | |
} | |
for k, e := range read.m { | |
v, ok := e.load() | |
if !ok { | |
continue | |
} | |
if !f(k, v) { | |
break | |
} | |
} | |
} | |
func (m *IntMap) missLocked() { | |
m.misses++ | |
if m.misses < len(m.dirty) { | |
return | |
} | |
atomic.StorePointer(&m.read, unsafe.Pointer(&readOnly{m: m.dirty})) | |
m.dirty = nil | |
m.misses = 0 | |
} | |
func (m *IntMap) dirtyLocked() { | |
if m.dirty != nil { | |
return | |
} | |
read := m.getRead() | |
m.dirty = make(map[int64]*entry, len(read.m)) | |
for k, e := range read.m { | |
if !e.tryExpungeLocked() { | |
m.dirty[k] = e | |
} | |
} | |
} | |
func (m *IntMap) getRead() readOnly { | |
read := (*readOnly)(atomic.LoadPointer(&m.read)) | |
if read == nil { | |
return readOnly{} | |
} | |
return *read | |
} | |
func (e *entry) tryExpungeLocked() (isExpunged bool) { | |
p := atomic.LoadPointer(&e.p) | |
for p == nil { | |
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { | |
return true | |
} | |
p = atomic.LoadPointer(&e.p) | |
} | |
return p == expunged | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package race | |
type EntryType int32 | |
type MessageType int32 | |
type Entry struct { | |
Term uint64 | |
Index uint64 | |
Type EntryType | |
Data []byte | |
XXX_unrecognized []byte | |
} | |
type ConfState struct { | |
Voters []uint64 | |
Learners []uint64 | |
VotersOutgoing []uint64 | |
LearnersNext []uint64 | |
AutoLeave bool | |
XXX_unrecognized []byte | |
} | |
type SnapshotMetadata struct { | |
ConfState ConfState | |
Index uint64 | |
Term uint64 | |
XXX_unrecognized []byte | |
} | |
type Snapshot struct { | |
Data []byte | |
Metadata SnapshotMetadata | |
XXX_unrecognized []byte | |
} | |
type Message struct { | |
Type MessageType | |
To uint64 | |
From uint64 | |
Term uint64 | |
LogTerm uint64 | |
Index uint64 | |
Entries []Entry | |
Commit uint64 | |
Snapshot Snapshot | |
Reject bool | |
RejectHint uint64 | |
Context []byte | |
XXX_unrecognized []byte | |
} | |
type NodeID int32 | |
type StoreID int32 | |
type RangeID int64 | |
type ReplicaID int32 | |
type ReplicaType int32 | |
type Key []byte | |
type RKey Key | |
type ReplicaDescriptor struct { | |
NodeID NodeID | |
StoreID StoreID | |
ReplicaID ReplicaID | |
Type *ReplicaType | |
} | |
type RaftHeartbeat struct { | |
RangeID RangeID | |
FromReplicaID ReplicaID | |
ToReplicaID ReplicaID | |
Term uint64 | |
Commit uint64 | |
Quiesce bool | |
ToIsLearner bool | |
} | |
type RaftMessageRequest struct { | |
RangeID RangeID | |
RangeStartKey RKey | |
FromReplica ReplicaDescriptor | |
ToReplica ReplicaDescriptor | |
Message Message | |
Quiesce bool | |
Heartbeats []RaftHeartbeat | |
HeartbeatResps []RaftHeartbeat | |
} | |
type RaftMessageRequestBatch struct { | |
Requests []RaftMessageRequest | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package race | |
import ( | |
"sync" | |
"sync/atomic" | |
"testing" | |
"unsafe" | |
) | |
func TestRace(t *testing.T) { | |
rt := RaftTransport{} | |
var wg sync.WaitGroup | |
for i := 0; i < 32; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
sender(&rt) | |
}() | |
} | |
wg.Wait() | |
} | |
func sender(rt *RaftTransport) { | |
msg := Message{} | |
for i := 0; i < 10000000; i++ { | |
nodeID := NodeID(i % 4) | |
class := ConnectionClass(i % 2) | |
req := newRaftMessageRequest() | |
if i%99999 == 0 { | |
req = new(RaftMessageRequest) | |
} | |
*req = RaftMessageRequest{ | |
RangeID: 3, | |
ToReplica: ReplicaDescriptor{ | |
NodeID: nodeID, | |
}, | |
FromReplica: ReplicaDescriptor{ | |
NodeID: 2, | |
}, | |
Message: msg, | |
RangeStartKey: nil, // usually nil | |
} | |
if !rt.SendAsync(req, class) { | |
req.release() | |
} | |
} | |
} | |
const raftSendBufferSize = 10000 | |
type ConnectionClass int8 | |
type RaftTransport struct { | |
queues [2]IntMap // map[NodeID]*chan *RaftMessageRequest | |
totalLen int32 | |
didNotSent int32 | |
} | |
func (t *RaftTransport) getQueue( | |
nodeID NodeID, class ConnectionClass, | |
) (chan *RaftMessageRequest, bool) { | |
queuesMap := &t.queues[class] | |
value, ok := queuesMap.Load(int64(nodeID)) | |
if !ok { | |
ch := make(chan *RaftMessageRequest, raftSendBufferSize) | |
value, ok = queuesMap.LoadOrStore(int64(nodeID), unsafe.Pointer(&ch)) | |
} | |
return *(*chan *RaftMessageRequest)(value), ok | |
} | |
func (t *RaftTransport) SendAsync(req *RaftMessageRequest, class ConnectionClass) (sent bool) { | |
toNodeID := req.ToReplica.NodeID | |
defer func() { | |
if !sent { | |
atomic.AddInt32(&t.didNotSent, 1) | |
} | |
}() | |
if req.RangeID == 0 && len(req.Heartbeats) == 0 && len(req.HeartbeatResps) == 0 { | |
// Coalesced heartbeats are addressed to range 0; everything else | |
// needs an explicit range ID. | |
panic("only messages with coalesced heartbeats or heartbeat responses may be sent to range ID 0") | |
} | |
if req.Message.Type == 3 { | |
panic("snapshots must be sent using SendSnapshot") | |
} | |
ch, existingQueue := t.getQueue(toNodeID, class) | |
if !existingQueue { | |
if !t.startProcessNewQueue(toNodeID, class) { | |
return false | |
} | |
} | |
select { | |
case ch <- req: | |
l := int32(len(ch)) | |
atomic.StoreInt32(&t.totalLen, l) | |
return true | |
default: | |
return false | |
} | |
} | |
func (t *RaftTransport) startProcessNewQueue( | |
toNodeID NodeID, class ConnectionClass, | |
) (started bool) { | |
worker := func() { | |
ch, existingQueue := t.getQueue(toNodeID, class) | |
if !existingQueue { | |
panic("bad") | |
} | |
defer t.queues[class].Delete(int64(toNodeID)) | |
if err := t.processQueue(toNodeID, ch, class); err != nil { | |
panic(err) | |
} | |
} | |
go worker() | |
return true | |
} | |
func (t *RaftTransport) processQueue( | |
nodeID NodeID, ch chan *RaftMessageRequest, class ConnectionClass, | |
) error { | |
batch := &RaftMessageRequestBatch{} | |
for { | |
select { | |
case req := <-ch: | |
batch.Requests = append(batch.Requests, *req) | |
req.release() | |
// Pull off as many queued requests as possible. | |
// | |
// TODO(peter): Think about limiting the size of the batch we send. | |
for done := false; !done; { | |
select { | |
case req = <-ch: | |
batch.Requests = append(batch.Requests, *req) | |
req.release() | |
if len(batch.Requests) >= 1024 { | |
done = true | |
} | |
default: | |
done = true | |
} | |
} | |
_ = batch | |
batch.Requests = batch.Requests[:0] | |
} | |
} | |
} | |
var raftMessageRequestPool = sync.Pool{ | |
New: func() interface{} { | |
return &RaftMessageRequest{} | |
}, | |
} | |
func newRaftMessageRequest() *RaftMessageRequest { | |
return raftMessageRequestPool.Get().(*RaftMessageRequest) | |
} | |
func (m *RaftMessageRequest) release() { | |
*m = RaftMessageRequest{} | |
raftMessageRequestPool.Put(m) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment