2026-03-24T13:35:44Z by Showboat dev
The DistSender circuit breaker prevents CockroachDB's DistSender from getting
stuck on non-functional replicas. The DistSender normally relies on receiving a
NotLeaseHolderError (NLHE) from a replica to redirect to other replicas. If
the replica is stalled (disk stall, mutex deadlock) or partitioned away from the
leader (clients time out before the NLHE arrives), the DistSender keeps
retrying the same broken replica indefinitely.
The circuit breaker detects these situations by tracking per-replica error streaks and stalls, probes the replica with a cheap LeaseInfo request, and trips the breaker if the probe fails. Once tripped, new requests are immediately rejected (skipping to the next replica) and in-flight requests can be cancelled.
The breaker has three modes, controlled by kv.dist_sender.circuit_breakers.mode:
| Mode | Value | Description |
|---|---|---|
no ranges |
0 | Circuit breakers never trip |
liveness range only |
1 | Default. Only trip breakers for replicas in the node liveness range |
all ranges |
2 | Trip breakers for any failing/stalled replica |
The liveness-only default exists because every node must write to the liveness range to heartbeat leases. A stalled liveness replica blocks the entire cluster. Enabling for all ranges risks goroutine spikes from probe goroutines when many replicas trip simultaneously.
Mode enum and setting definition
sed -n "30,61p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go// DistSenderCircuitBreakersMode controls if and to what level we trip circuit
// breakers for replicas in the DistSender when they are failed or stalled.
type DistSenderCircuitBreakersMode int64
const (
// DistSenderCircuitBreakersNoRanges indicates we should never trip circuit
// breakers.
DistSenderCircuitBreakersNoRanges DistSenderCircuitBreakersMode = iota
// DistSenderCircuitBreakersLivenessRangeOnly indicates we should only trip
// circuit breakers if a replica belonging to node liveness experiences a
// failure or stall.
//
// Typically, a replica belonging to node liveness has a high potential to
// disrupt a cluster. All nodes need to write to the liveness range -- either
// to heartbeat their liveness record to keep their leases, or to increment
// another node's epoch before acquiring theirs. As such, a stalled or failed
// replica belonging to the liveness range that these requests get stuck on is
// no good -- nothing in the cluster will make progress.
DistSenderCircuitBreakersLivenessRangeOnly
// DistSenderCircuitBreakersAllRanges indicates that we should trip circuit
// breakers for any replica that experiences a failure or a stall, regardless
// of the range it belongs to.
//
// Tripping a circuit breaker involves launching a probe to eventually un-trip
// the thing. This per-replica probe launches a goroutine. As such, a node
// level issue that causes a large number of circuit breakers to be tripped on
// a client has the potential of causing a big goroutine spike on the client.
// We currently don't have good scalability numbers to measure the impact of
// this -- once we do, we can revaluate the default mode we run in if the
// numbers look good. Else we may want to make these things more scalable.
DistSenderCircuitBreakersAllRanges
)
One per DistSender. Manages all per-replica breakers, runs background loops for stall detection and garbage collection.
sed -n "232,239p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gotype DistSenderCircuitBreakers struct {
ambientCtx log.AmbientContext
stopper *stop.Stopper
settings *cluster.Settings
transportFactory TransportFactory
metrics DistSenderMetrics
replicas syncutil.Map[cbKey, ReplicaCircuitBreaker]
}
The replicas field is a concurrent map keyed by (rangeID, replicaID).
Breakers are created on-demand and garbage collected when idle.
Each replica gets its own breaker tracking in-flight requests, error streaks, and stall timestamps. The key fields:
sed -n "426,468p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gotype ReplicaCircuitBreaker struct {
d *DistSenderCircuitBreakers
rangeID roachpb.RangeID
startKey roachpb.Key
desc roachpb.ReplicaDescriptor
breaker *circuit.Breaker
// inflightReqs tracks the number of in-flight requests.
inflightReqs atomic.Int32
// lastRequest contains the last request timestamp, for garbage collection.
lastRequest crtime.AtomicMono
// errorSince is the timestamp when the current streak of errors began. Set on
// an initial error, and cleared on successful responses.
errorSince crtime.AtomicMono
// stallSince is the timestamp when the current potential stall began. It is
// set on every first in-flight request (inflightReqs==1) and moved forward on
// every response from the replica (even errors).
//
// It is not reset to zero when inflightReqs==0, to avoid synchronization with
// inflightReqs. To determine whether a replica is stalled, it is therefore
// also necessary to check inflightReqs>0.
stallSince crtime.AtomicMono
// closedC is closed when the circuit breaker has been GCed. This will shut
// down a running probe, and prevent new probes from launching.
closedC chan struct{}
mu struct {
syncutil.Mutex
// cancelFns contains context cancellation functions for all in-flight
// requests, segmented by request type. Reads can be retried by the
// DistSender, so we cancel these immediately when the breaker trips.
// Writes can't automatically retry, and will return ambiguous result errors
// to clients, so we only cancel them after a grace period.
//
// Only tracked if cancellation is enabled.
cancelFns [cbNumRequestKinds]map[*kvpb.BatchRequest]context.CancelCauseFunc
}
}
Key observations:
inflightReqs,errorSince,stallSince,lastRequestare all atomics for lock-free fast-path access.cancelFnsis segmented into two buckets: reads (cancel immediately) and writes (cancel after grace period). Protected by a mutex.closedCsignals GC to running probes.
The circuit breaker hooks into the DistSender's per-replica send loop. Here is
the integration point in sendToReplicas:
sed -n "2837,2868p" pkg/kv/kvclient/kvcoord/dist_sender.go tBegin := crtime.NowMono() // for slow log message
sendCtx, cbToken, cbErr := ds.circuitBreakers.ForReplica(desc, &curReplica).Track(
ctx, ba, nonIdempotent, tBegin,
)
if cbErr != nil {
// Circuit breaker is tripped. err will be handled below.
err = cbErr
transport.SkipReplica()
} else {
br, err = transport.SendNext(sendCtx, requestToSend)
tEnd := crtime.NowMono()
if cancelErr := cbToken.Done(br, err, tEnd); cancelErr != nil {
// The request was cancelled by the circuit breaker tripping. If this is
// detected by request evaluation (as opposed to the transport send), it
// will return the context error in br.Error instead of err, which won't
// be retried below. Instead, record it as a send error in err and retry
// when possible. This commonly happens when the replica is local.
br, err = nil, cancelErr
}
if dur := tEnd.Sub(tBegin); dur > slowDistSenderReplicaThreshold {
var s redact.StringBuilder
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
log.KvExec.Warningf(ctx, "slow replica RPC: %v", &s)
} else {
log.Eventf(ctx, "slow replica RPC: %v", &s)
}
}
}
The flow is:
ForReplica()— looks up or creates the per-replica breaker. Returnsnilif breakers are disabled for this range.Track()— checks if breaker is tripped (reject immediately), otherwise registers the in-flight request and optionally wraps the context for cancellation.SendNext()— the actual RPC to the replica, using the potentially wrapped context.Done()— untracks the request, classifies the response, and may trigger a probe. Returns a cancellation error if the breaker tripped mid-flight.
sed -n "385,419p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (d *DistSenderCircuitBreakers) ForReplica(
rangeDesc *roachpb.RangeDescriptor, replDesc *roachpb.ReplicaDescriptor,
) *ReplicaCircuitBreaker {
// If circuit breakers are disabled, return a nil breaker.
if d.Mode() == DistSenderCircuitBreakersNoRanges {
return nil
}
// If circuit breakers are only enabled for the liveness range, don't check
// circuit breakers. If the mode changes for a tripped circuit breaker, the
// range will eventually idle and be GC'ed since we will stop tracking new
// requests to the range.
// NB: We may allow the liveness range to split in the future so the overlap
// check is safer.
if d.Mode() == DistSenderCircuitBreakersLivenessRangeOnly {
if !keys.NodeLivenessSpan.Overlaps(rangeDesc.KeySpan().AsRawSpanWithNoLocals()) {
return nil
}
}
key := cbKey{rangeID: rangeDesc.RangeID, replicaID: replDesc.ReplicaID}
// Fast path: use existing circuit breaker.
if cb, ok := d.replicas.Load(key); ok {
return cb
}
// Slow path: construct a new replica circuit breaker and insert it. If we
// race with a concurrent insert, return it instead.
cb, loaded := d.replicas.LoadOrStore(key, newReplicaCircuitBreaker(d, rangeDesc, replDesc))
if !loaded {
d.metrics.CircuitBreaker.Replicas.Inc(1)
}
return cb
}
Returns nil (no-op for all subsequent calls) when disabled or when in
liveness-only mode for a non-liveness range. The fast path is a simple
syncutil.Map.Load; the slow path atomically inserts a new breaker.
sed -n "599,661p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (r *ReplicaCircuitBreaker) Track(
ctx context.Context, ba *kvpb.BatchRequest, withCommit bool, now crtime.Mono,
) (context.Context, replicaCircuitBreakerToken, error) {
if r == nil {
return ctx, replicaCircuitBreakerToken{}, nil // circuit breakers disabled
}
// Record the request timestamp.
r.lastRequest.Store(now)
// Check if the breaker is tripped. If it is, this will also launch a probe if
// one isn't already running.
if err := r.Err(); err != nil {
log.VErrEventf(ctx, 2, "request rejected by tripped circuit breaker for %s: %s", r.id(), err)
r.d.metrics.CircuitBreaker.ReplicasRequestsRejected.Inc(1)
return nil, replicaCircuitBreakerToken{}, errors.Wrapf(err,
"%s is unavailable (circuit breaker tripped)", r.id())
}
// Set up the request token.
token := replicaCircuitBreakerToken{
r: r,
ctx: ctx,
ba: ba,
withCommit: withCommit,
}
// Record in-flight requests. If this is the only request, tentatively start
// tracking a stall.
if inflightReqs := r.inflightReqs.Add(1); inflightReqs == 1 {
r.stallSince.Store(now)
} else if inflightReqs < 0 {
log.Dev.Fatalf(ctx, "inflightReqs %d < 0", inflightReqs) // overflow
}
// If enabled, create a send context that can be used to cancel in-flight
// requests if the breaker trips.
//
// TODO(erikgrinaker): we should try to make the map lock-free. WithCancel()
// also allocates. Ideally, it should be possible to propagate cancellation of
// a single replica-scoped context onto all request contexts, but this
// requires messing with Go internals.
if CircuitBreakerCancellation.Get(&r.d.settings.SV) {
// If the request already has a timeout that is below the probe threshold
// and probe timeout, there is no point in us cancelling it (only relevant
// with replica stalls). This is the common case when using statement
// timeouts, and avoids the overhead.
if deadline, hasTimeout := ctx.Deadline(); !hasTimeout ||
crtime.MonoFromTime(deadline).Sub(now) >
CircuitBreakerProbeThreshold.Get(&r.d.settings.SV)+CircuitBreakerProbeTimeout.Get(&r.d.settings.SV) {
sendCtx, cancel := context.WithCancelCause(ctx)
token.cancelCtx = sendCtx
reqKind := cbRequestCancellationPolicyFromBatch(ba, withCommit)
r.mu.Lock()
r.mu.cancelFns[reqKind][ba] = cancel
r.mu.Unlock()
return sendCtx, token, nil
}
}
return ctx, token, nil
}
Key details in Track():
- Tripped check: If breaker is tripped, rejects immediately and also
launches a probe (via
Err()→Signal().Err()→maybeTriggerProbe). - Stall tracking: When
inflightReqsgoes from 0→1, recordsstallSince. - Cancellation optimization: Skips context wrapping if the request already has a shorter timeout than the probe threshold + probe timeout. This is the common case with statement timeouts.
- Cancel function storage: Stores cancel functions keyed by
*BatchRequestpointer, segmented by cancellation policy (immediate for reads, grace period for writes).
After the RPC completes, Done() classifies the response and potentially
triggers a probe.
sed -n "668,782p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (r *ReplicaCircuitBreaker) done(
ctx context.Context,
cancelCtx context.Context,
ba *kvpb.BatchRequest,
withCommit bool,
br *kvpb.BatchResponse,
sendErr error,
now crtime.Mono,
) error {
if r == nil {
return nil // circuit breakers disabled when we began tracking the request
}
// Untrack the request.
if inflightReqs := r.inflightReqs.Add(-1); inflightReqs < 0 {
log.Dev.Fatalf(ctx, "inflightReqs %d < 0", inflightReqs)
}
// Detect if the circuit breaker cancelled the request, and prepare a
// cancellation error to return to the caller.
var cancelErr error
if cancelCtx != nil {
if sendErr != nil || br.Error != nil {
if cancelErr = cancelCtx.Err(); cancelErr != nil && ctx.Err() == nil { // check ctx last
log.VErrEventf(ctx, 2,
"request cancelled by tripped circuit breaker for %s: %s", r.id(), cancelErr)
cancelErr = errors.Wrapf(cancelErr, "%s is unavailable (circuit breaker tripped)", r.id())
}
}
// Clean up the cancel function.
reqKind := cbRequestCancellationPolicyFromBatch(ba, withCommit)
r.mu.Lock()
cancel := r.mu.cancelFns[reqKind][ba]
delete(r.mu.cancelFns[reqKind], ba) // nolint:deferunlockcheck
r.mu.Unlock()
if cancel != nil {
cancel(nil)
}
}
// If this was a local send error, i.e. sendErr != nil, we rely on RPC circuit
// breakers to fail fast. There is no need for us to launch a probe as well.
// This includes the case where either the remote or local node has been
// decommissioned.
//
// However, if the sender's context is cancelled, pessimistically assume this
// is a timeout and fall through to the error handling below to potentially
// launch a probe. Even though this may simply be the client going away, we
// can't know if this was because of a client timeout or not, so we assume
// there may be a problem with the replica. We will typically see recent
// successful responses too if that isn't the case.
if sendErr != nil && ctx.Err() == nil {
return cancelErr
}
// If we got a response from the replica (even a br.Error), it isn't stalled.
// Bump the stall timestamp to the current response timestamp, in case a
// concurrent request has stalled.
//
// NB: we don't reset this to 0 when inflightReqs==0 to avoid unnecessary
// synchronization.
if sendErr == nil {
r.stallSince.Store(now)
}
// Record error responses, by setting err non-nil. Otherwise, the response is
// recorded as a success.
err := sendErr
if sendErr == nil && br.Error != nil {
switch tErr := br.Error.GetDetail().(type) {
case *kvpb.NotLeaseHolderError:
// Consider NLHE a success if it contains a lease record, as the replica
// appears functional. If there is no lease record, the replica was unable
// to acquire a lease and has no idea who the leaseholder might be, likely
// because it is disconnected from the leader or there is no quorum.
if tErr.Lease == nil || *tErr.Lease == (roachpb.Lease{}) {
err = tErr
}
case *kvpb.RangeNotFoundError, *kvpb.RangeKeyMismatchError, *kvpb.StoreNotFoundError:
// If the replica no longer exists, we don't need to probe. The DistSender
// will stop using the replica soon enough.
case *kvpb.ReplicaUnavailableError:
// If the replica's circuit breaker is tripped, defer to it. No need for
// us to also probe.
default:
// Record all other errors.
//
// NB: this pessimistically assumes that any other error response may
// indicate a replica problem. That's generally not true for most errors.
// However, we will generally also see successful responses. If we only
// see errors, it seems worthwhile to probe the replica and check, rather
// than explicitly listing error types and possibly missing some. In the
// worst case, this means launcing a goroutine and sending a cheap probe
// every few seconds for each failing replica (which could be bad enough
// across a large number of replicas).
err = br.Error.GoError()
}
}
if err == nil {
// On success, reset the error tracking.
r.errorSince.Store(0)
} else if errorDuration := r.errorDuration(now); errorDuration == 0 {
// If this is the first error we've seen, record it. We'll launch a probe on
// a later error if necessary.
r.errorSince.Store(now)
} else if errorDuration >= CircuitBreakerProbeThreshold.Get(&r.d.settings.SV) {
// The replica has been failing for the past probe threshold, probe it.
r.breaker.Probe()
}
// Return the client cancellation error (if any).
return cancelErr
}
| Response Type | Treated As | Reasoning |
|---|---|---|
| NLHE with lease | Success | Replica is functional, knows the leaseholder |
| NLHE without lease | Error | Replica can't find quorum/lease |
RangeNotFoundError |
Success | Replica doesn't exist; DistSender will handle |
RangeKeyMismatchError |
Success | Replica obsolete; DistSender will handle |
StoreNotFoundError |
Success | Replica removed; DistSender will handle |
ReplicaUnavailableError |
Success | Defer to replica's own server-side breaker |
| Context timeout/cancel | Error | Pessimistic: assume the replica stalled |
| Local send errors (no ctx.Err) | Ignored | Handled by RPC circuit breakers |
| All other errors | Error | Pessimistic: worth probing to check |
The error-driven probe trigger uses a duration threshold, not an error count:
- First error: record
errorSince = now. - Subsequent errors: if
now - errorSince >= probeThreshold(3s default), callbreaker.Probe(). - Any success: reset
errorSince = 0.
This means the breaker never trips from user requests alone — only from a failed probe.
The stall detection loop runs independently, scanning all breakers at the probe interval to detect replicas that haven't responded.
sed -n "275,313p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (d *DistSenderCircuitBreakers) probeStallLoop(ctx context.Context) {
// We use the probe interval as the scan interval, since we can sort of
// consider this to be probing the replicas for a stall.
var timer timeutil.Timer
defer timer.Stop()
timer.Reset(CircuitBreakerProbeInterval.Get(&d.settings.SV))
for {
select {
case <-timer.C:
// Eagerly reset the timer, to avoid skewing the interval.
timer.Reset(CircuitBreakerProbeInterval.Get(&d.settings.SV))
case <-d.stopper.ShouldQuiesce():
return
case <-ctx.Done():
return
}
// Don't do anything if circuit breakers have been disabled.
if d.Mode() == DistSenderCircuitBreakersNoRanges {
continue
}
// Probe replicas for a stall if we haven't seen a response from them in the
// past probe threshold.
now := crtime.NowMono()
probeThreshold := CircuitBreakerProbeThreshold.Get(&d.settings.SV)
d.replicas.Range(func(_ cbKey, cb *ReplicaCircuitBreaker) bool {
// Don't probe if the breaker is already tripped. It will be probed in
// response to user traffic, to reduce the number of concurrent probes.
if cb.stallDuration(now) >= probeThreshold && !cb.isTripped() {
cb.breaker.Probe()
}
return true
})
}
}
The stall check uses stallDuration():
stallSince is bumped on every response. If there are in-flight requests
(inflightReqs > 0) and no response has come back for ≥ probeThreshold,
the replica is considered stalled. Already-tripped breakers are skipped to
avoid redundant probes.
So there are two paths to probe triggering:
- Error streak (in
done()): continuous errors for ≥ 3s. - Stall (in
probeStallLoop): no responses with in-flight requests for ≥ 3s.
The probe sends a LeaseInfoRequest — a cheap health check that reads only
in-memory state, takes no latches, but requires a lease.
sed -n "950,1029p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.go// sendProbe probes the replica by sending a LeaseInfo request. It returns an
// error if the circuit breaker should trip, or nil if it should untrip and
// stop probing.
//
// Note that this may return nil even though the request itself fails. The
// typical example is a NLHE, which indicates that the replica is functional but
// not the leaseholder, but there are other cases too. See below.
//
// We use a LeaseInfo request as a makeshift health check because:
//
// - It is cheap (only reads in-memory state).
// - It does not take out any latches.
// - It requires a lease, so it will either attempt to acquire a lease or
// return NLHE if it knows about a potential leaseholder elsewhere. This is
// important, because if the replica is not connected to a quorum it will wait
// for lease acquisition, and clients with low timeouts may cancel their
// requests before a NLHE is returned, causing the DistSender to get stuck on
// these replicas.
func (r *ReplicaCircuitBreaker) sendProbe(ctx context.Context, transport Transport) error {
// We don't use timeutil.RunWithTimeout() because we need to be able to
// differentiate whether the context timed out.
timeout := CircuitBreakerProbeTimeout.Get(&r.d.settings.SV)
ctx, cancel := context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
transport.Reset()
ba := &kvpb.BatchRequest{}
ba.RangeID = r.rangeID
ba.Replica = transport.NextReplica()
ba.Add(&kvpb.LeaseInfoRequest{
RequestHeader: kvpb.RequestHeader{
Key: r.startKey,
},
})
log.VEventf(ctx, 2, "sending probe to %s: %s", r.id(), ba)
br, err := transport.SendNext(ctx, ba)
log.VEventf(ctx, 2, "probe result from %s: br=%v err=%v", r.id(), br, err)
// Handle local send errors.
if err != nil {
// If the context timed out, fail. The caller will handle the case where
// we're shutting down.
if err := ctx.Err(); err != nil {
return errors.Wrapf(err, "probe timed out")
}
// Any other local error is likely a networking/gRPC issue. This includes if
// either the remote node or the local node has been decommissioned. We
// rely on RPC circuit breakers to fail fast for these, so there's no point
// in us probing individual replicas. Stop probing.
return nil // nolint:returnerrcheck
}
// Handle error responses.
if br.Error != nil {
switch tErr := br.Error.GetDetail().(type) {
case *kvpb.NotLeaseHolderError:
// If we get a NLHE back with a lease record, the replica is healthy
// enough to know who the leaseholder is. Otherwise, we have to trip the
// breaker such that the DistSender will try other replicas and discover
// the leaseholder -- this may otherwise never happen if clients time out
// before the replica returns the NLHE.
if tErr.Lease == nil || *tErr.Lease == (roachpb.Lease{}) {
err = br.Error.GoError()
}
case *kvpb.RangeNotFoundError, *kvpb.RangeKeyMismatchError, *kvpb.StoreNotFoundError:
// If the replica no longer exists, stop probing.
case *kvpb.ReplicaUnavailableError:
// If the replica's circuit breaker is tripped, defer to it. No need for
// us to also probe.
default:
// On any other error, trip the breaker.
err = br.Error.GoError()
}
}
return errors.Wrapf(err, "probe failed")
}
- Cheap: reads only in-memory state.
- No latches: won't block or be blocked by other requests.
- Requires a lease: if the replica is partitioned from the leader, it will wait for lease acquisition and timeout — exactly the failure mode we're detecting.
| Probe Result | Action | Reasoning |
|---|---|---|
| Success | Untrip, stop probing | Replica is healthy |
| NLHE with lease | Untrip, stop probing | Replica knows a leaseholder |
| NLHE without lease | Trip breaker | Replica disconnected from quorum |
| Context timeout | Trip breaker | Replica unresponsive |
| RangeNotFound/KeyMismatch/StoreNotFound | Untrip, stop probing | Replica gone, not a problem |
| ReplicaUnavailableError | Untrip, stop probing | Defer to replica's own breaker |
| Other errors | Trip breaker | Something is wrong |
| gRPC/network error (no timeout) | Untrip, stop probing | Handled by RPC breakers |
The probe goroutine runs continuously while the breaker is tripped and there is recent traffic, re-probing at the probe interval.
sed -n "799,948p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (r *ReplicaCircuitBreaker) launchProbe(report func(error), done func()) {
// If this circuit breaker has been closed, don't launch further probes. This
// acts as a synchronization point with circuit breaker GC.
if r.isClosed() {
done()
return
}
ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
name := fmt.Sprintf("distsender-replica-probe-%s", r.id())
err := r.d.stopper.RunAsyncTask(ctx, name, func(ctx context.Context) {
defer done()
ctx, cancel := r.d.stopper.WithCancelOnQuiesce(ctx)
defer cancel()
// Prepare the probe transport, using SystemClass to avoid RPC latency.
//
// We construct a bare replica slice without any locality information, since
// we're only going to contact this replica.
replicas := ReplicaSlice{{ReplicaDescriptor: r.desc}}
opts := SendOptions{
class: rpcbase.SystemClass,
metrics: &r.d.metrics,
dontConsiderConnHealth: true,
}
transport := r.d.transportFactory(opts, replicas)
defer transport.Release()
// Start the write grace timer. Unlike reads, writes can't automatically be
// retried by the DistSender, so we don't cancel them immediately when the
// breaker trips but only after it has remained tripped for a grace period.
// This should be long enough to wait out a Raft election timeout and lease
// interval and then repropose the write, in case the range is temporarily
// unavailable (e.g. following leaseholder loss).
//
// If the breaker is already tripped, the previous probe already waited out
// the grace period, so we don't have to. The grace timer channel is set to
// nil when there is no timer running.
//
// NB: lease requests aren't subject to the write grace period, despite
// being write requests, since they are submitted directly to the local
// replica instead of via the DistSender.
var writeGraceTimer timeutil.Timer
defer writeGraceTimer.Stop()
if period := CircuitBreakerCancellationWriteGracePeriod.Get(&r.d.settings.SV); period > 0 {
if !r.isTripped() {
writeGraceTimer.Reset(period)
}
}
// Continually probe the replica until it succeeds or the replica stops
// seeing traffic. We probe immediately since we only trip the breaker on
// probe failure.
var timer timeutil.Timer
defer timer.Stop()
for {
// Untrip the breaker and stop probing if circuit breakers are disabled.
if r.d.Mode() == DistSenderCircuitBreakersNoRanges {
report(nil)
return
}
// Start the interval before sending the probe, to avoid skewing the
// interval, instead preferring frequent probes.
timer.Reset(CircuitBreakerProbeInterval.Get(&r.d.settings.SV))
// Probe the replica.
err := r.sendProbe(ctx, transport)
// If the context (with no timeout) failed, we're shutting down. Just exit
// the probe without reporting the result (which could trip the breaker).
if ctx.Err() != nil {
return
}
// Report the probe result.
report(err)
if err == nil {
// On a successful probe, record the success and stop probing.
r.stallSince.Store(crtime.NowMono())
r.errorSince.Store(0)
return
}
// Cancel in-flight read requests on failure, and write requests if the
// grace timer has expired. We do this on every failure, and also remove
// the cancel functions from the map (even though done() will also clean
// them up), in case another request makes it in after the breaker trips.
// There should typically never be any contention here.
cancelRequests := func(reqKind cbRequestCancellationPolicy) {
r.mu.Lock()
defer r.mu.Unlock()
if l := len(r.mu.cancelFns[reqKind]); l > 0 {
log.VEventf(ctx, 2, "cancelling %d requests %s for %s", l, reqKind, r.id())
}
for ba, cancel := range r.mu.cancelFns[reqKind] {
delete(r.mu.cancelFns[reqKind], ba)
cancel(errors.Wrapf(err, "%s is unavailable (circuit breaker tripped)", r.id()))
r.d.metrics.CircuitBreaker.ReplicasRequestsCancelled.Inc(1)
}
}
cancelRequests(cbCancelImmediately)
if writeGraceTimer.C == nil {
cancelRequests(cbCancelAfterGracePeriod)
}
for done := false; !done; { // select until probe interval timer fires
select {
case <-timer.C:
done = true
case <-writeGraceTimer.C:
cancelRequests(cbCancelAfterGracePeriod)
writeGraceTimer.Stop() // sets C = nil
case <-r.closedC:
// The circuit breaker has been GCed, exit. We could cancel the context
// instead to also abort an in-flight probe, but that requires extra
// synchronization with circuit breaker GC (a probe may be launching but
// haven't yet installed its cancel function). This is simpler.
return
case <-r.d.stopper.ShouldQuiesce():
return
case <-ctx.Done():
return
}
}
// If there haven't been any recent requests, stop probing but keep the
// breaker tripped. A new probe will be launched on the next request.
//
// NB: we check this after waiting out the probe interval above, to avoid
// frequently spawning new probe goroutines, instead waiting to see if any
// requests come in.
if r.lastRequestDuration(crtime.NowMono()) >= cbProbeIdleTimeout {
// Keep probing if the write grace timer hasn't expired yet, since we
// need to cancel pending writes first.
if writeGraceTimer.C == nil {
return
}
}
}
})
if err != nil {
done()
}
}
- Spawned as a stopper task named
distsender-replica-probe-r{rangeID}/{replicaID}. - Uses
SystemClassRPC priority to avoid interference with user traffic. - First probe fires immediately (breaker only trips on probe failure, not user errors alone).
- On success: reports nil → breaker resets, probe exits.
- On failure: reports error → breaker trips, cancels reads immediately, starts/checks write grace timer, waits for the next probe interval.
- Idle exit: if no requests for ≥10s (
cbProbeIdleTimeout), exits but leaves breaker tripped. Next request re-launches the probe. - GC exit: if
closedCis closed, exits immediately. - Write grace timer: 10s delay before cancelling writes (allows time for Raft election + lease acquisition + reproposal).
Two cancellation policies, determined by cbRequestCancellationPolicyFromBatch:
sed -n "155,168p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc cbRequestCancellationPolicyFromBatch(
ba *kvpb.BatchRequest, withCommit bool,
) cbRequestCancellationPolicy {
// If the batch request is writing or is part of a transaction commit, we
// can't automatically retry it without risking an ambiguous error, so we
// cancel it after a grace period. Otherwise, we cancel it immediately and
// allow DistSender to retry.
// TODO(nvanbenschoten): a batch request that is writing and is not part of a
// transaction commit can be retried. Do we need the IsWrite condition here?
if ba.IsWrite() || withCommit {
return cbCancelAfterGracePeriod
}
return cbCancelImmediately
}
| Policy | Request Type | Behavior |
|---|---|---|
cbCancelImmediately |
Reads | Cancelled on first probe failure. Safe because DistSender retries internally. |
cbCancelAfterGracePeriod |
Writes / commits | Cancelled after 10s grace period. Can't auto-retry (ambiguous results). Grace period allows Raft election + lease recovery. |
Cancellation is controlled by kv.dist_sender.circuit_breaker.cancellation.enabled
(default: true).
Idle breakers are cleaned up by a background GC loop.
sed -n "325,382p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (d *DistSenderCircuitBreakers) gcLoop(ctx context.Context) {
ticker := time.NewTicker(cbGCInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-d.stopper.ShouldQuiesce():
return
case <-ctx.Done():
return
}
now := crtime.NowMono()
var cbs, gced int
d.replicas.Range(func(key cbKey, cb *ReplicaCircuitBreaker) bool {
cbs++
if idleDuration := cb.lastRequestDuration(now); idleDuration >= cbGCThreshold {
// Check if we raced with a concurrent delete or replace. We don't
// expect to, since only this loop removes circuit breakers.
if cb2, ok := d.replicas.LoadAndDelete(key); ok {
cb = cb2
d.metrics.CircuitBreaker.Replicas.Dec(1)
gced++
// We don't expect a probe to run, since the replica is idle, but we
// may race with a probe launch or there may be a long-running one (if
// e.g. the probe timeout or interval has increased).
//
// Close closedC to stop any running probes and prevent new probes
// from launching. Only we close it, due to the atomic map delete.
close(cb.closedC)
// The circuit breaker may be tripped, and reported as such in
// metrics. A concurrent probe may also be about to trip/untrip it.
// We let the probe's OnProbeDone() be responsible for managing the
// ReplicasTripped gauge to avoid metrics leaks, by untripping the
// breaker when closedC has been closed. To synchronize with a
// concurrent probe, we attempt to launch a new one. Either:
//
// a) no probe is running: we launch a noop probe which will
// immediately call OnProbeDone() and clean up. All future
// probes are noops.
//
// b) a concurrent probe is running: the Probe() call is a noop, but
// when the running probe shuts down in response to closedC,
// OnProbeDone() will clean up.
cb.breaker.Probe()
}
}
return true
})
log.VEventf(ctx, 2, "garbage collected %d/%d DistSender replica circuit breakers", gced, cbs)
}
}
GC runs every 1 minute. Breakers idle for ≥20 minutes are removed. The tricky
part is metric cleanup: closing closedC signals any running probe to exit,
and then OnProbeDone() decrements the tripped gauge if needed. If no probe
is running, a noop probe is launched that immediately calls OnProbeDone().
ReplicaCircuitBreaker implements circuit.EventHandler to manage metrics
and logging at each state transition.
sed -n "1032,1128p" pkg/kv/kvclient/kvcoord/dist_sender_circuit_breaker.gofunc (r *ReplicaCircuitBreaker) OnTrip(b *circuit.Breaker, prev, cur error) {
if cur == nil {
return
}
// OnTrip() is called every time the probe reports an error, regardless of
// whether the breaker was already tripped. Record each probe failure, but
// only record tripped breakers when it wasn't already tripped.
r.d.metrics.CircuitBreaker.ReplicasProbesFailure.Inc(1)
if prev == nil {
// TODO(erikgrinaker): consider rate limiting these with log.Every, but for
// now we want to know which ones trip for debugging.
ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
now := crtime.NowMono()
stallSince := r.stallDuration(now).Truncate(time.Millisecond)
errorSince := r.errorDuration(now).Truncate(time.Millisecond)
log.Dev.Errorf(ctx, "%s circuit breaker tripped: %s (stalled for %s, erroring for %s)",
r.id(), cur, stallSince, errorSince)
r.d.metrics.CircuitBreaker.ReplicasTripped.Inc(1)
r.d.metrics.CircuitBreaker.ReplicasTrippedEvents.Inc(1)
}
}
// OnReset implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnReset(b *circuit.Breaker, prev error) {
// If the circuit breaker has been GCed, we don't need to log or record the
// probe success. We do need to decrement ReplicasTripped if we're actually
// tripped though, to avoid metrics leaks. This may be happen either in
// response to an actual probe success, or a noop probe during GC.
if r.isClosed() {
if prev != nil {
r.d.metrics.CircuitBreaker.ReplicasTripped.Dec(1)
}
return
}
// OnReset() is called every time the probe reports a success, regardless
// of whether the breaker was already tripped. Record each probe success,
// but only record untripped breakers when it was already tripped.
r.d.metrics.CircuitBreaker.ReplicasProbesSuccess.Inc(1)
if prev != nil {
// TODO(erikgrinaker): consider rate limiting these with log.Every, but for
// now we want to know which ones reset for debugging.
ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
log.Dev.Infof(ctx, "%s circuit breaker reset", r.id())
r.d.metrics.CircuitBreaker.ReplicasTripped.Dec(1)
}
}
// OnProbeLaunched implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnProbeLaunched(b *circuit.Breaker) {
r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Inc(1)
// If the circuit breaker has been GCed, don't log the probe launch since we
// don't actually spawn a goroutine. We still increment ProbesRunning above to
// avoid metrics leaks when decrementing in OnProbeDone().
if r.isClosed() {
return
}
ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
now := crtime.NowMono()
stallSince := r.stallDuration(now).Truncate(time.Millisecond)
errorSince := r.errorDuration(now).Truncate(time.Millisecond)
tripped := r.breaker.Signal().IsTripped()
log.VEventf(ctx, 2, "launching circuit breaker probe for %s (tripped=%t stall=%s error=%s)",
r.id(), tripped, stallSince, errorSince)
}
// OnProbeDone implements circuit.EventHandler.
func (r *ReplicaCircuitBreaker) OnProbeDone(b *circuit.Breaker) {
r.d.metrics.CircuitBreaker.ReplicasProbesRunning.Dec(1)
// If the circuit breaker has been GCed, don't log the probe stopping. We
// still decrement ProbesRunning above to avoid metrics leaks (we don't know
// if the circuit breaker was GCed when OnProbeLaunched was called).
//
// We must also reset the breaker if it's tripped, to avoid ReplicasTripped
// metric gauge leaks. This can either be in response to an already-running
// probe shutting down, or a noop probe launched by GC -- it doesn't matter.
// A concurrent request may then use the untripped breaker, but that's ok
// since it would also use an untripped breaker if it arrived after GC.
if r.isClosed() {
if r.isTripped() {
r.breaker.Reset()
}
return
}
ctx := r.d.ambientCtx.AnnotateCtx(context.Background())
now := crtime.NowMono()
tripped := r.breaker.Signal().IsTripped()
lastRequest := r.lastRequestDuration(now).Truncate(time.Millisecond)
log.VEventf(ctx, 2, "stopping circuit breaker probe for %s (tripped=%t lastRequest=%s)",
r.id(), tripped, lastRequest)
}
Key nuance: OnTrip is called on every probe failure (not just the
first), since the underlying circuit.Breaker calls Report() on each
failure. The event handler distinguishes prev == nil (first trip) from
prev != nil (already tripped, just updating the error).
The generic circuit.Breaker in pkg/util/circuit
provides the state machine:
- Closed (healthy):
Signal().Err()returns nil. - Open (tripped):
Signal().Err()returns the error and triggersmaybeTriggerProbe(). - Probing: an async probe goroutine calls
report(err)to trip/report(nil)to reset.
Signal() is allocation-free and suitable for hot paths. The error channel
(errAndCh) provides a stable view: once you get a signal, you'll get the
error that closed that specific channel, even if the breaker has since reset.
All settings are defined in dist_sender_circuit_breaker.go.
| Setting | Default | Description |
|---|---|---|
kv.dist_sender.circuit_breakers.mode |
liveness range only |
Which ranges get circuit breakers |
kv.dist_sender.circuit_breaker.probe.threshold |
3s | Error/stall duration before probing |
kv.dist_sender.circuit_breaker.probe.interval |
3s | Interval between consecutive probes |
kv.dist_sender.circuit_breaker.probe.timeout |
3s | Timeout for each probe RPC |
kv.dist_sender.circuit_breaker.cancellation.enabled |
true | Cancel in-flight requests on trip |
kv.dist_sender.circuit_breaker.cancellation.write_grace_period |
10s | Delay before cancelling writes (max 1m) |
Hardcoded constants:
| Constant | Value | Description |
|---|---|---|
cbGCThreshold |
20 min | Idle duration before breaker is GC'd |
cbGCInterval |
1 min | GC scan frequency |
cbProbeIdleTimeout |
10s | Probe exits if no traffic for this long |
All metrics are under distsender.circuit_breaker.replicas.*:
| Metric | Type | Description |
|---|---|---|
replicas.count |
Gauge | Number of actively tracked replicas |
replicas.tripped |
Gauge | Currently tripped breakers |
replicas.tripped_events |
Counter | Cumulative trip events |
replicas.probes.running |
Gauge | Active probe goroutines |
replicas.probes.success |
Counter | Successful probes |
replicas.probes.failure |
Counter | Failed probes |
replicas.requests.cancelled |
Counter | In-flight requests cancelled by breaker |
replicas.requests.rejected |
Counter | Requests rejected by tripped breaker |
┌──────────────────────────────────────────────────────┐
│ │
▼ │
┌──────────────────┐ │
│ CLOSED │ │
│ (healthy) │ │
│ │ │
│ Track() → ok │ │
│ Done() records │ │
│ error/success │ │
└────────┬─────────┘ │
│ │
errors for ≥ 3s (done) or │
stall for ≥ 3s (probeStallLoop) │
│ │
▼ │
┌──────────────────┐ │
│ PROBING │ │
│ (first probe) │──── probe success ──────────────────────────┘
│ │
│ sendProbe() │
│ LeaseInfo RPC │
└────────┬─────────┘
│
probe fails
│
▼
┌──────────────────┐
│ OPEN │
│ (tripped) │
│ │
│ Track() → reject │
│ Cancel reads │──── probe success ──► CLOSED
│ Grace → cancel │
│ writes │
│ │
│ Re-probe every │
│ 3s while │──── idle ≥10s ──► stop probing
│ traffic exists │ (stays tripped,
└──────────────────┘ relaunched on
next request)
A typical partial-partition scenario:
t=0s Request sent to replica R1 (follower, partitioned from leader)
Track(): inflightReqs=1, stallSince=0s
R1 tries to acquire lease, waits for leader...
t=3s probeStallLoop: stallDuration=3s ≥ threshold → Probe()
launchProbe(): spawns goroutine
sendProbe(): LeaseInfo to R1 with 3s timeout
t=6s Probe times out → report(err) → breaker.Report() → OnTrip
BREAKER TRIPPED
Cancel read requests immediately
Start 10s write grace timer
t=6s New read request arrives → Track() returns error → skip R1
DistSender tries R2 → succeeds
t=16s Write grace timer expires → cancel pending writes
t=9s+ Probes continue every 3s while traffic exists
Network heals → probe gets NLHE with lease → report(nil)
BREAKER RESET → R1 usable again
-
#93501 (Closed) — kvclient: DistSender circuit breakers for unreachable leaseholders. The original tracking issue. Proposed per-range circuit breakers to avoid clients getting stuck during partial network partitions. Implemented in PR #118943.
-
#33007 (Closed) — kvserver: circuit-break requests to unavailable ranges. The earlier server-side replica circuit breaker tracking issue.
-
#166391 (Open) — kvserver: circuit breaker stuck tripped after network partition recovery. During chaos testing, 3 of ~36 breakers remained permanently tripped for 3+ hours after a partition healed.
-
#163566 (Open) — kvserver: LEARNER replicas should return NotLeaseHolderError instead of ReplicaUnavailableError. LEARNERs return
ReplicaUnavailableError(from the server-side breaker/leaderless watcher) instead of NLHE, causing DistSender to exhaust all replicas inefficiently. -
#121210 (Open) — kvcoord: DistSender circuit breaker cancellation won't work on local requests. Context cancellation is ineffective for local requests because it's not respected by disk I/O or syscalls.
-
#104567 (Open) — kvserver: false positive "circuit breaker tripped" errors/metrics. False positives observed during range merges.
-
#101999 (Open) — problemranges: not showing tripped replicas. The problem ranges UI page doesn't show tripped replicas despite metrics indicating hundreds.
-
#121206 (Open) — kvcoord: DistSender circuit breaker scalability. Concerns about memory overhead (one breaker per active replica) and goroutine overhead from probes at scale.
-
#119919 (Open) — kvcoord: DistSender circuit breaker benchmarking and optimization. Need large-scale performance testing; considering
sync.MaporIntMapvariants. -
#119923 (Open) — kvcoord: handle follower reads with DistSender circuit breakers. Follower reads should be handled better for replicas partitioned from the leader.
-
#120195 (Open) — kvcoord: separate read/write tracking for DistSender circuit breakers. Recovery time (~20s with epoch leases) is higher than desired.
-
#121209 (Open) — kvcoord: improve DistSender circuit breaker recovery time. General umbrella for recovery time improvements.
-
#123117 (Open) — kvcoord: introduce DistSender circuit breakers for just the liveness range. Due to scalability concerns, allow enabling for only the liveness range. (Now implemented as the
liveness range onlymode.) -
#120073 (Open) — kvserver: during a partial partition, non-leaders wait 4s for a lease acquisition timeout.
-
#121941 (Open) — kv: add randomized testing of dist sender. Need a DistSender nemesis framework for comprehensive failure-mode testing.
-
#119918 (Closed) — kvcoord: DistSender circuit breaker testing. Comprehensive test suite added.
-
#119916 (Closed) — kvcoord: DistSender circuit breaker observability. Metrics, logging, and trace events added (was a GA blocker for 24.1).
-
#119917 (Closed) — kvcoord: improve DistSender circuit breaker scalability. Fixed scalability issues with probe goroutines.
-
#119921 (Closed) — kvcoord: ignore streaming requests for DistSender circuit breaker stall detection. Rangefeeds no longer trigger false stall detection (was a GA blocker for 24.1).
-
#93503 (Closed) — kvclient: route requests via followers when leaseholder is unreachable. Follower routing integrated with circuit breakers.
| File | Purpose |
|---|---|
dist_sender_circuit_breaker.go |
DistSender circuit breaker implementation |
dist_sender_circuit_breaker_test.go |
DistSender circuit breaker tests |
dist_sender.go |
Integration point (line 2838) |
circuitbreaker.go |
Generic circuit breaker utility |
replica_circuit_breaker.go |
Server-side replica circuit breaker (separate system) |