Last active
May 12, 2023 03:51
-
-
Save Jules-Bertholet/7bf734b3593e8f9831ef279246358b12 to your computer and use it in GitHub Desktop.
`spacedustrs` rate limit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
cmp, | |
str::FromStr, | |
sync::{ | |
atomic::{self, AtomicU16}, | |
Arc, | |
}, | |
time::Duration, | |
}; | |
use chrono::{DateTime, Utc}; | |
use log::{error, warn}; | |
use once_cell::sync::Lazy; | |
use reqwest::{Request, Response, StatusCode}; | |
use reqwest_middleware::{ClientWithMiddleware, Middleware}; | |
use spacedust::apis::configuration::Configuration; | |
use task_local_extensions::Extensions; | |
use tokio::{ | |
sync::{Mutex, Semaphore, SemaphorePermit}, | |
time::sleep, | |
}; | |
/// [`Configuration`] object for use in all API calls. | |
/// Sets API key and manages rate limit. | |
pub static CONFIGURATION: Lazy<Configuration> = Lazy::new(|| { | |
let mut configuration = Configuration::new(); | |
configuration.bearer_access_token = Some(include_str!("../TOKEN").to_owned()); | |
let middleware: Box<[Arc<dyn Middleware>]> = Box::new([Arc::new(RateLimitMiddleware)]); | |
configuration.client = ClientWithMiddleware::new(reqwest::Client::new(), middleware); | |
configuration | |
}); | |
/// The current documented burst limit. | |
const INITIAL_BURST_LIMIT: u16 = 10; | |
/// The current documented per-second limit. | |
const INITIAL_RATE_LIMIT: u16 = 2; | |
/// The burst limit according to the API headers. | |
static BURST_LIMIT: AtomicU16 = AtomicU16::new(INITIAL_BURST_LIMIT); | |
/// The per-second rate limit according to the API headers. | |
static RATE_LIMIT: AtomicU16 = AtomicU16::new(INITIAL_RATE_LIMIT); | |
// Exponential backoff constants. | |
/// For exponential backoff retry on server errors. | |
const BACKOFF_CONSTANT_SECONDS: f64 = 10.0; | |
/// For exponential backoff retry on server errors. | |
const BACKOFF_BASE: f64 = 1.5; | |
/// A permit must be acquired from this pool before any API request. | |
static REQUEST_SEMAPHORE: Semaphore = Semaphore::const_new(INITIAL_BURST_LIMIT as usize); | |
struct ReturnPermit; | |
impl ReturnPermit { | |
/// Wait for the appropriate amount of time (calculated from the rate limit), then returns a permit to the pool. | |
async fn return_permit(&mut self, permit: SemaphorePermit<'_>) { | |
sleep(Duration::from_millis( | |
1000_u16 | |
.saturating_div(RATE_LIMIT.load(atomic::Ordering::Acquire)) | |
.into(), | |
)) | |
.await; | |
drop(permit); | |
} | |
} | |
/// This mutex must be locked before a permit is returned to the pool. | |
/// Locking ensures that waiting periods are sequential. | |
static RETURN: Mutex<ReturnPermit> = Mutex::const_new(ReturnPermit); | |
/// Middleware to enforce rate-limiting for the SpaceTraders API. | |
#[derive(Default)] | |
struct RateLimitMiddleware; | |
#[async_trait::async_trait] | |
impl reqwest_middleware::Middleware for RateLimitMiddleware { | |
async fn handle( | |
&self, | |
mut req: Request, | |
extensions: &mut Extensions, | |
mut next: reqwest_middleware::Next<'_>, | |
) -> reqwest_middleware::Result<Response> { | |
// Perform the request, retrying on failure. | |
let mut retry_info; | |
let mut server_error_retry_count: i32 = 0; | |
let result = loop { | |
retry_info = req | |
.try_clone() | |
.map(|cloned_req: Request| (cloned_req, next.clone())); | |
// Acquire a permit, yield if burst limit attained. | |
let permit = REQUEST_SEMAPHORE.acquire().await.unwrap(); | |
let result = next.run(req, extensions).await; | |
// Return permit to the pool after an appropriate timeout. | |
tokio::spawn(async move { | |
let mut return_permit = RETURN.lock().await; | |
return_permit.return_permit(permit).await; | |
drop(return_permit); | |
}); | |
match (retry_info, result) { | |
// If this request isn't retryable, return the response no matter what it is. | |
(None, result) => break result, | |
// If this request was successful, return the response. | |
(_, Ok(response)) if response.status().is_success() => break Ok(response), | |
// On server error, log and retry with exponential backoff. | |
(Some((cloned_req, cloned_next)), Ok(resp)) if resp.status().is_server_error() => { | |
let status = resp.status(); | |
let text = resp.text().await.ok(); | |
error!( | |
"Server error: {}. This request was previously tried {} times. Response body: {}", | |
status, | |
server_error_retry_count, | |
text.as_deref().unwrap_or("<bytes>") | |
); | |
sleep(Duration::from_secs_f64( | |
BACKOFF_CONSTANT_SECONDS * BACKOFF_BASE.powi(server_error_retry_count), | |
)) | |
.await; | |
server_error_retry_count += 1; | |
req = cloned_req; | |
next = cloned_next; | |
} | |
// If, despite our efforts, we've hit a rate limit, wait for the limits to reset and then retry. | |
(Some((cloned_req, cloned_next)), Ok(response)) | |
if response.status() == StatusCode::TOO_MANY_REQUESTS => | |
{ | |
if let Some(reset) = response | |
.headers() | |
.get("x-ratelimit-reset") | |
.and_then(|h| h.to_str().ok()) | |
.and_then(|s| DateTime::parse_from_rfc3339(s).ok()) | |
{ | |
warn!("Rate limit hit! Waiting and retrying."); | |
let delay = reset | |
.signed_duration_since(Utc::now()) | |
.to_std() | |
.unwrap_or(Duration::ZERO); | |
sleep(delay).await; | |
req = cloned_req; | |
next = cloned_next; | |
} else { | |
break Ok(response); | |
} | |
} | |
// Otherwise, pass on the error. | |
(_, response) => break response, | |
} | |
}; | |
// Adjust rate limits if they have changed. | |
if let Ok(res) = &result { | |
if let Some(limit) = res | |
.headers() | |
.get("x-ratelimit-limit-per-second") | |
.and_then(|h| h.to_str().ok()) | |
.and_then(|h| u16::from_str(h).ok()) | |
{ | |
RATE_LIMIT.store(limit, atomic::Ordering::Release); | |
} | |
if let Some(burst_limit) = res | |
.headers() | |
.get("x-ratelimit-limit-burst") | |
.and_then(|h| h.to_str().ok()) | |
.and_then(|h| u16::from_str(h).ok()) | |
{ | |
let old = BURST_LIMIT.swap(burst_limit, atomic::Ordering::AcqRel); | |
match old.cmp(&burst_limit) { | |
cmp::Ordering::Less => { | |
REQUEST_SEMAPHORE.add_permits((burst_limit - old).into()); | |
} | |
cmp::Ordering::Greater => { | |
tokio::spawn(async move { | |
for _ in 0..(old - burst_limit) { | |
REQUEST_SEMAPHORE.acquire().await.unwrap().forget(); | |
} | |
}); | |
} | |
cmp::Ordering::Equal => (), | |
} | |
} | |
} | |
result | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment