Created
October 26, 2018 01:45
-
-
Save rust-play/9c22d7ad5bbc92071d079858f3c1f0da to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::cell::RefCell; | |
use std::rc::Rc; | |
trait Observer<T> { | |
fn on_next(&mut self, item: T); | |
} | |
impl<T, F> Observer<T> for F where F: FnMut(T) -> () { | |
fn on_next(&mut self, item: T) { | |
self(item) | |
} | |
} | |
trait Stream<'a>: Sized { | |
type Item: Clone + 'a; // TODO: ref instead of clone | |
fn subscribe<F>(self, f: F) where F: Observer<Self::Item> + 'a; | |
fn share(self) -> Multicast<'a, Self::Item> { | |
Multicast::new(self) | |
} | |
} | |
type ObserverBundle<'a, T> = Rc<RefCell<Vec<Box<Observer<T> + 'a>>>>; | |
struct Multicast<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T: Clone + 'a> Multicast<'a, T> { | |
fn new<S: Stream<'a, Item=T>>(stream: S) -> Self { | |
let observers: ObserverBundle<T> = Rc::new(RefCell::new(Vec::new())); | |
let dup = observers.clone(); | |
stream.subscribe(move |x: T| { | |
for observer in dup.borrow_mut().iter_mut() { | |
observer.on_next(x.clone()); // TODO: ref instead of clone | |
} | |
}); | |
Multicast { observers } | |
} | |
fn fork(&self) -> Subscription<'a, T> { | |
Subscription::new(self.observers.clone()) | |
} | |
} | |
struct Input<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T: Clone + 'a> Input<'a, T> { | |
fn new() -> Self { | |
Input { observers: Rc::new(RefCell::new(Vec::new())) } | |
} | |
fn feed(&self, value: T) { | |
for observer in self.observers.borrow_mut().iter_mut() { | |
observer.on_next(value.clone()); // TODO: ref instead of clone | |
} | |
} | |
fn fork(&self) -> Subscription<'a, T> { | |
Subscription::new(self.observers.clone()) | |
} | |
} | |
struct Subscription<'a, T> { | |
observers: ObserverBundle<'a, T>, | |
} | |
impl<'a, T> Subscription<'a, T> { | |
fn new(observers: ObserverBundle<'a, T>) -> Self { | |
Subscription { observers } | |
} | |
} | |
impl<'a, T: 'a + Clone> Stream<'a> for Subscription<'a, T> { | |
type Item = T; | |
fn subscribe<F>(self, f: F) where F: Observer<Self::Item> + 'a { | |
self.observers.borrow_mut().push(Box::new(f)); | |
} | |
} | |
struct Map<S, M> { | |
stream: S, | |
func: M, | |
} | |
impl<'a, S: Stream<'a>, M, T: 'a + Clone> Stream<'a> for Map<S, M> where M: 'a + FnMut(S::Item) -> T { | |
type Item = T; | |
fn subscribe<F>(self, mut f: F) where F: Observer<Self::Item> + 'a { | |
let mut func = self.func; | |
self.stream.subscribe(move |x| f.on_next(func(x))) | |
} | |
} | |
trait StreamExt<'a>: Stream<'a> { | |
fn map<M, T>(self, func: M) -> Map<Self, M> where M: FnMut(Self::Item) -> T + 'a{ | |
return Map { stream: self, func } | |
} | |
} | |
impl<'a, S: Stream<'a>> StreamExt<'a> for S where S: Stream<'a> {} | |
fn main() { | |
use std::cell::Cell; | |
let z = Cell::new(0.); | |
let input = Input::<i64>::new(); | |
let s1 = input.fork(); | |
s1.subscribe(|x| { | |
println!("s1: {}", x) | |
}); | |
let s2 = input.fork(); | |
s2.map(|x| x * 3).subscribe(|x| { | |
println!("s2: {}", x) | |
}); | |
let s3 = input.fork().map(|x| { | |
let s3 = (x as f64) + 100.5; | |
println!("s3: {}", s3); | |
s3 | |
}).share(); | |
s3.fork().map(|x| x + 1.).subscribe(|x| println!("s4: {}", x)); | |
s3.fork().map(|x| x + 2.).subscribe(|x| { z.replace(x); println!("s5: {}", x) }); | |
println!("z = {}", z.get()); | |
println!("---"); | |
input.feed(1); | |
println!("---"); | |
input.feed(2); | |
println!("---"); | |
println!("z = {}", z.get()); | |
} | |
/* | |
s1: 1 | |
s2: 3 | |
s3: 101.5 | |
s4: 102.5 | |
s5: 103.5 | |
--- | |
s1: 2 | |
s2: 6 | |
s3: 102.5 | |
s4: 103.5 | |
s5: 104.5 | |
--- | |
s1: 3 | |
s2: 9 | |
s3: 103.5 | |
s4: 104.5 | |
s5: 105.5 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment