Created
January 8, 2019 09:06
Revisions
-
αlpha 0x00 created this gist
Jan 8, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,137 @@ #!/usr/bin/env python import re import os.path as path import sqlite3 from operator import add from functools import reduce, wraps from urllib.parse import urlparse, urljoin from time import sleep import requests from bs4 import BeautifulSoup DEBUG = True KINDNESS = False sleep_times = 0 class PageFetcher: """auto cache pages""" __site = "" __db = "pagefecher.db3" __table = "pagefecher" def __init__(self, site): self.__site = site self.__db = site+'.db3' self.__table = site.reaplace('.', '_') #db = sqlite3.connect(self.__db) #cur = db.cursor() #cur.execute("""create table if not exsits `{self.__table}`( # `url` #""") def get(self, url): # TODO auto cache file return requests.get(url).content def _D(func, *args, **kwargs): if not DEBUG: return if callable(func): return func(*args, **kwargs) else: return print(func, *args, **kwargs) def timefunc(func): import time if not DEBUG: return func @wraps(func) def wrap(*args, **kwargs): s = time.time() func(*args, **kwargs) e = time.time() print(f"the total uses time {e-s:0.2f}s " "and the execute time is {e-s-sleep_times:0.2f}s") return wrap DICT = re.compile(r"\.(tar|dat)\.(bz2|gz|xz)$", re.IGNORECASE) def is_dictionary(url): return bool(re.search(DICT, urlparse(url).path)) def be_kind(): if not KINDNESS: return global sleep_times sleep_times += 1 sleep(1) def walk(index): """walk all links in this website from @index """ unique_urls = set() urlobj = urlparse(index) site = urlobj.hostname fetcher = PageFetcher(site) def is_in_this_site(url): obj = urlparse(url) return obj.hostname in (None, site) def _walk(index): # had visited if index in unique_urls: _D(f"link({index}) has visited.") return [] unique_urls.add(index) if is_dictionary(index): print(f"find a dictionary link({index})") return [index, ] def build_url(href): return urljoin(index, href) page = fetcher.get(index) be_kind() soup = BeautifulSoup(page, 'html.parser') links = [build_url(a['href']) for a in soup.find_all('a', href=True) \ if is_in_this_site(a['href'])] # if empty return [] else execute the latter sentance return links if not links else reduce(add, [_walk(link) for link in links]) return _walk(index) def download(url, save_to): # TODO check failure r = requests.get(url, stream=True) with open(save_to, "wb") as f: for chunk in r.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() return True def download_dicts(dicts): # TODO finish download_dicts pass @timefunc def main(): HOME = "http://download.huzheng.org/" dicts = walk(HOME) download_dicts(dicts) if __name__ == '__main__': main()