Created
May 29, 2025 00:00
-
-
Save ihciah/57edbae2eeeda9443d6542952e4b4807 to your computer and use it in GitHub Desktop.
FuturesUnorderedRetry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::cell::RefCell; | |
use std::pin::Pin; | |
use std::task::Context; | |
use std::task::Poll; | |
use std::task::ready; | |
use futures::Stream; | |
use futures::TryFuture; | |
use futures::stream::FuturesUnordered; | |
use futures::stream::StreamExt; | |
#[pin_project::pin_project] | |
pub struct FutureRetry<F, Fut> | |
where | |
F: Fn() -> Fut, | |
{ | |
#[pin] | |
fut: Fut, | |
f: F, | |
max_retries: usize, | |
} | |
impl<F: Fn() -> Fut, Fut> FutureRetry<F, Fut> { | |
pub fn new(f: F, max_retries: usize) -> Self { | |
Self { | |
fut: f(), | |
f, | |
max_retries, | |
} | |
} | |
} | |
impl<F, Fut> Future for FutureRetry<F, Fut> | |
where | |
F: Fn() -> Fut, | |
Fut: TryFuture, | |
{ | |
type Output = Result<Fut::Ok, Fut::Error>; | |
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
let mut this = self.project(); | |
loop { | |
match ready!(this.fut.as_mut().try_poll(cx)) { | |
Ok(item) => return Poll::Ready(Ok(item)), | |
Err(e) => { | |
if *this.max_retries > 0 { | |
*this.max_retries -= 1; | |
this.fut.set((this.f)()); | |
continue; | |
} else { | |
return Poll::Ready(Err(e)); | |
} | |
} | |
} | |
} | |
} | |
} | |
#[pin_project::pin_project] | |
pub struct FuturesUnorderedRetry<const RETRY: usize, Fut> { | |
#[pin] | |
inner: FuturesUnordered<Fut>, | |
} | |
impl<const RETRY: usize, F: Fn() -> Fut, Fut> FromIterator<F> | |
for FuturesUnorderedRetry<RETRY, FutureRetry<F, Fut>> | |
{ | |
fn from_iter<I>(iter: I) -> Self | |
where | |
I: IntoIterator<Item = F>, | |
{ | |
let inner = FuturesUnordered::new(); | |
Self { | |
inner: iter.into_iter().fold(inner, |inner, f| { | |
inner.push(FutureRetry::new(f, RETRY)); | |
inner | |
}), | |
} | |
} | |
} | |
impl<const RETRY: usize, Fut: Future> Stream for FuturesUnorderedRetry<RETRY, Fut> { | |
type Item = Fut::Output; | |
#[inline] | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
let this = self.project(); | |
this.inner.poll_next(cx) | |
} | |
} | |
// === example === | |
async fn try_add_1(input: &RefCell<usize>) -> Result<String, ()> { | |
let mut input = input.borrow_mut(); | |
*input += 1; | |
if *input >= 10 { | |
Ok("success".to_string()) | |
} else { | |
Err(()) | |
} | |
} | |
#[tokio::test] | |
async fn test() { | |
let first = RefCell::new(5); | |
let second = RefCell::new(6); | |
let mut results = [&first, &second] | |
.into_iter() | |
.map(|input| move || try_add_1(input)) | |
.collect::<FuturesUnorderedRetry<3_usize, _>>(); | |
while let Some(result) = results.next().await { | |
println!("result: {:?}", result); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment