|
package main |
|
|
|
import ( |
|
"errors" |
|
"fmt" |
|
"math/rand" |
|
"sync" |
|
"time" |
|
) |
|
|
|
// Define the Custom Resource (CR) structure |
|
type CustomResource struct { |
|
ID string `json:"id"` |
|
Spec CRSpec `json:"spec"` |
|
Status CRStatus `json:"status"` |
|
} |
|
|
|
type CRSpec struct { |
|
RequestType string `json:"requestType"` // create, update, delete |
|
TargetObject string `json:"targetObject"` |
|
} |
|
|
|
type CRStatus struct { |
|
State string `json:"state"` // pending, in-progress, succeeded, failed |
|
Message string `json:"message"` // Description of the state |
|
} |
|
|
|
type EventSource interface { |
|
// Abstract interface for event sources like DB, Message Queue, etc. |
|
GetNextEvent() (*CustomResource, error) |
|
} |
|
|
|
type Controller struct { |
|
eventSource EventSource |
|
maxConcurrency int |
|
wg sync.WaitGroup |
|
crStore map[string]*CustomResource // In-memory store to simulate CR management |
|
mu sync.Mutex // Mutex to handle concurrent access to CR store |
|
retryQueue chan *CustomResource // Queue for failed CRs to reattempt |
|
} |
|
|
|
type FakeEventSource struct { |
|
events []CustomResource |
|
index int |
|
} |
|
|
|
func (f *FakeEventSource) GetNextEvent() (*CustomResource, error) { |
|
if f.index < len(f.events) { |
|
event := &f.events[f.index] |
|
f.index++ |
|
return event, nil |
|
} |
|
return nil, fmt.Errorf("no more events") |
|
} |
|
|
|
// NewController initializes the controller with the event source |
|
func NewController(eventSource EventSource, maxConcurrency int) *Controller { |
|
return &Controller{ |
|
eventSource: eventSource, |
|
maxConcurrency: maxConcurrency, |
|
wg: sync.WaitGroup{}, |
|
crStore: make(map[string]*CustomResource), // CR store initialized |
|
retryQueue: make(chan *CustomResource, 100), // Channel for retrying failed CRs |
|
} |
|
} |
|
|
|
func (c *Controller) Start() { |
|
fmt.Println("Starting the controller...") |
|
semaphore := make(chan struct{}, c.maxConcurrency) |
|
|
|
// Start a goroutine to handle failed CRs |
|
go c.requeueFailedCRs() |
|
|
|
// Main loop to process events |
|
for { |
|
// Poll the event source |
|
event, err := c.eventSource.GetNextEvent() |
|
if err != nil { |
|
fmt.Println("Error retrieving event:", err) |
|
time.Sleep(1 * time.Second) |
|
continue |
|
} |
|
|
|
c.wg.Add(1) |
|
semaphore <- struct{}{} // Acquire a slot in the semaphore |
|
|
|
// Handle reconciliation concurrently |
|
go func(cr *CustomResource) { |
|
defer c.wg.Done() |
|
defer func() { <-semaphore }() // Release the semaphore slot |
|
|
|
c.Reconcile(cr) |
|
}(event) |
|
} |
|
|
|
// Wait for all goroutines to finish before exiting |
|
c.wg.Wait() |
|
} |
|
|
|
func (c *Controller) Reconcile(cr *CustomResource) { |
|
// Reconciliation loop for CRs based on the request type |
|
fmt.Printf("Reconciling CR: %s\n", cr.ID) |
|
|
|
// Set status to in-progress initially |
|
c.UpdateStatus(cr, "in-progress", "Request is being processed") |
|
|
|
// Handle based on request type |
|
var err error |
|
switch cr.Spec.RequestType { |
|
case "create": |
|
err = c.HandleCreate(cr) |
|
case "update": |
|
err = c.HandleUpdate(cr) |
|
case "delete": |
|
err = c.HandleDelete(cr) |
|
default: |
|
err = fmt.Errorf("unknown request type") |
|
} |
|
|
|
// If operation failed, requeue for retry |
|
if err != nil { |
|
c.UpdateStatus(cr, "failed", err.Error()) |
|
c.requeueFailedCR(cr) |
|
return |
|
} |
|
|
|
// If operation succeeded, update status |
|
c.UpdateStatus(cr, "succeeded", "Operation completed successfully") |
|
} |
|
|
|
func (c *Controller) HandleCreate(cr *CustomResource) error { |
|
// Simulate the creation of a resource with a possible failure |
|
if rand.Float32() < 0.3 { |
|
return errors.New("failed to create resource") |
|
} |
|
|
|
// Create CR in store |
|
c.mu.Lock() |
|
defer c.mu.Unlock() |
|
if _, exists := c.crStore[cr.ID]; exists { |
|
return fmt.Errorf("CR with ID %s already exists", cr.ID) |
|
} |
|
c.crStore[cr.ID] = cr |
|
return nil |
|
} |
|
|
|
func (c *Controller) HandleUpdate(cr *CustomResource) error { |
|
// Simulate updating a resource with a possible failure |
|
if rand.Float32() < 0.3 { |
|
return errors.New("failed to update resource") |
|
} |
|
|
|
// Update CR in store |
|
c.mu.Lock() |
|
defer c.mu.Unlock() |
|
if _, exists := c.crStore[cr.ID]; !exists { |
|
return fmt.Errorf("CR with ID %s does not exist", cr.ID) |
|
} |
|
c.crStore[cr.ID] = cr // Update the CR with new spec |
|
return nil |
|
} |
|
|
|
func (c *Controller) HandleDelete(cr *CustomResource) error { |
|
// Simulate deleting a resource with a possible failure |
|
if rand.Float32() < 0.3 { |
|
return errors.New("failed to delete resource") |
|
} |
|
|
|
// Delete CR from store |
|
c.mu.Lock() |
|
defer c.mu.Unlock() |
|
if _, exists := c.crStore[cr.ID]; !exists { |
|
return fmt.Errorf("CR with ID %s does not exist", cr.ID) |
|
} |
|
delete(c.crStore, cr.ID) |
|
return nil |
|
} |
|
|
|
func (c *Controller) UpdateStatus(cr *CustomResource, state, message string) { |
|
// Update the status of the CR |
|
cr.Status = CRStatus{ |
|
State: state, |
|
Message: message, |
|
} |
|
// Simulate saving the updated CR (e.g., to a database) |
|
fmt.Printf("CR %s status updated: %s - %s\n", cr.ID, state, message) |
|
} |
|
|
|
func (c *Controller) requeueFailedCR(cr *CustomResource) { |
|
// Add the failed CR to the retry queue |
|
c.retryQueue <- cr |
|
} |
|
|
|
func (c *Controller) requeueFailedCRs() { |
|
// Goroutine to process failed CRs |
|
for cr := range c.retryQueue { |
|
fmt.Printf("Requeuing failed CR %s for retry...\n", cr.ID) |
|
|
|
// Retry logic with exponential backoff |
|
backoffDuration := time.Second |
|
for { |
|
time.Sleep(backoffDuration) |
|
|
|
// Retry reconciliation |
|
c.Reconcile(cr) |
|
|
|
// If the reconciliation was successful, break the loop |
|
if cr.Status.State == "succeeded" { |
|
break |
|
} |
|
|
|
// Increase backoff duration (exponential backoff) |
|
backoffDuration *= 2 |
|
} |
|
} |
|
} |
|
|
|
// View CR Spec and Status |
|
func (c *Controller) ViewCR(id string) (*CustomResource, error) { |
|
c.mu.Lock() |
|
defer c.mu.Unlock() |
|
|
|
cr, exists := c.crStore[id] |
|
if !exists { |
|
return nil, fmt.Errorf("CR with ID %s not found", id) |
|
} |
|
|
|
return cr, nil |
|
} |
|
|
|
// Watch for changes in the status of CRs |
|
func (c *Controller) WatchCRStatus(id string, interval time.Duration) { |
|
ticker := time.NewTicker(interval) |
|
defer ticker.Stop() |
|
|
|
for range ticker.C { |
|
cr, err := c.ViewCR(id) |
|
if err != nil { |
|
fmt.Println("Error:", err) |
|
return |
|
} |
|
fmt.Printf("CR ID: %s, Status: %s, Message: %s\n", cr.ID, cr.Status.State, cr.Status.Message) |
|
} |
|
} |
|
|
|
// Main function to start the controller |
|
func main() { |
|
// Simulating event source (in-memory events) |
|
events := []CustomResource{ |
|
{ID: "1", Spec: CRSpec{RequestType: "create", TargetObject: "object1"}}, |
|
{ID: "2", Spec: CRSpec{RequestType: "update", TargetObject: "object2"}}, |
|
{ID: "3", Spec: CRSpec{RequestType: "delete", TargetObject: "object3"}}, |
|
} |
|
|
|
// Creating a fake event source |
|
eventSource := &FakeEventSource{events: events} |
|
|
|
// Instantiate the controller with a max concurrency of 3 |
|
controller := NewController(eventSource, 3) |
|
|
|
// Start the controller in a goroutine |
|
go controller.Start() |
|
|
|
// Simulate a delay to allow some operations to be processed |
|
time.Sleep(3 * time.Second) |
|
|
|
// Watch for the status of CR "1" |
|
controller.WatchCRStatus("1", 2*time.Second) |
|
} |