Last active
January 25, 2020 13:24
-
-
Save 0xKira/d9a1c07b8a94de2a61f97c40f717f014 to your computer and use it in GitHub Desktop.
监控丁香医生的实时动态,如有变化自动发送邮件提醒
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import requests | |
import re | |
import json | |
import time | |
from smtplib import SMTP_SSL | |
from email.header import Header | |
from email.mime.text import MIMEText | |
last_msg_time = None | |
def get_content(): | |
url = 'https://3g.dxy.cn/newh5/view/pneumonia' | |
headers = { | |
'User-Agent': | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36 Edge/15.15063' | |
} | |
content = requests.get(url, headers=headers).content.decode('utf-8') | |
return content | |
def get_latest_msg(content): | |
patt = r'<script id="getTimelineService">.+?window.getTimelineService\s=\s(\[.+?)}catch\(e\){}</script>' | |
res = re.findall(patt, content, re.S) | |
# assert res | |
j = json.loads(res[0]) | |
latest = j[0] | |
print(time.strftime('%H:%M:%S', time.localtime(time.time())), latest['title']) | |
return latest | |
def parse_msg(msg): | |
to_send_msg = '' | |
# to_send_msg += msg['title'] + '\n' | |
to_send_msg += '{} {}\n'.format(time.strftime("%H:%M", time.localtime(msg['pubDate'] / 1000)), msg['pubDateStr']) | |
to_send_msg += msg['summary'] + '\n' | |
to_send_msg += '信息来源:{}\n'.format(msg['infoSource']) | |
to_send_msg += '链接:{}\n'.format(msg['sourceUrl']) | |
return to_send_msg | |
def parse_statistics(content): | |
patt = r'实时播报.+?class="mapTitle.+?"><span>(.+?)</span></p>.+?' # 截止xxx | |
patt += r'<span.+?>(.+?)' | |
patt += r'<span.+?>(.+?)</span>(.+?)' * 4 # 确诊 疑似 治愈 死亡 | |
patt += r'</span></p>' | |
res = re.findall(patt, content, re.S)[0] | |
ret = '' | |
ret += res[0] + '\n' | |
ret += ' '.join([x.strip() for x in res[1:]]) | |
# print(ret) | |
return ret | |
HOST_SERVER = 'smtp.qq.com' | |
PWD = '' # PWD为qq邮箱的授权码 | |
SENDER_QQ_MAIL = '[email protected]' | |
smtp = None | |
def login_email(): | |
global smtp | |
smtp = SMTP_SSL(HOST_SERVER) | |
# smtp.set_debuglevel(1) | |
# set_debuglevel()是用来调试的。参数值为1表示开启调试模式,参数值为0关闭调试模式 | |
smtp.ehlo(HOST_SERVER) | |
smtp.login(SENDER_QQ_MAIL, PWD) | |
def init(): | |
global last_msg_time, smtp | |
last_msg_time = get_latest_msg(get_content())['createTime'] | |
def send_mail(title, content, receiver): | |
global smtp | |
print(content) | |
msg = MIMEText(content, "plain", 'utf-8') | |
msg["Subject"] = Header(title, 'utf-8') | |
msg["From"] = SENDER_QQ_MAIL | |
msg["To"] = receiver | |
smtp.sendmail(SENDER_QQ_MAIL, receiver, msg.as_string()) | |
# smtp.quit() | |
def main_loop(): | |
global last_msg_time | |
content = get_content() | |
latest = get_latest_msg(content) | |
if last_msg_time != latest['createTime']: | |
last_msg_time = latest['createTime'] | |
to_send_msg = parse_msg(latest) | |
to_send_msg += '\n\n' | |
to_send_msg += parse_statistics(content) | |
login_email() | |
with open('./receivers.txt', 'r') as f: | |
for receiver in f.readlines(): | |
print('sending email to:', receiver) | |
send_mail(latest['title'], to_send_msg, receiver) | |
if __name__ == '__main__': | |
init() | |
while True: | |
try: | |
main_loop() | |
except: | |
login_email() | |
send_mail('阿三又改代码啦', 'error!', 'xxx@xxx') | |
time.sleep(30) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment