Skip to content

Instantly share code, notes, and snippets.

@hanishi
Last active May 14, 2026 05:23
Show Gist options
  • Select an option

  • Save hanishi/940c51f5ea51085987c80008fc65fca9 to your computer and use it in GitHub Desktop.

Select an option

Save hanishi/940c51f5ea51085987c80008fc65fca9 to your computer and use it in GitHub Desktop.
package promovolve
import org.apache.pekko.actor.typed.{ActorRef, ActorSystem}
import scala.concurrent.Future
/** Thin wrapper around [[TokenBucketLimiter]] supplying Gemini-specific
* defaults: persistenceId/singletonName, plus env-driven cap/rate
* pulled from `GEMINI_BURST` and `GEMINI_TOKENS_PER_MINUTE`.
*
* Defaults sized for Gemini AI Studio free-tier `gemini-2.5-flash`
* (10 RPM upstream limit). Bucket stays strictly below — small burst
* (5) prevents dumping a minute's worth in 100ms, which still 429s
* even when the per-minute average is fine. Override via env for
* paid/Vertex deploys:
* GEMINI_TOKENS_PER_MINUTE=1000 GEMINI_BURST=50
*
* `Command` and `acquire` are aliased through so existing call sites
* (`ActorRef[GeminiRateLimiter.Command]`, `GeminiRateLimiter.acquire`)
* continue to work unchanged.
*/
object GeminiRateLimiter {
type Command = TokenBucketLimiter.Command
type Permit = TokenBucketLimiter.Permit
val Stop: Command = TokenBucketLimiter.Stop
def settings: TokenBucketLimiter.Settings = TokenBucketLimiter.Settings(
persistenceId = "gemini-rate-limiter",
singletonName = "gemini-rate-limiter",
maxTokens = sys.env.get("GEMINI_BURST").flatMap(_.toIntOption).getOrElse(5),
tokensPerSecond = sys.env
.get("GEMINI_TOKENS_PER_MINUTE")
.flatMap(_.toDoubleOption)
.map(_ / 60.0)
.getOrElse(8.0 / 60.0), // 8 RPM — 20% below the 10 RPM ceiling
)
def singletonInit(system: ActorSystem[?]): ActorRef[Command] =
TokenBucketLimiter.singletonInit(system, settings)
/** Acquire a token; the returned Future fails with
* [[TokenBucketLimiter.LimiterDenied]] if the limiter is shutting
* down. The ask deadline is sourced from `settings.askTimeout` so
* the API surface and the configured limiter agree on a single
* value (no independent default to drift). */
def acquire(limiter: ActorRef[Command])(using system: ActorSystem[?]): Future[Unit] =
TokenBucketLimiter.acquireOrFail(limiter, settings)
/** Lower-level: returns the raw [[Permit]] so callers can branch on
* granted=true/false explicitly. */
def acquirePermit(limiter: ActorRef[Command])(using system: ActorSystem[?]): Future[Permit] =
TokenBucketLimiter.acquire(limiter, settings)
}
package promovolve
import org.apache.pekko.actor.typed.{ActorRef, ActorSystem, Behavior, SupervisorStrategy}
import org.apache.pekko.actor.typed.scaladsl.{AskPattern, Behaviors}
import org.apache.pekko.cluster.typed.{ClusterSingleton, ClusterSingletonSettings, SingletonActor}
import org.apache.pekko.persistence.typed.PersistenceId
import org.apache.pekko.persistence.typed.state.RecoveryCompleted
import org.apache.pekko.persistence.typed.state.scaladsl.{DurableStateBehavior, Effect}
import org.apache.pekko.util.Timeout
import org.slf4j.LoggerFactory
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration.*
/** Cluster-wide token-bucket rate limiter built as a Pekko cluster
* singleton. One actor owns the bucket; every node sends `Acquire`
* to the same logical address; Pekko routes it to whichever node
* holds the singleton at the moment. The actor's single-threaded
* message loop means token math has no races.
*
* See `TokenBucketLimiter.md` for sequence diagrams covering the
* fast path, the queue+Drain path, and the mixed-queue case the
* `drainGranting` helper was written to handle.
*
* Persistence model: durable state is `(tokens, updatedAtMillis)`.
* Refill is computed lazily from elapsed time on every command, so
* the actor doesn't have to wake up on a fixed schedule just to
* advance the bucket. An idle bucket therefore never writes to the
* journal — the first command after a quiet period reconstructs the
* correct count from the persisted timestamp.
*
* The waiter queue is in-memory by design. On failover any pending
* ask future times out and the caller's normal retry path takes
* over. Persisting waiter refs would buy nothing because reply refs
* only complete the Future on the actor system that issued the ask.
*
* All externally observable side effects (replies, queue mutation,
* timer scheduling, logs about what was done) happen inside
* `.thenRun`, so a persist failure cannot leak a reply that the
* recovered state then contradicts.
*
* For Gemini-specific defaults see [[GeminiRateLimiter]] which is a
* thin wrapper supplying the env-var-driven Settings + persistenceId.
*/
object TokenBucketLimiter {
// -- Protocol --
sealed trait Command extends CborSerializable
// Package-private so tests can construct Acquire directly with a
// chosen expiresAtMillis to exercise the deadline-expired branch.
// Application code should use the [[acquire]] / [[acquireOrFail]]
// API rather than building this directly.
private[promovolve] final case class Acquire(
replyTo: ActorRef[Permit],
expiresAtMillis: Long,
) extends Command
// Internal timer: drain as many live waiters as currently-available
// tokens permit. Self-scheduled while the queue is non-empty.
// Package-private so tests can trigger a deterministic Drain pass
// rather than racing the auto-scheduled timer. Application code
// should not send this — the actor schedules it for itself.
private[promovolve] case object Drain extends Command
// One-shot self-message after recovery, only when the loaded count
// exceeds the current `Settings.maxTokens` (cap shrank since last
// save). Persists the clamped value so the new cap is honored
// immediately rather than after the bucket drains.
private final case class ClampToCap(cap: Int) extends Command
case object Stop extends Command
/** Reply for [[acquire]]. */
sealed trait Permit extends CborSerializable
object Permit {
case object Granted extends Permit
case object QueueFull extends Permit
case object Expired extends Permit
case object Stopping extends Permit
}
// -- Persistent State --
//
// tokens advances lazily: `refill(state, now)` recomputes the count
// from elapsed time on demand. Only token-changing transitions
// (grant, drain, clamp) actually persist.
final case class State(
tokens: Double,
updatedAtMillis: Long,
) extends CborSerializable
private final case class Waiter(
replyTo: ActorRef[Permit],
expiresAtMillis: Long,
)
// -- Settings --
/** @param persistenceId Durable-state journal key. Pick something
* stable per logical limiter so the token
* count survives restart.
* @param singletonName Cluster-singleton name. Usually matches
* persistenceId.
* @param maxTokens Burst cap — the most tokens the bucket
* can hold at once.
* @param tokensPerSecond Drip rate. The average sustained throughput.
* @param maxQueueSize Backpressure cap on the in-memory waiter
* queue. New acquires past this get
* [[Permit.QueueFull]].
* @param askTimeout Default ask timeout for [[acquire]] /
* [[acquireOrFail]] when the caller does
* not supply one.
* @param singletonRole Cluster role that hosts the singleton.
* `None` allows any node.
*/
final case class Settings(
persistenceId: String,
singletonName: String,
maxTokens: Int,
tokensPerSecond: Double,
maxQueueSize: Int = 10_000,
askTimeout: FiniteDuration = 30.seconds,
singletonRole: Option[String] = Some("singleton"),
)
// -- Behavior --
def apply(settings: Settings): Behavior[Command] =
Behaviors.setup { ctx =>
require(settings.persistenceId.nonEmpty, "persistenceId must be non-empty")
require(settings.singletonName.nonEmpty, "singletonName must be non-empty")
require(settings.maxTokens > 0, "maxTokens must be > 0")
require(settings.tokensPerSecond > 0.0, "tokensPerSecond must be > 0")
require(settings.maxQueueSize >= 0, "maxQueueSize must be >= 0")
require(settings.askTimeout > Duration.Zero, "askTimeout must be > 0")
val log =
LoggerFactory.getLogger(s"promovolve.TokenBucketLimiter.${settings.persistenceId}")
// Ephemeral waiter queue — never persisted. See class doc.
val waiting: mutable.Queue[Waiter] = mutable.Queue.empty
def nowMillis(): Long = System.currentTimeMillis()
def refill(state: State, now: Long): State =
if (now <= state.updatedAtMillis) state
else {
val elapsedSeconds = (now - state.updatedAtMillis).toDouble / 1000.0
val refilled = state.tokens + elapsedSeconds * settings.tokensPerSecond
state.copy(
tokens = math.min(refilled, settings.maxTokens.toDouble),
updatedAtMillis = now,
)
}
def millisUntilNextToken(state: State, now: Long): Long = {
val refilled = refill(state, now)
if (refilled.tokens >= 1.0) 0L
else {
val missing = 1.0 - refilled.tokens
val millis = math.ceil((missing / settings.tokensPerSecond) * 1000.0).toLong
math.max(1L, millis)
}
}
Behaviors.withTimers { timers =>
def scheduleNextDrain(state: State): Unit =
if (waiting.nonEmpty) {
val now = nowMillis()
val delay = millisUntilNextToken(state, now).millis
// startSingleTimer replaces any prior Drain — only one
// outstanding wake-up is needed at a time. Re-arming with
// the same key is intentional.
timers.startSingleTimer(Drain, delay)
}
// Walk the entire queue, replying to each waiter in FIFO order.
// Expired waiters (deadline ≤ now) get Permit.Expired regardless
// of position. Up to `grantCount` *live* waiters get
// Permit.Granted in queue order; any live waiters past that cap
// are re-enqueued behind the surviving live ones. Returns
// (granted, expired) so the caller can log both — the granted
// count must match its `grantCount` budget (asserted at the
// call site) and the expired count documents how many stale
// entries were swept in this pass.
//
// Called only inside `thenRun` so any persisted token decrement
// is already durable when these replies go out — a caller never
// sees Granted for tokens the limiter hasn't booked.
//
// Earlier versions counted only the expired *prefix* of the
// queue and then granted from the rest of the queue by raw
// size. That treated a live→expired→live queue as having three
// grantable waiters and could send Granted to the middle
// expired one. The full-queue walk below preserves FIFO order
// among live waiters and never grants an expired one.
def drainGranting(now: Long, grantCount: Int): (Int, Int) = {
var granted = 0
var expired = 0
val survivors = mutable.Queue.empty[Waiter]
while (waiting.nonEmpty) {
val w = waiting.dequeue()
if (w.expiresAtMillis <= now) {
w.replyTo ! Permit.Expired
expired += 1
} else if (granted < grantCount) {
w.replyTo ! Permit.Granted
granted += 1
} else {
survivors.enqueue(w)
}
}
waiting.enqueueAll(survivors)
(granted, expired)
}
def commandHandler(state: State, cmd: Command): Effect[State] = cmd match {
case Acquire(replyTo, expiresAtMillis) =>
val now = nowMillis()
val refilled = refill(state, now)
if (expiresAtMillis <= now) {
// Caller's deadline already passed. No durable change.
Effect.none.thenRun(_ => replyTo ! Permit.Expired)
} else if (waiting.isEmpty && refilled.tokens >= 1.0) {
// Fast path: nobody is waiting and tokens are available.
val next = refilled.copy(tokens = refilled.tokens - 1.0)
Effect.persist(next).thenRun(_ => replyTo ! Permit.Granted)
} else if (waiting.size >= settings.maxQueueSize) {
Effect.none.thenRun { _ =>
log.warn(
"Token bucket queue full: size={}, maxQueueSize={}",
waiting.size,
settings.maxQueueSize,
)
replyTo ! Permit.QueueFull
}
} else {
// Fairness: when a queue already exists, every new
// arrival joins the tail rather than stealing a token
// an earlier waiter is entitled to. Drain handles FIFO
// release; if tokens are available now, the scheduled
// delay collapses to 0 and Drain fires immediately.
Effect.none.thenRun { _ =>
waiting.enqueue(Waiter(replyTo, expiresAtMillis))
log.debug(
"Queued request: waiting={}, tokens={}",
waiting.size,
f"${refilled.tokens}%.3f",
)
scheduleNextDrain(refilled)
}
}
case Drain =>
val now = nowMillis()
val refilled = refill(state, now)
// Count *live* waiters across the whole queue, not just the
// prefix — interleaved expired waiters mustn't inflate this.
val liveCount = waiting.count(_.expiresAtMillis > now)
val available = math.floor(refilled.tokens).toInt
val toGrant = math.min(available, liveCount)
if (toGrant > 0) {
val next = refilled.copy(tokens = refilled.tokens - toGrant.toDouble)
Effect.persist(next).thenRun { _ =>
val (granted, expired) = drainGranting(now, toGrant)
// Cheap invariant check: drainGranting computed grants
// against the same `waiting` snapshot we just sized
// `toGrant` from, so the count must match. If it
// doesn't, the queue was mutated between sizing and
// draining, and the persisted decrement is now wrong.
assert(
granted == toGrant,
s"drainGranting granted $granted of intended $toGrant",
)
if (granted > 0 || expired > 0 || waiting.nonEmpty)
log.debug(
"Drained: granted={}, expired={}, still queued={}",
granted,
expired,
waiting.size,
)
scheduleNextDrain(next)
}
} else if (waiting.nonEmpty) {
// Queue has waiters but no token to give. Sweep expired
// ones (no durable change for that — they didn't consume
// tokens) and re-arm Drain only if any live ones remain.
Effect.none.thenRun { _ =>
val (_, expired) = drainGranting(now, 0)
if (expired > 0 || waiting.nonEmpty)
log.debug(
"Drain sweep: expired={}, still queued={}",
expired,
waiting.size,
)
scheduleNextDrain(refilled)
}
} else {
Effect.none
}
case ClampToCap(cap) =>
val now = nowMillis()
val refilled = refill(state, now)
val clampedTokens = math.min(refilled.tokens, cap.toDouble)
if (clampedTokens == state.tokens) {
// No real change — bumping updatedAtMillis alone isn't
// worth a journal write; lazy refill handles it.
Effect.none
} else {
val clamped = refilled.copy(tokens = clampedTokens)
Effect.persist(clamped).thenRun { _ =>
log.info(
"Clamped tokens {} -> {} after cap shrink",
f"${state.tokens}%.3f",
f"$clampedTokens%.3f",
)
}
}
case Stop =>
Effect
.none[State]
.thenRun { _ =>
log.info("Stopping, failing {} queued requests", waiting.size)
waiting.foreach(_.replyTo ! Permit.Stopping)
waiting.clear()
}
.thenStop()
}
val initialNow = nowMillis()
log.info(
"Starting: maxTokens={}, rate={} tokens/s ({} per minute), maxQueueSize={}, askTimeout={}",
settings.maxTokens,
f"${settings.tokensPerSecond}%.3f",
f"${settings.tokensPerSecond * 60}%.0f",
settings.maxQueueSize,
settings.askTimeout,
)
DurableStateBehavior[Command, State](
persistenceId = PersistenceId.ofUniqueId(settings.persistenceId),
// Start full on first ever boot. Subsequent boots load
// whatever was persisted; lazy refill catches up from
// updatedAtMillis on the first command.
emptyState = State(
tokens = settings.maxTokens.toDouble,
updatedAtMillis = initialNow,
),
commandHandler = commandHandler,
).receiveSignal {
case (state, RecoveryCompleted) =>
val now = nowMillis()
val recovered = refill(state, now)
log.info(
"Recovered: {}/{} tokens, updatedAtMillis={}, rate={} tokens/s",
f"${recovered.tokens}%.3f",
settings.maxTokens,
recovered.updatedAtMillis,
f"${settings.tokensPerSecond}%.3f",
)
// Compare against the *raw* persisted count, not the
// post-refill view: refill caps to maxTokens on read, so
// checking `recovered.tokens` would almost never fire.
// We want the journal itself to converge to the new cap
// after a shrink, not just have it look right at runtime.
if (state.tokens > settings.maxTokens.toDouble)
ctx.self ! ClampToCap(settings.maxTokens)
}
}
}
// -- Singleton Init --
def singletonInit(
system: ActorSystem[?],
settings: Settings,
): ActorRef[Command] = {
val baseSettings = ClusterSingletonSettings(system)
val singletonSettings = settings.singletonRole match {
case Some(role) => baseSettings.withRole(role)
case None => baseSettings
}
ClusterSingleton(system).init(
SingletonActor(
Behaviors
.supervise(TokenBucketLimiter(settings))
.onFailure[Exception](SupervisorStrategy.restart),
settings.singletonName,
).withStopMessage(Stop)
.withSettings(singletonSettings)
)
}
// -- Client API --
/** Acquire a token. Completes with [[Permit.Granted]] once the
* caller may proceed, or one of the denied variants when the
* limiter refuses (queue full, deadline passed, shutting down).
* Fails with `AskTimeoutException` after the ask timeout when the
* singleton is unreachable.
*
* `timeout` controls both the ask deadline and the actor-side
* expiry stamped on the request, so a queued caller whose Future
* has already timed out client-side won't get served by Drain.
*/
def acquire(
limiter: ActorRef[Command],
timeout: FiniteDuration,
)(using system: ActorSystem[?]): Future[Permit] = {
given Timeout = Timeout(timeout)
import AskPattern.*
val expiresAtMillis = System.currentTimeMillis() + timeout.toMillis
limiter.ask[Permit](replyTo => Acquire(replyTo, expiresAtMillis))
}
/** Acquire a token using the limiter's configured `askTimeout`.
* Prefer this overload in production — keeping the timeout on the
* Settings record means there's a single source of truth for the
* acquire deadline (instead of an independent default tucked into
* the API surface that drifts from what the limiter was configured
* for). */
def acquire(
limiter: ActorRef[Command],
settings: Settings,
)(using system: ActorSystem[?]): Future[Permit] =
acquire(limiter, settings.askTimeout)
/** Convenience: acquire a token and return a unit Future that
* fails with a [[LimiterDenied]] subtype when the limiter
* declined. Callers that don't need to distinguish denial reasons
* can use this and let their normal failure path handle all four
* outcomes (timeout, queue full, expired, stopping). */
def acquireOrFail(
limiter: ActorRef[Command],
timeout: FiniteDuration,
)(using system: ActorSystem[?]): Future[Unit] = {
given scala.concurrent.ExecutionContext = system.executionContext
acquire(limiter, timeout).flatMap {
case Permit.Granted => Future.successful(())
case Permit.QueueFull => Future.failed(LimiterQueueFull)
case Permit.Expired => Future.failed(LimiterAcquireExpired)
case Permit.Stopping => Future.failed(LimiterStopping)
}
}
/** [[acquireOrFail]] using the limiter's configured `askTimeout`. */
def acquireOrFail(
limiter: ActorRef[Command],
settings: Settings,
)(using system: ActorSystem[?]): Future[Unit] =
acquireOrFail(limiter, settings.askTimeout)
sealed abstract class LimiterDenied(msg: String)
extends RuntimeException(msg)
with scala.util.control.NoStackTrace
case object LimiterQueueFull extends LimiterDenied("rate limiter queue full")
case object LimiterAcquireExpired extends LimiterDenied("rate limiter acquire expired")
case object LimiterStopping extends LimiterDenied("rate limiter stopping")
}
package promovolve
import com.typesafe.config.ConfigFactory
import org.apache.pekko.actor.testkit.typed.scaladsl.ActorTestKit
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.persistence.state.DurableStateStoreRegistry
import org.apache.pekko.persistence.testkit.PersistenceTestKitDurableStateStorePlugin
import org.apache.pekko.persistence.testkit.state.scaladsl.PersistenceTestKitDurableStateStore
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Millis, Span}
import org.scalatest.wordspec.AnyWordSpec
import java.util.UUID
import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.Future
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
class TokenBucketLimiterSpec
extends AnyWordSpec
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with Eventually {
// serialize-messages = on forces every message through serialization
// even in local-only mode, so the test exercises the CBOR codec the
// singleton uses cross-node. allow-java-serialization = off makes a
// Java fallback fail loudly rather than silently mask a missing
// CborSerializable mixin on Command or Permit.
private val testConfig =
PersistenceTestKitDurableStateStorePlugin.config.withFallback(
ConfigFactory.parseString(
"""
|pekko {
| loglevel = "WARNING"
| actor {
| provider = "local"
| allow-java-serialization = off
| warn-about-java-serializer-usage = on
| serialize-messages = on
| serializers {
| jackson-cbor = "org.apache.pekko.serialization.jackson.JacksonCborSerializer"
| }
| serialization-bindings {
| "promovolve.CborSerializable" = jackson-cbor
| }
| }
|}
|""".stripMargin
)
)
private val testKit: ActorTestKit =
ActorTestKit("TokenBucketLimiterSpec", testConfig)
given ActorSystem[?] = testKit.system
given scala.concurrent.ExecutionContext = testKit.system.executionContext
override def afterAll(): Unit = testKit.shutdownTestKit()
override implicit def patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(8000, Millis), interval = Span(20, Millis))
import TokenBucketLimiter.{Acquire, Drain, Permit, Stop}
private def settings(
id: String,
maxTokens: Int,
tokensPerSecond: Double,
maxQueueSize: Int = 10_000,
): TokenBucketLimiter.Settings =
TokenBucketLimiter.Settings(
persistenceId = id,
singletonName = id,
maxTokens = maxTokens,
tokensPerSecond = tokensPerSecond,
maxQueueSize = maxQueueSize,
)
private def uniqueId(prefix: String): String =
s"$prefix-${UUID.randomUUID().toString.take(8)}"
"TokenBucketLimiter" should {
"grant immediately when tokens are available" in {
val id = uniqueId("grant")
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 3, tokensPerSecond = 1.0))
)
TokenBucketLimiter.acquire(limiter, 2.seconds).futureValue shouldBe Permit.Granted
}
"serve queued waiters in FIFO order when the bucket starves" in {
val id = uniqueId("fifo")
// 10 tok/s → one drain every ~100ms. maxTokens=1 ensures everyone
// past the first acquire is queued.
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 1, tokensPerSecond = 10.0))
)
// Drain the starting token so the next three all queue.
TokenBucketLimiter.acquire(limiter, 2.seconds).futureValue shouldBe Permit.Granted
val completionOrder = new ConcurrentLinkedQueue[Int]()
def acquireTagged(tag: Int): Future[Permit] =
TokenBucketLimiter.acquire(limiter, 5.seconds).map { p =>
completionOrder.add(tag); p
}
val all = Future.sequence(Seq(acquireTagged(1), acquireTagged(2), acquireTagged(3)))
all.futureValue.foreach(_ shouldBe Permit.Granted)
completionOrder.asScala.toSeq shouldBe Seq(1, 2, 3)
}
"reject acquires past maxQueueSize with QueueFull" in {
val id = uniqueId("full")
// Very slow refill so the queue does not drain during the test.
val limiter = testKit.spawn(
TokenBucketLimiter(
settings(id, maxTokens = 1, tokensPerSecond = 0.01, maxQueueSize = 2)
)
)
TokenBucketLimiter.acquire(limiter, 30.seconds).futureValue shouldBe Permit.Granted
val w1 = TokenBucketLimiter.acquire(limiter, 30.seconds)
val w2 = TokenBucketLimiter.acquire(limiter, 30.seconds)
// Give w1, w2 time to actually land in the queue before we send w3.
eventually {
w1.isCompleted shouldBe false
w2.isCompleted shouldBe false
}
Thread.sleep(100)
TokenBucketLimiter.acquire(limiter, 30.seconds).futureValue shouldBe Permit.QueueFull
}
"reply Expired when the request's deadline has already passed" in {
val id = uniqueId("expired")
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 3, tokensPerSecond = 1.0))
)
val probe = testKit.createTestProbe[Permit]()
// Stale deadline — actor must reply Expired and not consume a token.
limiter ! Acquire(probe.ref, expiresAtMillis = System.currentTimeMillis() - 1000)
probe.expectMessage(Permit.Expired)
// Bucket should still have its 3 tokens — drain them to verify.
val probe2 = testKit.createTestProbe[Permit]()
val future = System.currentTimeMillis() + 10_000
limiter ! Acquire(probe2.ref, expiresAtMillis = future)
probe2.expectMessage(Permit.Granted)
limiter ! Acquire(probe2.ref, expiresAtMillis = future)
probe2.expectMessage(Permit.Granted)
limiter ! Acquire(probe2.ref, expiresAtMillis = future)
probe2.expectMessage(Permit.Granted)
}
"reply Stopping to queued waiters when the actor stops" in {
val id = uniqueId("stopping")
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 1, tokensPerSecond = 0.01))
)
// Drain the starting token.
TokenBucketLimiter.acquire(limiter, 30.seconds).futureValue shouldBe Permit.Granted
val queued = TokenBucketLimiter.acquire(limiter, 30.seconds)
// Let the Acquire actually land in the waiter queue before stopping.
Thread.sleep(100)
queued.isCompleted shouldBe false
limiter ! Stop
queued.futureValue shouldBe Permit.Stopping
}
"Drain sweeps an expired waiter in the middle of the queue without granting it" in {
// The test owns Drain triggering explicitly via `limiter ! Drain`
// — no dependency on the auto-scheduled timer or on a specific
// refill latency. tokensPerSecond is set so low that, even on a
// slow CI box, the bucket cannot meaningfully refill during the
// test window; the only token in play is the starting one,
// which a warmup Acquire consumes upfront. After that, every
// subsequent Acquire queues.
//
// Pre-fix bug: queueSnapshot only counted the expired *prefix*
// (zero here, since live1 is first). liveCount was reported as
// 3 and the prefix-drain dequeued all three in order — sending
// Permit.Granted to the middle expired waiter. The new
// drainGranting walks the entire queue, classifies each waiter
// by deadline regardless of position, and sweeps the middle as
// Permit.Expired while leaving live waiters untouched (here,
// because no tokens are available, they stay queued).
val id = uniqueId("middle-expired-sweep")
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 1, tokensPerSecond = 0.001))
)
val warmup = testKit.createTestProbe[Permit]("warmup")
val far = System.currentTimeMillis() + 60_000
limiter ! Acquire(warmup.ref, expiresAtMillis = far)
warmup.expectMessage(Permit.Granted)
val baseNow = System.currentTimeMillis()
val live1 = testKit.createTestProbe[Permit]("live1")
val expiring = testKit.createTestProbe[Permit]("expiring")
val live3 = testKit.createTestProbe[Permit]("live3")
// Generous live deadlines, short-but-not-tiny expiring deadline.
// 300ms gives plenty of room above scheduler jitter on CI.
limiter ! Acquire(live1.ref, expiresAtMillis = baseNow + 30_000)
limiter ! Acquire(expiring.ref, expiresAtMillis = baseNow + 300)
limiter ! Acquire(live3.ref, expiresAtMillis = baseNow + 30_000)
// Wait until the middle waiter's deadline is comfortably in the
// past. The sleep gap (700ms) is much larger than the deadline
// (300ms) so a slow CI box still sees expiring as expired.
Thread.sleep(700)
limiter ! Drain
// The bucket has effectively zero tokens (rate is 0.001/s), so
// Drain takes the no-tokens branch and only sweeps expired
// waiters. expiring is replied Permit.Expired; the live waiters
// stay queued. The pre-fix bug would have miscounted liveCount
// as 3 and either granted the prefix in order (sending Granted
// to expiring) or otherwise over-consumed.
expiring.expectMessage(2.seconds, Permit.Expired)
live1.expectNoMessage(200.millis)
live3.expectNoMessage(100.millis)
}
"Drain grants live waiters and skips an interleaved expired one even with tokens available" in {
// Sister of the sweep test above: the bucket eventually has
// enough tokens to grant the live waiters. The test's correctness
// doesn't depend on a specific refill latency — it depends only
// on (a) the queue forming in the order [live1, expiring, live3]
// and (b) enough time passing that, when Drain fires, expiring's
// deadline is in the past and at least one token has refilled.
//
// To get the queue formed in exactly that order, the bucket must
// hold zero tokens at the moment each Acquire arrives — otherwise
// the fast-path branch consumes a token and the Acquire never
// queues. Slow refill rate + warmup matched to maxTokens gives
// us that: by the time the 3 warmups complete, the bucket is at
// ~0 tokens and the rate is too slow for any to refill in the
// microseconds before the queueing Acquires arrive.
//
// After queue formation we wait long enough for two things to
// be true: expiring's deadline has comfortably passed, and the
// bucket has refilled enough to grant the live waiters across
// one or more Drain cycles (auto-scheduled or explicit). The
// explicit `limiter ! Drain` afterwards is a nudge — if the
// auto-scheduled Drain has already fired, this one is a no-op
// on grants and a cheap sweep on any new expired entries.
//
// Load-bearing invariant: expiring NEVER receives Permit.Granted,
// only Permit.Expired. The pre-fix bug would have prefix-drained
// it as Granted when liveCount was over-counted.
val id = uniqueId("middle-expired-grants")
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 3, tokensPerSecond = 1.0))
)
// Drain the starting 3 tokens so the next Acquires queue.
val warmup = testKit.createTestProbe[Permit]("warmup")
val far = System.currentTimeMillis() + 60_000
(1 to 3).foreach(_ => limiter ! Acquire(warmup.ref, expiresAtMillis = far))
(1 to 3).foreach(_ => warmup.expectMessage(Permit.Granted))
val baseNow = System.currentTimeMillis()
val live1 = testKit.createTestProbe[Permit]("live1")
val expiring = testKit.createTestProbe[Permit]("expiring")
val live3 = testKit.createTestProbe[Permit]("live3")
// expiring's deadline (+300ms) is much shorter than the live
// deadlines (+30s) and well below the per-token refill interval
// (1s), so expiring is guaranteed to be dead by the time any
// token-granting Drain pass runs. live1 and live3 stay alive
// well past the end of the test.
limiter ! Acquire(live1.ref, expiresAtMillis = baseNow + 30_000)
limiter ! Acquire(expiring.ref, expiresAtMillis = baseNow + 300)
limiter ! Acquire(live3.ref, expiresAtMillis = baseNow + 30_000)
// Wait until expiring is definitely expired before issuing the
// explicit Drain. 600ms is 2× the expiring deadline + headroom
// for CI scheduling jitter; well below the first auto-Drain
// firing time (~1000ms at rate=1/s), so the auto-Drain timer
// hasn't yet pre-empted us.
Thread.sleep(600)
limiter ! Drain
// Eventually, after enough time for one token to refill, Drain
// grants live1 and expires expiring. After another token, live3
// gets granted. The TestProbe.expectMessage windows (3 seconds)
// are well above the natural per-token interval (1 second), so
// they tolerate ordinary CI jitter without baking in a specific
// refill cycle expectation.
expiring.expectMessage(3.seconds, Permit.Expired)
live1.expectMessage(3.seconds, Permit.Granted)
live3.expectMessage(3.seconds, Permit.Granted)
// Belt-and-suspenders: expiring is not granted by a subsequent
// Drain pass either.
expiring.expectNoMessage(200.millis)
}
"when Granted is delivered, the persisted token state already reflects the decrement" in {
// Pekko's DurableState test store doesn't expose a failNext-style
// hook in 1.4.0, so we can't directly assert "persist failed →
// no Granted." The next-best test for the same invariant is the
// positive direction: by the time the caller observes Granted,
// the durable state has already been written. If TokenBucketLimiter
// ever regressed to sending Granted before Effect.persist, this
// would race — sometimes the assertion would see the pre-decrement
// value and fail. The current code sends Granted from inside
// `.thenRun`, so the assertion is deterministic.
val id = uniqueId("grant-after-persist")
val limiter = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 5, tokensPerSecond = 1.0))
)
val store = DurableStateStoreRegistry(testKit.system)
.durableStateStoreFor[PersistenceTestKitDurableStateStore[TokenBucketLimiter.State]](
PersistenceTestKitDurableStateStore.Identifier
)
// Synchronous: acquire and block until the reply lands.
TokenBucketLimiter.acquire(limiter, 5.seconds).futureValue shouldBe Permit.Granted
// Right after Granted, the journal must show tokens decremented.
val record = store.getObject(id).futureValue
record.value should not be empty
// Starting tokens = 5, one consumed, so persisted count should
// be ≤ 4 (lazy refill may have added back a fraction of a second
// since persist, but never more than one full token).
record.value.get.tokens should be < 5.0
record.value.get.tokens should be > 3.5
}
"clamp tokens on recovery when maxTokens has shrunk across restart" in {
val id = uniqueId("clamp")
// Lifecycle 1: maxTokens=10. Acquire once to force a journal write
// (Permit.Granted only arrives after persist completes).
val limiter1 = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 10, tokensPerSecond = 1.0)),
s"$id-1",
)
TokenBucketLimiter.acquire(limiter1, 5.seconds).futureValue shouldBe Permit.Granted
testKit.stop(limiter1)
Thread.sleep(100)
// Lifecycle 2: same persistenceId, smaller cap. Recovery loads
// tokens=9; the post-recovery ClampToCap should bring the
// journaled count down to 5 before any acquire fires.
val limiter2 = testKit.spawn(
TokenBucketLimiter(settings(id, maxTokens = 5, tokensPerSecond = 1.0)),
s"$id-2",
)
// Give the recovery-completion ClampToCap self-message time to
// land before we start sending Acquires. Without this, a stashed
// Acquire can be unstashed ahead of the self-sent ClampToCap and
// grant from the pre-clamp pool.
Thread.sleep(200)
val probe = testKit.createTestProbe[Permit]()
def farFuture() = System.currentTimeMillis() + 10_000
// Drain 5 — all should be Granted from the clamped pool.
(1 to 5).foreach { _ =>
limiter2 ! Acquire(probe.ref, expiresAtMillis = farFuture())
probe.expectMessage(Permit.Granted)
}
// The 6th must NOT be granted within the deadline. If the cap
// hadn't applied, the bucket would still have ~4 tokens left
// from the persisted-9 count and would grant this one too.
val nearDeadline = System.currentTimeMillis() + 100
limiter2 ! Acquire(probe.ref, expiresAtMillis = nearDeadline)
// Acquire queues (no tokens), waiter expires at +100ms, Drain
// fires at ~+1s (rate=1/s) and replies Expired to the head.
probe.expectMessage(3.seconds, Permit.Expired)
}
}
}
@hanishi
Copy link
Copy Markdown
Author

hanishi commented May 14, 2026

TokenBucketLimiter

Visual reference for the actor's request/response flow, the lazy-refill +
FIFO queue + auto-Drain interplay, and why drainGranting walks the
whole queue rather than the prefix..

1. Fast path — tokens available, queue empty

sequenceDiagram
    autonumber
    participant C as Caller
    participant L as TokenBucketLimiter
    participant S as DurableStateStore

    C->>+L: Acquire(replyTo, expiresAtMillis)
    Note over L: refill(state, now)<br/>tokens ≥ 1 ∧ queue empty
    L->>S: Effect.persist(State(tokens - 1, now))
    S-->>L: persisted
    L-->>-C: Permit.Granted
    Note over L,C: Granted is sent from inside<br/>.thenRun(...) — the caller never sees<br/>a permit the limiter hasn't durably booked.
Loading

2. Queue path — bucket starves, caller waits for a Drain cycle

sequenceDiagram
    autonumber
    participant C as Caller
    participant L as TokenBucketLimiter
    participant T as Pekko Timer
    participant S as DurableStateStore

    C->>L: Acquire(replyTo, expiresAtMillis)
    Note over L: refill(state, now)<br/>tokens < 1 ∨ queue non-empty<br/>→ enqueue (FIFO)
    L->>T: startSingleTimer(Drain, millisUntilNextToken)
    Note over T: ... delay elapses ...
    T-->>L: Drain

    Note over L: refill(state, now)<br/>liveCount = waiting.count(_.deadline > now)<br/>available = floor(tokens)<br/>toGrant = min(available, liveCount)

    alt toGrant > 0
        L->>S: Effect.persist(State(tokens - toGrant, now))
        S-->>L: persisted
        Note over L: drainGranting(now, toGrant)<br/>walks queue, replies to each waiter
        L-->>C: Permit.Granted
        L->>T: scheduleNextDrain (if live waiters remain)
    else queue non-empty but no tokens
        Note over L: drainGranting(now, 0)<br/>sweeps expired waiters, no persist
        L-->>C: Permit.Expired (only if this caller expired)
        L->>T: scheduleNextDrain
    end
Loading

3. Mixed queue — expired waiter sitting in the middle

This is the case the original code got wrong. Earlier queueSnapshot
counted only the expired prefix; with [live1, expired2, live3]
that returned (0 expired, 3 live), and the prefix-drain dequeued
all three in order — granting expired2 even though its deadline had
passed. drainGranting walks every entry and classifies each waiter
by its own deadline.

sequenceDiagram
    autonumber
    participant L1 as live1 (caller A)
    participant X as expired2 (caller B, short deadline)
    participant L3 as live3 (caller C)
    participant L as TokenBucketLimiter
    participant T as Pekko Timer
    participant S as DurableStateStore

    L1->>L: Acquire(deadline = now + 30s)
    Note over L: enqueue → queue = [L1]
    X->>L: Acquire(deadline = now + 300ms)
    Note over L: enqueue → queue = [L1, X]
    L3->>L: Acquire(deadline = now + 30s)
    Note over L: enqueue → queue = [L1, X, L3]
    L->>T: startSingleTimer(Drain, ...)

    Note over X: ... 300ms elapses,<br/>X's deadline passes ...
    T-->>L: Drain (or explicit limiter ! Drain)

    Note over L: refill, liveCount = 2 (L1, L3)<br/>available ≥ 2 → toGrant = 2
    L->>S: Effect.persist(State(tokens - 2, now))
    S-->>L: persisted

    Note over L: drainGranting walks the queue:
    L-->>L1: Permit.Granted
    Note over L: dequeue X — deadline ≤ now
    L-->>X: Permit.Expired
    L-->>L3: Permit.Granted
    Note over L: queue empty → no reschedule
Loading

The same shape, drawn pre-fix, would have ended with Permit.Granted
being sent to X and the bucket persisting tokens - 3 even though
only two live waiters were honored — and the third granted was an
already-expired caller whose ask had already timed out client-side.

4. Drain decision flow

flowchart TD
    A([Drain fires]) --> B[refill state, now]
    B --> C{liveCount + queue == 0?}
    C -->|yes| Z([Effect.none])
    C -->|no| D[liveCount = waiting.count<br/>_.deadline &gt; now]
    D --> E[available = floor tokens]
    E --> F[toGrant = min available, liveCount]
    F --> G{toGrant &gt; 0?}
    G -->|yes| H[Effect.persist tokens -= toGrant]
    H --> I[drainGranting now, toGrant<br/>→ granted, expired]
    I --> J[log & scheduleNextDrain]
    G -->|no| M{queue non-empty?}
    M -->|yes| N[drainGranting now, 0<br/>→ _, expired]
    N --> O[log & scheduleNextDrain]
    M -->|no| Z
Loading

Invariants

  1. No Granted before persistence. Every Granted reply is sent
    inside .thenRun(...), after Effect.persist(next) resolves. A
    persist failure can never leak a permit the journal hasn't booked.
  2. FIFO among live waiters. drainGranting walks the queue in
    insertion order; the first N live waiters get the N available
    tokens, regardless of intervening expired entries.
  3. Expired never Granted. A waiter whose expiresAtMillis ≤ now
    always gets Permit.Expired, never Permit.Granted, regardless
    of position in the queue.
  4. Lazy refill. Tokens are recomputed from elapsed time on every
    command; the actor doesn't tick on a fixed schedule just to advance
    the bucket. Only token-changing transitions persist.
  5. Ephemeral waiter queue. The queue lives in actor memory only.
    On singleton failover any pending ask future times out and the
    caller's normal retry path takes over — persisting the queue would
    buy nothing because reply refs only complete on the actor system
    that issued the ask.
  6. startSingleTimer replaces. Re-arming Drain with the same key
    is intentional — only one outstanding wake-up is needed at a time.
  7. ClampToCap is defensive. Recovery already clamps tokens to
    the current maxTokens on read via refill; the self-message
    exists to make the journaled count converge after a cap shrink,
    not just the runtime view.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment