Created
March 9, 2023 10:58
-
-
Save markheath/220119511fc392b45078230018e57cc8 to your computer and use it in GitHub Desktop.
LINQPad demo of the producer consumer pattern with TPL dataflow including back-pressure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Query Kind="Statements"> | |
<Namespace>System.Collections.Concurrent</Namespace> | |
<Namespace>System.Threading.Tasks</Namespace> | |
<Namespace>System.Threading.Tasks.Dataflow</Namespace> | |
</Query> | |
var produceSpeed = TimeSpan.FromSeconds(0.5); | |
var produceCount = 20; | |
var consumeSpeed = TimeSpan.FromSeconds(2); | |
var maxParallelConsume = 4; | |
async Task ProduceAsync(ITargetBlock<string> target) | |
{ | |
for (int i = 0; i < produceCount; i++) | |
{ | |
var item = $"Item {i + 1}"; | |
Console.WriteLine($"Producing {item}"); | |
await Task.Delay(produceSpeed); | |
Console.WriteLine($"Produced {item}"); | |
await target.SendAsync(item); // this will block if we have too many | |
} | |
target.Complete(); | |
} | |
async Task ConsumeOneAsync(string message) | |
{ | |
Console.WriteLine($"Consuming {message}"); | |
await Task.Delay(consumeSpeed); | |
Console.WriteLine($"Consumed {message}"); | |
} | |
async Task<int> ConsumeAllAsync(ISourceBlock<string> source) | |
{ | |
int itemsProcessed = 0; | |
while (await source.OutputAvailableAsync()) | |
{ | |
var data = await source.ReceiveAsync(); | |
await ConsumeOneAsync(data); | |
itemsProcessed++; | |
} | |
return itemsProcessed; | |
} | |
var backPressure = true; | |
if (backPressure) | |
{ | |
// producer won't let us have more than 4 things in the queue at once | |
var buffer = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = maxParallelConsume }); | |
var consumerBlock = new ActionBlock<string>( | |
message => ConsumeOneAsync(message), | |
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); | |
buffer.LinkTo(consumerBlock, new DataflowLinkOptions() { PropagateCompletion = true }); | |
var producerTask = ProduceAsync(buffer); | |
await consumerBlock.Completion; | |
} | |
else | |
{ | |
// SIMPLE IMPLEMENTATION: | |
var buffer = new BufferBlock<string>(); | |
var consumerTask = ConsumeAllAsync(buffer); | |
var producerTask = ProduceAsync(buffer); | |
await Task.WhenAll(consumerTask, producerTask); | |
var itemsProcessed = consumerTask.Result; | |
itemsProcessed.Dump(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment