Created
February 18, 2025 18:21
-
-
Save texascloud/8d16f92c9cbae3a220340a7abc472ea3 to your computer and use it in GitHub Desktop.
Reproducing a tail latency issue in Moka from evictions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "moka_test" | |
version = "0.1.0" | |
edition = "2021" | |
[dependencies] | |
tokio = { version = "1.41.0", features = ["full", "test-util", "tracing"] } | |
clap = { package = "clap", version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] } | |
moka = { version = "0.12.10", features = ["future", "sync"] } | |
parking_lot = { version = "0.12.1", features = ["send_guard"] } | |
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } | |
tracing = { version = "0.1.41", features = ["attributes", "valuable"], default-features = false } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ cargo run --release -- --entries 5000000 --time-till-inserts-done-ms 16000 --poll-till-empty-ms 1000 | |
Compiling moka_test v0.1.0 (/home/texascloud/pure_rust/moka_test) | |
Finished `release` profile [optimized] target(s) in 2.46s | |
Running `/home/texascloud/pure_rust/moka_test/target/release/moka_test --entries 5000000 --time-till-inserts-done-ms 16000 --poll-till-empty-ms 1000` | |
2025-02-18T18:02:55.458473Z INFO moka_test: [serial] inserting 5000000 items | |
2025-02-18T18:03:05.326882Z INFO moka_test: done after 9868ms - size: 4999936 | |
2025-02-18T18:03:05.326980Z INFO moka_test: [before sync] size: 5000000 | |
2025-02-18T18:03:12.444443Z INFO moka_test: [polling till empty] num_during_iter: 5000000 - removed: cycle/168998 total/168998 - elapsed_ms: 116 - items/ms: 1456 | |
2025-02-18T18:03:13.408055Z INFO moka_test: [polling till empty] num_during_iter: 4831002 - removed: cycle/104906 total/273904 - elapsed_ms: 79 - items/ms: 1327 | |
2025-02-18T18:03:14.371001Z INFO moka_test: [polling till empty] num_during_iter: 4726096 - removed: cycle/64094 total/337998 - elapsed_ms: 42 - items/ms: 1526 | |
2025-02-18T18:03:15.449097Z INFO moka_test: [polling till empty] num_during_iter: 4662002 - removed: cycle/169000 total/506998 - elapsed_ms: 120 - items/ms: 1408 | |
2025-02-18T18:03:17.445550Z INFO moka_test: [polling till empty] num_during_iter: 4493002 - removed: cycle/169000 total/675998 - elapsed_ms: 117 - items/ms: 1444 | |
2025-02-18T18:03:20.997634Z INFO moka_test: [polling till empty] num_during_iter: 4324002 - removed: cycle/169000 total/844998 - elapsed_ms: 1669 - items/ms: 101 | |
2025-02-18T18:03:22.010585Z INFO moka_test: [polling till empty] num_during_iter: 4155002 - removed: cycle/169000 total/1013998 - elapsed_ms: 1012 - items/ms: 166 | |
2025-02-18T18:03:24.673186Z INFO moka_test: [polling till empty] num_during_iter: 3986002 - removed: cycle/299455 total/1313453 - elapsed_ms: 344 - items/ms: 870 | |
2025-02-18T18:03:25.354424Z INFO moka_test: [polling till empty] num_during_iter: 3686547 - removed: cycle/38545 total/1351998 - elapsed_ms: 26 - items/ms: 1482 | |
2025-02-18T18:03:26.412961Z INFO moka_test: [polling till empty] num_during_iter: 3648002 - removed: cycle/115411 total/1467409 - elapsed_ms: 84 - items/ms: 1373 | |
2025-02-18T18:03:27.365416Z INFO moka_test: [polling till empty] num_during_iter: 3532591 - removed: cycle/53589 total/1520998 - elapsed_ms: 37 - items/ms: 1448 | |
2025-02-18T18:03:28.416915Z INFO moka_test: [polling till empty] num_during_iter: 3479002 - removed: cycle/110129 total/1631127 - elapsed_ms: 88 - items/ms: 1251 | |
2025-02-18T18:03:30.869365Z INFO moka_test: [polling till empty] num_during_iter: 3368873 - removed: cycle/58871 total/1689998 - elapsed_ms: 1541 - items/ms: 38 | |
2025-02-18T18:03:31.775952Z INFO moka_test: [polling till empty] num_during_iter: 3310002 - removed: cycle/169000 total/1858998 - elapsed_ms: 906 - items/ms: 186 | |
2025-02-18T18:03:31.959469Z INFO moka_test: [polling till empty] num_during_iter: 3141002 - removed: cycle/81917 total/1940915 - elapsed_ms: 183 - items/ms: 447 | |
2025-02-18T18:03:32.389852Z INFO moka_test: [polling till empty] num_during_iter: 3059085 - removed: cycle/87083 total/2027998 - elapsed_ms: 61 - items/ms: 1427 | |
2025-02-18T18:03:34.451597Z INFO moka_test: [polling till empty] num_during_iter: 2972002 - removed: cycle/169000 total/2196998 - elapsed_ms: 123 - items/ms: 1373 | |
2025-02-18T18:03:35.403198Z INFO moka_test: [polling till empty] num_during_iter: 2803002 - removed: cycle/92264 total/2289262 - elapsed_ms: 75 - items/ms: 1230 | |
2025-02-18T18:03:36.403315Z INFO moka_test: [polling till empty] num_during_iter: 2710738 - removed: cycle/76736 total/2365998 - elapsed_ms: 75 - items/ms: 1023 | |
2025-02-18T18:03:39.846457Z INFO moka_test: [polling till empty] num_during_iter: 2634002 - removed: cycle/169000 total/2534998 - elapsed_ms: 1518 - items/ms: 111 | |
2025-02-18T18:03:40.602053Z INFO moka_test: [polling till empty] num_during_iter: 2465002 - removed: cycle/169000 total/2703998 - elapsed_ms: 755 - items/ms: 223 | |
2025-02-18T18:03:40.725739Z INFO moka_test: [polling till empty] num_during_iter: 2296002 - removed: cycle/40705 total/2744703 - elapsed_ms: 123 - items/ms: 330 | |
2025-02-18T18:03:42.464186Z INFO moka_test: [polling till empty] num_during_iter: 2255297 - removed: cycle/128295 total/2872998 - elapsed_ms: 135 - items/ms: 950 | |
2025-02-18T18:03:44.177136Z INFO moka_test: [polling till empty] num_during_iter: 2127002 - removed: cycle/72772 total/2945770 - elapsed_ms: 849 - items/ms: 85 | |
2025-02-18T18:03:44.847416Z INFO moka_test: [polling till empty] num_during_iter: 2054230 - removed: cycle/96228 total/3041998 - elapsed_ms: 518 - items/ms: 185 | |
2025-02-18T18:03:45.416354Z INFO moka_test: [polling till empty] num_during_iter: 1958002 - removed: cycle/95704 total/3137702 - elapsed_ms: 88 - items/ms: 1087 | |
2025-02-18T18:03:46.389674Z INFO moka_test: [polling till empty] num_during_iter: 1862298 - removed: cycle/73296 total/3210998 - elapsed_ms: 61 - items/ms: 1201 | |
2025-02-18T18:03:48.303343Z INFO moka_test: [polling till empty] num_during_iter: 1789002 - removed: cycle/169000 total/3379998 - elapsed_ms: 974 - items/ms: 173 | |
2025-02-18T18:03:49.704546Z INFO moka_test: [polling till empty] num_during_iter: 1620002 - removed: cycle/169000 total/3548998 - elapsed_ms: 376 - items/ms: 449 | |
2025-02-18T18:03:51.468263Z INFO moka_test: [polling till empty] num_during_iter: 1451002 - removed: cycle/169000 total/3717998 - elapsed_ms: 139 - items/ms: 1215 | |
2025-02-18T18:03:54.428112Z INFO moka_test: [polling till empty] num_during_iter: 1282002 - removed: cycle/169000 total/3886998 - elapsed_ms: 1099 - items/ms: 153 | |
2025-02-18T18:03:55.120149Z INFO moka_test: [polling till empty] num_during_iter: 1113002 - removed: cycle/169000 total/4055998 - elapsed_ms: 691 - items/ms: 244 | |
2025-02-18T18:03:56.931843Z INFO moka_test: [polling till empty] num_during_iter: 944002 - removed: cycle/169000 total/4224998 - elapsed_ms: 603 - items/ms: 280 | |
2025-02-18T18:03:58.565995Z INFO moka_test: [polling till empty] num_during_iter: 775002 - removed: cycle/169000 total/4393998 - elapsed_ms: 237 - items/ms: 713 | |
2025-02-18T18:03:59.631635Z INFO moka_test: [polling till empty] num_during_iter: 606002 - removed: cycle/29691 total/4423689 - elapsed_ms: 303 - items/ms: 97 | |
2025-02-18T18:04:00.769541Z INFO moka_test: [polling till empty] num_during_iter: 576311 - removed: cycle/139309 total/4562998 - elapsed_ms: 441 - items/ms: 315 | |
2025-02-18T18:04:01.769480Z INFO moka_test: [polling till empty] num_during_iter: 437002 - removed: cycle/153675 total/4716673 - elapsed_ms: 441 - items/ms: 348 | |
2025-02-18T18:04:02.421136Z INFO moka_test: [polling till empty] num_during_iter: 283327 - removed: cycle/15325 total/4731998 - elapsed_ms: 92 - items/ms: 166 | |
2025-02-18T18:04:03.801819Z INFO moka_test: [polling till empty] num_during_iter: 268002 - removed: cycle/169000 total/4900998 - elapsed_ms: 473 - items/ms: 357 | |
2025-02-18T18:04:05.525789Z INFO moka_test: [polling till empty] num_during_iter: 99002 - removed: cycle/99002 total/5000000 - elapsed_ms: 198 - items/ms: 500 | |
2025-02-18T18:04:06.328445Z INFO moka_test: total time syncing cache: 16812ms | |
2025-02-18T18:04:06.328462Z INFO moka_test: longest sync time seen: 1669ms | |
2025-02-18T18:04:06.328469Z INFO moka_test: [after sync] size: 0 - elapsed_ms: 61001 | |
2025-02-18T18:04:06.328473Z INFO moka_test: total duration: 70872ms |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::time::Duration; | |
use std::time::Instant; | |
use clap::Parser; | |
use moka::Expiry; | |
use parking_lot::Mutex; | |
use parking_lot::RwLock; | |
use std::io::IsTerminal; | |
use tracing::Instrument; | |
use tracing_subscriber::Registry; | |
use tracing_subscriber::layer::SubscriberExt; | |
fn init_tracing() { | |
let stderr = tracing_subscriber::fmt::Layer::default() | |
.with_writer(std::io::stderr) | |
.with_ansi(std::io::stderr().is_terminal()); | |
let subscriber = Registry::default().with(stderr); | |
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); | |
} | |
struct ExpirationPolicy { | |
batch_ttl: RwLock<Duration>, | |
counter: Mutex<u64>, | |
batch_size: u64, | |
} | |
impl ExpirationPolicy { | |
fn new(batch_ttl: Duration, batch_size: u64) -> Self { | |
ExpirationPolicy { | |
batch_ttl: RwLock::new(batch_ttl), | |
counter: Mutex::new(1), | |
batch_size, | |
} | |
} | |
#[inline] | |
fn next_ttl(&self) -> Duration { | |
let should_bump_ttl = { | |
let mut c = self.counter.lock(); | |
if c.ge(&self.batch_size) { | |
*c = 0; | |
true | |
} else { | |
*c += 1; | |
false | |
} | |
}; | |
if should_bump_ttl { | |
let mut batch_ttl = self.batch_ttl.write(); | |
*batch_ttl = batch_ttl.saturating_add(Duration::from_millis(1500)); | |
} | |
*self.batch_ttl.read() | |
} | |
} | |
impl Expiry<String, u64> for ExpirationPolicy { | |
fn expire_after_create( | |
&self, | |
_key: &String, | |
_value: &u64, | |
_created_at: Instant, | |
) -> Option<Duration> { | |
Some(self.next_ttl()) | |
} | |
} | |
#[derive(Debug, clap::Parser)] | |
#[command(name = "moka-test")] | |
pub struct MokaTestCommand { | |
/// How many entries to insert into the cache | |
#[arg(long)] | |
entries: usize, | |
/// Expected time in milliseconds for all entries to be inserted into the cache. | |
// Adjust based on `entries`. | |
#[arg(long)] | |
time_till_inserts_done_ms: u64, | |
/// Used by ExpirationPolicy to control the max number of allowed entries to be processed by | |
/// Moka's `.run_pending_tasks()`. Is best effort since the duration we add after a batch is full | |
/// needs to be greater than the time it takes to process a batch so there is no overlap. | |
#[arg(long, default_value_t = 168_999)] | |
eviction_max_batch_size: u64, | |
/// How long to sleep between running cache sync to force eviction until empty. | |
#[arg(long)] | |
poll_till_empty_ms: Option<u64>, | |
/// Use an unbounded tokio::JoinSet to concurrently insert items into the cache. | |
#[arg(long)] | |
concurrent_insert: bool, | |
/// Insert a ton of things into the cache after the initial insert to force eviction due to | |
/// max capacity being exceeded. | |
#[arg(long)] | |
admission_eviction_storm: bool, | |
} | |
#[tokio::main] | |
async fn main() { | |
init_tracing(); | |
let main_start = std::time::Instant::now(); | |
let cmd = MokaTestCommand::parse(); | |
let cache = moka::future::Cache::<String, u64>::builder() | |
.weigher(|_key: &String, value: &u64| -> u32 { size_of_val(value) as u32 }) | |
.initial_capacity(131_072) | |
.max_capacity(10_000_000_000) | |
.eviction_policy(moka::policy::EvictionPolicy::lru()) | |
// The most important thing is to make this duration take longer than it takes to | |
// finish inserting all entries in the cache cuz otherwise some would start expiring | |
// and being removed from the Moka cache before we are done inserting all of the entries. | |
.expire_after(ExpirationPolicy::new( | |
Duration::from_millis(cmd.time_till_inserts_done_ms), | |
cmd.eviction_max_batch_size, | |
)) | |
.build(); | |
let start = std::time::Instant::now(); | |
if cmd.concurrent_insert { | |
tracing::info!("[concurrent] inserting {} items", cmd.entries); | |
let mut join_set = tokio::task::JoinSet::new(); | |
for i in 0..cmd.entries { | |
let cache = cache.clone(); | |
join_set.spawn(async move { | |
cache.insert(i.to_string(), i as u64).await; | |
}); | |
} | |
let _ = join_set.join_all().await; | |
} else { | |
tracing::info!("[serial] inserting {} items", cmd.entries); | |
for i in 0..cmd.entries { | |
cache.insert(i.to_string(), i as u64).await; | |
} | |
} | |
tracing::info!( | |
"done after {:.2}ms - size: {}", | |
start.elapsed().as_millis(), | |
cache.entry_count() | |
); | |
cache | |
.run_pending_tasks() | |
.instrument(tracing::info_span!("cache_sync")) | |
.await; | |
tracing::info!("[before sync] size: {}", cache.entry_count()); | |
let start = std::time::Instant::now(); | |
if let Some(poll_time) = cmd.poll_till_empty_ms { | |
let mut removed_sum = 0; | |
let mut max_sync_time = 0; | |
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(poll_time)); | |
let mut sync_sum_ms = 0; | |
while cache.entry_count() > 0 { | |
let prev_size = cache.entry_count(); | |
let poll_start = std::time::Instant::now(); | |
cache | |
.run_pending_tasks() | |
.instrument(tracing::info_span!("cache_sync")) | |
.await; | |
let done = poll_start.elapsed().as_millis(); | |
let removed_entries = prev_size - cache.entry_count(); | |
if done > 0 && removed_entries > 0 { | |
max_sync_time = max_sync_time.max(done); | |
removed_sum += removed_entries; | |
let per_sync_efficiency = removed_entries / done as u64; | |
tracing::info!( | |
"[polling till empty] num_during_iter: {} - removed: cycle/{:?} total/{:?} - elapsed_ms: {:.2} - items/ms: {}", | |
prev_size, | |
removed_entries, | |
removed_sum, | |
done, | |
per_sync_efficiency, | |
); | |
sync_sum_ms += done; | |
} | |
interval.tick().await; | |
} | |
tracing::info!("total time syncing cache: {:.2}ms", sync_sum_ms); | |
tracing::info!("longest sync time seen: {:.2}ms", max_sync_time); | |
} else { | |
cache | |
.run_pending_tasks() | |
.instrument(tracing::info_span!("cache_sync")) | |
.await; | |
} | |
tracing::info!( | |
"[after sync] size: {} - elapsed_ms: {:.2}", | |
cache.entry_count(), | |
start.elapsed().as_millis() | |
); | |
if cmd.admission_eviction_storm { | |
tracing::info!("[cause evictions] size: {}", cache.entry_count()); | |
let start = std::time::Instant::now(); | |
for i in cmd.entries..(cmd.entries * 4) { | |
cache.insert(i.to_string(), i as u64).await; | |
} | |
cache | |
.run_pending_tasks() | |
.instrument(tracing::info_span!("cache_sync")) | |
.await; | |
tracing::info!( | |
"[after evictions] size: {} - elapsed_ms: {:.2}", | |
cache.entry_count(), | |
start.elapsed().as_millis() | |
); | |
} | |
tracing::info!("total duration: {:.2}ms", main_start.elapsed().as_millis()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment