Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Last active May 4, 2025 09:39
Show Gist options
  • Save OnurGumus/47c79943ed17db11ec01876a9a4fb936 to your computer and use it in GitHub Desktop.
Save OnurGumus/47c79943ed17db11ec01876a9a4fb936 to your computer and use it in GitHub Desktop.
Akka ObservingScheduler
module ObservingScheduler
open System
open System.Threading
open Akka.Actor
open Akka.Configuration
open Akka.Event
open Akka.TestKit
/// Proxy that carries a name through cancellation - uses a function to handle cancellation
type NamedCancelable(cancelFunc: unit -> unit, name: string option, delay: TimeSpan, cancelEvent: Event<string option * TimeSpan * (unit -> unit)>) =
let mutable isCancelled = false
let cancellationTokenSource = new CancellationTokenSource()
interface ICancelable with
member _.IsCancellationRequested = isCancelled
// Basic Cancel method
member _.Cancel() =
if not isCancelled then
isCancelled <- true
cancellationTokenSource.Cancel()
cancelFunc()
cancelEvent.Trigger(name, delay, fun () -> ())
// Implement the additional required members
member _.Token = cancellationTokenSource.Token
member this.CancelAfter(delay: TimeSpan) =
cancellationTokenSource.CancelAfter(delay)
member this.CancelAfter(millisecondsDelay: int) =
cancellationTokenSource.CancelAfter(millisecondsDelay)
member this.Cancel(throwOnFirstException: bool) =
(this :> ICancelable).Cancel() // Just call the simple Cancel method
/// Scheduler wrapper that exposes enqueue/cancel events with optional names
type ObservingScheduler(config: Config, log: ILoggingAdapter) =
// Underlying TestScheduler
let inner = new TestScheduler(config, log)
// Events carrying (name option, delay, callback)
let enqueuedEvent = Event<string option * TimeSpan * (unit -> unit)>()
let cancelledEvent = Event<string option * TimeSpan * (unit -> unit)>()
/// Fired whenever a ScheduledItem is enqueued
[<CLIEvent>]
member _.OnEnqueued : IEvent<string option * TimeSpan * (unit -> unit)> = enqueuedEvent.Publish
/// Fired whenever a ScheduledItem is cancelled
[<CLIEvent>]
member _.OnCancelled : IEvent<string option * TimeSpan * (unit -> unit)> = cancelledEvent.Publish
/// Schedule a one-time action with optional name
member this.ScheduleOnce(name: string option, delay: TimeSpan, action: unit -> unit) : ICancelable =
// Create a cancelable that we can pass to the scheduler
let cancelable = new System.Threading.CancellationTokenSource()
// Wrap action to check cancellation token
let wrappedAction = fun () ->
if not cancelable.IsCancellationRequested then action()
// Schedule with the inner scheduler
(inner :> IActionScheduler).ScheduleOnce(delay, Action(wrappedAction))
enqueuedEvent.Trigger(name, delay, action)
// Create a NamedCancelable that cancels our token source
new NamedCancelable((fun () -> cancelable.Cancel()), name, delay, cancelledEvent) :> ICancelable
/// Fallback ScheduleOnce without name
member this.ScheduleOnce(delay: TimeSpan, action: unit -> unit) : ICancelable =
this.ScheduleOnce(None, delay, action)
/// Schedule a one-time Tell with optional name
member this.ScheduleTellOnce(name: string option, delay: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef) : ICancelable =
let action = fun () -> receiver.Tell(message, sender)
// Create a cancelable
let cancelable = new System.Threading.CancellationTokenSource()
// Schedule with the inner scheduler
(inner :> ITellScheduler).ScheduleTellOnce(delay, receiver, message, sender)
enqueuedEvent.Trigger(name, delay, action)
// Create a NamedCancelable that cancels our token source
new NamedCancelable((fun () -> cancelable.Cancel()), name, delay, cancelledEvent) :> ICancelable
/// Fallback ScheduleTellOnce without name
member this.ScheduleTellOnce(delay: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef) : ICancelable =
this.ScheduleTellOnce(None, delay, receiver, message, sender)
/// Schedule a repeated action with optional name
member this.ScheduleRepeatedly(name: string option, initialDelay: TimeSpan, interval: TimeSpan, action: unit -> unit) : ICancelable =
// Create a cancelable
let cancelable = new System.Threading.CancellationTokenSource()
// Wrap action to check cancellation token
let wrappedAction = fun () ->
if not cancelable.IsCancellationRequested then action()
// Schedule with the inner scheduler
(inner :> IActionScheduler).ScheduleRepeatedly(initialDelay, interval, Action(wrappedAction))
enqueuedEvent.Trigger(name, initialDelay, action)
// Create a NamedCancelable that cancels our token source
new NamedCancelable((fun () -> cancelable.Cancel()), name, initialDelay, cancelledEvent) :> ICancelable
/// Fallback ScheduleRepeatedly without name
member this.ScheduleRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, action: unit -> unit) : ICancelable =
this.ScheduleRepeatedly(None, initialDelay, interval, action)
/// Schedule repeated Tell with optional name
member this.ScheduleTellRepeatedly(name: string option, initialDelay: TimeSpan, interval: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef) : ICancelable =
let action = fun () -> receiver.Tell(message, sender)
// Create a cancelable
let cancelable = new System.Threading.CancellationTokenSource()
// Schedule with the inner scheduler
(inner :> ITellScheduler).ScheduleTellRepeatedly(initialDelay, interval, receiver, message, sender)
enqueuedEvent.Trigger(name, initialDelay, action)
// Create a NamedCancelable that cancels our token source
new NamedCancelable((fun () -> cancelable.Cancel()), name, initialDelay, cancelledEvent) :> ICancelable
/// Fallback ScheduleTellRepeatedly without name
member this.ScheduleTellRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef) : ICancelable =
this.ScheduleTellRepeatedly(None, initialDelay, interval, receiver, message, sender)
/// Cancel via wrapper with name
member this.Cancel(cancelable: ICancelable, name: string option) =
cancelable.Cancel() // Call Cancel directly on the passed ICancelable
cancelledEvent.Trigger(name, TimeSpan.Zero, fun () -> ())
/// Drive the virtual clock forward
member _.Advance(d: TimeSpan) = inner.Advance(d)
member _.AdvanceTo(deadline: DateTimeOffset) = inner.AdvanceTo(deadline)
// Essential interface implementations to replace TestScheduler
// ITimeProvider - provides clock access
interface ITimeProvider with
member _.Now = inner.Now
member _.MonotonicClock = inner.MonotonicClock
member _.HighResMonotonicClock = inner.HighResMonotonicClock
// IScheduler implementation
interface IScheduler with
// Property that returns the advanced scheduler interface
member this.Advanced = this :> IAdvancedScheduler
// IRunnableScheduler implementation (since we now know it's needed)
interface IRunnableScheduler with
// Standard overloads
member _.ScheduleOnce(delay: TimeSpan, action: Akka.Dispatch.IRunnable) =
inner.ScheduleOnce(delay, action)
member _.ScheduleOnce(delay: TimeSpan, action: Akka.Dispatch.IRunnable, cancelable: ICancelable) =
inner.ScheduleOnce(delay, action, cancelable)
member _.ScheduleRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, action: Akka.Dispatch.IRunnable) =
inner.ScheduleRepeatedly(initialDelay, interval, action)
member _.ScheduleRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, action: Akka.Dispatch.IRunnable, cancelable: ICancelable) =
inner.ScheduleRepeatedly(initialDelay, interval, action, cancelable)
// IActionScheduler - enables scheduling lambdas/actions
interface IActionScheduler with
// Standard overloads
member _.ScheduleOnce(delay: TimeSpan, action: Action) =
inner.ScheduleOnce(delay, action)
member _.ScheduleOnce(delay: TimeSpan, action: Action, cancelable: ICancelable) =
inner.ScheduleOnce(delay, action, cancelable)
member _.ScheduleRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, action: Action) =
inner.ScheduleRepeatedly(initialDelay, interval, action)
member _.ScheduleRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, action: Action, cancelable: ICancelable) =
inner.ScheduleRepeatedly(initialDelay, interval, action, cancelable)
// ITellScheduler - enables sending messages to actors on a schedule
interface ITellScheduler with
// Standard overloads
member _.ScheduleTellOnce(delay: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef) =
inner.ScheduleTellOnce(delay, receiver, message, sender)
member _.ScheduleTellOnce(delay: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef, cancelable: ICancelable) =
inner.ScheduleTellOnce(delay, receiver, message, sender, cancelable)
member _.ScheduleTellRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef) =
inner.ScheduleTellRepeatedly(initialDelay, interval, receiver, message, sender)
member _.ScheduleTellRepeatedly(initialDelay: TimeSpan, interval: TimeSpan, receiver: ICanTell, message: obj, sender: IActorRef, cancelable: ICancelable) =
inner.ScheduleTellRepeatedly(initialDelay, interval, receiver, message, sender, cancelable)
// IAdvancedScheduler implementation
interface IAdvancedScheduler
public class ObservingScheduler : IScheduler, /*…*/
{
private readonly TestScheduler _inner;
// Events now include an optional name
public event Action<string?, TimeSpan, Action> OnEnqueued;
public event Action<string?, TimeSpan, Action> OnCancelled;
public ObservingScheduler(Config config, ILoggingAdapter log)
{
_inner = new TestScheduler(config, log);
}
// Standard clock members...
public DateTimeOffset Now => _inner.Now;
public void Advance(TimeSpan d) => _inner.Advance(d);
// etc.
// New overload with name
public ICancelable ScheduleTellOnce(
string name,
TimeSpan delay,
ICanTell receiver,
object message,
IActorRef sender)
{
var cancelable = _inner.ScheduleTellOnce(delay, receiver, message, sender);
// raise with name
OnEnqueued?.Invoke(name, delay, () => receiver.Tell(message, sender));
// wrap the cancelable so we can signal name on Cancel()
return new NamedCancelable(cancelable, name, delay, OnCancelled);
}
// Fallback to existing API (no name)
public ICancelable ScheduleTellOnce(
TimeSpan delay,
ICanTell receiver,
object message,
IActorRef sender)
=> ScheduleTellOnce(null, delay, receiver, message, sender);
// If someone calls Cancel on the scheduler directly
public void Cancel(ICancelable cancelable, string? name = null)
{
_inner.Advanced.Cancel(cancelable);
OnCancelled?.Invoke(name, TimeSpan.Zero, () => { });
}
// …and similar for ScheduleOnce, ScheduleRepeatedly, etc…
}
// Proxy that carries the name through user‐called Cancel()
public class NamedCancelable : ICancelable
{
private readonly ICancelable _inner;
private readonly string? _name;
private readonly TimeSpan _delay;
private readonly Action<string?, TimeSpan, Action>? _cancelEvent;
public NamedCancelable(
ICancelable inner,
string? name,
TimeSpan delay,
Action<string?, TimeSpan, Action>? cancelEvent)
{
_inner = inner;
_name = name;
_delay = delay;
_cancelEvent = cancelEvent;
}
public bool IsCancellationRequested => _inner.IsCancellationRequested;
public void Cancel()
{
_inner.Cancel();
_cancelEvent?.Invoke(_name, _delay, () => { });
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment