Last active
September 20, 2018 16:57
-
-
Save pshrosbree/6344173ed4f073158527a1ea7c4eb70f to your computer and use it in GitHub Desktop.
Fair request queueing with Akka.NET
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace FairQueueing | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Routing; | |
public class ApplicationActor : ReceiveActor, IWithUnboundedStash | |
{ | |
private const int MaxPending = 20; | |
public ApplicationActor() | |
{ | |
Output = ActorRefs.Nobody; | |
Become(Sending); | |
} | |
protected override void PreStart() | |
{ | |
Output = Context.ActorOf(Props.Create(() => new OutputActor()), "output"); | |
} | |
private void Sending() | |
{ | |
Receive<Message>(message => | |
{ | |
Output.Tell(message); | |
PendingMessages.Add(message); | |
if (PendingMessages.Count >= MaxPending) | |
Become(Waiting); | |
}); | |
Receive<OutputResponse>(response => PendingMessages.Remove(response.Message), response => response.Sent); | |
Receive<OutputResponse>(response => Output.Tell(response.Message), response => !response.Sent); | |
} | |
private void Waiting() | |
{ | |
Receive<Message>(message => Stash.Stash()); | |
Receive<OutputResponse>(response => | |
{ | |
PendingMessages.Remove(response.Message); | |
if (PendingMessages.Count < MaxPending) | |
{ | |
Stash.UnstashAll(); | |
Become(Sending); | |
} | |
}, response => response.Sent); | |
Receive<OutputResponse>(response => Output.Tell(response.Message), response => !response.Sent); | |
} | |
private IList<Message> PendingMessages { get; } = new List<Message>(); | |
private IActorRef Output { get; set; } | |
public IStash Stash { get; set; } | |
} | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
// make an actor system | |
var actorSystem = ActorSystem.Create("MyActorSystem"); | |
// make our first actors! | |
var displayActor = actorSystem.ActorOf(Props.Create(() => new DisplayActor()), "display"); | |
Console.WriteLine($"Display actor: {displayActor.Path}"); | |
var applicationActor = actorSystem.ActorOf(Props.Create(() => new ApplicationActor()).WithRouter(new ConsistentHashingPool(10)), "application"); | |
var endpointActor = actorSystem.ActorOf(Props.Create(() => new EndpointActor(applicationActor)), "endpoint"); | |
endpointActor.Tell(Command.Start); | |
actorSystem.WhenTerminated.Wait(); | |
} | |
} | |
public class OutputActor : ReceiveActor | |
{ | |
private Random Random { get; } = new Random((int)(DateTime.Now.Ticks % int.MaxValue)); | |
private ActorSelection DisplayActor { get; } | |
public OutputActor() | |
{ | |
DisplayActor = Context.ActorSelection("/user/display"); | |
Receive<OutputResponse>(response => | |
{ | |
DisplayActor.Tell(response.Message); | |
Sender.Tell(response); | |
}); | |
Receive<Message>(message => SendAsync(message).PipeTo(Self, Sender), _ => !_.Terminate); | |
Receive<Message>( | |
message => Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(100), Self, PoisonPill.Instance, | |
Sender), _ => _.Terminate); | |
} | |
/// <summary> | |
/// Simulation of an asynchronous send | |
/// </summary> | |
private async Task<OutputResponse> SendAsync(Message message) | |
{ | |
var delay = TimeSpan.FromMilliseconds(Random.Next(0, 5)); | |
await Task.Delay(delay).ConfigureAwait(false); | |
return new OutputResponse(message, true); | |
} | |
} | |
public enum Command | |
{ | |
Start, | |
Stop, | |
Terminate | |
} | |
public readonly struct OutputResponse | |
{ | |
public OutputResponse(Message message, bool sent) | |
{ | |
Message = message; | |
Sent = sent; | |
} | |
public Message Message { get; } | |
public bool Sent { get; } | |
} | |
public readonly struct Message : IConsistentHashable, IEquatable<Message>, IEqualityComparer<Message> | |
{ | |
public Message(int appId, bool terminate) | |
{ | |
AppId = appId; | |
Terminate = terminate; | |
} | |
public int AppId { get; } | |
public bool Terminate { get; } | |
public override bool Equals(object obj) => !ReferenceEquals(null, obj) && obj is Message other && Equals(other); | |
public override int GetHashCode() => AppId.GetHashCode(); | |
public object ConsistentHashKey => AppId; | |
public bool Equals(Message other) => AppId.Equals(other.AppId); | |
public bool Equals(Message x, Message y) => x.AppId == y.AppId; | |
public int GetHashCode(Message obj) => AppId.GetHashCode(); | |
} | |
/// <summary> | |
/// This stub generates a nonuniform and unbalanced message load for applications | |
/// </summary> | |
public class EndpointActor : ReceiveActor | |
{ | |
private Random Random { get; } = new Random((int)(DateTime.Now.Ticks % int.MaxValue)); | |
private int[] Counts { get; } = { 10, 10, 100 }; | |
public EndpointActor(IActorRef applicationRouter) | |
{ | |
ApplicationRouter = applicationRouter; | |
Receive<Command>(_ => | |
{ | |
for (var i = 0; i < 5; i++) | |
{ | |
var messages = new List<Message>(); | |
for (var j = 0; j < Counts.Length; j++) | |
for (var k = 0; k < Counts[j]; k++) | |
messages.Add(new Message(j + 1, false)); | |
foreach (var message in messages.OrderBy(__ => Random.NextDouble())) | |
ApplicationRouter.Tell(message); | |
} | |
Console.WriteLine("Scheduling stop"); | |
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(2), Self, Command.Stop, Self); | |
}, | |
_ => _ == Command.Start); | |
Receive<Command>(_ => | |
{ | |
for (var i = 0; i < Counts.Length; i++) | |
ApplicationRouter.Tell(new Message(i + 1, true)); | |
Console.WriteLine("Scheduling termination"); | |
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(3), Self, Command.Terminate, Self); | |
}, | |
_ => _ == Command.Stop); | |
Receive<Command>(_ => | |
{ | |
Console.WriteLine("Terminating"); | |
Context.System.Terminate(); | |
}, | |
_ => _ == Command.Terminate); | |
} | |
private IActorRef ApplicationRouter { get; } | |
} | |
/// <summary> | |
/// Displays the messages but is not involved in the algorithm | |
/// </summary> | |
public class DisplayActor : ReceiveActor | |
{ | |
private int AppId { get; set; } | |
private int AppCount { get; set; } | |
private int Total { get; set; } | |
public DisplayActor() | |
{ | |
// Same appId case | |
Receive<Message>(message => | |
{ | |
AppId = message.AppId; | |
AppCount = AppCount + 1; | |
Total = Total + 1; | |
}, | |
_ => AppId == 0 || _.AppId == AppId); | |
// New appId case | |
Receive<Message>(message => | |
{ | |
Console.WriteLine($"[{Total:D4}]: AppId {AppId} = {AppCount}"); | |
AppId = message.AppId; | |
AppCount = 1; | |
Total = Total + 1; | |
}); | |
// If no messages for a while, emit the current count and reset app id and count | |
Receive<ReceiveTimeout>(_ => | |
{ | |
Console.WriteLine($"[{Total:D4}]: AppId {AppId} = {AppCount}"); | |
AppId = 0; | |
AppCount = 0; | |
}, _ => AppId != 0); | |
} | |
protected override void PreStart() => Context.SetReceiveTimeout(TimeSpan.FromMilliseconds(75)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>netcoreapp2.1</TargetFramework> | |
<LangVersion>latest</LangVersion> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="Akka" Version="1.3.9" /> | |
</ItemGroup> | |
</Project> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment