-
-
Save kode54/034789459251e02698f3 to your computer and use it in GitHub Desktop.
FurAffinity -> Pushbullet Notifications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"username": "ninji-vahran", | |
"database": "notifier.db", | |
"cookies": { | |
"__cfduid": "REDACTED", | |
"a": "REDACTED", | |
"b": "REDACTED", | |
"folder": "inbox" | |
}, | |
"pushbullet_key": "REDACTED", | |
"headers": { | |
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36" | |
}, | |
"log_errors": true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import time | |
import random | |
import re | |
import sqlite3 | |
import json | |
import traceback | |
import sys | |
import calendar | |
import os | |
from bs4 import BeautifulSoup | |
# FurAffinity -> Pushbullet Notifications | |
# Script by Ninji Vahran | |
# https://twitter.com/_Ninji | |
# https://furaffinity.net/user/Ninji-Vahran | |
# Use at your own risk ;) | |
# Compatible with Python 2 or 3 | |
# Requires requests and BeautifulSoup4 | |
# Currently supported notification types: | |
# Watches, Journals, Notes, Shouts, Favourites, Comments | |
# To use, create a config.json file based on the template, and place it in the | |
# same directory as this script. | |
# Last updated: 2nd September 2015 | |
# Requires the 'beta' FA layout. | |
FA_BASE = 'https://furaffinity.net' | |
SUB_URL_REGEX = re.compile('^/view/') | |
USER_URL_REGEX = re.compile('^/user/') | |
JOURNAL_URL_REGEX = re.compile('^/journal/') | |
############################################################ | |
# Date Parsing | |
SHORT_MONTHS = ('Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec') | |
SHORT_REGEX = re.compile(r'^([a-z]{3}) (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I) | |
LONG_MONTHS = ('January','February','March','April','May','June','July','August','September','October','November','December') | |
LONG_REGEX = re.compile(r'^on ([a-z]+) (\d+)[a-z][a-z], (\d{4}) (\d\d):(\d\d) ([AP])M$', re.I) | |
def parse_date(text, regex, months): | |
'''Extract a date from a text timestamp''' | |
month, day, year, hour, minute, meridian = regex.match(text).groups() | |
year = int(year) | |
month = months.index(month) + 1 | |
day = int(day) | |
hour = int(hour) % 12 | |
if meridian == 'P': hour += 12 | |
minute = int(minute) | |
return calendar.timegm((year, month, day, hour, minute, 0)) | |
def parse_short_date(text): | |
'''Parse one of FurAffinity's date formats''' | |
return parse_date(text, SHORT_REGEX, SHORT_MONTHS) | |
def parse_long_date(text): | |
'''Parse FurAffinity's other date format''' | |
return parse_date(text, LONG_REGEX, LONG_MONTHS) | |
############################################################ | |
# Notification Utility Functions | |
def pushbullet(key, **data): | |
'''Send a notification to Pushbullet''' | |
if key == 'test': | |
print('[[ TEST NOTIFICATION: %r ]]' % data) | |
else: | |
requests.post( | |
'https://api.pushbullet.com/v2/pushes', | |
auth=(key, ''), | |
data=json.dumps(data), | |
headers={'content-type': 'application/json'} | |
) | |
############################################################ | |
# FurAffinity Page Scraping | |
def scrape_messages_html(html): | |
'''Extract all messages present in a FA /msg/others/ page. | |
Returns a dict containing a list of messages and a note count.''' | |
soup = BeautifulSoup(html) | |
msgs = [] | |
# fetch watches | |
watch_set = soup.find('fieldset', id='messages-watches') | |
if watch_set: | |
for li in watch_set.find_all('li', class_=None): | |
link = li.find('td', class_='avatar').find('a') | |
if link is None: # removed by the user, probably | |
continue | |
watch = dict( | |
type='watch', | |
name=li.find('div', class_='info').find('span').text, | |
url=link.attrs['href'], | |
timestamp=parse_short_date(li.find('span', class_='popup_date').attrs['title']), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(watch) | |
# fetch journals | |
journal_set = soup.find('fieldset', id='messages-journals') | |
if journal_set: | |
journal_url_re = re.compile('^/journal/') | |
user_url_re = re.compile('^/user/') | |
for li in journal_set.find_all('li', class_=None): | |
link = li.find('a', href=journal_url_re) | |
if link is None: | |
continue | |
journal = dict( | |
type='journal', | |
name=link.text, | |
url=link.attrs['href'], | |
author=li.find('a', href=user_url_re).text, | |
timestamp=parse_long_date(li.find('span', class_='popup_date').attrs['title']), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(journal) | |
# fetch shouts | |
shout_set = soup.find('fieldset', id='messages-shouts') | |
if shout_set: | |
for li in shout_set.find_all('li', class_=None): | |
# can shouts be 'removed'? I don't think so... | |
shout = dict( | |
type='shout', | |
author=li.find('a').text, | |
timestamp=parse_long_date(li.find('span', class_='popup_date').attrs['title']), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(shout) | |
# fetch favourites | |
fav_set = soup.find('fieldset', id='messages-favorites') | |
if fav_set: | |
for li in fav_set.find_all('li', class_=None): | |
link = li.find('a', href=SUB_URL_REGEX) | |
if link is None: # removed by the user, probably | |
continue | |
fav = dict( | |
type='fav', | |
sub_name=link.text, | |
sub_url=link.attrs['href'], | |
username=li.find('a', href=USER_URL_REGEX).text, | |
timestamp=parse_short_date(li.find('span', class_='popup_date').attrs['title']), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
msgs.append(fav) | |
# fetch journal and submission comments | |
cmt_sets = ( | |
('s_comment', 'messages-comments-submission', SUB_URL_REGEX), | |
('j_comment', 'messages-comments-journal', JOURNAL_URL_REGEX), | |
) | |
for msg_type, set_id, url_regex in cmt_sets: | |
cmt_set = soup.find('fieldset', id=set_id) | |
if cmt_set: | |
for li in cmt_set.find_all('li', class_=None): | |
link = li.find('a', href=url_regex) | |
if link is None: # removed by the user, probably | |
continue | |
user_link = li.find('a', href=user_url_re) | |
if user_link is None: | |
uname = 'No User??' | |
else: | |
uname = user_link.text | |
user_link.clear() | |
popup_date = li.find('span', class_='popup_date') | |
fuzzy_date = popup_date.text | |
popup_date.clear() | |
print(repr(link)) | |
cmt = dict( | |
type=msg_type, | |
name=uname, | |
description=li.text.strip(), | |
url=link.attrs['href'], | |
timestamp=parse_long_date(popup_date.attrs['title']), | |
eid=li.find('input', type='checkbox').attrs['value'], | |
) | |
print(repr(cmt)) | |
msgs.append(cmt) | |
result = {} | |
result['messages'] = msgs | |
# extract note count | |
result['note_count'] = 0 | |
notes = soup.find('ul', id='nav').find('a', title='Note Notifications', href='/msg/pms/') | |
if notes and notes.text: | |
result['note_count'] = int(notes.text.replace('N', '')) | |
return result | |
class Notifier(object): | |
def __init__(self, config): | |
self.username = config['username'] | |
self.pushbullet_key = config['pushbullet_key'] | |
self.request_params = dict(cookies=config['cookies'], headers=config['headers']) | |
self.log_errors = config.get('log_errors', False) | |
self.seen_cache = set() | |
self.db = sqlite3.connect(config['database']) | |
self.setup_db() | |
def setup_db(self): | |
'''Initialise the SQLite database by creating tables that don't exist''' | |
c = self.db.cursor() | |
c.execute('CREATE TABLE IF NOT EXISTS seen_notifs (eid INTEGER, type STRING)') | |
c.close() | |
def db_has_seen_message(self, msg): | |
'''Check whether a particular message has already been seen''' | |
type = msg['type'] | |
eid = msg['eid'] | |
cache_key = (type,eid) | |
if cache_key in self.seen_cache: | |
return True | |
c = self.db.cursor() | |
c.execute('SELECT eid FROM seen_notifs WHERE type = ? AND eid = ?', (type, eid)) | |
result = (c.fetchone() != None) | |
c.close() | |
if result: | |
self.seen_cache.add(cache_key) | |
return result | |
def db_mark_message_as_seen(self, msg): | |
'''Mark a message as one we've already seen''' | |
type = msg['type'] | |
eid = msg['eid'] | |
cache_key = (type,eid) | |
c = self.db.cursor() | |
c.execute('INSERT INTO seen_notifs (type, eid) VALUES (?, ?)', (type, eid)) | |
c.close() | |
self.seen_cache.add(cache_key) | |
def pushbullet_message(self, msg): | |
'''Send a Pushbullet link containing the given FA message''' | |
if msg['type'] == 'watch': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Watch] %s' % msg['name'], | |
body='New watch!', | |
url=FA_BASE+msg['url'], | |
) | |
elif msg['type'] == 'journal': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Journal] %s' % msg['author'], | |
body=msg['name'], | |
url=FA_BASE+msg['url'], | |
) | |
elif msg['type'] == 'shout': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Shout] %s' % msg['author'], | |
body='New shout!', | |
url='%s/user/%s/' % (FA_BASE, self.username), | |
) | |
elif msg['type'] == 'fav': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Fav] %s' % msg['username'], | |
body=msg['sub_name'], | |
url=FA_BASE+msg['sub_url'], | |
) | |
elif msg['type'] == 's_comment' or msg['type'] == 'j_comment': | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA [Comment] %s' % msg['name'], | |
body=msg['description'], | |
url=FA_BASE+msg['url'], | |
) | |
def pushbullet_note(self, note_count): | |
plural = '' if note_count == 1 else 's' | |
pushbullet(self.pushbullet_key, | |
type='link', | |
title='FA: %d new note%s' % (note_count, plural), | |
body='Tap me!', | |
url=FA_BASE+'/msg/pms/', | |
) | |
def get_messages(self): | |
'''Get the current FA messages, write out an error if appropriate''' | |
html = 'None' | |
url = FA_BASE + '/msg/others/' | |
try: | |
html = requests.get(url, **self.request_params).text | |
return scrape_messages_html(html) | |
except Exception as e: | |
# Failed! | |
if self.log_errors: | |
info = sys.exc_info() | |
try: | |
os.mkdir('notifier_debug') | |
except: | |
pass | |
stamp = time.time() | |
with open('notifier_debug/%r.html' % stamp, 'w') as f: | |
f.write(html) | |
with open('notifier_debug/%r.exc' % stamp, 'w') as f: | |
traceback.print_exception(info[0], info[1], info[2], None, f) | |
return None | |
def execute(self): | |
iteration = 0 | |
last_note_count = None | |
while True: | |
iteration += 1 | |
print('[%d] Polling...' % iteration) | |
result = self.get_messages() | |
if result is None: | |
print('[%d] Failed, trying again soon.' % iteration) | |
time.sleep(60) | |
continue | |
print('[%d] %d message(s) returned, %d unread note(s)' % (iteration, len(result['messages']), result['note_count'])) | |
# check notes | |
if last_note_count is not None and result['note_count'] > last_note_count: | |
print('[%d] New notes!' % iteration) | |
self.pushbullet_note(result['note_count']) | |
last_note_count = result['note_count'] | |
# check messages | |
new_count = 0 | |
too_old_count = 0 | |
old_threshold = time.time() - (86400 * 2) | |
for msg in result['messages']: | |
if self.db_has_seen_message(msg): | |
continue | |
print('%s - %r' % (time.strftime('%c', time.gmtime(msg['timestamp'])), msg)) | |
if msg['timestamp'] > old_threshold: | |
self.pushbullet_message(msg) | |
new_count += 1 | |
else: | |
too_old_count += 1 | |
self.db_mark_message_as_seen(msg) | |
if new_count > 0 or too_old_count > 0: | |
self.db.commit() | |
print('[%d] %d new message(s) pushed, %d held back due to age' % (iteration, new_count, too_old_count)) | |
# delay until the next round! | |
delay = random.randint(240, 300) | |
print('[%d] Waiting for %d seconds' % (iteration, delay)) | |
time.sleep(delay) | |
def main(): | |
# Obtain and read the configuration file | |
if len(sys.argv) <= 1: | |
config_path = 'config.json' | |
print('Configuration file not specified, defaulting to ./config.json') | |
elif len(sys.argv) == 2: | |
config_path = sys.argv[1] | |
print('Reading configuration from %s' % config_path) | |
else: | |
print('Usage: python %s [config.json]' % sys.argv[0]) | |
return | |
with open(config_path, 'r') as f: | |
raw_config = f.read() | |
try: | |
config = json.loads(raw_config) | |
except ValueError: | |
print('JSON parsing error while reading configuration!') | |
raise | |
# Work on it! | |
n = Notifier(config) | |
n.execute() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment