Skip to content

Instantly share code, notes, and snippets.

@rueian
Last active August 21, 2024 03:44
Show Gist options
  • Save rueian/fca6c2597ff2ba1dfc2ab5eee148bee7 to your computer and use it in GitHub Desktop.
Save rueian/fca6c2597ff2ba1dfc2ab5eee148bee7 to your computer and use it in GitHub Desktop.

How does EventRecorder work?

Let's start from our controller code:

r.Recorder.Eventf(&instance, corev1.EventTypeNormal, "Created", "Created head pod %s", pod.Name)

When we invoke the Eventf to record a new event, the recorder will pass the event to the *Broadcaster.ActionOrDrop(). The *Broadcaster.ActionOrDrop() looks like this:

// https://github.com/kubernetes/apimachinery/blob/a8a2284d318b9213fca20b0058e969941ce1c90b/pkg/watch/mux.go#L242-L247
select {
case m.incoming <- Event{action, obj}:
	return true, nil
default:
	return false, nil
}

where the buffer size of the m.incoming channel is 1000 by default. This is the first place an event can be dropped.

The Broadcaster has one goroutine that keeps consuming the m.incoming and pushing the event to all watcher channels:

// https://github.com/kubernetes/apimachinery/blob/a8a2284d318b9213fca20b0058e969941ce1c90b/pkg/watch/mux.go#L265-L288
for event := range m.incoming {
	for _, w := range m.watchers { // DropIfChannelFull
		select {
		case w.result <- event:
		case <-w.stopped:
		default: // Don't block if the event can't be queued.
		}
	}
}

where the buffer size of the w.result channel is also 1000 by default. This is the second place an event can be dropped.

Ref: https://github.com/kubernetes/client-go/blob/46965213e4561ad1b9c585d1c3551a0cc8d3fcd6/tools/record/event.go#L194

The above is what happened on the controller side where we populate events. Now, how does the Broadcaster send those events to the kube API from the watchers and where do those watchers come from?

Sending updates to the kube API

The Broadcaster will register the event handler "recordToSink" and an "eventCorrelator" by invoking the StartRecordingToSink when it starts running. The handler is the watcher.

// https://github.com/kubernetes/client-go/blob/46965213e4561ad1b9c585d1c3551a0cc8d3fcd6/tools/record/event.go#L275-L281
func (e *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
	eventCorrelator := NewEventCorrelatorWithOptions(e.options)
	return e.StartEventWatcher(
		func(event *v1.Event) {
			e.recordToSink(sink, event, eventCorrelator)
		})
}

// https://github.com/kubernetes/client-go/blob/46965213e4561ad1b9c585d1c3551a0cc8d3fcd6/tools/record/event.go#L411-L418
func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {
		return
	}
	...

The recordToSink handler will first pass the event into the eventCorrelator.EventCorrelate(), it will

  1. First, aggregates the similar events by the associated custom resource, the event type, and the event reason. The aggregation will always result in a new event patch.
  2. Second, the aggregation will then be passed to a SpamFilter, which will filter out bursts by each custom resource solely to avoid flooding the kube API.
// https://github.com/kubernetes/client-go/blob/46965213e4561ad1b9c585d1c3551a0cc8d3fcd6/tools/record/events_cache.go#L514-L518
func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
	if newEvent == nil {
		return nil, fmt.Errorf("event is nil")
	}
	aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
	observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
	if c.filterFunc(observedEvent) {
		return &EventCorrelateResult{Skip: true}, nil
	}
	return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}

The SpamFilter enforces the following token bucket rate limit for each custom resource:

  1. At most 25 tokens for each custom resource.
  2. Tokens are refilled at the rate of 1 token per 5 minutes for each custom resource.

Ref: https://github.com/kubernetes/client-go/blob/46965213e4561ad1b9c585d1c3551a0cc8d3fcd6/tools/record/events_cache.go#L424-L438

Those bursts that exceed the rate limit will be marked with result.Skip=true and be ignored by the recordToSink handler. This is the third place an event can be dropped.

Ref: https://github.com/kubernetes/client-go/blob/46965213e4561ad1b9c585d1c3551a0cc8d3fcd6/tools/record/event.go#L293-L298

Other allowed event patches will be applied to the kube-api with at most 12 tries. This is the forth place an event can be dropped.

Conclusion

No matter how hard we use the recorder, there will be at most 25 event updates burst to the kube API per custom resource and after that 1 event update per 5 miniutes per custom resource.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment