Last active
November 24, 2020 05:45
-
-
Save Horusiath/6051e87fb9f3e81dc2dfed93519fb753 to your computer and use it in GitHub Desktop.
An interfaced generic-aware Akka.NET actor implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Configuration; | |
namespace AkkaDemos | |
{ | |
public interface IGreeter | |
{ | |
Task<string> Greet(IGreeter who); | |
Task<string> Greetings(IGreeter input); | |
Task<int> Compare<T>(T a, T b) where T: IComparable<T>; | |
} | |
public class Greeter : TypedActor<IGreeter>, IGreeter | |
{ | |
public async Task<string> Greetings(IGreeter input) | |
{ | |
//throw new Exception("BOOM!"); | |
return $"Hello, {input}!"; | |
} | |
public async Task<string> Greet(IGreeter who) | |
{ | |
var reply = await who.Greetings(Self.Typed<IGreeter>()); | |
return $"{who} replied with: {reply}"; | |
} | |
public async Task<int> Compare<T>(T a, T b) where T : IComparable<T> | |
{ | |
return a.CompareTo(b); | |
} | |
} | |
class Program | |
{ | |
static Config Config(int port) => ConfigurationFactory.ParseString($@" | |
akka.actor.provider = remote | |
akka.remote.dot-netty.tcp {{ | |
hostname = 127.0.0.1 | |
port = {port} | |
}}"); | |
static async Task Main(string[] args) | |
{ | |
using (var sys1 = ActorSystem.Create("system1", Config(4001))) | |
using (var sys2 = ActorSystem.Create("system2", Config(4002))) | |
{ | |
var greeter = sys1.ActorOf(Props.Create<Greeter>(), "greeter").Typed<IGreeter>(); | |
var initiator = sys2.ActorOf(Props.Create<Greeter>(), "initiator").Typed<IGreeter>(); | |
var reply = await initiator.Greet(greeter); | |
Console.WriteLine(reply); | |
//Console.WriteLine($"1 compared to 2 => {await greeter.Compare(1, 2)}"); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Immutable; | |
using System.Reflection; | |
using System.Runtime.ExceptionServices; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Akka.Actor; | |
using Akka.Dispatch; | |
using Akka.Util; | |
namespace AkkaDemos | |
{ | |
public abstract class TypedActor<TActor> : ActorBase | |
{ | |
private static readonly ImmutableDictionary<string, MethodInfo> methods; | |
private static readonly Type TaskType = typeof(Task); | |
static TypedActor() | |
{ | |
var builder = ImmutableDictionary.CreateBuilder<string, MethodInfo>(); | |
foreach (var method in typeof(TActor).GetMethods(BindingFlags.Instance | BindingFlags.Public)) | |
{ | |
builder[method.Name] = method; | |
} | |
methods = builder.ToImmutable(); | |
} | |
protected sealed override bool Receive(object message) | |
{ | |
if (message is CallMethod call && methods.TryGetValue(call.MethodName, out var method)) | |
{ | |
if (TaskType.IsAssignableFrom(method.ReturnType)) | |
{ | |
ActorTaskScheduler.RunTask(() => CallMethodAsync(method, call.Args, call.TypeArgs, Sender)); | |
} | |
else | |
{ | |
method.Invoke(this, call.Args); | |
} | |
return true; | |
} | |
else return false; | |
} | |
private async Task CallMethodAsync(MethodInfo method, object[] args, Type[] typeArgs, IActorRef sender) | |
{ | |
try | |
{ | |
if (method.IsGenericMethod) | |
{ | |
method = method.MakeGenericMethod(typeArgs); | |
} | |
var task = method.Invoke(this, args) as Task; | |
await task; | |
if (method.ReturnType.IsGenericType) | |
{ | |
ReturnResultDynamic(task, sender); | |
} | |
else | |
{ | |
sender.Tell(new Status.Success(null)); | |
} | |
} | |
catch (Exception e) | |
{ | |
sender.Tell(new Status.Failure(e)); | |
throw; | |
} | |
} | |
private void ReturnResultDynamic(dynamic task, IActorRef sender) | |
{ | |
sender.Tell(new Status.Success(task.Result)); | |
} | |
} | |
internal sealed class CallMethod | |
{ | |
public string MethodName { get; } | |
public object[] Args { get; } | |
public Type[] TypeArgs { get; } | |
public CallMethod(string methodName, object[] args, Type[] typeArgs = null) | |
{ | |
MethodName = methodName; | |
Args = args; | |
TypeArgs = typeArgs; | |
} | |
} | |
public class TypedActorProxy : DispatchProxy, ISurrogated | |
{ | |
public sealed class Surrogate : ISurrogate | |
{ | |
public IActorRef Self { get; } | |
public Type ActorType { get; } | |
public Surrogate(IActorRef self, Type actorType) | |
{ | |
Self = self; | |
ActorType = actorType; | |
} | |
public ISurrogated FromSurrogate(ActorSystem system) | |
{ | |
return (ISurrogated)Self.Typed(ActorType); | |
} | |
} | |
private static readonly Type VoidType = typeof(void); | |
private static readonly Type TaskType = typeof(Task); | |
private static readonly ConcurrentDictionary<Type, MethodInfo> Mappers = new ConcurrentDictionary<Type, MethodInfo>(); | |
private static readonly MethodInfo GenericMapper; | |
static TypedActorProxy() | |
{ | |
GenericMapper = typeof(TypedActorProxy).GetMethod(nameof(MapResult), BindingFlags.Static | BindingFlags.NonPublic); | |
} | |
public IActorRef Self { get; private set; } | |
protected override object Invoke(MethodInfo method, object[] args) | |
{ | |
if (method.ReturnType == VoidType) | |
{ | |
Self.Tell(new CallMethod(method.Name, args, method.IsGenericMethod ? method.GetGenericArguments() : null)); | |
return null; | |
} | |
else if (TaskType.IsAssignableFrom(method.ReturnType)) | |
{ | |
var cancellationToken = default(CancellationToken); | |
if (!(args is null)) | |
{ | |
for (int i = args.Length - 1; i >= 0; i--) | |
{ | |
if (args[i] is CancellationToken t) | |
{ | |
cancellationToken = t; | |
//TODO: replace CancellationToken with a surrogate version, that can be serialized | |
break; | |
} | |
} | |
} | |
var task = Self.Ask<Status>(new CallMethod(method.Name, args, method.IsGenericMethod ? method.GetGenericArguments() : null), cancellationToken); | |
if (method.ReturnType == TaskType) | |
{ | |
return HandleResult(task); | |
} | |
else | |
{ | |
var returnType = method.ReturnType.GetGenericArguments()[0]; | |
var mapper = Mappers.GetOrAdd(returnType, (retType) => GenericMapper.MakeGenericMethod(retType)); | |
return mapper.Invoke(null, new object[]{ task }); | |
} | |
} | |
else | |
{ | |
throw new InvalidOperationException( | |
$"TypedActor proxy can be used only for methods, which return either void or Task."); | |
} | |
} | |
public override string ToString() => Self.ToString(); | |
public ISurrogate ToSurrogate(ActorSystem system) | |
{ | |
var actorType = this.GetType().GetInterfaces()[0]; | |
return new Surrogate(Self, actorType); | |
} | |
private static Task HandleResult(Task<Status> task) | |
{ | |
return task.ContinueWith(t => | |
{ | |
if (t.IsCompletedSuccessfully) | |
{ | |
switch (t.Result) | |
{ | |
case Status.Success _: break; | |
case Status.Failure f: | |
ExceptionDispatchInfo.Capture(f.Cause).Throw(); | |
break; | |
} | |
} | |
else | |
{ | |
ExceptionDispatchInfo.Capture(task.Exception).Throw(); | |
} | |
}); | |
} | |
private static Task<T> MapResult<T>(Task<Status> task) | |
{ | |
return task.ContinueWith(t => | |
{ | |
if (t.IsCompletedSuccessfully) | |
{ | |
switch (t.Result) | |
{ | |
case Status.Success s: return (T)s.Status; | |
case Status.Failure f: | |
ExceptionDispatchInfo.Capture(f.Cause).Throw(); | |
return default; | |
default: | |
throw new NotSupportedException($"Return value '{t.Result}' is not supported."); | |
} | |
} | |
else | |
{ | |
ExceptionDispatchInfo.Capture(task.Exception).Throw(); | |
return default; | |
} | |
}); | |
} | |
} | |
public static class TypedActors | |
{ | |
private static readonly PropertyInfo SelfProperty; | |
private static readonly ConcurrentDictionary<Type, Func<object>> DispatchProxies = new ConcurrentDictionary<Type, Func<object>>(); | |
static TypedActors() | |
{ | |
SelfProperty = typeof(TypedActorProxy).GetProperty("Self"); | |
} | |
public static TActor TypedActorOf<TActor>(this IActorRefFactory factory, Props props, string name) | |
{ | |
var actorRef = factory.ActorOf(props, name); | |
return actorRef.Typed<TActor>(); | |
} | |
public static TActor TypedActorOf<TActor>(this IActorRefFactory factory, Props props) | |
{ | |
var actorRef = factory.ActorOf(props); | |
return actorRef.Typed<TActor>(); | |
} | |
public static TActor Typed<TActor>(this IActorRef actorRef) | |
{ | |
var proxy = DispatchProxy.Create<TActor, TypedActorProxy>(); | |
SelfProperty.SetValue(proxy, actorRef); | |
return proxy; | |
} | |
private static readonly MethodInfo ProxyFactory = | |
typeof(DispatchProxy).GetMethod("Create", BindingFlags.Public | BindingFlags.Static); | |
private static Func<Type, Func<object>> DispatchProxyFactory = type => | |
{ | |
var factory = ProxyFactory.MakeGenericMethod(type, typeof(TypedActorProxy)); | |
return () => factory.Invoke(null, null); | |
}; | |
internal static object Typed(this IActorRef actorRef, Type actorType) | |
{ | |
var factory = DispatchProxies.GetOrAdd(actorType, DispatchProxyFactory); | |
var proxy = factory(); | |
SelfProperty.SetValue(proxy, actorRef); | |
return proxy; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment