Skip to content

Instantly share code, notes, and snippets.

@stepchowfun
Created March 26, 2019 16:54

Revisions

  1. stepchowfun created this gist Mar 26, 2019.
    44 changes: 44 additions & 0 deletions when.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    fn when<
    I: 'static + Send,
    F: 'static + Send + Future<Item = I, Error = ()>,
    R: 'static + Send,
    K: 'static + Send + Fn(&[I]) -> Option<R>,
    >(
    futures: Vec<F>,
    k: K,
    ) -> Box<dyn Future<Item = R, Error = ()> + Send> {
    fn when_rec<
    I: 'static + Send,
    R: 'static + Send,
    K: 'static + Send + Fn(&[I]) -> Option<R>,
    >(
    stream: Box<dyn Stream<Item = I, Error = ()> + Send>,
    mut acc: Vec<I>,
    k: K,
    ) -> Box<dyn Future<Item = R, Error = ()> + Send> {
    if let Some(result) = k(&acc) {
    Box::new(ok(result))
    } else {
    Box::new(stream.into_future().then(|result| match result {
    Ok((x, s)) => {
    if let Some(r) = x {
    acc.push(r);
    when_rec(s, acc, k)
    } else {
    Box::new(err(()))
    }
    }
    Err((_, s)) => when_rec(s, acc, k),
    }))
    }
    }

    when_rec(
    futures
    .into_iter()
    .map(|future| future.into_stream())
    .fold(Box::new(stream::empty()), |acc, x| Box::new(acc.select(x))),
    Vec::new(),
    k,
    )
    }