Created
April 13, 2026 16:04
-
-
Save ovr/2e350a8c15e9120f2895abf0d4b347f9 to your computer and use it in GitHub Desktop.
Memory benchmark: OLD vs NEW InfoSchemaTableDef approach in cubestore (system_cache table)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Bench for system_cache: 10k rows, cache values varying in size. | |
| // User scenario: total cache 52MB–256MB → avg value size 5KB–26KB per row for 10k rows. | |
| // | |
| // Key differences vs info_schema_columns: | |
| // 1. OLD clones Option<String> via `get_prefix().clone()` → extra allocation per row. | |
| // 2. `value` is large (~KBs), so the "incremental drop of source during into_iter" matters: | |
| // OLD keeps full source alive across 4 passes; NEW drops each row after appending. | |
| use std::alloc::{GlobalAlloc, Layout, System}; | |
| use std::sync::atomic::{AtomicUsize, Ordering}; | |
| use std::sync::Arc; | |
| use std::time::Instant; | |
| use arrow::array::{ArrayRef, StringArray, StringBuilder, TimestampNanosecondArray, TimestampNanosecondBuilder}; | |
| use arrow::record_batch::RecordBatch; | |
| use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; | |
| // ---- tracking allocator ---- | |
| struct Tracking; | |
| static CURR: AtomicUsize = AtomicUsize::new(0); | |
| static PEAK: AtomicUsize = AtomicUsize::new(0); | |
| static ALLOCS: AtomicUsize = AtomicUsize::new(0); | |
| unsafe impl GlobalAlloc for Tracking { | |
| unsafe fn alloc(&self, l: Layout) -> *mut u8 { | |
| let p = System.alloc(l); | |
| if !p.is_null() { | |
| let c = CURR.fetch_add(l.size(), Ordering::Relaxed) + l.size(); | |
| ALLOCS.fetch_add(1, Ordering::Relaxed); | |
| let mut peak = PEAK.load(Ordering::Relaxed); | |
| while c > peak { | |
| match PEAK.compare_exchange_weak(peak, c, Ordering::Relaxed, Ordering::Relaxed) { | |
| Ok(_) => break, | |
| Err(v) => peak = v, | |
| } | |
| } | |
| } | |
| p | |
| } | |
| unsafe fn dealloc(&self, p: *mut u8, l: Layout) { | |
| CURR.fetch_sub(l.size(), Ordering::Relaxed); | |
| System.dealloc(p, l); | |
| } | |
| unsafe fn realloc(&self, p: *mut u8, l: Layout, new: usize) -> *mut u8 { | |
| let np = System.realloc(p, l, new); | |
| if !np.is_null() { | |
| if new > l.size() { | |
| let c = CURR.fetch_add(new - l.size(), Ordering::Relaxed) + (new - l.size()); | |
| ALLOCS.fetch_add(1, Ordering::Relaxed); | |
| let mut peak = PEAK.load(Ordering::Relaxed); | |
| while c > peak { | |
| match PEAK.compare_exchange_weak(peak, c, Ordering::Relaxed, Ordering::Relaxed) { | |
| Ok(_) => break, | |
| Err(v) => peak = v, | |
| } | |
| } | |
| } else { | |
| CURR.fetch_sub(l.size() - new, Ordering::Relaxed); | |
| } | |
| } | |
| np | |
| } | |
| } | |
| #[global_allocator] | |
| static A: Tracking = Tracking; | |
| fn reset_stats() { | |
| PEAK.store(CURR.load(Ordering::Relaxed), Ordering::Relaxed); | |
| ALLOCS.store(0, Ordering::Relaxed); | |
| } | |
| fn snapshot_peak() -> usize { PEAK.load(Ordering::Relaxed) } | |
| fn snapshot_curr() -> usize { CURR.load(Ordering::Relaxed) } | |
| fn snapshot_allocs() -> usize { ALLOCS.load(Ordering::Relaxed) } | |
| // ---- domain: CacheItem, IdRow<CacheItem> ---- | |
| #[derive(Clone)] | |
| struct CacheItem { | |
| prefix: Option<String>, | |
| key: String, | |
| value: String, | |
| expire: Option<i64>, // nanos for simplicity | |
| } | |
| impl CacheItem { | |
| fn get_prefix(&self) -> &Option<String> { &self.prefix } | |
| fn get_key(&self) -> &String { &self.key } | |
| fn get_value(&self) -> &String { &self.value } | |
| fn get_expire(&self) -> &Option<i64> { &self.expire } | |
| } | |
| #[derive(Clone)] | |
| struct IdRow<T> { _id: u64, row: T } | |
| impl<T> IdRow<T> { fn get_row(&self) -> &T { &self.row } } | |
| // ---- data generator ---- | |
| // Generate n rows with value size varying uniformly in [min_bytes, max_bytes]. | |
| fn gen_rows(n: usize, min_bytes: usize, max_bytes: usize) -> Vec<IdRow<CacheItem>> { | |
| let mut out = Vec::with_capacity(n); | |
| // Pseudo-random LCG to avoid bringing a rand crate and keep bench reproducible. | |
| let mut state: u64 = 0xdead_beef_cafe_1234; | |
| let range = max_bytes - min_bytes; | |
| for i in 0..n { | |
| state = state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); | |
| let vlen = if range == 0 { min_bytes } else { min_bytes + (state as usize) % range }; | |
| // Build a value string of exactly vlen bytes without intermediate growth: | |
| let mut value = String::with_capacity(vlen); | |
| let pattern = b"abcdefghijklmnopqrstuvwxyz0123456789"; | |
| for j in 0..vlen { | |
| value.push(pattern[j % pattern.len()] as char); | |
| } | |
| let prefix = if i % 3 == 0 { None } else { Some(format!("pre_{}", i % 50)) }; | |
| out.push(IdRow { | |
| _id: i as u64, | |
| row: CacheItem { | |
| prefix, | |
| key: format!("key_for_cache_entry_{:08}", i), | |
| value, | |
| expire: if i % 2 == 0 { Some(1_700_000_000_000_000_000 + i as i64) } else { None }, | |
| }, | |
| }); | |
| } | |
| out | |
| } | |
| // ---- OLD approach (pre-commit): closures + Arc<Vec<_>>, 4 passes, get_prefix().clone() ---- | |
| fn build_old(rows: Arc<Vec<IdRow<CacheItem>>>) -> RecordBatch { | |
| let schema = Arc::new(ArrowSchema::new(vec![ | |
| Field::new("id", DataType::Utf8, false), | |
| Field::new("prefix", DataType::Utf8, true), | |
| Field::new("expire", DataType::Timestamp(TimeUnit::Nanosecond, None), true), | |
| Field::new("value", DataType::Utf8, false), | |
| ])); | |
| let closures: Vec<Box<dyn Fn(Arc<Vec<IdRow<CacheItem>>>) -> ArrayRef>> = vec![ | |
| Box::new(|items| Arc::new(StringArray::from_iter( | |
| items.iter().map(|row| Some(row.get_row().get_key().clone())), | |
| ))), | |
| Box::new(|items| Arc::new(StringArray::from_iter( | |
| items.iter().map(|row| row.get_row().get_prefix().clone()), // <-- clones | |
| ))), | |
| Box::new(|items| Arc::new(TimestampNanosecondArray::from_iter( | |
| items.iter().map(|row| row.get_row().get_expire().clone()), | |
| ))), | |
| Box::new(|items| Arc::new(StringArray::from_iter( | |
| items.iter().map(|row| Some(row.get_row().get_value().clone())), // clones huge value | |
| ))), | |
| ]; | |
| let cols: Vec<ArrayRef> = closures.into_iter().map(|c| c(rows.clone())).collect(); | |
| RecordBatch::try_new(schema, cols).unwrap() | |
| } | |
| // ---- NEW approach (post-commit): single pass, into_iter, StringBuilder ---- | |
| fn build_new(rows: Vec<IdRow<CacheItem>>) -> RecordBatch { | |
| let schema = Arc::new(ArrowSchema::new(vec![ | |
| Field::new("id", DataType::Utf8, false), | |
| Field::new("prefix", DataType::Utf8, true), | |
| Field::new("expire", DataType::Timestamp(TimeUnit::Nanosecond, None), true), | |
| Field::new("value", DataType::Utf8, false), | |
| ])); | |
| let n = rows.len(); | |
| let mut key_b = StringBuilder::with_capacity(n, n * 64); | |
| let mut prefix_b = StringBuilder::with_capacity(n, n * 64); | |
| let mut expire_b = TimestampNanosecondBuilder::with_capacity(n); | |
| let mut value_b = StringBuilder::with_capacity(n, n * 64); | |
| for row in rows.into_iter() { | |
| let item = row.get_row(); | |
| key_b.append_value(item.get_key()); | |
| prefix_b.append_option(item.get_prefix().as_deref()); | |
| expire_b.append_option(item.get_expire().as_ref().copied()); | |
| value_b.append_value(item.get_value()); | |
| } | |
| RecordBatch::try_new(schema, vec![ | |
| Arc::new(key_b.finish()), | |
| Arc::new(prefix_b.finish()), | |
| Arc::new(expire_b.finish()), | |
| Arc::new(value_b.finish()), | |
| ]).unwrap() | |
| } | |
| fn mb(b: usize) -> String { format!("{:>9.2} MB", b as f64 / 1024.0 / 1024.0) } | |
| fn run_case(name: &str, n: usize, min_b: usize, max_b: usize) { | |
| println!("═══ {} (n={}, value {}B..{}B) ═══", name, n, min_b, max_b); | |
| // --- OLD --- | |
| let rows = Arc::new(gen_rows(n, min_b, max_b)); | |
| let total_src_bytes: usize = rows.iter().map(|r| r.row.value.len() + r.row.key.len() + r.row.prefix.as_ref().map(|p| p.len()).unwrap_or(0)).sum(); | |
| let source_heap = snapshot_curr(); | |
| reset_stats(); | |
| let t0 = Instant::now(); | |
| let batch_o = build_old(rows.clone()); | |
| let t_o = t0.elapsed(); | |
| let peak_o = snapshot_peak().saturating_sub(source_heap); | |
| let allocs_o = snapshot_allocs(); | |
| drop(rows); | |
| drop(batch_o); | |
| // --- NEW --- | |
| let rows = gen_rows(n, min_b, max_b); | |
| let source_heap = snapshot_curr(); | |
| reset_stats(); | |
| let t0 = Instant::now(); | |
| let batch_n = build_new(rows); | |
| let t_n = t0.elapsed(); | |
| let peak_n = snapshot_peak().saturating_sub(source_heap); | |
| let allocs_n = snapshot_allocs(); | |
| let final_n = snapshot_curr(); | |
| drop(batch_n); | |
| println!(" source cache payload sum: {}", mb(total_src_bytes)); | |
| println!(" OLD peak Δ during build: {} allocs: {:>7} time: {:?}", | |
| mb(peak_o), allocs_o, t_o); | |
| println!(" NEW peak Δ during build: {} allocs: {:>7} time: {:?}", | |
| mb(peak_n), allocs_n, t_n); | |
| println!(" OLD - NEW peak Δ: {:>+9.2} MB ({:+.1}%)", | |
| (peak_o as i64 - peak_n as i64) as f64 / 1024.0 / 1024.0, | |
| 100.0 * (peak_o as f64 - peak_n as f64) / peak_o as f64); | |
| println!(" OLD - NEW allocs Δ: {:>+9}", | |
| allocs_o as i64 - allocs_n as i64); | |
| println!(" final RecordBatch live: {}", mb(final_n)); | |
| println!(); | |
| } | |
| fn main() { | |
| // value ~avg = (min + max) / 2 | |
| run_case("total ~10 MB (avg ~1 KB)", 10_000, 256, 2 * 1024); | |
| run_case("total ~55 MB (avg ~5 KB)", 10_000, 1_024, 10 * 1024); | |
| run_case("total ~128 MB (avg ~13 KB)", 10_000, 1_024, 25 * 1024); | |
| run_case("total ~257 MB (avg ~26 KB)", 10_000, 1_024, 52 * 1024); | |
| // NOTE: Arrow Utf8 StringArray uses i32 offsets → max 2 GiB of data per column. | |
| // 10K × 512KB = 5 GB would overflow ("byte array offset overflow" panic), so we | |
| // reduce N to keep total under the limit. This is a real limit of the current | |
| // schema — cubestore's system_cache would itself panic for such a cache size. | |
| run_case("value 512 KB fixed (N=3000, ~1.5 GB)", 3_000, 512 * 1024, 512 * 1024 + 1); | |
| run_case("value 1 MB fixed (N=1500, ~1.5 GB)", 1_500, 1024 * 1024, 1024 * 1024 + 1); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment