Skip to content

Instantly share code, notes, and snippets.

@christopherwxyz
Created December 4, 2024 18:36
Show Gist options
  • Save christopherwxyz/f6d865af67695cf55ff70990d2756f7f to your computer and use it in GitHub Desktop.
Save christopherwxyz/f6d865af67695cf55ff70990d2756f7f to your computer and use it in GitHub Desktop.
processBatch
func (mp *MessageProcessor) processBatch(batch []*pb.Message) {
if len(batch) == 0 {
return
}
startTime := time.Now()
defer func() {
if mp.metrics != nil {
mp.metrics.batchProcessingTime.Observe(time.Since(startTime).Seconds())
}
}()
skipIndices := make(map[int]bool)
for i, msg := range batch {
if msg == nil || msg.Data == nil {
hash := "nil"
if msg != nil && len(msg.Hash) > 0 {
hash = common.BytesToHash(msg.Hash).Hex()
}
slog.Warn("Empty message in batch", "index", i, "hash", hash)
skipIndices[i] = true
continue
}
if mp.handler != nil && mp.validateMessage(msg) != nil {
skipIndices[i] = true
}
}
for i, msg := range batch {
if skipIndices[i] {
continue
}
select {
case <-mp.ctx.Done():
return
default:
if err := mp.handleMessage(mp.ctx, msg); err != nil {
if mp.metrics != nil {
mp.metrics.errors.Inc()
}
slog.Error("Error processing message", "error", err)
}
}
}
if mp.metrics != nil {
mp.processed.Add(uint64(len(batch)))
mp.metrics.messagesProcessed.Add(float64(len(batch)))
}
validMessages := 0
var firstFid, lastFid uint64
for i, msg := range batch {
if !skipIndices[i] && msg != nil && msg.Data != nil {
if validMessages == 0 {
firstFid = msg.Data.Fid
}
lastFid = msg.Data.Fid
validMessages++
}
}
if validMessages > 0 {
slog.Info("Processed batch",
"size", validMessages,
"total_processed", mp.processed.Load(),
"first_fid", firstFid,
"last_fid", lastFid)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment