Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created December 9, 2014 06:37
Show Gist options
  • Save anaisbetts/04dd80a3a0c77feef143 to your computer and use it in GitHub Desktop.
Save anaisbetts/04dd80a3a0c77feef143 to your computer and use it in GitHub Desktop.
public async IObservable<int> takeStream(IObservable<int> enum, int count)
{
int i = 0;
return Observable.Create<int>(subj =>
{
foreach (int x in enum)
{
Task<int> task = somethingAsync(x);
int y = await task;
yield return y;
if (i > count)
{
yield break;
}
i++;
}
})
}
// Could be written as:
public IObservable<int> takeStream(IObservable<int> enum, int count)
{
return enum
.Select(x => somethingAsync(x)).Concat() // Preserves original ordering of enum
.Take(count)
}
//////////
public async Task<int> takeBuffered(IObservable<int> enum, int count)
{
int[] ary;
int i = 0;
foreach (int x in enum)
{
Task<int> task = somethingAsync(x);
int y = await task;
ary[i] = y;
if (i > count)
{
break;
}
i++;
}
return ary;
}
// Could be written as
public Task<IList<int>> takeBuffered(IObservable<int> enum, int count)
{
enum
.Select(x => somethingAsync(x)).Concat() // "Select" => map
.Take(count)
.ToList()
.ToTask();
}
// ToList is a convenience method that does something like
public Task<IList<int>> takeBuffered(IObservable<int> enum, int count)
{
enum
.Select(x => somethingAsync(x)).Concat()
.Take(count)
.Aggregate(new List<int>(), (acc,x) => { acc.Add(x); return acc; }) // "Aggregate" => reduce
.ToTask();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment