Skip to content

Instantly share code, notes, and snippets.

@SansPapyrus683
Last active February 8, 2025 15:50
Show Gist options
  • Save SansPapyrus683/c551e6b48d6cefdf9e8345720fc2e098 to your computer and use it in GitHub Desktop.
Save SansPapyrus683/c551e6b48d6cefdf9e8345720fc2e098 to your computer and use it in GitHub Desktop.
download all your twitter anime girls!
import filecmp
import os
import sys
from collections import defaultdict
from dataclasses import dataclass
@dataclass
class Tweet:
raw: str
author: str
id: int
part: int
def extract_info(tweet: str) -> Tweet:
raw = tweet
tweet = os.path.splitext(tweet)[0]
first = tweet.rfind("_")
second = tweet[:first].rfind("_")
return Tweet(
raw, tweet[:second], int(tweet[second + 1 : first]), int(tweet[first + 1 :])
)
new = sys.argv[1]
old = sys.argv[2]
print(f"comparing the new folder {new} with {old}")
old_tweets = defaultdict(list)
for t in os.listdir(old):
t = extract_info(t)
old_tweets[t.id].append(t)
for t in old_tweets.values():
t.sort(key=lambda i: i.part)
new_tweets = defaultdict(list)
for t in os.listdir(new):
t = extract_info(t)
new_tweets[t.id].append(t)
for t in new_tweets.values():
t.sort(key=lambda i: i.part)
for id_, t in old_tweets.items():
author = {i.author for i in t}
if len(author) > 1:
print(f"tweet {id_} has multiple authors: {author}, maybe check that out?")
author = next(iter(author))
if id_ not in new_tweets:
print(f"tweet {id_} by {author} isn't there anymore")
continue
new_t = new_tweets[id_]
if len(new_t) != len(t):
print(f"wtf? {id_}")
assert len(new_t) == len(t)
new_author = {i.author for i in new_t}
# freshly downloaded tweets shouldn't be problematic
assert len(new_author) == 1
new_author = next(iter(new_author))
if new_author != author:
print(f"{author} seems to now be {new_author}")
for o, n in zip(t, new_t):
old_path = os.path.join(old, o.raw)
new_path = os.path.join(new, n.raw)
if not filecmp.cmp(old_path, new_path, shallow=False):
print(
f"tweet {id_} by {author} seems to have changed- maybe the api is acting up"
)
import json
import os
import re
import shutil
import sys
from datetime import datetime, timedelta
from email import utils
import requests
def extract_id(tweet: str) -> int:
tweet = os.path.splitext(tweet)[0]
first = tweet.rfind("_")
second = tweet[:first].rfind("_")
return int(tweet[second + 1 : first])
def load_twt_obj(file: str) -> list:
raw = open(file, encoding="utf8").read()
return json.loads(raw[raw.find("=") + 1 :])
tweets = load_twt_obj("data/tweets.js") + load_twt_obj("data/deleted-tweets.js")
del_dir = "data/deleted_tweets_media"
gen_dir = "data/tweets_media"
for fn in os.listdir(del_dir):
shutil.copy(os.path.join(del_dir, fn), gen_dir)
have_alr = set()
if len(sys.argv) > 1:
for name in os.listdir(sys.argv[1]):
have_alr.add(extract_id(name))
# after getting the actual images this isn't needed but just in case
all_raw_media = os.listdir(gen_dir)
all_media = {}
for i in all_raw_media:
post_id = i[: i.find("-")]
img_id = i[i.find("-") + 1 : i.rfind(".")]
_, ext = os.path.splitext(i)
if post_id not in all_media:
all_media[post_id] = {}
all_media[post_id][img_id] = ext
# sort them from oldest to newest
tweets.sort(key=lambda t: utils.parsedate_to_datetime(t["tweet"]["created_at"]))
handle_fmt = re.compile(r"RT @([^:]*):")
img_id_fmt = re.compile(r"http://pbs\.twimg\.com/media/([^\.*]*)\.")
os.makedirs("good_media", exist_ok=True)
all_paths = []
print(f"alright, a total of {len(tweets)} tweets to go through. let's go!")
for v, t in enumerate(tweets):
if (v + 1) % 100 == 0:
print(f"at tweet #{v + 1}")
t = t["tweet"]
match = handle_fmt.match(t["full_text"])
if match is None:
continue
handle = match.group(1)
og_id = t["id"]
if "media" not in t["entities"]:
continue
media = t["extended_entities"]["media"]
src_id = [m["source_status_id"] for m in media]
assert len(set(src_id)) == 1 # just a sanity check
src_id = int(src_id[0])
if src_id in have_alr:
continue
curr_paths = []
for img_at, m in enumerate(media):
img_id = img_id_fmt.match(m["media_url"])
# sometimes you have things like ext_tw_video_thumb or tweet_video_thumb
if img_id is None:
continue
img_id = img_id.group(1)
if img_id not in all_media.get(og_id, []):
continue
ext = all_media[og_id][img_id]
stupid_path = os.path.join(gen_dir, f"{og_id}-{img_id}{ext}")
sigma_path = f"good_media/{handle}_{src_id}_{img_at}{ext}"
dl_url = f"http://pbs.twimg.com/media/{img_id}{ext}:orig"
img_data = requests.get(dl_url).content
with open(sigma_path, "wb") as written:
written.write(img_data)
curr_paths.append(sigma_path)
# shutil.copy(stupid_path, sigma_path)
all_paths.extend(reversed(curr_paths))
now = datetime.now()
epoch = datetime(1970, 1, 1)
for v, p in enumerate(reversed(all_paths)):
delta = (now - timedelta(seconds=2 * v) - epoch).total_seconds()
os.utime(p, times=(delta, delta))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment