Skip to content

Instantly share code, notes, and snippets.

@davyzhang
Last active November 28, 2024 02:07
Show Gist options
  • Save davyzhang/966d7e4ba62ea2b23e636c09ce19ce06 to your computer and use it in GitHub Desktop.
Save davyzhang/966d7e4ba62ea2b23e636c09ce19ce06 to your computer and use it in GitHub Desktop.
simple golang rate limiter
import (
"sync"
"time"
)
// Job represents a function to be executed
type Job func()
// RateLimiter manages rate limiting for function executions
type RateLimiter struct {
mu sync.Mutex
limit int // Maximum number of executions allowed per window
window time.Duration // Time window for rate limiting
executions []time.Time // Timestamps of executions
waitingQueue []Job // Queue of jobs waiting to be executed
queueMu sync.Mutex // Mutex for queue operations
}
// NewRateLimiter creates a new rate limiter with specified execution limit per minute
func NewRateLimiter(limit int, window time.Duration) *RateLimiter {
return &RateLimiter{
limit: limit,
window: window,
executions: make([]time.Time, 0),
waitingQueue: make([]Job, 0),
}
}
// Submit adds a job to be executed if within rate limit, otherwise queues it
func (rl *RateLimiter) Submit(job Job) {
rl.mu.Lock()
now := time.Now()
// Clean up old executions
cutoff := now.Add(-rl.window)
newExecutions := make([]time.Time, 0)
for _, t := range rl.executions {
if t.After(cutoff) {
newExecutions = append(newExecutions, t)
}
}
rl.executions = newExecutions
// Check if we can execute now
if len(rl.executions) < rl.limit {
rl.executions = append(rl.executions, now)
rl.mu.Unlock()
go job()
} else {
rl.mu.Unlock()
// Add to waiting queue
rl.queueMu.Lock()
rl.waitingQueue = append(rl.waitingQueue, job)
rl.queueMu.Unlock()
// Start a goroutine to process the queue
go rl.processQueue()
}
}
// processQueue attempts to execute queued jobs when rate limit allows
func (rl *RateLimiter) processQueue() {
rl.mu.Lock()
now := time.Now()
cutoff := now.Add(-rl.window)
// Clean up old executions
newExecutions := make([]time.Time, 0)
for _, t := range rl.executions {
if t.After(cutoff) {
newExecutions = append(newExecutions, t)
}
}
rl.executions = newExecutions
if len(rl.executions) >= rl.limit {
rl.mu.Unlock()
return
}
// Get next job from queue
rl.queueMu.Lock()
if len(rl.waitingQueue) == 0 {
rl.queueMu.Unlock()
rl.mu.Unlock()
return
}
job := rl.waitingQueue[0]
rl.waitingQueue = rl.waitingQueue[1:]
rl.queueMu.Unlock()
// Execute the job
rl.executions = append(rl.executions, now)
rl.mu.Unlock()
go job()
}
// GetCurrentCount returns the number of executions in the current time window
func (rl *RateLimiter) GetCurrentCount() int {
rl.mu.Lock()
defer rl.mu.Unlock()
now := time.Now()
cutoff := now.Add(-rl.window)
// Clean up and count valid executions
newExecutions := make([]time.Time, 0)
for _, t := range rl.executions {
if t.After(cutoff) {
newExecutions = append(newExecutions, t)
}
}
rl.executions = newExecutions
return len(rl.executions)
}
// GetQueueLength returns the current number of jobs waiting in the queue
func (rl *RateLimiter) GetQueueLength() int {
rl.queueMu.Lock()
defer rl.queueMu.Unlock()
return len(rl.waitingQueue)
}
@davyzhang
Copy link
Author

With rate limited services such as ai api, it's pretty annoying to implment this every time.

usage

// Create a rate limiter that allows 5 executions per minute
limiter := util.NewRateLimiter(5,time.Minute)

// Submit jobs
limiter.Submit(func() {
    // Your job code here
})

// Check current execution count
count := limiter.GetCurrentCount()

// Check queue length
queueLen := limiter.GetQueueLength()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment