Skip to content

Instantly share code, notes, and snippets.

@mayankshah1607
Created August 9, 2022 09:20
Show Gist options
  • Save mayankshah1607/56cddcc48df9796c9cc0906cd2c5c5bb to your computer and use it in GitHub Desktop.
Save mayankshah1607/56cddcc48df9796c9cc0906cd2c5c5bb to your computer and use it in GitHub Desktop.
package main
import (
"time"
)
type Batch struct {
events []int
}
func writeInfiniteEvents() <-chan int {
eventsChan := make(chan int)
go func() {
for {
eventsChan <- 1
}
}()
return eventsChan
}
func processBatch(b *Batch) {
// do something
}
func main() {
// dummy event writer
events := writeInfiniteEvents()
batchSize := 1000
// this channel will have a batch
batchOut := make(chan *Batch)
go func() {
// initialize empty batch
eventCount := 0
batch := &Batch{}
for {
select {
// read new event
case event := <-events:
// append it to current batch
batch.events = append(batch.events, event)
eventCount++
// check if we have reached the limit
if len(batch.events) == batchSize {
// pass it to batchOut
batchOut <- batch
// reset
batch = &Batch{}
eventCount = 0
}
// check if we reached 5 seconds
case <-time.After(5 * time.Second):
// pass it to batch out
batchOut <- batch
// reset
batch = &Batch{}
eventCount = 0
}
}
}()
go func() {
// Spin up new routines as we get a new batch
for b := range batchOut {
go processBatch(b)
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment