Created
December 30, 2019 18:11
-
-
Save AlgorithmsAreCool/492564e6baec44eea0c95f3e8b4d0941 to your computer and use it in GitHub Desktop.
Hastily written and incomplete dataflow clone using tasks and channels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace HomebrewDataflow | |
{ | |
public abstract class WorkflowStageBase<TIn> | |
{ | |
public WorkflowStageBase(BoundedChannelOptions channelOptions) | |
{ | |
var channel = Channel.CreateBounded<TIn>(channelOptions); | |
Inflow = channel.Reader; | |
Input = channel.Writer; | |
} | |
protected ChannelReader<TIn> Inflow { get; } | |
public ChannelWriter<TIn> Input { get; } | |
protected abstract Task WorkPump(); | |
public Task Run() => Task.Run(WorkPump); | |
} | |
public enum BroadcastPolicy | |
{ | |
WaitForEach, | |
SkipBusy | |
} | |
public class TransformStage<TIn, TOut> : WorkflowStageBase<TIn> | |
{ | |
public TransformStage(Func<CancellationToken, TIn, ValueTask<TOut>> workFunction, BroadcastPolicy policy, BoundedChannelOptions options, CancellationToken cancellationToken) | |
: base(options) | |
{ | |
WorkFunction = workFunction; | |
Policy = policy; | |
Subscribers = new ConcurrentBag<ChannelWriter<TOut>>(); | |
Token = cancellationToken; | |
} | |
private Func<CancellationToken, TIn, ValueTask<TOut>> WorkFunction { get; } | |
private BroadcastPolicy Policy { get; } | |
private ConcurrentBag<ChannelWriter<TOut>> Subscribers { get; } | |
private CancellationToken Token { get; } | |
public void Subscribe(WorkflowStageBase<TOut> stage) | |
{ | |
Subscribers.Add(stage.Input); | |
} | |
protected override async Task WorkPump() | |
{ | |
try | |
{ | |
await foreach (var item in Inflow.ReadAllAsync(Token)) | |
{ | |
var result = await WorkFunction(Token, item); | |
if(Policy == BroadcastPolicy.WaitForEach) | |
{ | |
foreach(var subscriber in Subscribers) | |
await subscriber.WriteAsync(result, Token); | |
} | |
else //Policy = Skip | |
{ | |
foreach (var subscriber in Subscribers) | |
subscriber.TryWrite(result); | |
} | |
} | |
} | |
catch(Exception ex) | |
{ | |
foreach (var subscriber in Subscribers) | |
subscriber.Complete(ex); | |
} | |
finally | |
{ | |
foreach (var subscriber in Subscribers) | |
subscriber.TryComplete(); | |
} | |
} | |
} | |
public class ActionStage<TIn> : WorkflowStageBase<TIn> | |
{ | |
public ActionStage(Func<CancellationToken, TIn, ValueTask> workFunction, BoundedChannelOptions options, CancellationToken token) | |
: base(options) | |
{ | |
WorkFunction = workFunction; | |
Token = token; | |
} | |
private Func<CancellationToken, TIn, ValueTask> WorkFunction { get; } | |
private CancellationToken Token { get; } | |
protected override async Task WorkPump() | |
{ | |
try | |
{ | |
await foreach (var item in Inflow.ReadAllAsync(Token)) | |
{ | |
await WorkFunction(Token, item); | |
} | |
} | |
catch | |
{ | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment