Created
December 14, 2017 13:41
-
-
Save nonZero/63e8d8b39605edf0e93afdb95808d49a to your computer and use it in GitHub Desktop.
Requirements: python 3.6, `tqdm` and `pillow. To install: pip install tqdm pillow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import itertools | |
import logging | |
from collections import namedtuple | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from pathlib import Path | |
from urllib.request import urlretrieve, urlopen | |
import requests | |
import tqdm | |
from PIL import Image | |
TIMEOUT = 20 | |
def patch_dns(): | |
import socket | |
prv_getaddrinfo = socket.getaddrinfo | |
dns_cache = {} | |
def new_getaddrinfo(*args): | |
try: | |
return dns_cache[args] | |
except KeyError: | |
res = prv_getaddrinfo(*args) | |
dns_cache[args] = res | |
return res | |
socket.getaddrinfo = new_getaddrinfo | |
logger = logging.getLogger(__name__) | |
# Docs: http://iiif.nli.org.il/imageapi.html | |
API_ENDPOINT = "http://iiif.nli.org.il/IIIFv21/" | |
MANIFEST_URL = API_ENDPOINT + "DOCID/{}/manifest" | |
IMG_URL = API_ENDPOINT + "{id}/{region}/{size}/{rotation}/default.jpg" | |
MAX_TILE_SIZE = 576 | |
ImageData = namedtuple("ImageData", "id,width,height") | |
def get_img_info(doc_id): | |
url = MANIFEST_URL.format(doc_id) | |
logger.debug(f"Getting {url}") | |
r = requests.get(url) | |
r.raise_for_status() | |
doc = r.json() | |
seqs = doc['sequences'] | |
assert len(seqs) == 1, len(seqs) | |
canvases = seqs[0]['canvases'] | |
assert len(canvases) == 1, len(canvases) | |
c = canvases[0] | |
assert c['@id'].startswith(API_ENDPOINT) | |
imgs = c['images'] | |
assert (len(imgs)) == 1 | |
return ImageData(imgs[0]['@id'], c['width'], c['height']) | |
def get_img_url(img_id, region='full', size='max', rotation='0'): | |
return IMG_URL.format(id=img_id, region=region, size=size, | |
rotation=rotation) | |
def download_preview(img_id, filename=None): | |
if filename is None: | |
filename = f"{img_id}.preview.jpeg" | |
url = get_img_url(img_id) | |
result = urlretrieve(url, filename) | |
return result[0] | |
def download(url, retries=10): | |
for i in range(retries): | |
try: | |
f = urlopen(url, timeout=TIMEOUT) | |
assert f.getcode() == 200, (url, f.getcode()) | |
return io.BytesIO(f.read()) | |
except OSError as e: | |
tqdm.tqdm.write(f"error retry {i + 1} {url} {e}") | |
except AssertionError as e: | |
tqdm.tqdm.write(f"error #{f.getcode()}; retry {i + 1} {url} {e}") | |
raise Exception(f"Falied after {retries} retries") | |
def get_tile(img_id, x, y, size=MAX_TILE_SIZE): | |
region = f"{x},{y},{size},{size}" | |
url = get_img_url(img_id, region, f"{size},") | |
return download(url) | |
def download_tiles(img_data: ImageData, filenames=None, | |
tile_size=MAX_TILE_SIZE): | |
xx = range(0, img_data.width, tile_size) | |
yy = range(0, img_data.height, tile_size) | |
total = len(xx) * len(yy) | |
print("Tiles:", total) | |
xy = itertools.product(xx, yy) | |
big = Image.new("RGB", img_data[1:]) | |
def f(n, x, y): | |
img = Image.open(get_tile(img_data.id, x, y, tile_size)) | |
# print(n, x, y, img.size) | |
assert img.size[0] > 0, img.size | |
big.paste(img, (x, y)) | |
return n | |
with ThreadPoolExecutor(max_workers=100) as ex: | |
futs = {ex.submit(f, i, x, y): (i, x, y) for i, (x, y) in | |
enumerate(xy)} | |
t = tqdm.tqdm(as_completed(futs), total=total) | |
for fut in t: | |
d = futs[fut] | |
try: | |
# t.set_description(f"done #{fut.result()}") | |
t.set_description(f"done #{fut.result()}") | |
except: | |
t.close() | |
print("error", d) | |
raise | |
if filenames is None: | |
filenames = [img_data.id + ".jpeg"] | |
for filename in filenames: | |
print("Saving to", filename) | |
big.save(filename) | |
return filenames | |
def do_download(doc_id): | |
patch_dns() | |
d = get_img_info(doc_id) | |
print("{}: {}x{}".format(*d)) | |
print("Preview:", download_preview(d.id, f"{doc_id}.preview.jpeg")) | |
for fn in download_tiles(d, [f"{doc_id}.jpeg"]): | |
print(fn, Path(fn).stat().st_size) | |
print("Done.") | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description='Download NLI Images from tiles') | |
parser.add_argument('doc_id', nargs='+', | |
help='document IDs (for example: NNL_MAPS_JER002367356)') | |
args = parser.parse_args() | |
for doc_id in args.doc_id: | |
do_download(doc_id) | |
# do_download("NNL_MAPS_JER002367356") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment