Created
November 30, 2020 15:04
-
-
Save J3ronimo/b50fd03a7db3fe568011245769924290 to your computer and use it in GitHub Desktop.
hornoxe picdump crawler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import argparse | |
import requests | |
import threading | |
import queue | |
class ImgDownloader(threading.Thread): | |
def __init__(self, folder, img_url_fmt, start, interval, print_queue): | |
super().__init__() | |
self.folder = folder | |
self.img_url_fmt = img_url_fmt | |
self.img_start = start | |
self.img_interval = interval | |
self.print_queue = print_queue | |
self.error = False | |
def run(self): | |
img_id = self.img_start | |
while True: | |
try: | |
self.download_img(img_id) | |
except StopIteration: | |
break | |
except Exception: | |
self.error = True | |
break | |
img_id += self.img_interval | |
def download_img(self, img_index): | |
url = self.img_url_fmt.format(img_index) | |
filename = os.path.join(self.folder, "{}.jpg".format(img_index)) | |
res = requests.get(url) | |
if res.status_code == 404: | |
raise StopIteration # reqular end of dump | |
elif not res.status_code == 200: | |
self.print_queue.put("Error: {}".format(res.content)) | |
raise Exception | |
else: | |
self.print_queue.put("Downloaded: {}".format(url)) | |
with open(filename, "wb") as f: | |
f.write(res.content) | |
class HxCrawler: | |
def __init__(self, num_threads): | |
self.num_threads = num_threads | |
self.img_url_fmt = None | |
self.folder = None | |
def crawl(self, folder, img_url_fmt): | |
self.folder = folder | |
self.img_url_fmt = img_url_fmt | |
os.makedirs(self.folder, exist_ok=True) | |
# create threads and print queue | |
print_queue = queue.Queue() | |
self.threads = [ImgDownloader(self.folder, self.img_url_fmt, thread+1, self.num_threads, print_queue) | |
for thread in range(self.num_threads)] | |
t0 = time.time() | |
for thread in self.threads: | |
thread.start() | |
# wait until all threads done, printing downloaded urls in the meantime | |
while any(thread.is_alive() for thread in self.threads): | |
try: | |
msg = print_queue.get_nowait() | |
print(msg) | |
except queue.Empty: | |
time.sleep(0.01) | |
if any(thread.error for thread in self.threads): | |
raise RuntimeError("Stopping after error in thread.") | |
elif len(os.listdir(folder)) < 5: | |
raise FileNotFoundError("Folder is empty. Something went wrong.") | |
t1 = time.time() | |
print("Done after {:.3f} seconds.".format(t1-t0)) | |
def crawl_picdump(self, index, folder=None): | |
folder = folder or os.path.join(__file__, "../data") | |
folder = os.path.abspath(os.path.join(folder, "picdump/{}".format(index))) | |
if index == 521: | |
print("Dump not supported: {}".format(index)) | |
raise NotImplementedError # random image names / not continuous numbers | |
elif index == 679: | |
img_url_fmt = "pidcump{0:02d}/picdump{0:02d}_{{:03d}}.jpg".format(index) # typo: pidcump | |
elif index == 672: | |
img_url_fmt = "picdump672/picdump671_{{:03d}}.jpg".format(index) # wrong number | |
elif index in [487, 490]: | |
img_url_fmt = "picdump{0:02d}/picudmp{0:02d}_{{:02d}}.jpg".format(index) # typo: picudmp | |
elif index == 392: | |
img_url_fmt = "picdump{0:02d}/picdump{0:02d}-{{:03d}}.jpg".format(index) # "-" instead of "_" | |
elif index == 336: | |
img_url_fmt = "picdump336/picdump335_{{:02d}}.jpg".format(index) # wrong number | |
elif index == 237: | |
img_url_fmt = "picdump{0:02d}/hornoxe.com_pcidump{0:02d}_{{:03d}}.jpg".format(index) # pcidump | |
elif index == 27: | |
img_url_fmt = "picdump{0:02d}/hornoxe.com_picdump{1:02d}_{{:03d}}.jpg".format(index, index-1) | |
elif index < 100: | |
# addtional "hornoxe.com" prefix in filename, 2 digits | |
img_url_fmt = "picdump{0:02d}/hornoxe.com_picdump{0:02d}_{{:02d}}.jpg".format(index) | |
elif index <= 326: | |
# addtional "hornoxe.com" prefix in filename | |
img_url_fmt = "picdump{0:02d}/hornoxe.com_picdump{0:02d}_{{:03d}}.jpg".format(index) | |
else: | |
img_url_fmt = "picdump{0:02d}/picdump{0:02d}_{{:03d}}.jpg".format(index) | |
img_url_fmt = "https://www.hornoxe.com/wp-content/picdumps/" + img_url_fmt | |
print("Starting download of picdump #{}.".format(index)) | |
try: | |
self.crawl(folder, img_url_fmt) | |
except FileNotFoundError: | |
# retry with 2-digit img fmt. used whenever less than 100 pics are in the dump | |
print("No images. Trying different img url format...") | |
self.crawl(folder, img_url_fmt.replace(":03d", ":02d")) | |
def crawl_babes(self, index, folder=None): | |
folder = folder or os.path.join(__file__, "../data") | |
folder = os.path.abspath(os.path.join(folder, "babes/{}".format(index))) | |
red = (index != 10 and index % 10 == 0) or (index in [4, 11]) # red edition | |
if index in [187]: | |
print("Dump not supported: {}".format(index)) | |
raise NotImplementedError # random image names / not continuous numbers | |
if red: | |
img_url_fmt = "horni_babes{0:02d}_red_edition/horni_babes{0:02d}_red_{{:02d}}.jpg".format(index) | |
else: | |
if index == 1: | |
img_url_fmt = "horni_babes1/horni_babes1_{{:03d}}.jpg".format(index) | |
elif index == 8: | |
img_url_fmt = "horni_babes08/horni_babes07_{{:02d}}.jpg".format(index) # lol | |
else: | |
img_url_fmt = "horni_babes{0:02d}/horni_babes{0:02d}_{{:02d}}.jpg".format(index) | |
img_url_fmt = "https://www.hornoxe.com/wp-content/picdumps/" + img_url_fmt | |
print("Starting download of babes #{}.".format(index)) | |
self.crawl(folder, img_url_fmt) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("target", choices=["picdump", "babes"]) | |
parser.add_argument("index", type=int) | |
parser.add_argument("--folder", default=None, help="Download target folder.") | |
parser.add_argument("--threads", type=int, default=20) | |
parser.add_argument("--all", "-a", action="store_true", | |
help="Iteratively download all dumps, going backwards from this one.") | |
args = parser.parse_args() | |
crawler = HxCrawler(num_threads=args.threads) | |
if args.all: | |
ids = range(args.index, 0, -1) | |
else: | |
ids = [args.index] | |
for index in ids: | |
try: | |
if args.target == "picdump": | |
crawler.crawl_picdump(index, args.folder) | |
elif args.target == "babes": | |
crawler.crawl_babes(index, args.folder) | |
except NotImplementedError: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment