Skip to content

Instantly share code, notes, and snippets.

@qezz
Created February 3, 2026 10:10
Show Gist options
  • Select an option

  • Save qezz/d34c8c6162e2fffd04a2ea61413a6a71 to your computer and use it in GitHub Desktop.

Select an option

Save qezz/d34c8c6162e2fffd04a2ea61413a6a71 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Aggregate speedscope profile data by frame file path regex pattern.
Supports speedscope's native JSON format with both sampled and evented profiles.
"""
import argparse
import json
import re
import sys
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class FrameStats:
name: str
file: str = ""
total_time: float = 0.0
self_time: float = 0.0
count: int = 0
def merge(self, other: "FrameStats"):
self.total_time += other.total_time
self.self_time += other.self_time
self.count += other.count
@dataclass
class AggregatedStats:
pattern: re.Pattern
frames: dict[str, FrameStats] = field(default_factory=dict)
total_profile_time: float = 0.0
unit: str = "unknown"
def add_frame(
self, name: str, file: str, total_time: float, self_time: float, count: int = 1
):
if name not in self.frames:
self.frames[name] = FrameStats(name, file)
self.frames[name].total_time += total_time
self.frames[name].self_time += self_time
self.frames[name].count += count
@property
def total_time(self) -> float:
return sum(f.self_time for f in self.frames.values())
@property
def total_count(self) -> int:
return sum(f.count for f in self.frames.values())
def load_speedscope(path: Path) -> dict:
with open(path) as f:
return json.load(f)
def process_sampled_profile(
profile: dict, frames: list[dict], pattern: re.Pattern, stats: AggregatedStats
):
"""Process a sampled profile (has samples + weights arrays)."""
samples = profile.get("samples", [])
weights = profile.get("weights", [])
if len(samples) != len(weights):
print(
f"Warning: samples ({len(samples)}) and weights ({len(weights)}) length mismatch",
file=sys.stderr,
)
return
stats.total_profile_time += sum(weights)
for stack, weight in zip(samples, weights):
if not stack:
continue
# Track which matching frames are in this stack
matching_in_stack = set()
for i, frame_idx in enumerate(stack):
if frame_idx >= len(frames):
continue
frame = frames[frame_idx]
name = frame.get("name", f"frame_{frame_idx}")
file = frame.get("file", "")
if not pattern.search(file):
continue
# Self time only for the top of stack (last element)
is_top = i == len(stack) - 1
self_time = weight if is_top else 0.0
# Only count total time once per frame per sample (handles recursion)
if name not in matching_in_stack:
stats.add_frame(
name, file, total_time=weight, self_time=self_time, count=1
)
matching_in_stack.add(name)
elif is_top:
# Still add self time if this frame is at the top
stats.frames[name].self_time += self_time
def process_evented_profile(
profile: dict, frames: list[dict], pattern: re.Pattern, stats: AggregatedStats
):
"""Process an evented profile (has events array with O/C events)."""
events = profile.get("events", [])
# Track open frames: frame_idx -> list of open timestamps
open_stack: list[tuple[int, float]] = [] # (frame_idx, open_time)
frame_times: dict[int, list[tuple[float, float]]] = defaultdict(
list
) # frame_idx -> [(total, self), ...]
for event in events:
event_type = event.get("type")
frame_idx = event.get("frame")
at = event.get("at", 0)
if event_type == "O":
open_stack.append((frame_idx, at))
elif event_type == "C":
if not open_stack:
continue
# Find matching open event (should be the most recent one for this frame)
for i in range(len(open_stack) - 1, -1, -1):
if open_stack[i][0] == frame_idx:
_, open_time = open_stack.pop(i)
duration = at - open_time
# Self time: subtract time spent in children
# For simplicity, we'll track self time as duration minus nested calls
# This is approximate for evented profiles
frame_times[frame_idx].append((duration, duration))
break
# Calculate total profile time from events
if events:
times = [e.get("at", 0) for e in events]
stats.total_profile_time += max(times) - min(times)
# Aggregate frame times
for frame_idx, times in frame_times.items():
if frame_idx >= len(frames):
continue
frame = frames[frame_idx]
name = frame.get("name", f"frame_{frame_idx}")
file = frame.get("file", "")
if not pattern.search(file):
continue
total = sum(t[0] for t in times)
self_t = sum(t[1] for t in times)
count = len(times)
stats.add_frame(name, file, total_time=total, self_time=self_t, count=count)
def process_speedscope(data: dict, pattern: re.Pattern) -> AggregatedStats:
"""Process a speedscope file and return aggregated stats for matching frames."""
# Get shared frames (speedscope format)
shared = data.get("shared", {})
frames = shared.get("frames", [])
profiles = data.get("profiles", [])
# Determine unit from first profile
unit = "samples"
if profiles:
unit = profiles[0].get("unit", "samples")
stats = AggregatedStats(pattern=pattern, unit=unit)
for profile in profiles:
profile_type = profile.get("type", "")
# Some profiles have their own frames
profile_frames = profile.get("frames", frames)
if not profile_frames:
profile_frames = frames
if profile_type == "sampled" or "samples" in profile:
process_sampled_profile(profile, profile_frames, pattern, stats)
elif profile_type == "evented" or "events" in profile:
process_evented_profile(profile, profile_frames, pattern, stats)
else:
print(f"Warning: unknown profile type '{profile_type}'", file=sys.stderr)
return stats
def format_time(value: float, unit: str) -> str:
"""Format a time value with appropriate unit scaling."""
if unit in ("nanoseconds", "ns"):
if value >= 1e9:
return f"{value/1e9:.3f}s"
if value >= 1e6:
return f"{value/1e6:.3f}ms"
if value >= 1e3:
return f"{value/1e3:.3f}µs"
return f"{value:.0f}ns"
elif unit in ("microseconds", "µs", "us"):
if value >= 1e6:
return f"{value/1e6:.3f}s"
if value >= 1e3:
return f"{value/1e3:.3f}ms"
return f"{value:.3f}µs"
elif unit in ("milliseconds", "ms"):
if value >= 1e3:
return f"{value/1e3:.3f}s"
return f"{value:.3f}ms"
elif unit in ("seconds", "s"):
return f"{value:.3f}s"
else:
return f"{value:.2f} {unit}"
def print_stats(
stats: AggregatedStats, sort_by: str = "self", top_n: int | None = None
):
"""Print aggregated statistics."""
if not stats.frames:
print(f"No frames found matching pattern '{stats.pattern.pattern}'")
return
# Sort frames
frames = list(stats.frames.values())
if sort_by == "self":
frames.sort(key=lambda f: f.self_time, reverse=True)
elif sort_by == "total":
frames.sort(key=lambda f: f.total_time, reverse=True)
elif sort_by == "count":
frames.sort(key=lambda f: f.count, reverse=True)
elif sort_by == "name":
frames.sort(key=lambda f: f.name)
if top_n:
frames = frames[:top_n]
# Print summary
print(f"\n{'='*70}")
print(f"Pattern: {stats.pattern.pattern}")
print(f"Matching frames: {len(stats.frames)}")
print(f"Total self time: {format_time(stats.total_time, stats.unit)}")
print(f"Total calls: {stats.total_count}")
if stats.total_profile_time > 0:
pct = (stats.total_time / stats.total_profile_time) * 100
print(
f"Profile time: {format_time(stats.total_profile_time, stats.unit)} ({pct:.1f}% in matched)"
)
print(f"{'='*70}\n")
# Calculate column widths
name_width = min(40, max(len(f.name) for f in frames))
file_width = min(40, max(len(f.file) for f in frames))
# Print header
header = f"{'Name':<{name_width}} {'File':<{file_width}} {'Self Time':>12} {'Total Time':>12} {'Count':>8} {'Self %':>7}"
print(header)
print("-" * len(header))
# Print frames
for frame in frames:
name = frame.name
if len(name) > name_width:
name = name[: name_width - 3] + "..."
file = frame.file
if len(file) > file_width:
file = "..." + file[-(file_width - 3) :]
self_pct = (
(frame.self_time / stats.total_profile_time * 100)
if stats.total_profile_time > 0
else 0
)
print(
f"{name:<{name_width}} "
f"{file:<{file_width}} "
f"{format_time(frame.self_time, stats.unit):>12} "
f"{format_time(frame.total_time, stats.unit):>12} "
f"{frame.count:>8} "
f"{self_pct:>6.1f}%"
)
def main():
parser = argparse.ArgumentParser(
description="Aggregate speedscope profile data by frame file path pattern"
)
parser.add_argument("file", type=Path, help="Speedscope JSON file")
parser.add_argument("pattern", help="Regex pattern to match frame file paths")
parser.add_argument(
"-i",
"--ignore-case",
action="store_true",
help="Case-insensitive pattern matching",
)
parser.add_argument(
"--sort",
"-s",
choices=["self", "total", "count", "name"],
default="self",
help="Sort by: self time, total time, count, or name (default: self)",
)
parser.add_argument(
"--top", "-n", type=int, default=None, help="Show only top N frames"
)
parser.add_argument("--json", "-j", action="store_true", help="Output as JSON")
args = parser.parse_args()
if not args.file.exists():
print(f"Error: file not found: {args.file}", file=sys.stderr)
sys.exit(1)
try:
flags = re.IGNORECASE if args.ignore_case else 0
pattern = re.compile(args.pattern, flags)
except re.error as e:
print(f"Error: invalid regex pattern: {e}", file=sys.stderr)
sys.exit(1)
try:
data = load_speedscope(args.file)
except json.JSONDecodeError as e:
print(f"Error: invalid JSON: {e}", file=sys.stderr)
sys.exit(1)
stats = process_speedscope(data, pattern)
if args.json:
output = {
"pattern": pattern.pattern,
"unit": stats.unit,
"total_self_time": stats.total_time,
"total_count": stats.total_count,
"profile_time": stats.total_profile_time,
"frames": [
{
"name": f.name,
"file": f.file,
"self_time": f.self_time,
"total_time": f.total_time,
"count": f.count,
}
for f in stats.frames.values()
],
}
print(json.dumps(output, indent=2))
else:
print_stats(stats, sort_by=args.sort, top_n=args.top)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment