Created
October 29, 2018 00:57
-
-
Save rust-play/53c6255e6c7b33969fd89ba792d1b877 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![allow(dead_code)] | |
use std::cell::RefCell; | |
use std::rc::Rc; | |
trait Stream<'a> { | |
type Item: 'a + ?Sized; | |
fn subscribe<F>(self, f: F) where F: FnMut(&Self::Item) + 'a; | |
fn share(self) -> Multicast<'a, Self::Item> where Self: Sized { | |
Multicast::new(self) | |
} | |
} | |
type ObserverBundle<'a, T> = Rc<RefCell<Vec<Box<FnMut(&T) + 'a>>>>; | |
struct Multicast<'a, T: ?Sized> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T> Multicast<'a, T> where T: 'a + ?Sized { | |
fn new<S: Stream<'a, Item=T>>(stream: S) -> Self { | |
let observers: ObserverBundle<T> = Rc::new(RefCell::new(Vec::new())); | |
let dup = observers.clone(); | |
stream.subscribe(move |x: &T| { | |
for observer in dup.borrow_mut().iter_mut() { | |
observer(x); | |
} | |
}); | |
Multicast { observers } | |
} | |
fn fork(&self) -> Subscription<'a, T> { | |
Subscription::new(self.observers.clone()) | |
} | |
} | |
struct Input<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T: Clone + 'a> Input<'a, T> { | |
fn new() -> Self { | |
Input { observers: Rc::new(RefCell::new(Vec::new())) } | |
} | |
fn feed(&self, value: T) { | |
for observer in self.observers.borrow_mut().iter_mut() { | |
observer(&value); | |
} | |
} | |
fn fork(&self) -> Subscription<'a, T> { | |
Subscription::new(self.observers.clone()) | |
} | |
} | |
struct Subscription<'a, T: ?Sized> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T: ?Sized> Subscription<'a, T> { | |
fn new(observers: ObserverBundle<'a, T>) -> Self { | |
Subscription { observers } | |
} | |
} | |
impl<'a, T> Stream<'a> for Subscription<'a, T> where T: 'a { | |
type Item = T; | |
fn subscribe<F>(self, f: F) where F: FnMut(&Self::Item) + 'a { | |
self.observers.borrow_mut().push(Box::new(f)); | |
} | |
} | |
struct Map<S, M> { | |
stream: S, | |
func: M, | |
} | |
impl<'a, S, M, T> Stream<'a> for Map<S, M> | |
where | |
S: Stream<'a>, | |
T: 'a, | |
M: 'a + FnMut(&S::Item) -> T | |
{ | |
type Item = T; | |
fn subscribe<F>(self, mut f: F) where F: FnMut(&Self::Item) + 'a { | |
let mut func = self.func; | |
self.stream.subscribe(move |x| f(&func(x))) | |
} | |
} | |
struct Collect<S, T: Sized> { | |
stream: S, | |
data: Rc<RefCell<Vec<T>>>, | |
} | |
impl<'a, S, T> Stream<'a> for Collect<S, T> | |
where S: Stream<'a, Item=T>, T: 'a + Clone + Sized | |
{ | |
type Item = [T]; | |
fn subscribe<F>(self, mut f: F) where F: FnMut(&Self::Item) + 'a { | |
let data = self.data.clone(); | |
self.stream.subscribe(move |x| { | |
data.borrow_mut().push(x.clone()); | |
f(&*data.borrow()); | |
}) | |
} | |
} | |
trait StreamExt<'a>: Stream<'a> { | |
fn map<M, T>(self, func: M) -> Map<Self, M> | |
where M: FnMut(&Self::Item) -> T + 'a, Self: Sized | |
{ | |
Map { stream: self, func } | |
} | |
fn collect(self) -> Collect<Self, Self::Item> | |
where Self: Sized, Self::Item: Sized | |
{ | |
Collect { stream: self, data: Rc::new(RefCell::new(Vec::new())) } | |
} | |
} | |
impl<'a, S: Stream<'a>> StreamExt<'a> for S where S: Stream<'a> {} | |
fn main() { | |
let input = Input::<i64>::new(); | |
let s1 = input.fork(); | |
s1.subscribe(|x| { | |
println!("s1: {}", x) | |
}); | |
let s2 = input.fork(); | |
s2.map(|x| x * 2).collect().subscribe(|x| { | |
println!("s2: {:?}", x); | |
}); | |
input.feed(1); | |
input.feed(2); | |
input.feed(3) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment