Skip to content

Instantly share code, notes, and snippets.

@sttts
Created February 11, 2025 21:19
Show Gist options
  • Save sttts/b91bbb0483e0e03c4ddcf5ab43f0e9fe to your computer and use it in GitHub Desktop.
Save sttts/b91bbb0483e0e03c4ddcf5ab43f0e9fe to your computer and use it in GitHub Desktop.
commit a755b09e49a3fd44a5a6fdbbd441eb9366cdf475
Author: Dr. Stefan Schimanski <[email protected]>
Date: Tue Feb 11 22:15:16 2025 +0100
Add fair queue
Signed-off-by: Dr. Stefan Schimanski <[email protected]>
diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go
index 47b519e..1902a20 100644
--- a/pkg/builder/controller.go
+++ b/pkg/builder/controller.go
@@ -24,10 +24,9 @@ import (
"github.com/go-logr/logr"
- mchandler "github.com/multicluster-runtime/multicluster-runtime/pkg/handler"
-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
@@ -35,15 +34,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/controller"
+ "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
mccontroller "github.com/multicluster-runtime/multicluster-runtime/pkg/controller"
+ mchandler "github.com/multicluster-runtime/multicluster-runtime/pkg/handler"
mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager"
mcreconcile "github.com/multicluster-runtime/multicluster-runtime/pkg/reconcile"
mcsource "github.com/multicluster-runtime/multicluster-runtime/pkg/source"
+ mcworkqueue "github.com/multicluster-runtime/multicluster-runtime/pkg/workqueue"
)
// project represents other forms that we can use to
@@ -465,6 +467,23 @@ func (blder *TypedBuilder[request]) doController(r reconcile.TypedReconciler[req
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
}
+ if ctrlOptions.NewQueue == nil {
+ ctrlOptions.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
+ if ptr.Deref(blder.mgr.GetControllerOptions().UsePriorityQueue, false) {
+ return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
+ o.Log = blder.mgr.GetLogger().WithValues("controller", controllerName)
+ o.RateLimiter = rateLimiter
+ })
+ }
+ return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
+ Name: controllerName,
+ DelayingQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[request]{
+ Name: controllerName,
+ Queue: mcworkqueue.NewClusterFair[request](),
+ }),
+ })
+ }
+ }
// Retrieve the GVK from the object we're reconciling
// to pre-populate logger information, and to optionally generate a default name.
diff --git a/pkg/workqueue/cluster.go b/pkg/workqueue/cluster.go
new file mode 100644
index 0000000..353e32f
--- /dev/null
+++ b/pkg/workqueue/cluster.go
@@ -0,0 +1,32 @@
+/*
+Copyright 2025 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package workqueue
+
+import (
+ mcreconcile "github.com/multicluster-runtime/multicluster-runtime/pkg/reconcile"
+)
+
+// ClusterFair is a queue that ensures items are dequeued fairly across different
+// clusters of cluster-aware requests.
+type ClusterFair[request mcreconcile.ClusterAware[request]] TypedFair[request]
+
+// NewClusterFair creates a new ClusterFair instance.
+func NewClusterFair[request mcreconcile.ClusterAware[request]]() *TypedFair[request] {
+ return NewTypedFair[request](func(r request) string {
+ return r.Cluster()
+ })
+}
diff --git a/pkg/workqueue/fair.go b/pkg/workqueue/fair.go
new file mode 100644
index 0000000..3e8848e
--- /dev/null
+++ b/pkg/workqueue/fair.go
@@ -0,0 +1,185 @@
+/*
+Copyright 2025 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package workqueue
+
+import (
+ "container/list"
+ "sync"
+
+ "k8s.io/client-go/util/workqueue"
+)
+
+var _ workqueue.TypedInterface[any] = &TypedFair[any]{}
+
+// TypedFair is a queue that ensures items are dequeued fairly across different
+// fairness keys while maintaining FIFO order within each key.
+type TypedFair[T comparable] struct {
+ mu sync.Mutex
+ entries map[string]*queueEntry[T]
+ activeList *list.List
+ keyFunc FairnessKeyFunc[T]
+}
+
+// Fair is a queue that ensures items are dequeued fairly across different
+// fairness keys while maintaining FIFO order within each key.
+type Fair TypedFair[any]
+
+// FairnessKeyFunc is a function that returns a string key for a given item.
+// Items with different keys are dequeued fairly.
+type FairnessKeyFunc[T comparable] func(T) string
+
+// NewFair creates a new Fair instance.
+func NewFair(keyFunc FairnessKeyFunc[any]) *Fair {
+ return (*Fair)(NewTypedFair[any](keyFunc))
+}
+
+// NewTypedFair creates a new TypedFair instance.
+func NewTypedFair[T comparable](keyFunc FairnessKeyFunc[T]) *TypedFair[T] {
+ return &TypedFair[T]{
+ entries: make(map[string]*queueEntry[T]),
+ activeList: list.New(),
+ keyFunc: keyFunc,
+ }
+}
+
+type queueEntry[T comparable] struct {
+ queue workqueue.TypedInterface[T]
+ activeElem *list.Element // Reference to the element in activeList
+}
+
+// Add inserts an item into the queue under its fairness key.
+func (q *TypedFair[T]) Add(item T) {
+ key := q.keyFunc(item)
+
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ entry, exists := q.entries[key]
+ if !exists {
+ entry = &queueEntry[T]{
+ queue: workqueue.NewTyped[T](),
+ }
+ q.entries[key] = entry
+ }
+
+ entry.queue.Add(item)
+
+ // If the queue was previously empty, add to activeList
+ if entry.queue.Len() == 1 && entry.activeElem == nil {
+ entry.activeElem = q.activeList.PushBack(key)
+ }
+}
+
+// Get retrieves the next item from the queue, ensuring fairness across keys.
+func (q *TypedFair[T]) Get() (item T, shutdown bool) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ var elem *list.Element
+ for elem = q.activeList.Front(); elem != nil; elem = elem.Next() {
+ key := elem.Value.(string)
+ entry := q.entries[key]
+
+ if entry.queue.Len() == 0 {
+ q.activeList.Remove(elem)
+ entry.activeElem = nil
+ continue
+ }
+
+ item, shutdown = entry.queue.Get()
+ if shutdown {
+ continue
+ }
+
+ // Check if the queue is now empty and update activeList
+ if entry.queue.Len() == 0 {
+ q.activeList.Remove(elem)
+ entry.activeElem = nil
+ } else {
+ // Move to back to maintain round-robin order
+ q.activeList.MoveToBack(elem)
+ }
+
+ return item, false
+ }
+
+ var zero T
+ return zero, true
+}
+
+// Done marks the processing of an item as complete.
+func (q *TypedFair[T]) Done(item T) {
+ key := q.keyFunc(item)
+
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ if entry, exists := q.entries[key]; exists {
+ entry.queue.Done(item)
+ }
+}
+
+// Len returns the total number of items across all keys.
+func (q *TypedFair[T]) Len() int {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ total := 0
+ for _, entry := range q.entries {
+ total += entry.queue.Len()
+ }
+ return total
+}
+
+// ShutDown terminates the queue and all sub-queues.
+func (q *TypedFair[T]) ShutDown() {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ for _, entry := range q.entries {
+ entry.queue.ShutDown()
+ }
+}
+
+// ShuttingDown checks if all sub-queues are shutting down.
+func (q *TypedFair[T]) ShuttingDown() bool {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ for _, entry := range q.entries {
+ if !entry.queue.ShuttingDown() {
+ return false
+ }
+ }
+ return true
+}
+
+// ShutDownWithDrain terminates the queue and all sub-queues, draining all.
+func (q *TypedFair[T]) ShutDownWithDrain() {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+
+ var wg sync.WaitGroup
+ for _, entry := range q.entries {
+ wg.Add(1)
+ go func(entry *queueEntry[T]) {
+ defer wg.Done()
+ entry.queue.ShutDownWithDrain()
+ }(entry)
+ }
+ wg.Wait()
+}
diff --git a/providers/kind/provider.go b/providers/kind/provider.go
index 5ef34df..6ccc365 100644
--- a/providers/kind/provider.go
+++ b/providers/kind/provider.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/go-logr/logr"
+
mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager"
"github.com/multicluster-runtime/multicluster-runtime/pkg/multicluster"
"k8s.io/apimachinery/pkg/util/sets"
diff --git a/providers/namespace/provider.go b/providers/namespace/provider.go
index b3ef306..dde94a3 100644
--- a/providers/namespace/provider.go
+++ b/providers/namespace/provider.go
@@ -22,6 +22,7 @@ import (
"sync"
"github.com/go-logr/logr"
+
mcmanager "github.com/multicluster-runtime/multicluster-runtime/pkg/manager"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment