Skip to content

Instantly share code, notes, and snippets.

@dharmit
Created March 19, 2023 08:44
Show Gist options
  • Save dharmit/86002f8820efdeb770f25ab77595bb72 to your computer and use it in GitHub Desktop.
Save dharmit/86002f8820efdeb770f25ab77595bb72 to your computer and use it in GitHub Desktop.
At Operator
// AtReconciler reconciles a At object
type AtReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=at.example.com,resources=ats,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=at.example.com,resources=ats/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=at.example.com,resources=ats/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *AtReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("==== Reconciling at ====")
instance := &atv1alpha1.At{}
err := r.Get(ctx, req.NamespacedName, instance)
if err != nil {
if apierrors.IsNotFound(err) {
// resource wasn't found, maybe it was deleted
log.Info("at resource not found")
return ctrl.Result{}, nil
}
}
if instance.Status.Phase == "" {
instance.Status.Phase = atv1alpha1.PhasePending
}
switch instance.Status.Phase {
case atv1alpha1.PhasePending:
log.Info("Phase: PENDING")
diff, err := timeUntilSchedule(instance.Spec.Schedule)
if err != nil {
log.Error(err, "schedule parsing failed")
return ctrl.Result{}, err
}
log.Info("Schedule parsing done", "Result", fmt.Sprintf("%v", diff))
if diff > 0 {
// wait until scheduled time
return ctrl.Result{RequeueAfter: diff}, nil
}
log.Info("Time to execute", "Ready to execute", instance.Spec.Command)
instance.Status.Phase = atv1alpha1.PhaseRunning
case atv1alpha1.PhaseRunning:
log.Info("Phase: RUNNING")
pod := newPodForCR(instance)
err := ctrl.SetControllerReference(instance, pod, r.Scheme)
if err != nil {
return ctrl.Result{}, err
}
query := &corev1.Pod{}
// check if Pod already exists
err = r.Get(ctx, req.NamespacedName, query)
if err != nil && apierrors.IsNotFound(err) {
// does not exist, create a Pod
err = r.Create(ctx, pod)
if err != nil {
return ctrl.Result{}, err
}
log.Info("Pod created successfully", "name", pod.Name)
return ctrl.Result{}, nil
} else if err != nil {
// requeue with err
log.Error(err, "could not create pod")
return ctrl.Result{}, err
} else if query.Status.Phase == corev1.PodFailed ||
query.Status.Phase == corev1.PodSucceeded {
// pod finished or error'd out
log.Info("Container terminated", "reason", query.Status.Reason,
"message", query.Status.Message)
instance.Status.Phase = atv1alpha1.PhaseDone
} else {
// don't requeue, it will happen automatically when
// pod status changes
return ctrl.Result{}, nil
}
case atv1alpha1.PhaseDone:
log.Info("Phase: DONE")
// reconcile without requeueing
return ctrl.Result{}, err
default:
log.Info("NOP")
return ctrl.Result{}, err
}
// update status
err = r.Status().Update(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func timeUntilSchedule(schedule string) (time.Duration, error) {
now := time.Now().UTC()
layout := "2006-01-02T15:04:05Z"
scheduledTime, err := time.Parse(layout, schedule)
if err != nil {
return time.Duration(0), err
}
return scheduledTime.Sub(now), nil
}
func newPodForCR(cr *atv1alpha1.At) *corev1.Pod {
labels := map[string]string{"app": cr.Name}
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: cr.Name,
Namespace: cr.Namespace,
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "busybox",
Image: "quay.io/quay/busybox",
Command: strings.Split(cr.Spec.Command, " "),
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
},
}
}
// SetupWithManager sets up the controller with the Manager.
func (r *AtReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&atv1alpha1.At{}).
Owns(&corev1.Pod{}).
Complete(r)
}
const (
PhasePending = "PENDING"
PhaseRunning = "RUNNING"
PhaseDone = "DONE"
)
// AtSpec defines the desired state of At
type AtSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Schedule string `json:"schedule,omitempty"`
Command string `json:"command,omitempty"`
}
// AtStatus defines the observed state of At
type AtStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Phase string `json:"phase,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
// At is the Schema for the ats API
type At struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec AtSpec `json:"spec,omitempty"`
Status AtStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// AtList contains a list of At
type AtList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []At `json:"items"`
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment