Last active
February 17, 2021 00:20
-
-
Save ChengLuffy/fc279531510c6e22315db0d64cd5c568 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding:UTF-8 | |
import json | |
import re | |
import base64 | |
import time | |
import requests | |
from bs4 import BeautifulSoup | |
HEADERS = { | |
"X-Requested-With": "XMLHttpRequest", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36" | |
"(KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36", | |
} | |
PATH = r"/config/ss.txt" | |
YMLPATH = r"/config/clash.yml" | |
SURGEPATH = r"/config/surge.txt" | |
def parse_ishadow(): | |
""" | |
parse website:http://isx.yt/ | |
""" | |
try: | |
url = "http://isx.yt/" | |
req = requests.get(url, headers=HEADERS).text.encode( | |
"ISO-8859-1" | |
).decode( | |
"utf-8" | |
) | |
bs = BeautifulSoup(req, "lxml").find( | |
"div", class_="portfolio-items" | |
).get_text().strip() | |
except Exception as e: | |
print(e) | |
else: | |
server = re.findall(r"(?<=IP Address:)\S+", bs) | |
server_port = re.findall(r"(?<=Port:)\d+", bs) | |
password = re.findall(r"(?<=Password:) *\S*", bs) | |
method = re.findall(r"(?<=Method:)\S+", bs) | |
accounts = [] | |
# some account without password | |
for i, value in enumerate(password): | |
if value != " ": | |
node = method[i] + ":" + value + "@" + server[i] + ":" + server_port[i] | |
node_base64 = base64.b64encode(node.encode(encoding="utf-8")) | |
link = "ss://" + node_base64.decode() | |
accounts.append(link) | |
configs.extend(accounts) | |
saveSSTXT() | |
saveClashYml() | |
saveSurgeList() | |
def saveSurgeList(): # 保存 surge 外部代理订阅文件 | |
str = "\n".join(configs) | |
nodes = getAllNodesByString(str) | |
ret = "#%s\n" % (time.asctime( time.localtime(time.time()) )) | |
for node in nodes: | |
name = node[0] | |
server = node[1] | |
port = node[2] | |
cipher = node[3] | |
pwd = node[4] | |
ret += "%s = ss, %s, %s, encrypt-method=%s, password=%s\n" % (name, server, port, cipher, pwd) | |
with open(SURGEPATH, 'wt') as f: | |
f.write(ret) | |
f.close | |
def saveClashYml(): # 保存 clash 订阅文件 | |
str = "\n".join(configs) | |
nodes = getAllNodesByString(str) | |
getClash(nodes) | |
def saveSSTXT(): # 保存 ss 订阅文件 | |
str = "\n".join(configs) | |
result = base64.b64encode(str.encode(encoding="utf-8")).decode() | |
with open(PATH, "w+", encoding="utf-8") as f: | |
f.write(result) | |
f.close() | |
def getBasefile(url): # 获取订阅链接加密文本 | |
try: | |
html = requests.get(url) | |
html.raise_for_status | |
html.encoding = html.apparent_encoding | |
return str(html.text) | |
except: | |
with open(url, 'rt') as f: | |
data = f.read() | |
return data | |
def getAllLinks(url): # 从加密文本解析出所有ss链接 | |
links = getBasefile(url) | |
result = decodeInfo(links) | |
alllinks = result.split('\n') | |
alllinks = list(set(alllinks)) | |
if len(alllinks[-1]) < 10: | |
alllinks.pop() | |
return alllinks | |
def getAllNodes(url): # 从ss链接汇总得到所有节点信息 | |
allnodes = [] | |
links = getAllLinks(url) | |
for ss in links: | |
link = ss.split('//')[1].split("'")[0] | |
# node = getNode(link) if ss.split(':')[0] == "ss" else getNodeR(link) | |
if ss.split(':')[0] == "ss": | |
node = getNode(link) | |
allnodes.append(node) | |
else: | |
node = getNodeR(link) | |
if checkNode(node): | |
node = node[:-2] | |
allnodes.append(node) | |
else: | |
continue | |
return allnodes | |
def getAllNodesByString(content): # 文本解析,得到所有节点信息 | |
allnodes = [] | |
links = content.split('\n') | |
links = list(set(links)) | |
if len(links[-1]) < 10: | |
links.pop() | |
for ss in links: | |
link = ss.split('//')[1].split("'")[0] | |
# node = getNode(link) if ss.split(':')[0] == "ss" else getNodeR(link) | |
if ss.split(':')[0] == "ss": | |
node = getNode(link) | |
allnodes.append(node) | |
else: | |
node = getNodeR(link) | |
if checkNode(node): | |
node = node[:-2] | |
allnodes.append(node) | |
else: | |
continue | |
return allnodes | |
def getNode(link): # 从ss链接中得到节点信息 | |
info = decodeInfo(link) | |
method = info.split(':')[0] | |
pwd = info.split("@")[0].split(":")[1] | |
server = info.split("@")[1].split(":")[0] | |
port = info.split(':')[2] | |
remark = server | |
node = [remark, server, port, method, pwd] | |
return node | |
def getNodeR(link): # 从ssr链接中得到节点信息 | |
info = decodeInfo(link) | |
pwd = decodeInfo(info.split('/')[0].split(':')[-1]).split("'")[1] | |
server = info.split(':')[0].split("'")[1] | |
port = info.split(':')[1] | |
protocol = info.split(':')[2] | |
method = info.split(':')[3] | |
obfs = info.split(':')[4] | |
remark = getName(info.split('&')[2].split('=')[1]) | |
# print(server, port, method, pwd, protocol, obfs, remark) | |
node = [remark, server, port, method, pwd, protocol, obfs] | |
return node | |
def getName(info): # 得到节点名称(有待合并) | |
lens = len(info) | |
if lens % 4 == 1: | |
info = info + "===" | |
elif lens % 4 == 2: | |
info = info + "==" | |
elif lens % 4 == 3: | |
info = info + "=" | |
result = base64.urlsafe_b64decode(info).decode('utf-8', errors='ignore') | |
return result | |
def checkNode(node): # 检查节点是否是ss节点 | |
obfs = node[6] | |
pro = node[5] | |
if checkObfs(obfs) and checkPro(pro): | |
return True | |
else: | |
return False | |
def checkObfs(str): # 检查是否为ss混淆 | |
if str == "plain" or str.split('_')[-1] == "compatible": | |
return True | |
else: | |
return False | |
def checkPro(str): # 检查是否为ss协议 | |
if str == "origin" or str.split('_')[-1] == "compatible": | |
return True | |
else: | |
return False | |
def decodeInfo(info): # 解码加密内容 | |
lens = len(info) | |
if lens % 4 == 1: | |
info = info + "===" | |
elif lens % 4 == 2: | |
info = info + "==" | |
elif lens % 4 == 3: | |
info = info + "=" | |
result = base64.b64decode(info).decode() | |
return result | |
def setNodes(nodes): # 设置节点 | |
proxies = [] | |
for node in nodes: | |
name = node[0] | |
server = node[1] | |
port = node[2] | |
cipher = node[3] | |
pwd = node[4] | |
proxy = " - { name: " + str( | |
name).strip() + ", type: ss, server: " + str( | |
server) + ", port: " + str(port) + ", cipher: " + str( | |
cipher) + ", password: " + str(pwd) + " }\n" | |
proxies.append(proxy) | |
proxies.insert(0, '\nproxies:\n') | |
return proxies | |
def setPG(nodes): # 设置策略组 auto,Fallback-auto,Proxy | |
proxy_names = [] | |
for node in nodes: | |
proxy_names.append(node[0]) | |
auto = " - { name: 'Auto', type: url-test, proxies: " + str( | |
proxy_names | |
) + ", url: 'http://www.gstatic.com/generate_204', interval: 300 }\n" | |
Fallback = " - { name: 'Fallback-Auto', type: fallback, proxies: " + str( | |
proxy_names | |
) + ", url: 'http://www.gstatic.com/generate_204', interval: 300 }\n" | |
Select = " - { name: 'Select', type: select, proxies: " + str( | |
proxy_names) + " }\n" | |
Proxy = " - { name: 'Proxy', type: select, proxies: [Auto, Fallback-Auto, Select] }\n" | |
ProxyGroup = ['\nproxy-groups:\n', auto, Fallback, Select, Proxy] | |
# ProxyGroup.insert(0, 'Proxy Group:\n') | |
return ProxyGroup | |
def getClash(nodes): | |
gener = getBasefile( | |
'https://gist.githubusercontent.com/ChengLuffy/db93213e2bea17f892725247ec617016/raw/General.yml') | |
with open(YMLPATH, "w", encoding="utf8") as f: | |
f.writelines(gener) | |
f.close() | |
info = setNodes(nodes) + setPG(nodes) | |
with open(YMLPATH, "a", encoding="utf8") as f: | |
f.writelines(info) | |
f.close() | |
rules = getBasefile( | |
'https://gist.githubusercontent.com/ChengLuffy/7c497ed2f5e4cc2a8664cd8803b4af21/raw/rules.yml') | |
with open(YMLPATH, "a", encoding="utf8") as f: | |
f.writelines(rules) | |
f.close() | |
def main(): | |
parse_ishadow() | |
if __name__ == "__main__": | |
configs = [] | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment