Last active
May 28, 2018 09:38
-
-
Save lolgesten/dcdbdc9eb1eb38d6f99ff989479bb82a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate tokio; | |
use futures::*; | |
use std::sync::Arc; | |
use std::sync::Mutex; | |
use std::thread; | |
use std::time::Duration; | |
// Test stream struct. | |
struct TestStream { | |
// the actual state is protected by a mutex since we are going to | |
// manipulate in a separate thread to the one driving the stream. | |
state: Arc<Mutex<State>>, | |
} | |
// the inner protected state | |
struct State { | |
// last value we emitted | |
last: u64, | |
// the next value to emit, or None if it is emitted. | |
next: Option<u64>, | |
} | |
impl TestStream { | |
// schedule a thread producing the next value. | |
// the task handle is the key to waking up the | |
// stream polling to get the next value. | |
fn schedule_next(&self, task: task::Task) { | |
// local ref to not need "self" inside the thread. | |
let state = Arc::clone(&self.state); | |
thread::spawn(move || { | |
// sleep for a second | |
thread::sleep(Duration::from_millis(1000)); | |
// obtain the lock | |
let mut lock = state.lock().unwrap(); | |
// manipulate the state | |
lock.last += 1; | |
lock.next = Some(lock.last); | |
// and ***this is the key***, notify we got a new value to poll() | |
task.notify(); | |
}); | |
} | |
} | |
impl Stream for TestStream { | |
type Item = u64; | |
type Error = (); | |
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | |
// obtain lock | |
let mut lock = self.state.lock().unwrap(); | |
// take the optional value, leaving none in its place (if there is one) | |
match lock.next.take() { | |
Some(v) => { | |
if v > 44 { | |
// end here. Sending "None" indicates end of stream. | |
Ok(Async::Ready(None)) | |
} else { | |
// emit the value | |
Ok(Async::Ready(Some(v))) | |
} | |
} | |
// no value? schedule another value. | |
None => { | |
// this is the handle that will be used to "tell" the stream polling | |
// there's another value to emit. | |
let task = task::current(); | |
// schedule a value | |
self.schedule_next(task); | |
// tell poller we don't have a value yet. | |
Ok(Async::NotReady) | |
} | |
} | |
} | |
} | |
fn main() { | |
// this is a simple runtime that drives futures on a single thread. | |
// the alternative is to use tokio::run(), which creates a thread | |
// pool to drive any futures spawned onto it. | |
use tokio::runtime::current_thread::Runtime; | |
let mut rt = Runtime::new().expect("Failed to create tokio runtime"); | |
rt.spawn(future::ok(()).map(|_| { | |
// the stream | |
let test = TestStream { | |
state: Arc::new(Mutex::new(State { | |
last: 41, | |
next: None, | |
})), | |
}; | |
// drive the stream as a future, "consuming" each value as they drop in. | |
// the .for_each produces a future that is complete once the stream ends. | |
tokio::spawn(test.for_each(|v| { | |
println!("{}", v); | |
Ok(()) | |
})); | |
})); | |
// Wait until the runtime becomes idle and shut it down. | |
rt.run().unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment