Skip to content

Instantly share code, notes, and snippets.

@Aladex
Created March 17, 2026 11:10
Show Gist options
  • Select an option

  • Save Aladex/d1dc636bdb53827d910e7571de8bd23c to your computer and use it in GitHub Desktop.

Select an option

Save Aladex/d1dc636bdb53827d910e7571de8bd23c to your computer and use it in GitHub Desktop.
Yandex Disk photounlim bulk delete script — deletes photos invisible to public REST API via internal models-v2 endpoint
#!/usr/bin/env python3
"""
Yandex Disk photounlim cleanup script.
Deletes photos from the "photounlim" section of Yandex.Disk via internal
models-v2 API. Photounlim files are invisible to the public REST API and
can only be managed through the web interface's internal endpoints.
Requirements:
pip install curl_cffi
Setup:
1. Log into disk.yandex.ru in Firefox
2. Set FIREFOX_PROFILE below to your Firefox profile path
(find it at about:profiles in Firefox)
3. Get your PHOTOSLICE_ID:
- Open disk.yandex.ru/client/photo in Firefox
- Open DevTools console (F12)
- Run: document.getElementById('app')[
Object.keys(document.getElementById('app'))
.find(k => k.startsWith('__reactContainer'))
].child.memoizedProps.store.getState().photoslice.photosliceId
- Paste the result below
4. Get your UID:
- Same console: ...getState().user.uid
- Paste below
Usage:
python3 yadisk_cleanup.py --space # check disk usage
python3 yadisk_cleanup.py --collect # collect alive file paths (cache)
python3 yadisk_cleanup.py --test # test delete 10 files
python3 yadisk_cleanup.py # full run (uses cache)
python3 yadisk_cleanup.py --fresh # full run (re-collect paths)
How it works:
1. Reads session cookies from your Firefox profile (cookies.sqlite)
2. Extracts CSRF token (SK) from disk.yandex.ru HTML
3. Fetches photo clusters via intapi/photo-get-clusters
4. Checks which files are alive via intapi/photo-get-clusters-with-resources
(dead files show up in "missing", alive in "fetched")
5. Deletes alive files via mpfs/bulk-async-delete
6. Empties trash via mpfs/async-trash-drop-all
Notes:
- Yandex checks TLS fingerprint, so we use curl_cffi to impersonate Firefox
- Too many requests trigger CAPTCHA — script uses random delays
- If CAPTCHA is triggered, pass it in browser and rerun
- Photounlim paths differ between APIs:
Photoslice returns: photounlim:/filename.JPG
Delete expects: /photounlim/filename.JPG
"""
import json
import os
import random
import re
import shutil
import sqlite3
import tempfile
import time
import sys
from curl_cffi import requests
# ============================================================
# CONFIGURE THESE
# ============================================================
# Path to your Firefox profile directory
# Find yours at about:profiles in Firefox
FIREFOX_PROFILE = os.path.expanduser(
"~/.mozilla/firefox/XXXXXXXX.default-release"
)
# Your photoslice ID (see Setup instructions above)
PHOTOSLICE_ID = ""
# Your Yandex UID (see Setup instructions above)
UID = ""
# ============================================================
# TUNING (adjust if you get CAPTCHAs)
# ============================================================
BASE_URL = "https://disk.yandex.ru/models-v2"
DELAY_MIN = 8 # min seconds between delete batches
DELAY_MAX = 15 # max seconds between delete batches
CHUNK_MIN = 35 # min files per delete request
CHUNK_MAX = 50 # max files per delete request
def load_cookies_from_firefox():
"""Read cookies from Firefox sqlite db, preferring disk.yandex.ru domain."""
src = os.path.join(FIREFOX_PROFILE, "cookies.sqlite")
if not os.path.exists(src):
raise FileNotFoundError(
f"Firefox cookies not found at {src}\n"
f"Check FIREFOX_PROFILE setting. Find yours at about:profiles"
)
tmp = tempfile.mktemp(suffix=".sqlite")
shutil.copy2(src, tmp)
conn = sqlite3.connect(tmp)
cur = conn.cursor()
cur.execute(
"SELECT name, value, host FROM moz_cookies "
"WHERE host LIKE '%yandex%' ORDER BY lastAccessed DESC"
)
rows = cur.fetchall()
conn.close()
os.unlink(tmp)
if not rows:
raise RuntimeError(
"No Yandex cookies found. Log into disk.yandex.ru in Firefox first."
)
# Prefer cookies from disk.yandex.ru, then .yandex.ru
by_name = {}
domain_priority = {
"disk.yandex.ru": 0, ".disk.yandex.ru": 0,
".yandex.ru": 1, "yandex.ru": 1,
}
for name, value, host in rows:
prio = domain_priority.get(host, 2)
if name not in by_name or prio < by_name[name][1]:
by_name[name] = (value, prio)
return "; ".join(f"{k}={v[0]}" for k, v in by_name.items())
def get_sk_from_page(session, cookies):
"""Load disk page and extract SK token from HTML."""
resp = session.get(
"https://disk.yandex.ru/client/photo",
headers={"Cookie": cookies},
impersonate="firefox",
)
m = re.search(r'"sk":"([^"]+)"', resp.text)
if m:
return m.group(1)
if "captcha" in resp.text.lower():
raise RuntimeError(
"CAPTCHA on page load. Open disk.yandex.ru in browser, "
"pass captcha, then retry."
)
raise RuntimeError("Could not extract SK from page. Are you logged in?")
class YaDiskCleaner:
def __init__(self):
if not PHOTOSLICE_ID:
raise RuntimeError("PHOTOSLICE_ID is not set. See setup instructions.")
if not UID:
raise RuntimeError("UID is not set. See setup instructions.")
self.cookies = load_cookies_from_firefox()
self.session = requests.Session(impersonate="firefox")
self.headers = {
"Content-Type": "application/json",
"X-Requested-With": "XMLHttpRequest",
"X-Retpath-Y": "https://disk.yandex.ru/client/photo",
"Origin": "https://disk.yandex.ru",
"Referer": "https://disk.yandex.ru/client/photo",
"Cookie": self.cookies,
}
print("[init] Loading SK from page...")
self.sk = get_sk_from_page(self.session, self.cookies)
print(f"[init] SK: {self.sk[:30]}...")
self.id_client = f"{UID}{int(time.time() * 1000)}"
print(f"[init] Ready\n")
def api(self, method, params):
"""Call models-v2 API with auto CKEY refresh."""
body = {
"sk": self.sk,
"connection_id": self.id_client,
"apiMethod": method,
"requestParams": params,
}
resp = self.session.post(
f"{BASE_URL}?m={method}",
json=body,
headers=self.headers,
)
if resp.status_code == 403:
data = resp.json()
if data.get("error", {}).get("code") == "CKEY":
new_sk = data["error"]["ckey"]
print(f" [sk rotated] {new_sk[:30]}...")
self.sk = new_sk
body["sk"] = new_sk
resp = self.session.post(
f"{BASE_URL}?m={method}",
json=body,
headers=self.headers,
)
data = resp.json()
if resp.status_code != 200:
raise RuntimeError(f"API error {resp.status_code}: {data}")
if isinstance(data, dict) and data.get("type") == "captcha":
raise RuntimeError("CAPTCHA! Wait and retry later.")
return data
def get_space(self):
data = self.api("mpfs/space", None)
return {
"used_gb": round(data["used"] / 1024**3, 1),
"trash_gb": round(data.get("trash", 0) / 1024**3, 1),
"files": data["files_count"],
}
def collect_paths(self):
"""Collect alive file paths by checking each cluster via
photo-get-clusters-with-resources (missing = dead)."""
print("[collect] Fetching cluster list...")
result = self.api("intapi/photo-get-clusters", {
"id": PHOTOSLICE_ID,
"offset": 0,
"amount": 5000,
})
items = result.get("items", [])
print(f"[collect] Got {len(items)} clusters from API")
seen_clusters = set()
cluster_ids = []
for cluster in items:
cid = cluster["cluster_id"]
if cid in seen_clusters:
continue
seen_clusters.add(cid)
cluster_ids.append(cid)
print(f"[collect] {len(cluster_ids)} unique clusters")
print("[collect] Checking which clusters are alive...")
alive_paths = []
dead_clusters = 0
alive_clusters = 0
# Check clusters in batches of 10
for i in range(0, len(cluster_ids), 10):
batch = cluster_ids[i:i + 10]
query = {cid: {"range": [0, 10000]} for cid in batch}
try:
r = self.api("intapi/photo-get-clusters-with-resources", {
"photosliceId": PHOTOSLICE_ID,
"clusters": query,
"hideScreenshots": True,
})
except Exception as e:
print(f" [!] Error checking batch {i}: {e}")
continue
resources = r.get("resources", {})
fetched = resources.get("fetched", [])
missing = resources.get("missing", [])
for res in fetched:
path = res.get("path", res.get("id", ""))
if path:
alive_paths.append(path)
if fetched:
alive_clusters += len(batch) - (1 if missing else 0)
if missing:
dead_clusters += 1
print(f" [{i+len(batch)}/{len(cluster_ids)}] "
f"alive_files={len(alive_paths)} "
f"dead_clusters={dead_clusters}")
time.sleep(random.uniform(2, 5))
print(f"\n[collect] Done: {len(alive_paths)} alive files, "
f"{dead_clusters} dead clusters, "
f"{alive_clusters} alive clusters")
# Cache to file
cache = os.path.join(os.path.dirname(__file__) or ".", ".yadisk_paths_cache.json")
with open(cache, "w") as f:
json.dump(alive_paths, f)
print(f"[collect] Cached to {cache}")
return alive_paths
def load_or_collect_paths(self, force=False):
"""Load paths from cache or collect fresh."""
cache = os.path.join(os.path.dirname(__file__) or ".", ".yadisk_paths_cache.json")
if not force and os.path.exists(cache):
age_min = (time.time() - os.path.getmtime(cache)) / 60
with open(cache) as f:
paths = json.load(f)
print(f"[cache] Loaded {len(paths)} paths "
f"(cached {age_min:.0f} min ago)")
return paths
return self.collect_paths()
def run(self, max_files=None, force_collect=False):
space = self.get_space()
print(f"[space] {space['used_gb']} GB used, "
f"{space['files']} files, {space['trash_gb']} GB trash\n")
all_paths = self.load_or_collect_paths(force=force_collect)
if max_files:
all_paths = all_paths[:max_files]
print(f"[limit] Capped to {max_files} files\n")
deleted = 0
skipped = 0
errors = 0
i = 0
while i < len(all_paths):
chunk_size = random.randint(CHUNK_MIN, CHUNK_MAX)
chunk = all_paths[i:i + chunk_size]
i += len(chunk)
# Show what we're deleting
for p in chunk:
print(f" -> {p}")
try:
result = self.api("mpfs/bulk-async-delete", {
"operations": [{"src": p} for p in chunk],
})
if isinstance(result, list):
for j, r in enumerate(result):
if r.get("oid"):
deleted += 1
print(f" ✓ deleted (oid={r['oid'][:16]}...)")
elif r.get("error"):
code = r["error"].get("code", "?")
if code == 71: # resource not found
skipped += 1
print(f" - already gone")
else:
errors += 1
print(f" ✗ error: {r['error']}")
else:
deleted += len(chunk)
print(f" ✓ batch ok")
except RuntimeError as e:
if "CAPTCHA" in str(e):
print(f"\n[CAPTCHA] Stopping! deleted={deleted} "
f"skipped={skipped} errors={errors}")
return
errors += len(chunk)
print(f" ✗ {e}")
# Progress
total = len(all_paths)
print(f" [{i}/{total}] del={deleted} skip={skipped} err={errors}")
# Random delay
delay = random.uniform(DELAY_MIN, DELAY_MAX)
print(f" ... waiting {delay:.0f}s\n")
time.sleep(delay)
print(f"\n[done] deleted={deleted} skipped={skipped} errors={errors}")
if deleted > 0:
print("[trash] Emptying trash...")
try:
self.api("mpfs/async-trash-drop-all", {})
except Exception as e:
print(f"[trash] error: {e}")
time.sleep(5)
space = self.get_space()
print(f"[space] {space['used_gb']} GB, {space['files']} files, "
f"{space['trash_gb']} GB trash")
if __name__ == "__main__":
cleaner = YaDiskCleaner()
if "--space" in sys.argv:
print(cleaner.get_space())
elif "--collect" in sys.argv:
# Force re-collect alive paths
cleaner.collect_paths()
elif "--test" in sys.argv:
# Test: 10 files (uses cache if available)
cleaner.run(max_files=10)
else:
# Full run. Add --fresh to force re-collect
cleaner.run(force_collect="--fresh" in sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment