Created
February 23, 2019 12:52
-
-
Save cuihaoleo/ccb5c61cb1267257a5e3e3732503b2f3 to your computer and use it in GitHub Desktop.
Export is-programmer.com data to JSON
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import datetime | |
import logging | |
import psycopg2 | |
from requestsutils import RequestsBase | |
from htmlutils import parse_document_from_requests | |
from pgutils import savepoint | |
from getpass import getpass | |
logger = logging.getLogger(__name__) | |
class Chito(RequestsBase): | |
# TODO: change this | |
baseurl = 'http://cuihao.is-programmer.com/' | |
auto_referer = True | |
def may_login(self): | |
r = self.request('/admin', allow_redirects = False) | |
if r.status_code == 302: | |
self.login() | |
def login(self): | |
p = getpass('登录密码: ') | |
r = self.request('/login') | |
doc = parse_document_from_requests(r) | |
token = doc.xpath('//input[@name="authenticity_token"]')[0].get('value') | |
form = { | |
'authenticity_token': token, | |
'commit': '登录', | |
'name': os.environ['USER'], | |
'password': p, | |
'persist': '1', | |
'utf8': '✓', | |
} | |
r = self.request('/login', method='POST', | |
data = form, | |
allow_redirects = False) | |
if not r.headers['Location'].endswith('/admin'): | |
raise Exception('failed to login: %r', r) | |
def get_comments(self, page=1): | |
r = self.request('/admin/comments', params = {'page': str(page)}) | |
doc = parse_document_from_requests(r) | |
rows = doc.xpath('//table[@id="comments_table"]//tr')[1:] | |
for tr in rows: | |
d = {} | |
d['id'] = int(tr[0][0].get('value')) | |
d['name'] = tr[1].text | |
# without timezone; PostgreSQL will give it a default | |
d['time'] = datetime.datetime.strptime(tr[2].text, '%Y-%m-%d %H:%M') | |
try: | |
d['content'] = tr[3].text.replace('\r', '') | |
except AttributeError: | |
d['content'] = '' | |
d['ip'] = tr[4].text | |
if len(tr[5]) == 0: | |
d['email'] = None | |
else: | |
d['email'] = tr[5][0].get('href').split(':', 1)[-1] | |
d['post_id'] = int(tr[6][0].get('href').split('/')[-1].split('.')[0]) | |
yield d | |
def get_messages(self, page=1): | |
r = self.request('/admin/messages', params = {'page': str(page)}) | |
doc = parse_document_from_requests(r) | |
rows = doc.xpath('//table[@id="messages_table"]//tr')[1:] | |
for tr in rows: | |
d = {} | |
d['id'] = int(tr[0][0].get('value')) | |
d['name'] = tr[1].text | |
# without timezone; PostgreSQL will give it a default | |
d['time'] = datetime.datetime.strptime(tr[2].text, '%Y-%m-%d %H:%M') | |
try: | |
d['content'] = tr[3].text.replace('\r', '') | |
except AttributeError: | |
# empty messages | |
d['content'] = '' | |
d['ip'] = tr[4].text | |
if len(tr[5]) == 0: | |
d['email'] = None | |
else: | |
d['email'] = tr[5][0].get('href').split(':', 1)[-1] | |
d['post_id'] = None | |
yield d | |
def get_post(self, pid): | |
d = {'id': pid} | |
r = self.request('/admin/posts/%d/edit' % d['id']) | |
article = parse_document_from_requests(r) | |
main = article.xpath('//div[@id="form_main"]')[0] | |
d['title'] = main.xpath('//input[@id="article_title"]')[0].get('value') | |
d['category'] = main.xpath( | |
'//div[@id="category_list_remote"]//input[@checked]')[0].tail.strip() | |
d['content'] = main.xpath('//textarea')[0] \ | |
.text_content().replace('\r', '') | |
d['tags'] = [x.strip() for x in | |
main.xpath('//input[@id="article_tag_list"]')[0] | |
.get('value').split(',') if x.strip()] | |
d['linktext'] = main.xpath('//input[@id="article_permalink"]')[0] \ | |
.get('value') | |
d['state'] = 'post' | |
return d | |
def get_posts(self, page=1): | |
r = self.request('/admin/posts', params = {'page': str(page)}) | |
doc = parse_document_from_requests(r) | |
rows = doc.xpath('//table[@id="article_table"]//tr')[1:] | |
for tr in rows: | |
d = {} | |
d['id'] = int(tr[0][0].get('value')) | |
d['title'] = tr[1][0].text | |
# without timezone; PostgreSQL will give it a default | |
d['time'] = datetime.datetime.strptime(tr[2].text, '%Y-%m-%d %H:%M') | |
d['category'] = tr[4].text | |
r = self.request('/admin/posts/%d/edit' % d['id']) | |
article = parse_document_from_requests(r) | |
main = article.xpath('//div[@id="form_main"]')[0] | |
d['content'] = main.xpath('//textarea')[0] \ | |
.text_content().replace('\r', '') | |
d['tags'] = [x.strip() for x in | |
main.xpath('//input[@id="article_tag_list"]')[0] | |
.get('value').split(',') if x.strip()] | |
d['linktext'] = main.xpath('//input[@id="article_permalink"]')[0] \ | |
.get('value') | |
d['state'] = 'post' | |
yield d | |
def get_files(self, type): | |
raise NotImplementedError | |
def do_one_type(conn, method, sql): | |
with conn: | |
cursor = conn.cursor() | |
page = 1 | |
done = False | |
while not done: | |
data = method(page=page) | |
has_data = False | |
for entry in data: | |
has_data = True | |
try: | |
with savepoint(cursor, 'inserting'): | |
cursor.execute(sql, entry) | |
except psycopg2.IntegrityError as e: | |
if e.pgcode == '23505': # unique_violation | |
done = True | |
break | |
elif e.pgcode == '23503': # foreign_key_violation | |
# skip stale comments | |
pass | |
if not has_data: | |
done = True | |
page += 1 | |
def do_work(conn, chito): | |
for state in ['posts']: | |
do_one_type(conn, chito.get_posts, ''' | |
insert into posts | |
(id, title, time, category, content, tags, linktext, state) | |
values | |
(%(id)s, %(title)s, %(time)s, %(category)s, | |
%(content)s, %(tags)s, %(linktext)s, %(state)s) | |
''') | |
do_one_type(conn, chito.get_messages, ''' | |
insert into comments | |
(id, name, time, content, ip, email, post_id) | |
values | |
(%(id)s, %(name)s, %(time)s, %(content)s, | |
%(ip)s, %(email)s, %(post_id)s) | |
''') | |
do_one_type(conn, chito.get_comments, ''' | |
insert into comments | |
(id, name, time, content, ip, email, post_id) | |
values | |
(%(id)s, %(name)s, %(time)s, %(content)s, | |
%(ip)s, %(email)s, %(post_id)s) | |
''') | |
def update_posts(conn, chito, ids): | |
posts = [chito.get_post(x) for x in ids] | |
if not posts: | |
return | |
with conn: | |
cursor = conn.cursor() | |
for p in posts: | |
cursor.execute('''\ | |
update posts | |
set title = %(title)s, | |
category = %(category)s, | |
content = %(content)s, | |
tags = %(tags)s, | |
linktext = %(linktext)s | |
where id = %(id)s | |
''', p) | |
if __name__ == '__main__': | |
from nicelogger import enable_pretty_logging | |
enable_pretty_logging('DEBUG') | |
conn = psycopg2.connect('') | |
chito = Chito(cookiefile='.cookie') | |
chito.may_login() | |
do_work(conn, chito) | |
update_posts(conn, chito, []) | |
del chito |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from chito import Chito | |
import json | |
from bson import json_util | |
import itertools | |
def main(): | |
blog = Chito(cookiefile=".cookie") | |
blog.may_login() | |
dump = { | |
"posts": [], | |
"comments": [], | |
} | |
for page in itertools.count(1): | |
has_data = False | |
for post in blog.get_posts(page): | |
has_data = True | |
detail = blog.get_post(post["id"]) | |
post.update(detail) | |
dump["posts"].append(post) | |
if not has_data: | |
break | |
for method in (blog.get_messages, blog.get_comments): | |
for page in itertools.count(1): | |
has_data = False | |
for msg in method(page): | |
has_data = True | |
dump["comments"].append(msg) | |
if not has_data: | |
break | |
with open("output.json", "w") as fout: | |
json.dump(dump, fout, default=str) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from xml.sax.saxutils import escape | |
import json | |
from html.parser import HTMLParser | |
import base64 | |
from bs4 import BeautifulSoup | |
import urllib.request | |
import os | |
import datetime | |
import pytz | |
import sys | |
import hashlib | |
VIMWIKI_TEMPLATE="blog-isp" | |
HTML_IMG_BASE="/img/isp/" | |
BLOG_URL = "http://cuihao.is-programmer.com" | |
def main(): | |
with open("output.json") as fin: | |
dump = json.load(fin) | |
print("""<?xml version="1.0" encoding="UTF-8"?> | |
<rss version="2.0" | |
xmlns:content="http://purl.org/rss/1.0/modules/content/" | |
xmlns:dsq="http://www.disqus.com/" | |
xmlns:dc="http://purl.org/dc/elements/1.1/" | |
xmlns:wp="http://wordpress.org/export/1.0/"> | |
<channel>""") | |
posts = {} | |
for post in dump["posts"]: | |
posts[post["id"]] = post | |
for comm in dump["comments"]: | |
if comm["post_id"] is None: | |
comm["post_id"] = 214156 | |
if comm["email"] is not None: | |
avatar="https://www.gravatar.com/avatar/" + hashlib.md5(comm["email"].lower().encode()).hexdigest() | |
else: | |
avatar="" | |
post = posts[comm["post_id"]] | |
mt = datetime.datetime.strptime(comm["time"], "%Y-%m-%d %H:%M:%S") | |
utctime = mt.astimezone(pytz.utc).strftime("%Y-%m-%d %H:%M:%S") | |
print("""<item> | |
<title>{post_title}</title> | |
<link>{url}</link> | |
<dsq:thread_identifier>{page_id}</dsq:thread_identifier> | |
<wp:comment_status>open</wp:comment_status> | |
<wp:comment> | |
<dsq:remote> | |
<dsq:avatar>{avatar}</dsq:avatar> | |
</dsq:remote> | |
<wp:comment_id>{cid}</wp:comment_id> | |
<wp:comment_author>{username}</wp:comment_author> | |
<wp:comment_author_email>{email}</wp:comment_author_email> | |
<wp:comment_content><![CDATA[{content}]]></wp:comment_content> | |
<wp:comment_author_IP>{ip}</wp:comment_author_IP> | |
<wp:comment_date_gmt>{time}</wp:comment_date_gmt> | |
<wp:comment_approved>1</wp:comment_approved> | |
</wp:comment> | |
</item>""".format( | |
cid=comm["id"], | |
post_title=escape(post["title"]), | |
url="https://blog.i-yu.me/html/zh/isp/isp_%d.html" % post["id"], | |
page_id="zh/isp/isp_%d.html" % post["id"], | |
time=utctime, | |
avatar=avatar, | |
ip=comm["ip"], | |
email=comm["email"], | |
username=escape(comm["name"]), | |
content=comm["content"])) | |
print(""" | |
</channel> | |
</rss>""") | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
from html.parser import HTMLParser | |
import base64 | |
from bs4 import BeautifulSoup | |
import urllib.request | |
import urllib.parse | |
import os | |
VIMWIKI_TEMPLATE="blog-isp" | |
HTML_IMG_BASE="/img/isp/" | |
BLOG_URL = "http://cuihao.is-programmer.com" | |
def main(): | |
with open("output.json") as fin: | |
dump = json.load(fin) | |
os.makedirs("dump/wiki", exist_ok=True) | |
os.makedirs("dump/img", exist_ok=True) | |
for post in dump["posts"]: | |
fname = "isp_%d" % post["id"] | |
with open(os.path.join("dump/wiki", fname + ".wiki"), "w") as fout: | |
buf = list(post["tags"]) | |
buf.insert(0, post["category"]) | |
print(":" + ":".join(buf) + ":", file=fout) | |
print("%date", post["time"].split()[0], file=fout) | |
print("%title", post["title"], file=fout) | |
# print("%template", VIMWIKI_TEMPLATE, file=fout) | |
soup = BeautifulSoup(post["content"], features="html.parser") | |
for img in soup.findAll('img'): | |
if img['src'].startswith("/"): | |
bname = img["src"].split("/")[-1] | |
url = BLOG_URL + urllib.parse.quote(img["src"]) | |
#urllib.request.urlretrieve(url, "dump/img/" + bname) | |
img['src'] = os.path.join(HTML_IMG_BASE, bname) | |
print("{{{", file=fout) | |
print(soup.get_text(), file=fout) | |
print("}}}", file=fout) | |
print('[[local:%s.html|__RAW_HTML__]]' % fname, file=fout) | |
with open(os.path.join("dump/wiki", fname + ".html"), "w") as fout: | |
fout.write(str(soup)) | |
print("- [[%s|%s]]" % (fname, post["title"])) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment