Created
January 3, 2018 03:31
-
-
Save rachmadaniHaryono/a6a29cd6d79c92c6ea0617e3104e3704 to your computer and use it in GitHub Desktop.
python version for https://askubuntu.com/questions/991447/how-do-i-create-a-cli-web-spider-that-uses-keywords-and-filters-content
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from urllib.parse import urljoin | |
import json | |
import bs4 | |
import click | |
import aiohttp | |
import asyncio | |
import async_timeout | |
BASE_URL = 'http://e-bane.net' | |
async def fetch(session, url): | |
while(1): | |
try: | |
with async_timeout.timeout(20): | |
async with session.get(url) as response: | |
return await response.text() | |
except asyncio.TimeoutError as e: | |
print('[{}]{}'.format(e, url)) | |
async def get_result(user): | |
target_url = 'http://e-bane.net/modules.php?name=Stories_Archive' | |
res = [] | |
async with aiohttp.ClientSession() as session: | |
html = await fetch(session, target_url) | |
html_soup = bs4.BeautifulSoup(html, 'html.parser') | |
date_module_links = parse_date_module_links(html_soup) | |
for dm_link in date_module_links: | |
html = await fetch(session, dm_link) | |
html_soup = bs4.BeautifulSoup(html, 'html.parser') | |
thread_links = parse_thread_links(html_soup) | |
print('[{}]{}'.format(len(thread_links), dm_link)) | |
for t_link in thread_links: | |
thread_html = await fetch(session, t_link) | |
t_html_soup = bs4.BeautifulSoup(thread_html, 'html.parser') | |
if is_article_match(t_html_soup, user): | |
print('[v]{}'.format(t_link)) | |
res.append(get_main_article(t_html_soup)) | |
else: | |
print('[x]{}'.format(t_link)) | |
return res | |
def parse_date_module_links(page): | |
a_tags = page.select('ul li a') | |
hrefs = a_tags = [x.get('href') for x in a_tags] | |
return [urljoin(BASE_URL, x) for x in hrefs] | |
def parse_thread_links(page): | |
a_tags = page.select('table table tr td > a') | |
hrefs = a_tags = [x.get('href') for x in a_tags] | |
# filter href with 'file=article' | |
valid_hrefs = [x for x in hrefs if 'file=article' in x] | |
return [urljoin(BASE_URL, x) for x in valid_hrefs] | |
def is_article_match(page, user): | |
main_article = get_main_article(page) | |
return main_article.text.startswith(user) | |
def get_main_article(page): | |
td_tags = page.select('table table td.row1') | |
td_tag = td_tags[4] | |
return td_tag | |
@click.command() | |
@click.argument('user') | |
@click.option('--output-filename', default='out.json', help='Output filename.') | |
def main(user, output_filename): | |
loop = asyncio.get_event_loop() | |
res = loop.run_until_complete(get_result(user)) | |
# convert html soup into text | |
text_res = [x.text for x in res] | |
with open(output_filename, 'w') as f: | |
json.dump(text_res, f) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment