Last active
April 16, 2022 18:56
-
-
Save andy-williams/6b7c749bb9a58def0f4c to your computer and use it in GitHub Desktop.
Parallel Programming in C# : Patterns
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Problem: | |
// We have lots of data that we want to process that can be easily parallelised | |
// We want to process all our data and combine the results | |
// "Map is an idiom in parallel computing where a simple operation is applied to all elements of a | |
// sequence, potentially in parallel.[1] It is used to solve embarrassingly parallel problems: those | |
// problems that can be decomposed into independent subtasks, requiring no | |
// communication/synchronization between the subtasks except a join or barrier at the end." | |
// - https://en.wikipedia.org/wiki/Map_(parallel_pattern) | |
void Main() | |
{ | |
var sw = System.Diagnostics.Stopwatch.StartNew(); | |
var dataToProcess = Enumerable.Range(0, 1000000); | |
var result = new List<int>(); | |
Parallel.ForEach(dataToProcess, | |
() => new List<int>(), // for storing result locally | |
(toProcess, loopControl, localList) => // map | |
{ | |
localList.Add(toProcess + 10); | |
return localList; | |
}, | |
(localList) => // this is our finaliser, reducing our map result into our result list | |
{ | |
lock(result) | |
{ | |
result.AddRange(localList); | |
} | |
}); | |
sw.Stop(); | |
sw.Elapsed.Dump(); | |
result.Min().Dump(); | |
result.Count.Dump(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Problem: | |
// Want to execute a lot of operations in parallel. | |
// No ordering in execution is required. | |
// Each operation can take a few minutes (3-5+). | |
void Main() | |
{ | |
var N = 100; // Number of tasks | |
// SOLUTION 1 - Use Parallel library | |
Parallel.For(0, N, (i) => { | |
// parallel work here | |
}); | |
// SOLUTION 2 - You could improve from solution one by passing in MaxDegreeOfParallelism | |
var pOptions = new ParallelOptions() | |
{ | |
MaxDegreeOfParallelism = System.Environment.ProcessorCount | |
}; | |
Parallel.For(0, N, pOptions, (i) => { | |
// parallel work here | |
}); | |
// SOLUTION 3 - only spawn threads that will use each CPU | |
// Long solution | |
// Needs to be reviewed | |
var cores = System.Environment.ProcessorCount; | |
var tasks = new List<Task>(); | |
// add tasks | |
for(var i=0; i < cores; i++) | |
{ | |
var t = new Task(() => { | |
// parallel work here | |
}); | |
} | |
while(N > 0) | |
{ | |
var i = Task.WaitAny(tasks); | |
tasks.RemoveAt(i); | |
tasks.Add(new Task(() => { | |
// parallel work here | |
})); | |
N--; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Very useful for quickly assessing whether parallelising work can give benefits | |
// the results seem to be slower than writing custom parallel code using an appropriate pattern | |
void Main() | |
{ | |
var sw = System.Diagnostics.Stopwatch.StartNew(); | |
var dataToProcess = Enumerable.Range(0, 1000000); | |
var result = new List<int>(); | |
result = dataToProcess.AsParallel().Select(x => x + 10).ToList(); | |
sw.Stop(); | |
sw.Elapsed.Dump(); | |
result.Min().Dump(); | |
result.Count.Dump(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Problem: | |
// Want to execute a lot of tasks, but each task does not really do much in terms of CPU cycles | |
// The main issue with this pattern is contension - the consumers are most probably emptying the task quicker | |
// than the producer is adding tasks | |
void Main() | |
{ | |
// Producer/Consumer Pattern | |
// Producer-consumer pattern splits work into smaller chunks and feeds them into a thread-safe/concurrent queue | |
// The difference between this and spawning threads for each task is that with this pattern, threads | |
// are kept alive as long-running threads until all the work in the queue is done, so no expensive | |
// constant spawning and killing of threads | |
// [producers]->[queue]->[consumers] | |
const int MAX_CAPACITY = 10000; | |
var sw = System.Diagnostics.Stopwatch.StartNew(); | |
var dataToProcess = Enumerable.Range(0, 1000000); | |
var result = new List<int>(); | |
var workQueue = new BlockingCollection<int>(MAX_CAPACITY); | |
var numOfCores = System.Environment.ProcessorCount; | |
var @lock = new object(); | |
var longRunningTaskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); | |
// You're also allowed multiple producers if required | |
var producer = longRunningTaskFactory.StartNew(() => { | |
foreach(var toProcess in dataToProcess) | |
{ | |
workQueue.Add(toProcess); | |
} | |
// essential to let our consumers know that we're done so they can stop trying to consume more stuff to do | |
workQueue.CompleteAdding(); | |
}); | |
// we will store results as separate set of results | |
var consumerResults = new Task<List<int>>[numOfCores]; | |
// spawn consumer threads of the same amount as our cores | |
for(var i = 0; i < numOfCores; i++) | |
{ | |
consumerResults[i] = longRunningTaskFactory.StartNew(() => { | |
var localResult = new List<int>(); | |
try | |
{ | |
while(!workQueue.IsCompleted) | |
{ | |
// this is where we do our work | |
var toProcess = workQueue.Take(); | |
localResult.Add(toProcess + 10); | |
} | |
} | |
catch(InvalidOperationException ex) | |
{ | |
// workQueue is completed | |
} | |
catch(Exception ex) | |
{ | |
// something terrible happened | |
} | |
return localResult; | |
}); | |
} | |
// WARNING: this would produce incorrect result, because Task.WaitAny does not care | |
// when a task has finished potentially merging the same result set twice from the same consumer | |
// var merged = 0; | |
// while(merged < numOfCores) | |
// { | |
// var taskId = Task.WaitAny(consumerResults); | |
// var consumerResult = consumerResults[taskId].Result; | |
// result.AddRange(consumerResult); | |
// merged++; | |
// } | |
Task.WaitAll(consumerResults); | |
foreach(var consumerResult in consumerResults) | |
{ | |
result.AddRange(consumerResult.Result); | |
} | |
sw.Stop(); // all done | |
result.Min().Dump(); | |
result.Count.Dump(); | |
sw.Elapsed.Dump(); | |
} |
Author
andy-williams
commented
Apr 27, 2021
via email
No license on them 🙂 but you have my permission to do anything you wish to
do, without limitations.
…On Sun, 25 Apr 2021, 20:58 suprafun, ***@***.***> wrote:
***@***.**** commented on this gist.
------------------------------
Hello, under what license are these snippets released under ? I would like
to study them to learn from them. Thank you.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<https://gist.github.com/6b7c749bb9a58def0f4c#gistcomment-3719449>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAB27AXLLXMQVBHQMVLMN3TTKRX5RANCNFSM43RTTBAA>
.
Thank you so very much.
Love the MapReduce implementation, didnt notice that possiblity
Love the MapReduce implementation, didnt notice that possiblity
@arsu-leo Keep in mind that this is quite old, you can achieve this in an async
manner using Channels if you're using .NET Core. Reading material here https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/. The idea is that you can create multiple writers and readers for the same channel - achieving the same pattern, but with async support + easier to design a more flexible system as there's a better separation.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment