Skip to content

Instantly share code, notes, and snippets.

@vishalg0wda
Created December 9, 2024 21:00
Show Gist options
  • Save vishalg0wda/e2d2b6cc641d0f2533bca58d4516f1f7 to your computer and use it in GitHub Desktop.
Save vishalg0wda/e2d2b6cc641d0f2533bca58d4516f1f7 to your computer and use it in GitHub Desktop.
Illustrating SPMC(randomized consumer selection)
use rand::prelude::SliceRandom;
use rand::random;
use std::fmt::Debug;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use tokio::sync::oneshot;
use tracing::{info, instrument, span, trace, Level};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter};
#[derive(Debug)]
struct Job<W: Send> {
id: usize,
work: W,
ack: oneshot::Sender<()>,
}
fn multicast<W: Send + Debug + 'static>(upstream: mpsc::Receiver<Job<W>>, num_workers: u8) {
let mut rng = rand::thread_rng();
let workers: Vec<_> = (0..num_workers)
.map(|id| {
let (otx, orx) = mpsc::channel();
let handle = thread::spawn(move || worker(id, orx));
(otx, handle)
})
.collect();
while let Ok(job) = upstream.recv() {
if let Some((sender, _)) = workers.choose(&mut rng) {
info!("delegating job to worker");
sender.send(job).unwrap();
}
}
// no more messages. join all worker handles
for (s, h) in workers {
drop(s);
h.join().unwrap();
}
}
fn worker<W: Send + Debug>(id: u8, rx: mpsc::Receiver<Job<W>>) {
while let Ok(job) = rx.recv() {
let span = span!(Level::TRACE, "task", worker_id = id, job.id);
span.in_scope(|| {
trace!("received a job");
thread::sleep(Duration::from_millis(100));
job.ack.send(()).unwrap();
trace!("completed the job");
});
}
}
fn main() {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
let (tx, rx) = mpsc::channel();
let router_h = thread::spawn(move || {
multicast(rx, 4);
});
let mut acks = vec![];
(0..100).for_each(|i| {
let (otx, orx) = oneshot::channel();
tx.send(Job {
id: i,
work: "some work".to_string(),
ack: otx,
})
.unwrap();
acks.push(orx);
});
for ack in acks {
ack.blocking_recv().unwrap();
}
drop(tx);
router_h.join().unwrap();
println!("received ack");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment