-
-
Save leolara/f6fb5dfc04d64947487f16764d6b37b6 to your computer and use it in GitHub Desktop.
| // Package gochannels example of how to close a channel written by several goroutines | |
| package gochannels | |
| import ( | |
| "math/big" | |
| "sync" | |
| ) | |
| // Publisher write sequences of big.Int into a channel | |
| type Publisher struct { | |
| ch chan big.Int | |
| closingCh chan interface{} | |
| writersWG sync.WaitGroup | |
| writersWGMutex sync.Mutex | |
| } | |
| // New creates a Publisher | |
| func New() *Publisher { | |
| return &Publisher{ | |
| ch: make(chan big.Int), | |
| closingCh: make(chan interface{}), | |
| } | |
| } | |
| // Run write into the channel the sequence 0..n-1 | |
| func (p *Publisher) Run(n int) { | |
| for i := 0; i < n; i++ { | |
| p.write(*big.NewInt(int64(i))) | |
| } | |
| } | |
| // Read returns the channel to write | |
| func (p *Publisher) Read() <-chan big.Int { | |
| return p.ch | |
| } | |
| // write into the channel in a different goroutine | |
| func (p *Publisher) write(data big.Int) { | |
| go func(data big.Int) { | |
| p.writersWGMutex.Lock() | |
| p.writersWG.Add(1) | |
| p.writersWGMutex.Unlock() | |
| defer p.writersWG.Done() | |
| select { | |
| case <-p.closingCh: | |
| return | |
| default: | |
| } | |
| select { | |
| case <-p.closingCh: | |
| case p.ch <- data: | |
| } | |
| }(data) | |
| } | |
| // Closes channel, draining any blocked writes | |
| func (p *Publisher) Close() { | |
| close(p.closingCh) | |
| go func() { | |
| for range p.ch { | |
| } | |
| }() | |
| p.writersWGMutex.Lock() | |
| p.writersWG.Wait() | |
| p.writersWGMutex.Unlock() | |
| close(p.ch) | |
| } | |
| // CloseWithoutDraining closes channel, without draining any pending writes, this method | |
| // will block until all writes have been unblocked by reads | |
| func (p *Publisher) CloseWithoutDraining() { | |
| close(p.closingCh) | |
| p.writersWGMutex.Lock() | |
| p.writersWG.Wait() | |
| p.writersWGMutex.Unlock() | |
| close(p.ch) | |
| } |
| package gochannels | |
| // In order to detect race conditions run the test with: | |
| // go test -cpu=1,9,55,99 -race -count=100 -failfast | |
| import ( | |
| "math/big" | |
| "sync" | |
| "testing" | |
| ) | |
| func TestSimple(t *testing.T) { | |
| consumer := func(pub *Publisher, n int, wg *sync.WaitGroup, result chan *big.Int) { | |
| ch := pub.Read() | |
| acc := big.NewInt(0) | |
| for i := 0; i < n; i++ { | |
| val := <-ch | |
| t.Log(&val) | |
| acc.Add(acc, &val) | |
| } | |
| wg.Done() | |
| result <- acc | |
| } | |
| producer := func(pub *Publisher, n int, wg *sync.WaitGroup) { | |
| pub.Run(n) | |
| wg.Done() | |
| } | |
| precalc := func(n int) *big.Int { | |
| acc := big.NewInt(0) | |
| for i := 0; i < n; i++ { | |
| acc.Add(acc, big.NewInt(int64(i))) | |
| } | |
| return acc | |
| } | |
| p := New() | |
| var wg sync.WaitGroup | |
| resultCh := make(chan *big.Int) | |
| wg.Add(2) | |
| go consumer(p, 100, &wg, resultCh) | |
| go producer(p, 100, &wg) | |
| wg.Wait() | |
| p.CloseWithoutDraining() | |
| result := <-resultCh | |
| t.Log(result) | |
| t.Log(precalc(100)) | |
| if result.Cmp(precalc(100)) != 0 { | |
| t.Error("wrong result") | |
| } | |
| } | |
| func TestIntermediate(t *testing.T) { | |
| consumer := func(pub *Publisher, n int, wg *sync.WaitGroup, result chan *big.Int) { | |
| ch := pub.Read() | |
| acc := big.NewInt(0) | |
| for i := 0; i < n; i++ { | |
| val := <-ch | |
| t.Log(&val) | |
| acc.Add(acc, &val) | |
| } | |
| wg.Done() | |
| result <- acc | |
| } | |
| producer := func(pub *Publisher, n int, wg *sync.WaitGroup) { | |
| pub.Run(n) | |
| wg.Done() | |
| } | |
| p := New() | |
| var wg sync.WaitGroup | |
| resultCh := make(chan *big.Int) | |
| wg.Add(3) | |
| go consumer(p, 100, &wg, resultCh) | |
| go producer(p, 100, &wg) | |
| go producer(p, 100, &wg) | |
| <-resultCh | |
| p.Close() | |
| wg.Wait() | |
| } |
I wrote this a long time ago, so I do not remember all details now.
I think close is non-blocking, so I think it would not make a diference, but as I said it was a long time ago
Hi @leolara,
I also come here from the https://www.leolara.me/blog/closing_a_go_channel_written_by_several_goroutines/. It is really a great article.
And since you mentioned the usage of a WaitGroup, I come up with an idea to avoid using mutex: we could probably use a writing counter to do similar work.
type Publisher struct {
ch chan big.Int
state int32
writing int32
}
func (p *Publisher) write(n int) {
atomic.AddInt32(&p.writing, 1)
if atomic.LoadInt32(&p.state) == 0 {
p.ch <- n
}
atomic.AddInt32(&p.writing, -1)
}
func (p *Publisher) Close() {
if atomic.CompareAndSwapInt32(&p.state, 0, 1) {
go func() {
for range p.ch {
}
}()
}
for atomic.LoadInt32(&p.writing) != 0 {
runtime.Gosched()
}
if atomic.CompareAndSwapInt32(&p.state, 1, 2) {
close(p.ch)
}
}Hi @rueian,
I guess you do atomic.CompareAndSwapInt32(&p.state, 1, 2) in case Close is called more than once?
Hi @rueian,
I guess you do
atomic.CompareAndSwapInt32(&p.state, 1, 2)in case Close is called more than once?
Yes, you are right. Otherwise it may panic.
Hi, @leolara,
Thanks for your excellent post, would you be so kind to add a license to this?
@leolara First of all, what an excellent post https://www.leolara.me/blog/closing_a_go_channel_written_by_several_goroutines/
I have one question though, in the
func (p *Publisher) Close()method, what if we put the channel close inside the critical section instead?Like