Skip to content

Instantly share code, notes, and snippets.

@boyswan
Created January 2, 2025 18:52
Show Gist options
  • Save boyswan/c370df0026bf4a243750fd0ac031ded5 to your computer and use it in GitHub Desktop.
Save boyswan/c370df0026bf4a243750fd0ac031ded5 to your computer and use it in GitHub Desktop.
pcmbuf
import (
crand "crypto/rand"
"github.com/smallnest/ringbuffer"
"log"
"math/rand"
"sync"
"time"
)
func generateRandomBytes() []byte {
length := rand.Intn(1024)
bytes := make([]byte, length)
_, err := crand.Read(bytes)
if err != nil {
return []byte{}
}
return bytes
}
type BufConfig struct {
BufferSize int
FrameSize int
Timeout time.Duration
}
type Buf struct {
frameSize int
timeout time.Duration
timer *time.Timer
condWrite *sync.Cond
rb *ringbuffer.RingBuffer
done chan struct{}
}
func NewBuf(cfg BufConfig) *Buf {
return &Buf{
frameSize: cfg.FrameSize,
timeout: cfg.Timeout,
timer: time.NewTimer(cfg.Timeout),
condWrite: sync.NewCond(&sync.Mutex{}),
rb: ringbuffer.New(cfg.BufferSize),
done: make(chan struct{}),
}
}
func (b *Buf) Write(v []byte) error {
b.condWrite.L.Lock()
_, err := b.rb.Write(v)
if err != nil {
return err
}
b.condWrite.Signal()
b.condWrite.L.Unlock()
b.timer.Reset(b.timeout)
return nil
}
func (b *Buf) cleanup(ch chan []byte) []byte {
b.condWrite.L.Lock()
defer b.condWrite.L.Unlock()
if l := b.rb.Length(); l > 0 {
buf := make([]byte, l)
_, err := b.rb.Read(buf)
if err != nil {
log.Printf("read err %s", err.Error())
return nil
}
log.Printf(">>> Cleaned %d!", l)
return buf
}
log.Printf(">>> skip clean")
return nil
}
func (b *Buf) readChunks(ch chan []byte) [][]byte {
b.condWrite.L.Lock()
defer b.condWrite.L.Unlock()
for b.rb.Length() < b.frameSize {
b.condWrite.Wait()
}
chunks := make([][]byte, 0, int(b.rb.Length()/b.frameSize))
for {
if b.rb.Length() < b.frameSize {
break
}
buf := make([]byte, b.frameSize)
_, err := b.rb.Read(buf)
if err != nil {
log.Printf("main read %s", err.Error())
continue
}
// log.Printf("R: %d <<<", len(buf))
chunks = append(chunks, buf)
}
return chunks
}
func (b *Buf) ReadTo(ch chan []byte) {
go func() {
for {
select {
case <-b.done:
return
case <-b.timer.C:
buf := b.cleanup(ch)
if buf != nil {
ch <- buf
}
default:
continue
}
}
}()
for {
select {
case <-b.done:
return
default:
chunks := b.readChunks(ch)
for _, chunk := range chunks {
ch <- chunk
}
}
}
}
func (b *Buf) Close() {
if b.done != nil {
b.done <- struct{}{}
close(b.done)
}
}
func main() {
b := NewBuf(BufConfig{
BufferSize: 1024 * 1024,
FrameSize: 256,
Timeout: time.Duration(time.Millisecond * 200),
})
ch := make(chan []byte)
go func() {
for {
v := generateRandomBytes()
b.Write(v)
if rand.Float64()*1 < 0.1 {
time.Sleep(time.Duration(1000 * time.Millisecond))
log.Printf(" ... 3 sec")
} else {
time.Sleep(time.Duration(50 * time.Millisecond))
log.Printf(" ... 50ms")
}
}
}()
go func() {
for range ch {
continue
}
}()
b.ReadTo(ch)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment