Skip to content

Instantly share code, notes, and snippets.

@rohan335
Created February 28, 2026 08:07
Show Gist options
  • Select an option

  • Save rohan335/8aa8a6e53dd168e4703506856f3c19b7 to your computer and use it in GitHub Desktop.

Select an option

Save rohan335/8aa8a6e53dd168e4703506856f3c19b7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Minimal downloader for PostHog session replay snapshots.
- Reads POSTHOG_HOST, POSTHOG_PERSONAL_API_KEY, POSTHOG_PROJECT_ID from .env
- Writes one JSON file per event in a per-session folder
- Only supports blob_v2 source currently
Example:
# Download latest 3 sessions from last 7 days
python download_replay_snapshots.py --since -7d --limit 3
"""
import argparse
import json
import os
import urllib.request
import urllib.error
import urllib.parse
from typing import Any, Dict, Optional, Tuple, List
import gzip
def _load_dotenv(path: str = ".env") -> None:
try:
if not os.path.exists(path):
return
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
k, v = k.strip(), v.strip()
if (v.startswith('"') and v.endswith('"')) or (
v.startswith("'") and v.endswith("'")
):
v = v[1:-1]
if k and k not in os.environ:
os.environ[k] = v
except Exception:
pass
def _normalize_host(host: str) -> str:
host = (host or "").strip()
if not host:
return "https://us.posthog.com"
if not host.startswith("http://") and not host.startswith("https://"):
host = "https://" + host
return host.rstrip("/")
def _http_request(
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
timeout: int = 60,
) -> Tuple[int, Dict[str, Any]]:
req = urllib.request.Request(url, headers=headers or {}, method=method.upper())
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
status = resp.getcode()
raw = resp.read()
try:
payload = json.loads(raw.decode("utf-8")) if raw else {}
except Exception:
payload = {"_raw": raw.decode("utf-8", errors="replace")}
return status, payload
except urllib.error.HTTPError as e:
raw = e.read()
try:
payload = json.loads(raw.decode("utf-8")) if raw else {}
except Exception:
payload = {"_raw": raw.decode("utf-8", errors="replace")}
return e.code, payload
def _list_recordings(
host: str, key: str, project_id: str, date_from: str, limit: int
) -> List[Dict[str, Any]]:
url = f"{host}/api/projects/{project_id}/session_recordings?date_from={urllib.parse.quote(date_from)}&limit={limit}"
st, payload = _http_request("GET", url, headers={"Authorization": f"Bearer {key}"})
if st != 200:
raise RuntimeError(f"Failed to list recordings: HTTP {st}: {payload}")
return payload.get("results", []) if isinstance(payload, dict) else []
def _snapshots_sources(
host: str, key: str, project_id: str, session_id: str
) -> Dict[str, Any]:
# No 'source' param -> returns a manifest with sources listing
url = f"{host}/api/projects/{project_id}/session_recordings/{session_id}/snapshots"
st, payload = _http_request("GET", url, headers={"Authorization": f"Bearer {key}"})
if st != 200:
raise RuntimeError(f"Failed to fetch snapshots manifest: HTTP {st}: {payload}")
return payload
def _fetch_blob_v2_range(
host: str, key: str, project_id: str, session_id: str, start_k: int, end_k: int
) -> str:
base = f"{host}/api/projects/{project_id}/session_recordings/{session_id}/snapshots"
url = f"{base}?source=blob_v2&start_blob_key={start_k}&end_blob_key={end_k}"
st, payload = _http_request("GET", url, headers={"Authorization": f"Bearer {key}"})
if st != 200:
raise RuntimeError(
f"Failed to fetch blob_v2 range {start_k}-{end_k}: HTTP {st}: {payload}"
)
if isinstance(payload, dict) and "_raw" in payload:
return payload["_raw"]
return json.dumps(payload)
def _save_text(path: str, text: str) -> None:
with open(path, "w", encoding="utf-8") as f:
f.write(text)
def _decode_ndjson_to_events(text: str) -> List[Dict[str, Any]]:
events: List[Dict[str, Any]] = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
except Exception:
continue
if isinstance(item, list) and len(item) >= 2 and isinstance(item[1], dict):
obj = item[1]
if isinstance(obj.get("type"), int) and "timestamp" in obj:
events.append(obj)
continue
data = obj.get("data")
if isinstance(obj.get("cv"), str) and isinstance(data, str):
try:
buf = data.encode("latin-1", errors="ignore")
dec = gzip.decompress(buf)
payload = dec.decode("utf-8", errors="replace")
# Try as JSON array/object, else line-by-line
try:
parsed = json.loads(payload)
if isinstance(parsed, list):
events.extend([e for e in parsed if isinstance(e, dict)])
elif isinstance(parsed, dict) and isinstance(
parsed.get("events"), list
):
events.extend(
[
e
for e in parsed.get("events", [])
if isinstance(e, dict)
]
)
else:
for l2 in payload.splitlines():
try:
ev = json.loads(l2)
if isinstance(ev, dict):
events.append(ev)
except Exception:
continue
except Exception:
for l2 in payload.splitlines():
try:
ev = json.loads(l2)
if isinstance(ev, dict):
events.append(ev)
except Exception:
continue
except Exception:
continue
return events
def _write_events_folder(
base_dir: str, session_id: str, events: List[Dict[str, Any]]
) -> str:
session_dir = os.path.join(base_dir, f"events_{session_id}")
os.makedirs(session_dir, exist_ok=True)
width = max(4, len(str(len(events))))
for idx, ev in enumerate(events, start=1):
fname = f"event_{idx:0{width}d}.json"
path = os.path.join(session_dir, fname)
with open(path, "w", encoding="utf-8") as f:
json.dump(ev, f)
with open(os.path.join(session_dir, "manifest.json"), "w", encoding="utf-8") as mf:
json.dump({"session_id": session_id, "event_count": len(events)}, mf)
return session_dir
def main() -> int:
_load_dotenv()
p = argparse.ArgumentParser(
description="Download session replay events to per-session folders (one JSON per event)"
)
p.add_argument(
"--since", default="-7d", help="Relative time window for listing, e.g. -7d"
)
p.add_argument(
"--limit",
type=int,
default=1,
help="How many recent recordings to download (ignored if --session-id set)",
)
p.add_argument(
"--session-id",
help="Specific session id to download; bypass listing when provided",
)
p.add_argument(
"--out-dir",
default=".",
help="Output directory to place session folders (default current directory)",
)
args = p.parse_args()
host = _normalize_host(os.getenv("POSTHOG_HOST", "https://us.posthog.com"))
key = os.getenv("POSTHOG_PERSONAL_API_KEY")
if not key:
print("Missing POSTHOG_PERSONAL_API_KEY in env/.env")
return 2
project_id = os.getenv("POSTHOG_PROJECT_ID")
if not project_id:
print("Missing POSTHOG_PROJECT_ID in env/.env")
return 2
sessions: List[Dict[str, Any]] = []
if args.session_id:
sessions = [{"id": args.session_id}]
else:
sessions = (
_list_recordings(host, key, str(project_id), args.since, args.limit) or []
)
if not sessions:
print("No recordings found to download.")
return 0
for rec in sessions:
sid = rec.get("id")
if not sid:
continue
# Fetch manifest to obtain blob_v2 keys
manifest = _snapshots_sources(host, key, str(project_id), sid)
sources = manifest.get("sources") if isinstance(manifest, dict) else None
# Output folder base
base_dir = args.out_dir or "."
# Hardcoded blob_v2 mode only, no fallback
keys = [
int(s.get("blob_key"))
for s in (sources or [])
if str(s.get("blob_key")).isdigit()
]
if not keys:
print(
"no-blob-keys: manifest missing blob_v2 keys; probably wrong source, try blob_v2_ts or none"
)
continue
start_k, end_k = min(keys), max(keys)
try:
text = _fetch_blob_v2_range(host, key, str(project_id), sid, start_k, end_k)
events = _decode_ndjson_to_events(text)
session_dir = _write_events_folder(base_dir, sid, events)
print(session_dir)
except Exception as e:
msg = str(e)
if "HTTP 400" in msg or "invalid_input" in msg:
print("error-400: probably wrong source, try blob_v2_ts or none")
else:
print(f"error: {msg}")
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except KeyboardInterrupt:
print("Interrupted.")
raise SystemExit(130)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment