Created
April 13, 2026 21:58
-
-
Save nascheme/6d09885d3696055b9665d3b88c7aacaa to your computer and use it in GitHub Desktop.
Python cyclic GC benchmark script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import gc | |
| import sys | |
| import time | |
| def get_memory_usage(): | |
| """Memory usage of the current process in KB.""" | |
| result = {'peak': 0, 'rss': 0} | |
| with open('/proc/self/status') as status: | |
| for line in status: | |
| parts = line.split() | |
| key = parts[0][2:-1].lower() | |
| if key in result: | |
| result[key] = int(parts[1]) | |
| return result | |
| def get_rss_kb(): | |
| return get_memory_usage().get('rss', 0) | |
| class Node: | |
| __slots__ = ('next', 'payload') | |
| class Stats: | |
| num_alive = 0 | |
| num_trash = 0 | |
| max_trash = 0 | |
| peak_rss = 0 | |
| def make_cycle(cycle_size, extra_bytes): | |
| nodes = [Node() for _ in range(cycle_size)] | |
| for i, n in enumerate(nodes): | |
| n.next = nodes[(i + 1) % cycle_size] | |
| n.payload = None | |
| if extra_bytes: | |
| nodes[0].payload = bytes(extra_bytes) | |
| return nodes[0] | |
| def gc_callback(phase, info): | |
| if phase == 'stop': | |
| collected = info.get('collected') or 0 | |
| Stats.num_trash -= collected | |
| Stats.num_alive -= collected | |
| if Stats.num_trash < 0: | |
| Stats.num_trash = 0 | |
| def parse_args(): | |
| p = argparse.ArgumentParser(description='Cyclic GC benchmark') | |
| p.add_argument( | |
| '--cycle-size', | |
| type=int, | |
| default=4, | |
| help='objects per cycle (ring length)', | |
| ) | |
| p.add_argument( | |
| '--extra-bytes', | |
| type=int, | |
| default=0, | |
| help='extra bytes payload attached to each cycle', | |
| ) | |
| p.add_argument( | |
| '--live-objects', | |
| type=int, | |
| default=10_000, | |
| help='approximate live object count before releasing', | |
| ) | |
| p.add_argument( | |
| '--total-objects', | |
| type=int, | |
| default=1_000_000, | |
| help='exit after creating this many objects', | |
| ) | |
| p.add_argument( | |
| '--report-interval', | |
| type=float, | |
| default=0.25, | |
| help='seconds between status lines', | |
| ) | |
| return p.parse_args() | |
| def main(): | |
| args = parse_args() | |
| gc.callbacks.append(gc_callback) | |
| holder = [] | |
| created = 0 | |
| cycle_size = args.cycle_size | |
| extra_bytes = args.extra_bytes | |
| live_target = args.live_objects | |
| total = args.total_objects | |
| interval = args.report_interval | |
| start = time.perf_counter() | |
| next_report = start | |
| print('time_ms,alive,trash,rss_kb') | |
| def report(now): | |
| rss = get_rss_kb() | |
| if rss > Stats.peak_rss: | |
| Stats.peak_rss = rss | |
| elapsed_ms = (now - start) * 1000.0 | |
| print( | |
| f'{elapsed_ms:.1f},{Stats.num_alive},' f'{Stats.num_trash},{rss}' | |
| ) | |
| while created < total: | |
| holder.append(make_cycle(cycle_size, extra_bytes)) | |
| Stats.num_alive += cycle_size | |
| Stats.num_trash += cycle_size | |
| if Stats.num_trash > Stats.max_trash: | |
| Stats.max_trash = Stats.num_trash | |
| created += cycle_size | |
| if len(holder) * cycle_size >= live_target: | |
| holder.clear() | |
| now = time.perf_counter() | |
| if now >= next_report: | |
| report(now) | |
| next_report = now + interval | |
| report(time.perf_counter()) | |
| end = time.perf_counter() | |
| print(f'Total time: {end - start:.3f}s', file=sys.stderr) | |
| print(f'Peak RSS: {Stats.peak_rss} KB', file=sys.stderr) | |
| print(f'Max trash: {Stats.max_trash}', file=sys.stderr) | |
| print(f'Created objects: {created}', file=sys.stderr) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment