Skip to content

Instantly share code, notes, and snippets.

@valm
Created November 14, 2013 14:04

Revisions

  1. valm renamed this gist Nov 14, 2013. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. valm created this gist Nov 14, 2013.
    263 changes: 263 additions & 0 deletions TPLDemo1
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,263 @@
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Globalization;
    using System.IO;
    using System.Linq;
    using System.Net.Http;
    using System.Reactive.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    using System.Xml.Linq;

    namespace TPLDataFlowTest
    {
    class Program
    {
    static void Main(string[] args)
    {
    //TPLDemo1();
    RxMix5();
    //RxMix4();

    Console.ReadKey();
    }

    private static void RxMix5()
    {
    var blockOptions = new ExecutionDataflowBlockOptions
    {
    MaxDegreeOfParallelism = 1,
    BoundedCapacity = 1
    };

    ActionBlock<int> warmupBlock = new ActionBlock<int>(async i =>
    {
    await Task.Delay(1000);
    Console.WriteLine(i);
    }, blockOptions);

    ActionBlock<int> postBlock = new ActionBlock<int>(async i =>
    {
    await Task.Delay(1000);
    Console.WriteLine(i);
    }, blockOptions);

    IObservable<int> warmUpSource = Observable.Range(1, 100).TakeUntil(DateTimeOffset.UtcNow.AddSeconds(5));
    warmUpSource.Subscribe(warmupBlock.AsObserver());

    IObservable<int> testSource = Observable.Range(1000, 1000).TakeUntil(DateTimeOffset.UtcNow.AddSeconds(10));
    testSource.Subscribe(postBlock.AsObserver());



    }

    private static void RxMix1()
    {
    IPropagatorBlock<int, string> source = new TransformBlock<int, string>(i => i.ToString());
    IObservable<int> observable = source.AsObservable().Select(Int32.Parse);
    IDisposable subscription = observable.Subscribe(i => Console.WriteLine(i));
    // send some data into TDF
    source.Post(138);
    }

    private static void RxMix2()
    {
    IPropagatorBlock<string, int> target = new TransformBlock<string, int>(s => Int32.Parse(s));
    IDisposable link = target.LinkTo(new ActionBlock<int>(i => Console.WriteLine(i)));
    IObserver<string> observer = target.AsObserver();

    IObservable<string> observable = Observable.Range(1, 10).Select(i => i.ToString());

    observable.Subscribe(observer);
    }

    private static void RxMix4()
    {
    var inputBlock = new BufferBlock<string>();
    var transformInputBlock = new TransformBlock<string, XDocument>(s => XDocument.Parse(s));
    var processBlock = new TransformBlock<XDocument, Tuple<string, int>>(
    x =>
    {
    var person = x.Element("person");
    return Tuple.Create((string)person.Element("name"), (int)person.Element("age"));
    });
    var transformOutputBlock =
    new TransformBlock<Tuple<string, int>, string>(
    t => string.Format(CultureInfo.CurrentCulture, "{0} is {1} years old", t.Item1, t.Item2));
    var outputBlock = new ActionBlock<string>(m => Console.Out.WriteLine(m));

    using (inputBlock.LinkTo(transformInputBlock))
    using (transformInputBlock.LinkTo(processBlock))
    using (processBlock.LinkTo(transformOutputBlock))
    using (transformOutputBlock.LinkTo(outputBlock))
    {
    inputBlock.Completion.ContinueWith(t => transformInputBlock.Complete());
    transformInputBlock.Completion.ContinueWith(t => processBlock.Complete());
    processBlock.Completion.ContinueWith(t => transformOutputBlock.Complete());
    transformOutputBlock.Completion.ContinueWith(t => outputBlock.Complete());

    var records = new[]
    {
    "<person><name>Michael Collins</name><age>38</age></person>",
    "<person><name>George Washington</name><age>281</age></person>",
    "<person><name>Abraham Lincoln</name><age>204</age></person>"
    };
    foreach (var record in records)
    {
    inputBlock.Post(record);
    }

    inputBlock.Complete();
    outputBlock.Completion.Wait();
    }
    }

    private static void RxMix3()
    {
    IObservable<int> originalInts = Observable.Range(1, 10);

    IPropagatorBlock<int, int[]> batch = new BatchBlock<int>(2);
    IObservable<int[]> batched = batch.AsObservable();
    originalInts.Subscribe(batch.AsObserver());

    IObservable<int> added = batched.Timeout(TimeSpan.FromMilliseconds(50)).Select(a => a.Sum());

    IPropagatorBlock<int, string> toString = new TransformBlock<int, string>(i => i.ToString());
    added.Subscribe(toString.AsObserver());

    JoinBlock<string, int> join = new JoinBlock<string, int>();
    toString.LinkTo(join.Target1);

    IObserver<int> joinIn2 = join.Target2.AsObserver();
    originalInts.Subscribe(joinIn2);

    IObservable<Tuple<string, int>> joined = join.AsObservable();

    joined.Subscribe(t => Console.WriteLine("{0};{1}", t.Item1, t.Item2));
    }

    private static void TPLDemo1()
    {
    // Create the cancellation source.
    var cancellationSource = new CancellationTokenSource();

    var inputWorkBufferBlock = new BufferBlock<Uri>();

    // Input - Uri - seed address
    // Output - Uri - key, content, content-type
    var downloaderBlock = new TransformBlock<Uri, string>(address =>
    {
    var httpClient = new HttpClient();
    // Downloads the requested resource as a string.
    Console.WriteLine("Downloading '{0}'... Thread id {1}", address.OriginalString, Thread.CurrentThread.ManagedThreadId);

    var contentType = string.Empty;
    var content = httpClient.GetAsync(address).ContinueWith(task =>
    {
    HttpResponseMessage response = task.Result;
    if (task.Result.IsSuccessStatusCode)
    {
    return task.Result.Content.ReadAsStringAsync();
    }

    return new Task<string>(() => null);
    }).Unwrap();

    return content.Result;
    }, new ExecutionDataflowBlockOptions
    {
    CancellationToken = cancellationSource.Token,
    MaxDegreeOfParallelism = 5
    });

    var outputBufferBlock = new BufferBlock<string>();
    var saverBlock = new ActionBlock<string>(content =>
    {
    if (content != null)
    {
    const string targetPath = "c:\\temp\\TPLtest";
    const string extension = ".html";
    var fileName = Path.ChangeExtension(Path.Combine(targetPath, Path.GetRandomFileName()), extension);
    Console.WriteLine("Saving {0} ...Thread: {1}", fileName, Thread.CurrentThread.ManagedThreadId);
    using (var stream = new StreamWriter(fileName))
    {
    stream.Write(content);
    }
    }
    }, new ExecutionDataflowBlockOptions
    {
    MaxDegreeOfParallelism = 1
    });

    // Blocks linking
    inputWorkBufferBlock.LinkTo(downloaderBlock);
    // Filtering, skips empty response
    downloaderBlock.LinkTo(outputBufferBlock, s => !string.IsNullOrWhiteSpace(s));
    outputBufferBlock.LinkTo(saverBlock);

    // Propagating completition
    inputWorkBufferBlock.Completion.ContinueWith(t =>
    {
    if (t.IsFaulted)
    {
    ((IDataflowBlock)downloaderBlock).Fault(t.Exception);
    }
    else
    {
    downloaderBlock.Complete();
    }
    });
    downloaderBlock.Completion.ContinueWith(t =>
    {
    if (t.IsFaulted)
    {
    ((IDataflowBlock)outputBufferBlock).Fault(t.Exception);
    }
    else
    {
    outputBufferBlock.Complete();
    }
    });
    outputBufferBlock.Completion.ContinueWith(t =>
    {
    if (t.IsFaulted)
    {
    ((IDataflowBlock)outputBufferBlock).Fault(t.Exception);
    }
    else
    {
    outputBufferBlock.Complete();
    }
    });
    outputBufferBlock.Completion.ContinueWith(t =>
    {
    if (t.IsFaulted)
    {
    ((IDataflowBlock)saverBlock).Fault(t.Exception);
    }
    else
    {
    saverBlock.Complete();
    }
    });

    // Message passing
    inputWorkBufferBlock.Post(new Uri("http://svnbook.red-bean.com/nightly/ru/svn-book.html"));
    inputWorkBufferBlock.Post(new Uri("http://bash.im"));
    inputWorkBufferBlock.Post(new Uri("http://habrahabr.ru"));
    inputWorkBufferBlock.Post(new Uri("http://lb.ua"));
    inputWorkBufferBlock.Post(new Uri("http://blogs.msdn.com/b/pfxteam/"));
    inputWorkBufferBlock.Post(new Uri("http://hgbook.red-bean.com/read/a-tour-of-mercurial-merging-work.html"));
    inputWorkBufferBlock.Complete();

    saverBlock.Completion.Wait();
    Console.WriteLine("Job is DONE...");
    Console.WriteLine("Hit ANY KEY to exit...");
    Console.ReadKey();
    }
    }
    }