Last active
March 15, 2020 21:39
-
-
Save dignifiedquire/69608edd6199b1d2ea96c0785d7b0cd5 to your computer and use it in GitHub Desktop.
jobs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Result; | |
use async_std::prelude::*; | |
use async_std::sync::Arc; | |
use async_std::sync::{channel, Receiver, Sender}; | |
use async_std::task; | |
use log::info; | |
use std::time::Duration; | |
fn setup_logger() { | |
pretty_env_logger::init(); | |
} | |
#[derive(Debug, Default)] | |
struct Context {} | |
impl Context { | |
async fn configure(self) -> ConfiguredContext { | |
ConfiguredContext::default() | |
} | |
async fn import(&mut self) { | |
info!("importing"); | |
} | |
} | |
#[derive(Debug, Default, Clone)] | |
struct ConfiguredContext {} | |
#[derive(Debug, Clone)] | |
struct RunningContext { | |
inner: Arc<InnerContext>, | |
} | |
#[derive(Debug)] | |
struct InnerContext { | |
inbox: ImapConnection, | |
smtp: SmtpConnection, | |
} | |
#[derive(Debug)] | |
struct ImapConnection { | |
/// Channel to notify that the inbox task was finished. | |
shutdown_receiver: Receiver<()>, | |
stop_sender: Sender<()>, | |
idle_interrupt_sender: Sender<()>, | |
jobs_receiver: Receiver<ImapJob>, | |
jobs_sender: Sender<ImapJob>, | |
} | |
#[derive(Debug)] | |
struct SmtpConnection { | |
/// Channel to notify that the smtp task was finished. | |
shutdown_receiver: Receiver<()>, | |
stop_sender: Sender<()>, | |
jobs_receiver: Receiver<SmtpJob>, | |
jobs_sender: Sender<SmtpJob>, | |
} | |
impl SmtpConnection { | |
async fn new() -> (Self, SmtpConnectionHandlers) { | |
let (jobs_sender, jobs_receiver) = channel(50); | |
let (stop_sender, stop_receiver) = channel(1); | |
let (shutdown_sender, shutdown_receiver) = channel(1); | |
let handlers = SmtpConnectionHandlers { | |
connection: Smtp::new().await, | |
stop_receiver, | |
shutdown_sender, | |
}; | |
let conn = SmtpConnection { | |
shutdown_receiver, | |
stop_sender, | |
jobs_sender, | |
jobs_receiver, | |
}; | |
(conn, handlers) | |
} | |
async fn send_job(&self, job: SmtpJob) { | |
self.jobs_sender.send(job).await | |
} | |
async fn stop(&self) { | |
self.stop_sender.send(()).await; | |
self.shutdown_receiver.recv().await; | |
} | |
} | |
#[derive(Debug, Clone)] | |
enum ImapJob { | |
Move { from: String, to: String }, | |
} | |
#[derive(Debug, Clone)] | |
enum SmtpJob { | |
SendMessage { msg: String }, | |
} | |
impl ConfiguredContext { | |
async fn run(self) -> RunningContext { | |
RunningContext::new().await | |
} | |
async fn export(&self) { | |
info!("exporting"); | |
} | |
} | |
#[derive(Debug)] | |
struct Imap { | |
idle_interrupt: Receiver<()>, | |
} | |
#[derive(Debug)] | |
struct Smtp {} | |
impl Smtp { | |
async fn new() -> Self { | |
Smtp {} | |
} | |
async fn send(&mut self, msg: &str) { | |
info!("_smtp_ sending message: {}", msg); | |
} | |
} | |
impl Imap { | |
async fn new(idle_interrupt: Receiver<()>) -> Self { | |
Imap { idle_interrupt } | |
} | |
async fn fetch(&mut self) { | |
info!("_imap_ fetching"); | |
} | |
async fn idle(&mut self) { | |
use futures::future::FutureExt; | |
info!("_imap_ idle start"); | |
enum Idle { | |
Interrupt, | |
Done, | |
} | |
// clear channel to make sure we don't reuse an old interrupt. | |
if !self.idle_interrupt.is_empty() { | |
self.idle_interrupt.recv().await; | |
} | |
let fut = task::sleep(Duration::from_secs(5)) | |
.map(|_| Idle::Done) | |
.race(self.idle_interrupt.recv().map(|_| Idle::Interrupt)); | |
match fut.await { | |
Idle::Interrupt => info!("_imap_ idle interrupted"), | |
Idle::Done => info!("_imap_ idle done"), | |
} | |
} | |
async fn mov(&mut self, from: &str, to: &str) { | |
info!("_imap_ move from: '{}' to: '{}'", from, to); | |
} | |
} | |
impl ImapConnection { | |
async fn new() -> (Self, ImapConnectionHandlers) { | |
let (jobs_sender, jobs_receiver) = channel(50); | |
let (stop_sender, stop_receiver) = channel(1); | |
let (idle_interrupt_sender, idle_interrupt_receiver) = channel(1); | |
let (shutdown_sender, shutdown_receiver) = channel(1); | |
let handlers = ImapConnectionHandlers { | |
connection: Imap::new(idle_interrupt_receiver).await, | |
stop_receiver, | |
shutdown_sender, | |
}; | |
let conn = ImapConnection { | |
shutdown_receiver, | |
stop_sender, | |
idle_interrupt_sender, | |
jobs_sender, | |
jobs_receiver, | |
}; | |
(conn, handlers) | |
} | |
async fn send_job(&self, job: ImapJob) { | |
self.jobs_sender | |
.send(job) | |
.join(self.idle_interrupt_sender.send(())) | |
.await; | |
} | |
async fn stop(&self) { | |
self.stop_sender.send(()).await; | |
self.shutdown_receiver.recv().await; | |
} | |
} | |
struct ImapConnectionHandlers { | |
connection: Imap, | |
stop_receiver: Receiver<()>, | |
shutdown_sender: Sender<()>, | |
} | |
struct SmtpConnectionHandlers { | |
connection: Smtp, | |
stop_receiver: Receiver<()>, | |
shutdown_sender: Sender<()>, | |
} | |
impl RunningContext { | |
async fn new() -> Self { | |
let (inbox, inbox_handlers) = ImapConnection::new().await; | |
let (smtp, smtp_handlers) = SmtpConnection::new().await; | |
let inner = InnerContext { inbox, smtp }; | |
let ctx = RunningContext { | |
inner: Arc::new(inner), | |
}; | |
let ctx1 = ctx.clone(); | |
task::spawn(async move { | |
let ImapConnectionHandlers { | |
mut connection, | |
stop_receiver, | |
shutdown_sender, | |
} = inbox_handlers; | |
let fut = async move { | |
loop { | |
info!(" loop-imap"); | |
match ctx1 | |
.inner | |
.inbox | |
.jobs_receiver | |
.recv() | |
.timeout(Duration::from_millis(200)) | |
.await | |
{ | |
Ok(Some(job)) => { | |
// execute job | |
info!(" executing job: {:?}", job); | |
match job { | |
ImapJob::Move { from, to } => connection.mov(&from, &to).await, | |
} | |
} | |
Ok(None) | Err(async_std::future::TimeoutError { .. }) => { | |
// fetch | |
connection.fetch().await; | |
// idle | |
connection.idle().await; | |
info!(" idle done"); | |
} | |
} | |
} | |
}; | |
info!(" awaiting inbox future"); | |
fut.race(stop_receiver.recv()).await; | |
info!(" inbox interrupted"); | |
shutdown_sender.send(()).await; | |
}); | |
let ctx1 = ctx.clone(); | |
task::spawn(async move { | |
let SmtpConnectionHandlers { | |
mut connection, | |
stop_receiver, | |
shutdown_sender, | |
} = smtp_handlers; | |
let fut = async move { | |
loop { | |
info!(" loop-smtp"); | |
match ctx1.inner.smtp.jobs_receiver.recv().await { | |
Some(job) => { | |
// execute job | |
info!(" executing job: {:?}", job); | |
match job { | |
SmtpJob::SendMessage { msg } => connection.send(&msg).await, | |
} | |
} | |
None => {} | |
} | |
} | |
}; | |
info!(" awaiting smtp future"); | |
fut.race(stop_receiver.recv()).await; | |
info!(" smtp interrupted"); | |
shutdown_sender.send(()).await; | |
}); | |
ctx | |
} | |
async fn send_inbox_job(&self, job: ImapJob) { | |
self.inner.inbox.send_job(job).await; | |
} | |
async fn send_smtp_job(&self, job: SmtpJob) { | |
self.inner.smtp.send_job(job).await; | |
} | |
async fn send_message(&self, msg: impl AsRef<str>) { | |
self.send_smtp_job(SmtpJob::SendMessage { | |
msg: msg.as_ref().to_string(), | |
}) | |
.await; | |
} | |
async fn mov(&self, from: impl AsRef<str>, to: impl AsRef<str>) { | |
self.send_inbox_job(ImapJob::Move { | |
from: from.as_ref().to_string(), | |
to: to.as_ref().to_string(), | |
}) | |
.await; | |
} | |
async fn stop(self) -> ConfiguredContext { | |
self.inner.inbox.stop().join(self.inner.smtp.stop()).await; | |
drop(self); | |
info!("dropped"); | |
ConfiguredContext::default() | |
} | |
} | |
#[async_std::main] | |
async fn main() -> Result<()> { | |
setup_logger(); | |
let ctx = Context::default(); | |
let ctx = ctx.configure().await; | |
info!("running"); | |
let ctx = ctx.run().await; | |
let ctx1 = ctx.clone(); | |
let mov_task = task::spawn(async move { | |
ctx1.mov("hello", "world").await; | |
}); | |
let ctx1 = ctx.clone(); | |
let send_msg_task = task::spawn(async move { | |
for i in 0..10 { | |
ctx1.send_message(format!("hello_{}", i)).await; | |
} | |
}); | |
info!("sleep start"); | |
task::sleep(Duration::from_secs(1)).await; | |
info!("sleep end"); | |
ctx.send_message("world").await; | |
ctx.mov("a", "b").await; | |
task::sleep(Duration::from_secs(4)).await; | |
send_msg_task.await; | |
mov_task.await; | |
info!("stopping"); | |
let _ctx = ctx.stop().await; | |
info!("stopped"); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment