Created
March 11, 2022 21:15
-
-
Save Segerberg/9c098b199bfc612441155022d66a16bb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from warcio.capture_http import capture_http | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urlparse | |
import os | |
import sqlite3 | |
import datetime | |
import sys | |
result_page_params = { | |
'S21CNR': '20', | |
'Z21ID': '', | |
'C21COM': 'T', | |
'S21FMT': 'fullwebr', | |
'T21CNR': 100, | |
'T21PRF': 'T=', | |
'CODE': 'dic_web.mnu' | |
} | |
class Dedup: | |
def __init__(self): | |
self.file = os.path.join("dedup.db") | |
def start(self): | |
conn = sqlite3.connect(self.file) | |
conn.execute( | |
"create table if not exists dedup (" | |
" key varchar(300) primary key);" | |
) | |
conn.commit() | |
conn.close() | |
def save(self, key): | |
conn = sqlite3.connect(self.file) | |
conn.execute( | |
"insert or replace into dedup (key) values (?)", (key,) | |
) | |
conn.commit() | |
conn.close() | |
def lookup(self, key): | |
result = False | |
conn = sqlite3.connect(self.file) | |
cursor = conn.execute("select key from dedup where key = ?", (key,)) | |
result_tuple = cursor.fetchone() | |
conn.close() | |
if result_tuple: | |
result = True | |
return result | |
def get_page_requisits(elements, key): | |
with capture_http(warc_filename): | |
for item in elements: | |
if dedup.lookup(item[key]): | |
print("DUP") | |
continue | |
dedup.save(item[key]) | |
try: | |
r = requests.get(f"{parsed_url.scheme}://{parsed_url.hostname}{item[key]}") | |
except: | |
continue | |
def get_records(url): | |
with capture_http(warc_filename): | |
records_page = requests.get(f"{parsed_url.scheme}://{parsed_url.hostname}{url}") | |
record_soup = BeautifulSoup(records_page.content, "html.parser") | |
try: | |
content = record_soup.find('td', {"class":"main_content"}) | |
links = content.find_all('a', href=True) | |
for link in links: | |
if dedup.lookup(link['href']): | |
continue | |
dedup.save(link['href']) | |
resource = requests.get(f"{parsed_url.scheme}://{parsed_url.hostname}{link['href'].replace('&S21CNR=20', '&S21CNR=2000')}") | |
print(link['href']) | |
except AttributeError: | |
pass | |
def get_next_dict_page(url, params, next=None, parent=None): | |
with capture_http(warc_filename): | |
try: | |
dictionary_page = requests.get(url,params=params) | |
print(dictionary_page.url) | |
dictionary_soup = BeautifulSoup(dictionary_page.content, "html.parser") | |
title_links = dictionary_soup.find_all('a', href=True) # Find all title links | |
imgs = dictionary_soup.find_all('img',src=True) | |
get_page_requisits(imgs, 'src') | |
for title_link in title_links: | |
if "S21STR" in title_link['href']: # Filter out links | |
next = title_link.text | |
get_records(title_link['href']) | |
print(next) | |
if next and parent != next: | |
params['T21TRM'] = next | |
get_next_dict_page(url, params, parent=next) | |
return next | |
except requests.exceptions.ConnectionError: | |
print("CONNECTION ERROR") | |
pass | |
def main(url): | |
with capture_http(warc_filename): | |
r = requests.get(url) | |
soup = BeautifulSoup(r.content, "html.parser") | |
script_srcs = soup.find_all('script', src=True) | |
get_page_requisits(script_srcs,'src') | |
stylesheets = soup.find_all('link', href=True) | |
get_page_requisits(stylesheets, 'href') | |
imgs = soup.find_all('img', src=True) | |
get_page_requisits(imgs, 'src') | |
# Find all links to DBS | |
dbs = soup.find_all('a', href=True) | |
for db in dbs: | |
# Filter out all non DB links | |
if not "C21COM=T" in db['href'] and 'I21DBN' in db['href'] and 'C21COM=S' not in db['href'] and 'javascript' not in db['href']: | |
# Extract DB name and update params | |
org_param_list = db['href'].split("&")[1:] | |
temp_params = {} | |
params = {} | |
for value in org_param_list: | |
value = (value.split("=")) | |
temp_params[value[0]] = value[1] | |
if "_EX" in temp_params['I21DBN']: | |
params['I21DBN'] = f"{temp_params['I21DBN']}" | |
else: | |
params['I21DBN'] = f"{temp_params['I21DBN']}_EX" | |
params['P21DBN'] = temp_params['P21DBN'] | |
params.update(result_page_params) | |
cgi_bin = (db['href'].split("&")[:1][0].split('?')[0]) # creates cgi part of url | |
frames = ["T=", "G=", "K=", "A="] | |
for f in frames: | |
params['T21PRF'] = f | |
page = get_next_dict_page(f"{parsed_url.scheme}://{parsed_url.hostname}{cgi_bin}", params=params) | |
if __name__ == '__main__': | |
url = sys.argv[1] | |
parsed_url = urlparse(url) | |
dedup = Dedup() | |
dedup.start() | |
warc_filename = f"{parsed_url.hostname}_{datetime.datetime.now().strftime('%Y%m%d-%H_%M_%S')}.warc.gz" | |
main(url) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment