Last active
April 23, 2023 01:57
-
-
Save TransparentLC/83fa438907708b476e29930f5c76ddda to your computer and use it in GitHub Desktop.
在一些账号有更新时使用 Windows 的通知卡片发送提醒
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import bs4 | |
import csv | |
import dataclasses | |
import functools | |
import hashlib | |
import lxml.etree | |
import orjson | |
import os | |
import re | |
import requests | |
import rich.console | |
import rich.table | |
import secrets | |
import subprocess | |
import tempfile | |
import typing | |
import warnings | |
import time | |
import zstandard | |
from concurrent.futures import ThreadPoolExecutor | |
__import__('requests.models').Response.json = lambda self, **kwargs: orjson.loads(self.content) | |
warnings.filterwarnings('ignore', category=bs4.MarkupResemblesLocatorWarning) | |
WATCH_TIMEOUT = 5 | |
DOWNLOAD_CHUNK = 262144 | |
@functools.total_ordering | |
@dataclasses.dataclass | |
class WatchItem: | |
id: typing.Hashable | |
time: time.struct_time | |
text: str | |
url: str | |
image: str | None | |
info: dict[str] | None | |
extra: dict[str] | None | |
def __hash__(self) -> int: | |
return hash(self.id) | |
def __eq__(self, other: 'WatchItem') -> bool: | |
return self.time == other.time | |
def __lt__(self, other: 'WatchItem') -> bool: | |
return self.time < other.time | |
class Watcher(abc.ABC): | |
sublatestItem: WatchItem | None = None | |
latestItem: WatchItem | None = None | |
session: requests.Session | |
name: str | |
icon: str | None = None | |
url: str | |
def createSession(self): | |
self.session = requests.Session() | |
self.session.hooks['response'].append(lambda r, *args, **kwargs: r.raise_for_status()) | |
def notify(self) -> None: | |
if self.icon: | |
iconPath = os.path.join(tempfile.gettempdir(), hashlib.sha256(self.icon.encode()).hexdigest()) | |
if not os.path.exists(iconPath): | |
with ( | |
open(iconPath, 'wb') as f, | |
self.session.get(self.icon, stream=True) as r, | |
): | |
for chunk in r.iter_content(DOWNLOAD_CHUNK): | |
f.write(chunk) | |
if self.latestItem.image: | |
imagePath = os.path.join(tempfile.gettempdir(), secrets.token_hex(32)) | |
with ( | |
open(imagePath, 'wb') as f, | |
self.session.get(self.latestItem.image, stream=True) as r, | |
): | |
for chunk in r.iter_content(DOWNLOAD_CHUNK): | |
f.write(chunk) | |
xml = lxml.etree.XML(''' | |
<toast activationType="protocol"> | |
<visual> | |
<binding template="ToastGeneric"> | |
<text></text> | |
<text></text> | |
<image placement="appLogoOverride" hint-crop="circle" src="" /> | |
<image placement="hero" src="" /> | |
</binding> | |
</visual> | |
<actions> | |
<action content="查看" activationType="protocol" arguments="" /> | |
</actions> | |
<audio src="ms-winsoundevent:Notification.Reminder" /> | |
</toast> | |
''', parser=lxml.etree.XMLParser(remove_blank_text=True)) | |
xml.find('.//text[1]').text = self.name | |
xml.find('.//text[2]').text = self.latestItem.text | |
xml.find('.//action').set('arguments', self.latestItem.url) | |
if self.icon: | |
xml.find('.//image[@placement="appLogoOverride"]').set('src', iconPath) | |
else: | |
xml.find('.//binding').remove(xml.find('.//image[@placement="appLogoOverride"]')) | |
if self.latestItem.image: | |
xml.find('.//image[@placement="hero"]').set('src', imagePath) | |
else: | |
xml.find('.//binding').remove(xml.find('.//image[@placement="hero"]')) | |
script = f''' | |
[Windows.UI.Notifications.ToastNotificationManager, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null | |
[Windows.UI.Notifications.ToastNotification, Windows.UI.Notifications, ContentType = WindowsRuntime] | Out-Null | |
[Windows.Data.Xml.Dom.XmlDocument, Windows.Data.Xml.Dom.XmlDocument, ContentType = WindowsRuntime] | Out-Null | |
$d = [Windows.Data.Xml.Dom.XmlDocument]::New() | |
$d.loadXml(@" | |
{lxml.etree.tostring(xml, pretty_print=False).decode()} | |
"@) | |
[Windows.UI.Notifications.ToastNotificationManager]::CreateToastNotifier("Multi Notifier").Show([Windows.UI.Notifications.ToastNotification]::New($d)) | |
'''.strip() | |
subprocess.run( | |
( | |
'powershell', '-Command', script, | |
), | |
) | |
if imagePath: | |
os.remove(imagePath) | |
@abc.abstractmethod | |
def watch(self) -> WatchItem: | |
pass | |
class WeiboWatcher(Watcher): | |
containerid: int | |
def __init__(self, uid: int) -> None: | |
self.createSession() | |
self.session.headers['Referer'] = 'https://m.weibo.cn/' | |
self.url = f'https://m.weibo.cn/u/{uid}' | |
try: | |
r = self.session.get( | |
'https://m.weibo.cn/api/container/getIndex', | |
params={ | |
'type': 'uid', | |
'value': uid, | |
}, | |
) | |
d = r.json()['data'] | |
self.name = d['userInfo']['screen_name'] | |
self.icon = d['userInfo']['profile_image_url'].split('?')[0] | |
for tab in d['tabsInfo']['tabs']: | |
if tab['tab_type'] == 'weibo': | |
self.containerid = int(tab['containerid']) | |
break | |
except: | |
self.name = self.containerid = self.icon = None | |
def watch(self) -> WatchItem: | |
r = self.session.get( | |
'https://m.weibo.cn/api/container/getIndex', | |
params={ | |
'containerid': self.containerid, | |
}, | |
timeout=WATCH_TIMEOUT, | |
) | |
self.sublatestItem = self.latestItem | |
self.latestItem = max( | |
( | |
WatchItem( | |
id=int(x['mblog']['id']), | |
time=time.strptime(x['mblog']['created_at'], '%a %b %d %H:%M:%S %z %Y'), | |
text=bs4.BeautifulSoup( | |
re.sub( | |
r'<span class="url-icon"><img alt=(.+?) src=".+?" .*?/></span>', | |
'\g<1>', | |
x['mblog']['text'], | |
), | |
features='lxml', | |
).text.strip(), | |
url=f"https://m.weibo.cn/detail/{x['mblog']['id']}", | |
image=( | |
x['mblog']['pics'][0]['large']['url'] | |
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], list) else | |
( | |
x['mblog']['pics']['0']['large']['url'] | |
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], dict) else | |
None | |
) | |
), | |
info={ | |
':repeat:': x['mblog']['reposts_count'], | |
':speech_balloon:': x['mblog']['comments_count'], | |
':thumbs_up:': x['mblog']['attitudes_count'], | |
}, | |
extra={ | |
'images': ( | |
tuple(x['large']['url'] for x in x['mblog']['pics']) | |
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], list) else | |
( | |
tuple(x['large']['url'] for i, x in x['mblog']['pics'].items() if i.isnumeric()) | |
if 'pics' in x['mblog'] and isinstance(x['mblog']['pics'], dict) else | |
tuple() | |
) | |
), | |
}, | |
) | |
for x in r.json()['data']['cards'] | |
if x['card_type'] == 9 | |
), | |
key=lambda e: e.time, | |
) | |
return self.latestItem | |
class ZhihuWatcher(Watcher): | |
def __init__(self, username: str) -> None: | |
self.createSession() | |
self.url = f'https://www.zhihu.com/people/{username}' | |
try: | |
r = self.session.get(self.url) | |
d = orjson.loads(re.search(rb'<script id="js-initialData" type="text/json">(.+?)</script>', r.content).group(1))['initialState']['entities'] | |
self.name = d['users'][username]['name'] | |
self.icon = d['users'][username]['avatarUrl'].split('?')[0] | |
except: | |
self.name = self.icon = None | |
def watch(self) -> WatchItem: | |
r = self.session.get(self.url, timeout=WATCH_TIMEOUT) | |
d = orjson.loads(re.search(rb'<script id="js-initialData" type="text/json">(.+?)</script>', r.content).group(1))['initialState']['entities'] | |
a = max(d['activities'].values(), key=lambda e: e['createdTime']) | |
self.sublatestItem = self.latestItem | |
match a['target']['schema']: | |
case 'answer': | |
self.latestItem = WatchItem( | |
id=int(a['target']['id']), | |
time=time.localtime(a['createdTime']), | |
text=a['actionText'] + ':' + d['answers'][str(a['target']['id'])]['question']['title'] + '\n' + bs4.BeautifulSoup(d['answers'][str(a['target']['id'])]['excerpt'], features='lxml').text.strip(), | |
url=f"https://www.zhihu.com/answer/{a['target']['id']}", | |
image=None, | |
info={ | |
':speech_balloon:': d['answers'][str(a['target']['id'])]['commentCount'], | |
':thumbs_up:': d['answers'][str(a['target']['id'])]['voteupCount'], | |
}, | |
extra=None, | |
) | |
case 'question': | |
self.latestItem = WatchItem( | |
id=int(a['target']['id']), | |
time=time.localtime(a['createdTime']), | |
text=a['actionText'] + ':' + d['questions'][str(a['target']['id'])]['title'], | |
url=f"https://www.zhihu.com/question/{a['target']['id']}", | |
image=None, | |
info=None, | |
extra=None, | |
) | |
case 'pin': | |
self.latestItem = WatchItem( | |
id=int(a['target']['id']), | |
time=time.localtime(a['createdTime']), | |
text=a['actionText'] + ':' + d['pins'][str(a['target']['id'])]['content'][0]['content'], | |
url=f"https://www.zhihu.com/pin/{a['target']['id']}", | |
image=d['pins'][str(a['target']['id'])]['content'][1]['url'] if len(d['pins'][str(a['target']['id'])]['content']) > 1 else None, | |
info={ | |
':speech_balloon:': d['pins'][str(a['target']['id'])]['commentCount'], | |
':thumbs_up:': d['pins'][str(a['target']['id'])]['reactionCount'], | |
}, | |
extra=None, | |
) | |
case 'article': | |
self.latestItem = WatchItem( | |
id=int(a['target']['id']), | |
time=time.localtime(a['createdTime']), | |
text=a['actionText'] + ':' + d['articles'][str(a['target']['id'])]['title'] + '\n' + bs4.BeautifulSoup(d['articles'][str(a['target']['id'])]['excerpt'], features='lxml').text.strip(), | |
url=f"https://zhuanlan.zhihu.com/p/{a['target']['id']}", | |
image=d['articles'][str(a['target']['id'])]['imageUrl'].split('?')[0] or None, | |
info={ | |
':speech_balloon:': d['articles'][str(a['target']['id'])]['commentCount'], | |
':thumbs_up:': d['articles'][str(a['target']['id'])]['voteupCount'], | |
}, | |
extra=None, | |
) | |
return self.latestItem | |
if __name__ == '__main__': | |
console = rich.console.Console(highlight=False) | |
with ( | |
ThreadPoolExecutor(os.cpu_count() * 4) as executor, | |
zstandard.open('multi-watcher.csv.zst', 'w', encoding='utf-8', cctx=zstandard.ZstdCompressor(22)) as f, | |
): | |
csvfile = csv.DictWriter(f, ( | |
'id', | |
'author', | |
'watcher', | |
'url', | |
'time', | |
'text', | |
'image', | |
'extra', | |
)) | |
csvfile.writeheader() | |
watchers: tuple[Watcher, ...] = tuple(x for x in ( | |
*executor.map(lambda e: ZhihuWatcher(e), ( | |
'wu-qing-qi-fen-pi', # 这里是安全的 | |
'reseted1676580842000', # 这里是安全滴 | |
'ncc21382', # ncc21382 | |
)), | |
*executor.map(lambda e: WeiboWatcher(e), ( | |
5514879288, # 江江bot | |
6339370357, # 江图社 | |
7166086720, # 冮冮bot | |
)), | |
) if x.name) | |
def wrappedWatch(watcher: Watcher): | |
try: | |
return watcher.watch() | |
except: | |
return watcher.latestItem | |
try: | |
while True: | |
ts = time.perf_counter() | |
for _ in executor.map(wrappedWatch, watchers): | |
pass | |
te = time.perf_counter() | |
table = rich.table.Table( | |
'author/time', 'content/link', | |
header_style='bold red', | |
show_lines=True, | |
) | |
for watcher in watchers: | |
if watcher.latestItem: | |
if ( | |
(watcher.sublatestItem is None and time.time() - time.mktime(watcher.latestItem.time) < 3600) or | |
(watcher.sublatestItem is not None and watcher.sublatestItem < watcher.latestItem) | |
): | |
executor.submit(watcher.notify) | |
csvfile.writerow({ | |
'id': watcher.latestItem.id, | |
'author': watcher.name, | |
'watcher': watcher.__class__.__name__, | |
'url': watcher.latestItem.url, | |
'time': time.strftime('%Y-%m-%d %H:%M:%S', watcher.latestItem.time), | |
'text': watcher.latestItem.text, | |
'image': watcher.latestItem.image or '', | |
'extra': orjson.dumps(watcher.latestItem.extra).decode() if watcher.latestItem.extra else '', | |
}) | |
table.add_row( | |
f'[link={watcher.url}]{watcher.name}[/link]\n{time.strftime("%Y-%m-%d %H:%M:%S", watcher.latestItem.time)}', | |
watcher.latestItem.text + '\n' + ' '.join(( | |
*(f'{k}{v:>4}' for k, v in watcher.latestItem.info.items()), | |
f':link:{watcher.latestItem.url}', | |
)), | |
) | |
os.system('cls' if os.name == 'nt' else 'clear') | |
console.print(table) | |
console.print('Updated', len(watchers), 'watchers at', time.strftime('%Y-%m-%d %H:%M:%S'), 'in', f'{(te - ts) * 1000:.2f}ms', style='italic bright_black', end='') | |
time.sleep(20) | |
except KeyboardInterrupt: | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment