-
-
Save nihaals/bc90ab1e6d64c49d80055fcfcf0d2330 to your computer and use it in GitHub Desktop.
Rock's IPC Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import base64 | |
import csv | |
import html | |
import imghdr | |
import logging | |
import math | |
import os | |
import pathlib | |
import random | |
import re | |
import string | |
import subprocess | |
import sys | |
import time | |
import traceback | |
import tracemalloc | |
from datetime import timedelta | |
from functools import wraps | |
from urllib.parse import urlparse | |
from uuid import uuid4 | |
import aiohttp | |
import discord | |
import paypalrestsdk | |
import rethinkdb | |
import ujson as json | |
import uvloop | |
import yaml | |
from itsdangerous import URLSafeSerializer | |
from PIL import Image, ImageDraw, ImageFont | |
from quart import (Quart, jsonify, redirect, render_template, request, | |
send_file, session, websocket) | |
from quart.ctx import copy_current_websocket_context | |
from quart.sessions import SessionInterface, SessionMixin | |
from requests_oauthlib import OAuth2Session | |
from rockutils import rockutils | |
from werkzeug.datastructures import CallbackDict | |
uvloop.install() | |
tracemalloc.start() | |
config = rockutils.load_json("cfg/config.json") | |
paypal_api = paypalrestsdk.configure(config['paypal']) | |
recaptcha_key = config['keys']['recaptcha'] | |
_oauth2 = config['oauth'] | |
# _domain = "beta.welcomer.fun" | |
_domain = "192.168.0.29:15007" | |
_debug = True | |
r = rethinkdb.RethinkDB() | |
rethink = r.connect(host=config['db']['host'], | |
port=config['db']['port'], | |
db=config['db']['table'] + "debug" if _debug else "") | |
rethink_sessions = r.connect(host=config['db']['host'], | |
port=config['db']['port'], | |
db="Quart") | |
def empty(val): | |
return val in ['', ' ', None] | |
class RethinkSession(CallbackDict, SessionMixin): | |
def __init__(self, initial=None, sid=None, new=False): | |
def on_update(self): | |
self.modified = True | |
CallbackDict.__init__(self, initial, on_update) | |
self.sid = sid | |
self.new = new | |
self.modified = False | |
class RethinkSessionInterface(SessionInterface): | |
serializer = json | |
session_class = RethinkSession | |
def __init__(self, rethink, prefix=''): | |
self.rethink = rethink | |
self.prefix = prefix | |
self.serialize = URLSafeSerializer(_oauth2['client_secret']) | |
def generate_sid(self): | |
return str(uuid4()) | |
# def get_redis_expiration_time(self, app, session): | |
# return app.permanent_session_lifetime | |
def open_session(self, app, request): | |
sid = request.cookies.get(app.session_cookie_name) | |
if not sid: | |
sid = self.generate_sid() | |
return self.session_class(sid=sid, new=True) | |
rockutils.prefix_print( | |
f"Retreiving session {self.prefix + sid}. URL: {request.path}", | |
prefix_colour="light blue") | |
val = r.table("sessions").get(self.prefix + sid).run(self.rethink) | |
if val is not None: | |
data = self.serializer.loads(self.serialize.loads(val['data'])) | |
return self.session_class(data, sid=sid) | |
return self.session_class(sid=sid, new=True) | |
def save_session(self, app, session, response): | |
domain = self.get_cookie_domain(app) | |
path = self.get_cookie_path(app) | |
if not session: | |
if session.modified: | |
rockutils.prefix_print( | |
f"Deleting session {self.prefix + session.sid}. URL: {request.path}", | |
prefix_colour="light red") | |
r.table("sessions").get(self.prefix + | |
session.sid).delete().run(self.rethink) | |
response.delete_cookie(app.session_cookie_name, | |
domain=domain, | |
path=path) | |
return | |
cookie_exp = self.get_expiration_time(app, session) | |
val = self.serialize.dumps(self.serializer.dumps(dict(session))) | |
rockutils.prefix_print( | |
f"Updating session {self.prefix + session.sid}. URL: {request.path}", | |
prefix_colour="light yellow") | |
if r.table("sessions").get( | |
self.prefix + | |
session.sid).run( | |
self.rethink): | |
r.table("sessions").get(self.prefix + | |
session.sid).replace({"id": self.prefix + | |
session.sid, "data": val}).run(self.rethink) | |
else: | |
r.table("sessions").insert( | |
{"id": self.prefix + session.sid, "data": val}).run(self.rethink) | |
response.set_cookie(app.session_cookie_name, session.sid, | |
expires=cookie_exp, httponly=True, domain=domain) | |
print("[init] Setting up quart") | |
app = Quart(__name__) | |
app.session_interface = RethinkSessionInterface(rethink_sessions) | |
app.secret_key = _oauth2['client_secret'] | |
app.jinja_env.trim_blocks = True | |
app.jinja_env.lstrip_blocks = True | |
# app.config['TEMPLATES_AUTO_RELOAD'] = True | |
app.config['MAX_CONTENT_LENGTH'] = 268435456 | |
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = 'true' | |
logging.basicConfig(filename='errors.log', level=logging.ERROR, | |
format='[%(asctime)s] %(levelname)-8s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S') | |
log = logging.getLogger('quart.serving') | |
log.setLevel(logging.ERROR) | |
def normalize_form(v): | |
if str(v).lower() == "true": | |
return True | |
if str(v).lower() == "false": | |
return False | |
try: | |
if v[0] == "#": | |
return int(v[1:], 16) | |
except BaseException: | |
pass | |
try: | |
return int(v) | |
except BaseException: | |
return v | |
def expect(v, t): | |
if t == "hex": | |
if v[0] == "#": | |
try: | |
int(v[1:], 16) | |
return True | |
except BaseException: | |
pass | |
try: | |
int(v, 16) | |
return True | |
except BaseException: | |
pass | |
if t == "int": | |
try: | |
int(v) | |
return True | |
except BaseException: | |
pass | |
if t == "bool": | |
if v in [True, "true", "True", False, "false", "False"]: | |
return True | |
return False | |
def iterat(): | |
return base64.b64encode( | |
bytes(str(time.time() * 100000), | |
"ascii")).decode().replace( | |
"=", "").lower() | |
def sub(s, b, e=None, a=False): | |
s = str(s) | |
if e: | |
return s[b:e] | |
else: | |
if a: | |
return s[:b] | |
else: | |
return s[b:] | |
app.jinja_env.globals['json_loads'] = json.loads | |
app.jinja_env.globals['len'] = len | |
app.jinja_env.globals['str'] = str | |
app.jinja_env.globals['dict'] = dict | |
app.jinja_env.globals['bool'] = bool | |
app.jinja_env.globals['int'] = int | |
app.jinja_env.globals['hex'] = hex | |
app.jinja_env.globals['sub'] = sub | |
app.jinja_env.globals['iterat'] = iterat | |
app.jinja_env.globals['unesc'] = html.unescape | |
app.jinja_env.globals['ctime'] = time.time | |
app.jinja_env.globals['ceil'] = math.ceil | |
app.jinja_env.globals['enumerate'] = enumerate | |
app.jinja_env.globals['sorted'] = sorted | |
app.jinja_env.globals['dirlist'] = os.listdir | |
app.jinja_env.globals['exist'] = os.path.exists | |
app.jinja_env.globals['since_unix_str'] = rockutils.since_unix_str | |
app.jinja_env.globals['recaptcha_key'] = recaptcha_key | |
ipc_jobs = {} | |
cluster_jobs = {} | |
clusters_initialized = set() | |
last_ping = {} | |
user_cache = {} | |
cluster_status = {} | |
cluster_data = {} | |
_status_name = { | |
0: "Connecting", | |
1: "Ready", | |
2: "Restarting", | |
3: "Hung", | |
4: "Resuming", | |
5: "Stopped" | |
} | |
discord_cache = {} | |
async def create_job(request=None, o="", a="", r="", timeout=10): | |
global ipc_jobs | |
global clusters_initialized | |
if request: | |
o = (o if o != "" else request.headers.get("op")) | |
a = (a if a != "" else request.headers.get("args")) | |
r = (r if r != "" else request.headers.get("recep")) | |
if timeout == 10: | |
timeout = int(request.headers.get("timeout")) | |
job_key = "".join(random.choices(string.ascii_letters, k=32)) | |
recepients = [] | |
if r == "*": | |
for i in clusters_initialized: | |
recepients.append(i) | |
else: | |
try: | |
for i in json.loads(r): | |
_c = str(i).lower() | |
if _c in clusters_initialized: | |
recepients.append(_c) | |
except BaseException: | |
_c = str(r).lower() | |
if _c in clusters_initialized: | |
recepients.append(_c) | |
ipc_jobs[job_key] = {} | |
payload = {"o": o, "a": a, "k": job_key} | |
for r in recepients: | |
cluster_jobs[r].append(payload) | |
time_start = time.time() | |
delay_time = int(time_start) + timeout | |
while time.time() < delay_time: | |
if len(ipc_jobs[job_key]) == len(recepients): | |
break | |
await asyncio.sleep(0.05) | |
responce_payload = { | |
"k": job_key, | |
"r": r, | |
"o": o, | |
"a": a, | |
"js": time_start, | |
"d": ipc_jobs[job_key] | |
} | |
time_end = time.time() | |
responce_payload['jd'] = time_end - time_start | |
del ipc_jobs[job_key] | |
return responce_payload | |
async def sending(cluster): | |
rockutils.prefix_print(f"Started sending for {cluster}") | |
_last_ping = 0 | |
try: | |
while True: | |
_t = time.time() | |
_jobs = cluster_jobs[cluster] | |
if _t - _last_ping > 60: | |
_jobs.append({ | |
"o": "PING", | |
"a": "", | |
"k": f"ping.{cluster}" | |
}) | |
_last_ping = _t | |
if len(_jobs) > 0: | |
await websocket.send(json.dumps(_jobs)) | |
cluster_jobs[cluster] = [] | |
await asyncio.sleep(0.05) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print(str(e), prefix="IPC Sender", | |
prefix_colour="light red", text_colour="red") | |
async def receiving(cluster): | |
rockutils.prefix_print(f"Started receiving for {cluster}") | |
try: | |
while True: | |
_data = json.loads(await websocket.receive()) | |
o = _data['o'] | |
if o == "SUBMIT" and "ping" in _data['k']: | |
last_ping[cluster] = time.time() | |
cluster_data[cluster] = _data['d'] | |
rockutils.prefix_print( | |
f"Retrieved PING from cluster {cluster}") | |
elif o == "STATUS_UPDATE": | |
d = _data['d'] | |
cluster_status[cluster] = d | |
rockutils.prefix_print( | |
f"Cluster {cluster} is now {_status_name.get(d)}") | |
elif o == "SUBMIT" and _data['k'] != "push": | |
k = _data['k'] | |
r = _data['r'] | |
d = _data['d'] | |
if k in ipc_jobs: | |
ipc_jobs[k][r] = d | |
elif o == "PUSH_OPCODE": | |
d = _data['d'] | |
_opcode = d[0] | |
_args = d[1] | |
r = d[2] | |
recepients = [] | |
if r == "*": | |
for i in clusters_initialized: | |
recepients.append(i) | |
else: | |
try: | |
for i in json.loads(r): | |
_c = str(i).lower() | |
if _c in clusters_initialized: | |
recepients.append(_c) | |
except BaseException: | |
_c = str(r).lower() | |
if _c in clusters_initialized: | |
recepients.append(_c) | |
for _r in recepients: | |
ipc_jobs[_r].append({ | |
"o": _opcode, | |
"a": _args, | |
"k": "push" | |
}) | |
await asyncio.sleep(0.05) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print(str(e), prefix="IPC Receiver", | |
prefix_colour="light red", text_colour="red") | |
@app.route("/api/job/<auth>", methods=['POST', 'GET']) | |
async def ipc_job(auth): | |
if auth != config['ipc']['auth_key']: | |
return "Invalid authentication", 403 | |
return jsonify(await create_job(request)) | |
@app.websocket("/api/ipc/<cluster>/<auth>") | |
async def ipc_slave(cluster, auth): | |
if auth != config['ipc']['auth_key']: | |
return "Invalid authentication", 403 | |
else: | |
await websocket.accept() | |
cluster = str(cluster).lower() | |
ipc_jobs[cluster] = [] | |
clusters_initialized.add(cluster) | |
if cluster not in cluster_jobs: | |
cluster_jobs[cluster] = [] | |
rockutils.prefix_print(f"Connected to cluster {cluster}") | |
# loop = asyncio.get_event_loop() | |
try: | |
await asyncio.gather( | |
copy_current_websocket_context(sending)(cluster), | |
copy_current_websocket_context(receiving)(cluster) | |
) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print(str(e), prefix="IPC Slave", | |
prefix_colour="light red", text_colour="red") | |
############################################################# | |
# DASHBOARD | |
############################################################# | |
@app.errorhandler(404) | |
async def not_found_error(e): | |
print("404") | |
if "static" in request.url: | |
return "404" | |
return (await render_template('error.html', error="I cant seem to find this page. Maybe you went to the wrong place?", image="finding")), 404 | |
@app.errorhandler(403) | |
async def forbidden_error(e): | |
print("403") | |
if "static" in request.url: | |
return "403" | |
return (await render_template('error.html', error="Hm, it seems you cant access this page. Good try though old chap", image="sir")), 403 | |
@app.errorhandler(500) | |
async def internal_server_error(e): | |
print("500") | |
if "static" in request.url: | |
return "500" | |
return (await render_template('error.html', error="Uh oh, seems something pretty bad just happened... Mr Developer, i dont feel so good...", image=random.choice(["sad1", "sad2", "sad3"]))), 500 | |
# Functions | |
def token_updater(token): | |
session['oauth2_token'] = token | |
def make_session(token=None, state=None, scope=None): | |
return OAuth2Session( | |
client_id=_oauth2['client_id'], | |
token=token, | |
state=state, | |
scope=scope, | |
redirect_uri=_oauth2['redirect_uri'][_domain], | |
auto_refresh_kwargs={ | |
'client_id': _oauth2['client_id'], | |
'client_secret': _oauth2['client_secret'], | |
}, | |
auto_refresh_url=_oauth2['token_url'], | |
token_updater=token_updater | |
) | |
async def get_user_info(id, refer=""): | |
try: | |
rockutils.prefix_print( | |
f"{f'[Refer: {refer}] ' if refer != '' else ''}Getting information for U:{id}", | |
prefix="User Info:Get", | |
prefix_colour="light green") | |
guild_info = r.table("users").get(str(id)).run(rethink) | |
return guild_info or None | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print( | |
f"Error occured whilst retrieving info for U:{id}. {e}", | |
prefix="User Info:Update", | |
prefix_colour="red", | |
text_colour="light red") | |
return False | |
async def get_guild_info(id, refer=""): | |
try: | |
rockutils.prefix_print( | |
f"{f'[Refer: {refer}] ' if refer != '' else ''}Getting information for G:{id}", | |
prefix="Guild Info:Get", | |
prefix_colour="light green") | |
guild_info = r.table("guilds").get(str(id)).run(rethink) | |
if not guild_info: | |
await create_job(o="cachereload", a=str(id), r="*") | |
guild_info = r.table("guilds").get(str(id)).run(rethink) | |
return guild_info or None | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print( | |
f"Error occured whilst retrieving info for G:{id}. {e}", | |
prefix="Guild Info:Update", | |
prefix_colour="red", | |
text_colour="light red") | |
return False | |
async def update_guild_info(id, data, forceupdate=False, refer=""): | |
try: | |
rockutils.prefix_print( | |
f"{f'[Refer: {refer}] ' if refer != '' else ''}Updating information for G:{id}", | |
prefix="Guild Info:Update", | |
prefix_colour="light green") | |
t = time.time() | |
if forceupdate: | |
r.table("guilds").get(str(id)).update(data).run(rethink) | |
else: | |
r.table("guilds").get(str(id)).replace(data).run(rethink) | |
te = time.time() | |
if te - t > 1: | |
rockutils.prefix_print( | |
f"Updating guild info took {math.floor((te-t)*1000)}ms", | |
prefix="Guild Info:Update", | |
prefix_colour="red", | |
text_colour="light red") | |
return True | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print( | |
f"Error occured whilst updating info for G:{id}. {e}", | |
prefix="Guild Info:Update", | |
prefix_colour="red", | |
text_colour="light red") | |
return False | |
async def get_guild_donations(guild_info): | |
_time = time.time() | |
valid_donations = [] | |
try: | |
_userinfo = await get_user_info(guild_info['d']['g']['o']['id']) | |
if _userinfo and _userinfo['m']['p']: | |
valid_donations.append("partner") | |
except BaseException: | |
pass | |
for id in guild_info['d']['de']: | |
try: | |
_userinfo = await get_user_info(id) | |
if _userinfo: | |
if _userinfo['m']['1']['h'] and ( | |
_time < _userinfo['m']['1']['u'] or | |
_userinfo['m']['1']['p']): | |
valid_donations.append("donation") | |
if _userinfo['m']['3']['h'] and ( | |
_time < _userinfo['m']['3']['u'] or | |
_userinfo['m']['3']['p']): | |
valid_donations.append("donation") | |
if _userinfo['m']['5']['h'] and ( | |
_time < _userinfo['m']['5']['u'] or | |
_userinfo['m']['5']['p']): | |
valid_donations.append("donation") | |
except BaseException: | |
pass | |
return valid_donations | |
async def has_guild_donated(guild_info, donation=True, partner=True): | |
_time = time.time() | |
try: | |
if partner: | |
_userinfo = await get_user_info(guild_info['d']['g']['o']['id']) | |
if _userinfo and _userinfo['m']['p']: | |
return True | |
except BaseException: | |
pass | |
for id in guild_info['d']['de']: | |
try: | |
_userinfo = await get_user_info(id) | |
if _userinfo: | |
if donation: | |
if _userinfo['m']['1']['h'] and ( | |
_time < _userinfo['m']['1']['u'] or | |
_userinfo['m']['1']['p']): | |
return True | |
if _userinfo['m']['3']['h'] and ( | |
_time < _userinfo['m']['3']['u'] or | |
_userinfo['m']['3']['p']): | |
return True | |
if _userinfo['m']['5']['h'] and ( | |
_time < _userinfo['m']['5']['u'] or | |
_userinfo['m']['5']['p']): | |
return True | |
except BaseException: | |
pass | |
return False | |
async def cache_discord(url, bot_type, key=None, custom_token=None, default={}, cachetime=120): | |
if bot_type not in config['tokens']: | |
return False, default | |
if not custom_token: | |
token = config['tokens'].get(bot_type) | |
else: | |
token = custom_token | |
key = token['access_token'] | |
if not key: | |
key = url | |
url = f"{_oauth2['api_base']}/v6/{url}" | |
_t = time.time() | |
if key not in discord_cache or _t - discord_cache.get(key)['s'] > 0: | |
try: | |
rockutils.prefix_print(f"Retrieving {url}", prefix="Cacher") | |
async with aiohttp.ClientSession() as _session: | |
async with _session.get(url, headers={"Authorization": f"Bot {token}"}) as r: | |
data = await r.json() | |
if isinstance(data, dict) and data.get("code"): | |
rockutils.prefix_print( | |
f"Encountered bad response: {data}", prefix="Cacher") | |
discord_cache[key] = { | |
"d": default, | |
"s": _t + cachetime | |
} | |
return False, data.get("code", -1) | |
discord_cache[key] = { | |
"d": data, | |
"s": _t + cachetime | |
} | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
rockutils.prefix_print( | |
str(e), prefix="cache_discord", text_colour="red") | |
return False, [] | |
return True, discord_cache[key]['d'] | |
async def has_elevation(guild_id, member_id, guild_info=None, bot_type="main"): | |
if not guild_info: | |
guild_info = await get_guild_info(guild_id) | |
if isinstance(guild_info, dict) and guild_info['d']['b']['hd']: | |
bot_type = "donator" | |
bot_type = "debug" if _debug else bot_type | |
guild_success, guild = await cache_discord(f"guilds/{guild_id}", bot_type, key=f"guild:{guild_id}", cachetime=600) | |
if guild_success: | |
if "owner_id" in guild and (int(guild['owner_id']) == int(member_id)): | |
return True | |
if guild_info: | |
# check staff list and if they are on it | |
if guild_info.get("st"): | |
if str(member_id) in guild_info['st']['u']: | |
return True | |
member_success, member = await cache_discord(f"guilds/{guild_id}/members/{member_id}", bot_type, key=f"member:{guild_id}:{member_id}") | |
role_success, roles = await cache_discord(f"guilds/{guild_id}/roles", bot_type, key=f"role:{guild_id}") | |
# get member and roles they have and check if they have certain roles | |
if member_success and role_success and "roles" in member: | |
for role_id in map(int, member['roles']): | |
for role in roles: | |
if int(role['id']) == role_id: | |
permissions = discord.permissions.Permissions( | |
role['permissions']) | |
if permissions.manage_guild or permissions.ban_members or permissions.administrator: | |
return True | |
return False | |
def get_user(token): | |
_t = time.time() | |
key = token['access_token'] | |
if key not in user_cache or _t - user_cache.get(key)['s'] > 0: | |
try: | |
discord = make_session(token=token) | |
user = discord.get(_oauth2['api_base'] + '/users/@me') | |
guilds = discord.get(_oauth2['api_base'] + '/users/@me/guilds') | |
user_cache[key] = { | |
"d": {"user": user.json(), "guilds": guilds.json()}, | |
"s": _t + 60 | |
} | |
data = user_cache[key]['d'] | |
if str(user.status_code)[0] != "2": | |
return False, data | |
if str(guilds.status_code)[0] != "2": | |
return False, data | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
print(f"[get_user] {e}") | |
return False, {} | |
data = user_cache[key]['d'] | |
# if "message" in data['user'] and "401: Unauthorized" in data['user'][ | |
# 'message']: | |
# return False, data | |
return True, data | |
# has_elevation(guild id, member id, bot type) | |
# has_elevation(436243598925627392, 143090142360371200, "debug") | |
############################################################# | |
# DASHBOARD PAGES | |
############################################################# | |
@app.route("/logout") | |
async def _logout(): | |
session.clear() | |
return redirect("/") | |
@app.route("/login") | |
async def _login(): | |
discord = make_session(scope=['identify', 'guilds']) | |
authorization_url, state = discord.authorization_url( | |
_oauth2['authorization_url']) | |
session['oauth2_state'] = state | |
return redirect(authorization_url) | |
@app.route("/callback") | |
async def _callback(): | |
try: | |
discord = make_session(state=session.get( | |
"oauth2_state"), scope=['identify', 'guilds']) | |
token = discord.fetch_token( | |
_oauth2['token_url'], | |
client_secret=_oauth2['client_secret'], | |
authorization_response=request.url) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
print(f"[callback] {e}") | |
return redirect("/login") | |
user_success, user = get_user(token) | |
if not user_success: | |
return redirect("/login") | |
_t = math.ceil(time.time()) | |
session['oauth2_token'] = token | |
session['oauth2_check'] = math.ceil(time.time()) | |
session['user_id'] = str(user['user']['id']) | |
session['user'] = str(user['user']['username']) + \ | |
"#" + str(user['user']['discriminator']) | |
session['user_data'] = user['user'] | |
session['guild_data'] = user['guilds'] | |
session['reloaded_data'] = _t | |
session['dashboard_guild'] = "-" | |
session['developer_mode'] = False | |
session.permanent = True | |
if session.get("previous_path"): | |
return redirect(session['previous_path']) | |
return redirect("/dashboard") | |
def valid_oauth_required(f): | |
@wraps(f) | |
def decorated_function(*args, **kwargs): | |
session['previous_path'] = request.path | |
if session.get("oauth2_token") is None: | |
return redirect("/login") | |
should_check = False | |
if sum( | |
1 if not session.get(c) else 0 for c in [ | |
'user_id', | |
'user_data', | |
'guild_data', | |
'oauth2_check']) > 0: | |
should_check = True | |
if session.get("oauth2_check") is None or time.time() - \ | |
session.get("oauth2_check") > 604800: | |
should_check = True | |
if should_check: | |
return redirect("/login") | |
return f(*args, **kwargs) | |
return decorated_function | |
def valid_dash_user(f): | |
@wraps(f) | |
def decorated_function(*args, **kwargs): | |
session['previous_path'] = request.path | |
# Manualy retrieving the guild data from oauth2 every | |
# time is not required as oauth is updated every 2 minutes. | |
# Handled by before_requests -> cache_data() | |
guilds = session.get("guild_data") | |
if not guilds: | |
cache_data() | |
guilds = session.get("guild_data") | |
if not guilds: | |
return redirect("/login") | |
guild = None | |
dashboard_guild = session.get("dashboard_guild") | |
for item in guilds: | |
if str(item.get("id")) == str(dashboard_guild): | |
guild = item | |
# redirects to invalid guild if no data and no developer mode | |
if not guild and not session.get("developer_mode", False): | |
return redirect("/dashboard?invalidguild") | |
# allow if they are the owner | |
if guild['owner']: | |
return f(*args, **kwargs) | |
# check users permissions from oauth2 | |
permissions = discord.permissions.Permissions(guild['permissions']) | |
if permissions.manage_guild or permissions.ban_members or permissions.administrator: | |
return f(*args, **kwargs) | |
# check if has elevation | |
if not has_elevation( | |
guild['id'], | |
session.get("user_id"), | |
bot_type="main"): | |
if not has_elevation( | |
guild['id'], | |
session.get("user_id"), | |
bot_type="donator"): | |
return redirect("/dashboard?missingpermission") | |
return f(*args, **kwargs) | |
return decorated_function | |
@app.before_request | |
def cache_data(): | |
# Checks all oauth2 information is up to date and does not check if it is | |
# a static page | |
if "static" not in request.url: | |
print(f"Caching data for path {request.path}") | |
_t = math.ceil(time.time()) | |
if session.get("reloaded_data") and session.get( | |
"oauth2_token") and _t - session.get("reloaded_data") > 120: | |
user_success, user = get_user(session.get("oauth2_token")) | |
if user_success: | |
session['reloaded_data'] = _t | |
session['user_id'] = str(user['user']['id']) | |
session['user_data'] = user['user'] | |
session['guild_data'] = user['guilds'] | |
session.permanent = True | |
session.permanent = True | |
app.permanent_session_lifetime = timedelta(days=7) | |
# @app.route("/dashboard") | |
# @valid_oauth_required | |
# async def _dashboard(): | |
# return "I am dashboard" | |
# _args = list(dict(request.args).keys()) | |
# if len(_args) == 1: | |
# arg = _args[0] | |
# if arg == "missingpermission": | |
# pass | |
# elif arg == "invalidguild": | |
# pass | |
# elif arg == "missingdata": | |
# pass | |
# handle invalid guild | |
# handle missing guild info | |
# handle no permissions\ | |
@app.route("/api/getbackground/<background>") | |
async def getbackground(background): | |
cdnpath = config['cdn']['location'] | |
if "custom" not in background: | |
f = os.path.join(cdnpath, "Images", background) | |
else: | |
f = os.path.join(cdnpath, "CustomImages", background) | |
if not os.path.exists(f): | |
background = background.replace(".png", ".gif") | |
f = os.path.join(cdnpath, "CustomImages", background) | |
if not os.path.exists(f): | |
background = background.replace(".gif", ".jpg") | |
f = os.path.join(cdnpath, "CustomImages", background) | |
if not os.path.exists(f): | |
bgloc = f"preview-samples/none.png" | |
if not os.path.exists(bgloc): | |
txt = "This background could not be found" | |
f_h = open(os.path.join(cdnpath, "Images", "default.png"), "rb") | |
i_h = Image.open(f_h) | |
i_h = i_h.convert("RGBA") | |
size = 40 | |
while True: | |
default = ImageFont.truetype(os.path.join( | |
cdnpath, "Fonts", "default.ttf"), size) | |
size -= 1 | |
w, h = default.getsize(txt) | |
if w < i_h.size[0] or size < 2: | |
break | |
iw, ih = i_h.size | |
y = ih - 20 - h | |
x = math.floor((iw - w) / 2) | |
frame_draw = ImageDraw.Draw(i_h) | |
frame_draw.text((x + 1, y + 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x + 1, y - 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x - 1, y + 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x - 1, y - 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x, y), txt, font=default, fill=(255, 255, 255)) | |
i_h = i_h.convert("RGB") | |
i_h.save(bgloc, format="jpeg") | |
f_h.close() | |
else: | |
bgloc = f"preview-samples/{background}" | |
if not os.path.exists(bgloc) or "custom" in f.lower(): | |
txt = background.replace(".png", "") | |
txt = background.replace(".gif", "") | |
txt = background.replace(".jpg", "") | |
f_h = open(f, "rb") | |
i_h = Image.open(f_h) | |
i_h = i_h.convert("RGBA") | |
size = 40 | |
while True: | |
default = ImageFont.truetype(os.path.join( | |
cdnpath, "Fonts", "default.ttf"), size) | |
size -= 1 | |
w, h = default.getsize(txt) | |
if w < i_h.size[0] or size < 2: | |
break | |
iw, ih = i_h.size | |
y = ih - 20 - h | |
x = math.floor((iw - w) / 2) | |
frame_draw = ImageDraw.Draw(i_h) | |
frame_draw.text((x + 1, y + 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x + 1, y - 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x - 1, y + 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x - 1, y - 1), txt, font=default, fill=(0, 0, 0)) | |
frame_draw.text((x, y), txt, font=default, fill=(255, 255, 255)) | |
i_h = i_h.convert("RGB") | |
i_h.save(bgloc, format="jpeg") | |
f_h.close() | |
return await send_file(bgloc, cache_timeout=-1) | |
@app.route("/api/internal-status") | |
async def api_internal_status(): | |
return jsonify(cluster_status) | |
@app.route("/api/status") | |
async def api_status(): | |
_time = time.time() | |
clusters = {} | |
for i in set(list(["debug", "donator"] + | |
list(range(config['bot']['clusters'])))): | |
if i == "debug": | |
name = "DB" | |
elif i == "donator": | |
name = "DN" | |
else: | |
name = f"C{i}" | |
clusters[i] = { | |
"alive": False, | |
"pingalive": False, | |
"stats": {}, | |
"lastping": 0, | |
"id": i, | |
"name": name | |
} | |
for cluster, ping in last_ping.items(): | |
clusters[cluster]['alive'] = True | |
if _time - ping < 70: | |
clusters[cluster]['pingalive'] = True | |
clusters[cluster]['lastping'] = _time - ping | |
for cluster, data in cluster_data.items(): | |
clusters[cluster]['stats'] = data | |
clusters[cluster]['stats']['uptime'] = time.time() - data['init'] | |
clusters[cluster]['stats']['displayed'] = rockutils.produce_human_timestamp( | |
time.time() - data['init']) | |
clusters[cluster]['stats']['highest_latency'] = max( | |
list(map(lambda o: o[1], data['latencies']))) | |
clusters[cluster]['stats']['lowest_latency'] = max( | |
list(map(lambda o: o[1], data['latencies']))) | |
return jsonify(list(sorted(clusters.values(), key=lambda o: o['name']))) | |
@app.route("/api/search", methods=["GET", "POST"]) | |
async def api_search(): | |
form = dict(await request.form) | |
if "term" in form: | |
term = form.get("term") | |
if len(term) > 2: | |
try: | |
term = int(term) | |
canInt = True | |
except BaseException: | |
canInt = False | |
pass | |
if canInt: | |
result_payload = await create_job(request=None, o="guildsfind", a=[0, term], r="*") | |
payload = [] | |
for data in result_payload['d'].values(): | |
if data['success']: | |
payload = [data['results']] | |
else: | |
result_payload = await create_job(request=None, o="guildsfind", a=[1, term], r="*") | |
payload = rockutils.merge_results_lists(result_payload['d']) | |
return jsonify(success=True, data=payload) | |
else: | |
return jsonify(success=True, data=[]) | |
else: | |
return jsonify( | |
success=False, | |
error="Missing search term at key 'term'") | |
@app.route("/api/resetconfig", methods=["POST"]) | |
@valid_oauth_required | |
async def api_resetconfig(): | |
user_id = session.get("user_id", None) | |
guild_id = session.get("dashboard_guild", None) | |
if not guild_id: | |
return jsonify(success=False, error="Invalid guild key") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return jsonify(success=False, error="No guild information") | |
if not await has_elevation(guild_id, user_id, guild_info=guild_info, bot_type="main"): | |
if not await has_elevation(guild_id, user_id, guild_info=guild_info, bot_type="donator"): | |
return jsonify(success=False, error="Missing permissions") | |
r.table("guilds").get(guild_id).delete().run(rethink) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
@app.route("/api/invites", methods=["POST"]) | |
@valid_oauth_required | |
async def api_invites(): | |
user_id = session.get("user_id", None) | |
guild_id = session.get("dashboard_guild", None) | |
if not guild_id: | |
return jsonify(success=False, error="Invalid guild key") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return jsonify(success=False, error="No guild information") | |
if not await has_elevation(guild_id, user_id, guild_info=guild_info, bot_type="main"): | |
if not await has_elevation(guild_id, user_id, guild_info=guild_info, bot_type="donator"): | |
return jsonify(success=False, error="Missing permissions") | |
invites = [] | |
_time = time.time() | |
for invite in guild_info['d']['i']: | |
invite['duration'] = int(invite.get('duration', 0)) | |
invite['created_at_str'] = rockutils.since_unix_str( | |
invite['created_at']) | |
if invite['duration'] > 0: | |
invite['expires_at_str'] = rockutils.since_seconds_str( | |
_time - (invite['created_at'] + invite['duration']), include_ago=False) | |
else: | |
invite['expires_at_str'] = "∞" | |
invites.append(invite) | |
invites = sorted(invites, key=lambda o: o['uses'], reverse=True) | |
return jsonify(success=True, data=invites) | |
@app.route("/api/punishments/<mtype>", methods=["GET", "POST"]) | |
@valid_oauth_required | |
@valid_dash_user | |
async def api_punishments(mtype="false"): | |
# user id | user display | moderator id | moderator name | punishment type | |
# | reason | punishment start | punishment duration | is handled | |
guild_id = session.get("dashboard_guild", None) | |
if not guild_id: | |
return jsonify(success=False, error="Invalid guild key") | |
if not mtype.lower() in ['true', 'false', 'reset', 'export']: | |
mtype = "false" | |
if mtype.lower() in ['true', 'false']: | |
try: | |
if os.path.exists(f"punishments/{guild_id}.csv"): | |
file = open(f"punishments/{guild_id}.csv", "r") | |
data = csv.reader(file) | |
else: | |
data = [] | |
if mtype.lower() == "true": | |
data = list(filter(lambda o: o[8].lower() == "false", data)) | |
# punishment type | user display | moderator display | reason | | |
# date occured | duration | handled | |
_data = sorted(data, key=lambda o: int(o[6]), reverse=True) | |
data = [] | |
for audit in _data: | |
data.append( | |
[f"{audit[4][0].upper()}{audit[4][1:].lower()}", | |
f"{audit[1]} ({audit[0]})", f"{audit[3]} ({audit[2]})", | |
"-" if audit[5] in [None, "", " "] else audit[5].strip(), | |
rockutils.since_unix_str(int(audit[6])).strip(), | |
rockutils.since_seconds_str( | |
int(audit[7]), | |
allow_secs=True, include_ago=False).strip() | |
if int(audit[7]) > 0 else ("∞" if "temp" in audit[4].lower() else "-"), | |
"Handled" if audit[8].lower() == "true" else "Ongoing", ]) | |
return jsonify(success=True, data=data) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
elif mtype.lower() == "export": | |
if os.path.exists(f"punishments/{guild_id}.csv"): | |
return await send_file(f"punishments/{guild_id}.csv", as_attachment=True, attachment_filename=f"{guild_id}.csv") | |
else: | |
return jsonify(success=False) | |
elif mtype.lower() == "reset": | |
if os.path.exists(f"punishments/{guild_id}.csv"): | |
os.remove(f"punishments/{guild_id}.csv") | |
return jsonify(success=True) | |
else: | |
return jsonify(success=True) | |
@app.route("/api/logs/<mtype>", methods=["GET", "POST"]) | |
@valid_oauth_required | |
@valid_dash_user | |
async def api_logs(mtype="false"): | |
guild_id = session.get("dashboard_guild", None) | |
if not guild_id: | |
return jsonify(success=False, error="Invalid guild key") | |
if not mtype.lower() in ['view', 'reset', 'export']: | |
mtype = "view" | |
if mtype.lower() == "view": | |
try: | |
if os.path.exists(f"logs/{guild_id}.csv"): | |
file = open(f"logs/{guild_id}.csv", "r") | |
data = list(csv.reader(file)) | |
else: | |
data = [] | |
return jsonify(success=True, data=data) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
elif mtype.lower() == "export": | |
if os.path.exists(f"logs/{guild_id}.csv"): | |
return await send_file(f"logs/{guild_id}.csv", as_attachment=True, attachment_filename=f"{guild_id}.csv") | |
else: | |
return jsonify(success=False) | |
elif mtype.lower() == "reset": | |
if os.path.exists(f"logs/{guild_id}.csv"): | |
os.remove(f"logs/{guild_id}.csv") | |
return jsonify(success=True) | |
else: | |
return jsonify(success=True) | |
@app.route("/api/invite/<id>", methods=["GET", "POST"]) | |
@valid_oauth_required | |
async def api_invite(id): | |
guild_info = await get_guild_info(id) | |
if not guild_info: | |
return jsonify(success=False, error="This guild is not valid") | |
if not guild_info['d']['b']['ai']: | |
return jsonify( | |
success=False, | |
error="This server has disabled invites through welcomer") | |
has_invite = False | |
invite_code = "" | |
responce = await create_job(o="guildinvite", a=str(id), r="*") | |
for cluster_id, cluster_data in responce['d'].items(): | |
print(cluster_data) | |
if cluster_data.get("success", False) and \ | |
cluster_data.get("invite", None): | |
invite_code = cluster_data['invite'] | |
has_invite = True | |
if not has_invite: | |
return jsonify( | |
success=False, | |
error="Failed to create an invite. The bot may be missing permissions") | |
return jsonify(success=True, code=invite_code) | |
# /api/logs/retrieve | |
# /api/logs/clear | |
# /api/logs/export | |
# /api/punishments/retrieve | |
# /api/punishments/clear | |
# /api/punishments/export | |
########################################################################## | |
@app.route("/donate/<dtype>/<months>") | |
@valid_oauth_required | |
async def donate_create(dtype, months): | |
donation_types = { | |
"0": ["Welcomer Custom Background"], | |
"1": ["Welcomer Pro x1"], | |
"3": ["Welcomer Pro x3"], | |
"5": ["Welcomer Pro x5"] | |
} | |
dontype = dtype + ":" + str(months) | |
prices = { | |
"0:1": 1, | |
"1:1": 5, | |
"1:3": 13.50, | |
"1:6": 24.00, | |
"3:1": 10, | |
"3:3": 27, | |
"3:6": 48, | |
"5:1": 15, | |
"5:3": 40.50, | |
"5:6": 72.00, | |
} | |
if str(dtype) == "0": | |
months = "ever" | |
else: | |
months = f" {months} month(s)" | |
if dontype not in prices.keys(): | |
return redirect("/donate?invaliddonation") | |
donation = donation_types.get(dtype) | |
price = prices.get(dontype) | |
description = f"{donation[0]} for{months}" | |
patreon_webhook = config['webhooks']['donations'] | |
if not donation: | |
return redirect("/donate?invaliddonation") | |
payer = {"payment_method": "paypal"} | |
items = [{"name": dontype, "price": price, | |
"currency": "GBP", "quantity": "1"}] | |
amount = {"total": price, "currency": "GBP"} | |
redirect_urls = { | |
"return_url": f"https://{_domain}/donate/confirm?success=true", | |
"cancel_url": "http://127.0.0.1:15007/donate?cancel"} | |
payment = paypalrestsdk.Payment({"intent": "sale", | |
"payer": payer, | |
"redirect_urls": redirect_urls, | |
"transactions": [{"item_list": {"items": items}, | |
"amount": amount, | |
"description": description}]}, | |
api=paypal_api) | |
valid = payment.create() | |
if valid: | |
for link in payment.links: | |
if link['method'] == "REDIRECT": | |
return redirect(link["href"]) | |
else: | |
await rockutils.send_webhook(patreon_webhook, f"`{str(payment.error)}`") | |
return redirect("/donate?paymentcreateerror") | |
@app.route("/donate/confirm") | |
@valid_oauth_required | |
async def donate_confirm(): | |
user = session['user_data'] | |
userid = session['user_id'] | |
responce = await create_job(o="retrieveuser", a=userid, r="*") | |
userinfo = r.table("users").get(str(userid)).run(rethink) | |
if not userinfo: | |
return redirect("/donate?userinfogone") | |
try: | |
payment = paypalrestsdk.Payment.find(request.args.get('paymentId')) | |
except BaseException: | |
return redirect("/donate?invalidpaymentid") | |
name = payment.transactions[0]['item_list']['items'][0]['name'] | |
amount = float(payment.transactions[0]["amount"]["total"]) | |
prices = { | |
"0:1": 1, | |
"1:1": 5, | |
"1:3": 13.50, | |
"1:6": 24.00, | |
"3:1": 10, | |
"3:3": 27, | |
"3:6": 48, | |
"5:1": 15, | |
"5:3": 40.50, | |
"5:6": 72.00, | |
} | |
if name not in prices or amount != prices.get(name): | |
return redirect("/donate?invalidprice") | |
dtype, months = name.split(":") | |
try: | |
months = int(months) | |
except BaseException: | |
return redirect("/donate?invalidmonth") | |
if not dtype or not months: | |
return redirect("/donate?invalidarg") | |
donation_types = { | |
"0": ["Welcomer Custom Background"], | |
"1": ["Welcomer Pro x1"], | |
"3": ["Welcomer Pro x3"], | |
"5": ["Welcomer Pro x5"] | |
} | |
dname = donation_types.get(dtype) | |
if payment.execute({"payer_id": request.args.get('PayerID')}): | |
patreon_webhook = config['webhooks']['donations'] | |
message = f"""User: `{user.get('username')}` <@{userid}>\nDonation: **{dname[0]}** (£ **{amount}**)\nMonths: {months}""" | |
await rockutils.send_webhook(patreon_webhook, message) | |
try: | |
name = payment.transactions[0]["amount"] | |
if dtype == "0": | |
userinfo['m']['hs'] = True | |
membership_name = "Custom Background" | |
if dtype == "1": | |
userinfo['m']['1']['h'] = True | |
userinfo['m']['1']['u'] = 2592000 * (months * 31) | |
membership_name = "Welcomer Pro x1" | |
elif dtype == "3": | |
userinfo['m']['3']['h'] = True | |
userinfo['m']['3']['u'] = 2592000 * (months * 31) | |
membership_name = "Welcomer Pro x3" | |
elif dtype == "5": | |
userinfo['m']['5']['h'] = True | |
userinfo['m']['5']['u'] = 2592000 * (months * 31) | |
membership_name = "Welcomer Pro x5" | |
else: | |
membership_name = f"Donation (£{amount})" | |
r.table("users").get(str(userid)).update(userinfo).run(rethink) | |
receps = [] | |
for cluster_id, cluster_data in responce['d'].items(): | |
if cluster_data.get("success", False): | |
receps.append(cluster_id) | |
for recep in receps: | |
data = await create_job(o="donationannounce", a=[userid, membership_name], r=recep) | |
if recep in data['d'] and data['d'][recep]: | |
break | |
return redirect("/donate?success") | |
except Exception as e: | |
await rockutils.send_webhook(patreon_webhook, f":warning: | <@143090142360371200> Problem processing donation for <@{userid}>\n`{e}`") | |
return redirect("/donate?fatalexception") | |
else: | |
return redirect("/") | |
########################################################################## | |
# @app.route("/dashboard/{}", methods=['GET', 'POST']) | |
# @valid_oauth_required | |
# @valid_dash_user | |
# async def dashboard_(): | |
# guild_id = session.get("dashboard_guild", None) | |
# guilds = list(sorted(session.get("guild_data", []), key=lambda o: o.get("name", ""))) | |
# guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
# if not guild_id: | |
# return redirect("/dashboard") | |
# guild_info = await get_guild_info(guild_id) | |
# guild_donations = await get_guild_donations(guild_info) | |
# if not guild_info: | |
# return redirect("/dashboard?missingdata") | |
# if request.method == "GET": | |
# _config = { | |
# "_all_translations": os.listdir("locale"), | |
# "_donations": guild_donations, | |
# "prefix": guild_info['d']['b']['p'], | |
# "locale": guild_info['d']['b']['l'], | |
# "forcedms": guild_info['d']['g']['fd'], | |
# "embedcolour": guild_info['d']['b']['ec'], | |
# "showstaff": guild_info['d']['b']['ss'], | |
# "allowinvites": guild_info['d']['b']['ai'], | |
# "splash": guild_info['d']['b']['s'] or "", | |
# "description": guild_info['d']['b']['d'] or "" | |
# } | |
# return await render_template("dashboard/botconfig.html", | |
# session=session, guild=guild, guilds=guilds, config=config) | |
# if request.method == "POST": | |
# form = dict(await request.form) | |
# try: | |
# update_payload = {} | |
# replaces = [ | |
# ["d.b.p", form['prefix'], False, None, "prefix" ], | |
# ["d.b.l", form['locale'], False, None, "locale" ], | |
# ["d.b.d", form['description'], False, None, "description" ], | |
# ["d.b.ec", form['embedcolour'], True, "hex", "embedcolour" ], | |
# ["d.b.ss", form['showstaff'], True, "bool", "showstaff" ], | |
# ["d.b.ai", form['allowinvites'], True, "bool", "allowinvites" ], | |
# ["d.g.fd", form['forcedms'], True, "bool", "forcedms" ], | |
# ] | |
# for replace in replaces: | |
# if replace[2] and replace[3] and expect(replace[1], replace[3]) or not replace[2]: | |
# if not replace[1] is None: | |
# value = normalize_form(replace[1]) | |
# oldvalue = rockutils.getvalue(replace[0], guild_info) | |
# rockutils.setvalue(replace[0], guild_info, value) | |
# if oldvalue != value: | |
# update_payload[replace[4]] = [oldvalue, value] | |
# if len(guild_donations) > 0: | |
# parsed_url = urlparse(form['splash']) | |
# if "imgur.com" in parsed_url.netloc: | |
# guild_info['d']['b']['s'] = form['splash'] | |
# if form['splash'] != rockutils.getvalue("d.b.s", guild_info): | |
# update_payload['splash'] = [guild_info['d']['b']['s'], form['splash']] | |
# await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user_id", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
# await update_guild_info(guild_id, guild_info) | |
# await create_job(o="cachereload", a=guild_id, r="*") | |
# return jsonify(success=True) | |
# except Exception as e: | |
# return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/botconfig", methods=["GET", "POST"]) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_botconfig(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
guild_donations = await get_guild_donations(guild_info) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"_all_translations": os.listdir("locale"), | |
"_donations": guild_donations, | |
"prefix": guild_info['d']['b']['p'], | |
"locale": guild_info['d']['b']['l'], | |
"forcedms": guild_info['d']['g']['fd'], | |
"embedcolour": guild_info['d']['b']['ec'], | |
"showstaff": guild_info['d']['b']['ss'], | |
"allowinvites": guild_info['d']['b']['ai'], | |
"splash": guild_info['d']['b']['s'] or "", | |
"description": guild_info['d']['b']['d'] or "" | |
} | |
return await render_template("dashboard/botconfig.html", session=session, guild=guild, guilds=guilds, config=_config) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
["d.b.p", form['prefix'], False, None, "bot.prefix"], | |
["d.b.l", form['locale'], False, None, "bot.locale"], | |
["d.g.fd", form['forcedms'], True, "bool", "bot.forcedms"], | |
["d.b.ss", form['showstaff'], True, "bool", "bot.showstaff"], | |
["d.b.d", form['description'], False, None, "bot.description"], | |
["d.b.ec", form['embedcolour'], True, "hex", "bot.embedcolour"], | |
["d.b.ai", form['allowinvites'], True, "bool", "bot.allowinvites"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(guild_donations) > 0: | |
parsed_url = urlparse(form['splash']) | |
if "imgur.com" in parsed_url.netloc: | |
rockutils.setvalue("d.b.s", guild_info, form['splash']) | |
if form['splash'] != rockutils.getvalue( | |
"d.b.s", guild_info): | |
rockutils.setvalue( | |
"bot.splash", update_payload, [ | |
rockutils.getvalue( | |
"d.b.s", guild_info), form['splash']]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/guilddetails") | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_guilddetails(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
await create_job(o="cachereload", a=guild_id, r="*") | |
guild_info = await get_guild_info(guild_id) | |
guild_donations = await get_guild_donations(guild_info) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
return await render_template("dashboard/guilddetails.html", session=session, guild=guild, guilds=guilds, config=guild_info, donations=guild_donations) | |
@app.route("/dashboard/invites") | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_invites(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
guild_donations = await get_guild_donations(guild_info) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
return await render_template("dashboard/invites.html", session=session, guild=guild, guilds=guilds, config=guild_info, donations=guild_donations) | |
@app.route("/dashboard/welcomergeneral", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_welcomergeneral(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"channel": guild_info['w']['c'], | |
"badges": guild_info['w']['b'], | |
"invited": guild_info['w']['iv'], | |
"imagesenabled": guild_info['w']['i']['e'], | |
"textenabled": guild_info['w']['t']['e'], | |
"dmsenabled": guild_info['w']['dm']['e'], | |
"embedenabled": guild_info['w']['ue'], | |
"customembed": guild_info['w']['em'][1], | |
} | |
channellist = [] | |
for channel in sorted( | |
guild_info['d']['c']['t'], | |
key=lambda o: o['position']): | |
channellist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['w']['c']: | |
_config['welcomerchannel'] = f"#deleted-channel ({guild_info['w']['c']})" | |
results = list(filter(lambda o, id=guild_info['w']['c']: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
_config['welcomerchannel'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['welcomerchannel'] = "" | |
return await render_template("dashboard/welcomergeneral.html", session=session, guild=guild, guilds=guilds, config=_config, channellist=channellist) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['w.b', form['badges'], True, "bool", "welcomer.badges"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = re.findall(r"#.+ \(([0-9]+)\)", form['channel']) | |
if len(value) == 1: | |
value = value[0] | |
results = list(filter(lambda o, id=value: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
oldvalue = rockutils.getvalue("w.c", guild_info) | |
rockutils.setvalue("w.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"welcomer.channel", update_payload, [ | |
oldvalue, value]) | |
else: | |
value = None | |
oldvalue = rockutils.getvalue("w.c", guild_info) | |
rockutils.setvalue("w.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"welcomer.channel", update_payload, [ | |
oldvalue, value]) | |
value = form['customembed'] | |
loader_type = None | |
try: | |
yaml.load(value, Loader=yaml.SafeLoader) | |
loader_type = "y" | |
except BaseException: | |
pass | |
try: | |
json.loads(value) | |
loader_type = "j" | |
except BaseException: | |
pass | |
if loader_type: | |
oldvalue = rockutils.getvalue("w.em", guild_info)[1] | |
rockutils.setvalue("w.em", guild_info, [loader_type, value]) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"welcomer.customembed", update_payload, None) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/welcomerimages", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_welcomerimages(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
guild_donations = await get_guild_donations(guild_info) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
# welcomer channel | |
# welcomer badges | |
# welcomer invited | |
# welcomer images enable | |
# welcomer text enabled | |
# welcomer dm enabled | |
if request.method == "GET": | |
_config = { | |
"enableimages": guild_info['w']['i']['e'], | |
"textcolour": guild_info['w']['i']['c']['b'], | |
"textbordercolour": guild_info['w']['i']['c']['bo'], | |
"profileoutlinecolour": guild_info['w']['i']['c']['pb'], | |
"imagebordercolour": guild_info['w']['i']['c']['ib'], | |
"enableimageborder": guild_info['w']['i']['b'], | |
"textalign": guild_info['w']['i']['a'], | |
"texttheme": guild_info['w']['i']['t'], | |
"profilebordertype": guild_info['w']['i']['pb'], | |
"message": guild_info['w']['i']['m'], | |
"background": guild_info['w']['i']['bg'], | |
} | |
backgrounds = list(map(lambda o: pathlib.Path(o).stem, os.listdir( | |
os.path.join(config['cdn']['location'], "Images")))) | |
return await render_template("dashboard/welcomerimages.html", session=session, guild=guild, guilds=guilds, config=_config, backgrounds=backgrounds) | |
if request.method == "POST": | |
files = await request.files | |
file = files.get('file', None) | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
_allow_hq, _allow_gif = False, False | |
if len(guild_donations) > 0: | |
_allow_hq = True | |
if "donation" in guild_donations: | |
_allow_gif = True | |
if file: | |
location = os.path.join( | |
config['cdn']['location'], | |
"CustomImages", | |
f"custom_{guild['id']}") | |
isgif = imghdr.what(file.filename, h=file.read()) in [ | |
"gif", "webp"] | |
image = Image.open(file) | |
if _allow_gif: | |
if isgif: | |
location += ".gif" | |
image.save(location, "gif") | |
elif _allow_hq: | |
location += ".png" | |
image.save(location, "png") | |
else: | |
location += ".jpg" | |
image = image.convert("RGB") | |
image.save(location, "jpeg", quality=30, | |
optimize=True, progressive=True) | |
else: | |
if _allow_hq: | |
location += ".png" | |
image.save(location, "png") | |
else: | |
location += ".jpg" | |
image = image.convert("RGB") | |
image.save(location, "jpeg", quality=30, | |
optimize=True, progressive=True) | |
form['background'] = f"custom_{guild['id']}" | |
replaces = [ | |
# ['w.b', form['badges'], True, "bool", "badges"], | |
['w.i.e', form['enableimages'], | |
True, "bool", "welcomer.imagesenabled"], | |
['w.i.c.b', form['textcolour'], True, | |
"hex", "welcomerimg.textcolour"], | |
['w.i.c.bo', form['textbordercolour'], True, | |
"hex", "welcomerimg.textbordercolour"], | |
['w.i.c.pb', form['profileoutlinecolour'], True, | |
"hex", "welcomerimg.profileoutlinecolour"], | |
['w.i.c.ib', form['imagebordercolour'], True, | |
"hex", "welcomerimg.imagebordercolour"], | |
['w.i.b', form['enableimageborder'], True, | |
"bool", "welcomerimg.enableimageborder"], | |
['w.i.a', form['textalign'], | |
False, None, "welcomerimg.textalign"], | |
['w.i.t', form['texttheme'], True, | |
"int", "welcomerimg.texttheme"], | |
['w.i.pb', form['profilebordertype'], True, | |
"int", "welcomerimg.profilebordertype"], | |
['w.i.m', form['message'], | |
False, None, "welcomerimg.message"], | |
['w.i.bg', form['background'], | |
False, None, "welcomerimg.background"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/welcomertext", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_welcomertext(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"badges": guild_info['w']['b'], | |
"invited": guild_info['w']['iv'], | |
"textenabled": guild_info['w']['t']['e'], | |
"embedenabled": guild_info['w']['ue'], | |
"customembed": guild_info['w']['em'][1], | |
"message": guild_info['w']['t']['m'], | |
} | |
channellist = [] | |
for channel in sorted( | |
guild_info['d']['c']['t'], | |
key=lambda o: o['position']): | |
channellist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['w']['c']: | |
_config['welcomerchannel'] = f"#deleted-channel ({guild_info['w']['c']})" | |
results = list(filter(lambda o, id=guild_info['w']['c']: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
_config['welcomerchannel'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['welcomerchannel'] = "" | |
return await render_template("dashboard/welcomertext.html", session=session, guild=guild, guilds=guilds, config=_config, channellist=channellist) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['w.b', form['badges'], True, | |
"bool", "welcomer.badges"], | |
['w.iv', form['invited'], True, | |
"bool", "welcomer.invited"], | |
['w.t.e', form['textenabled'], True, | |
"bool", "welcomer.textenabled"], | |
['w.ue', form['embedenabled'], True, | |
"bool", "welcomer.embedenabled"], | |
['w.t.m', form['message'], False, | |
None, "welcomer.message"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = form['customembed'] | |
loader_type = None | |
try: | |
yaml.load(value, Loader=yaml.SafeLoader) | |
loader_type = "y" | |
except BaseException: | |
pass | |
try: | |
json.loads(value) | |
loader_type = "j" | |
except BaseException: | |
pass | |
if loader_type: | |
oldvalue = rockutils.getvalue("w.em", guild_info)[1] | |
rockutils.setvalue("w.em", guild_info, [loader_type, value]) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"welcomer.customembed", update_payload, None) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/welcomerdms", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_welcomerdms(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"dmsenabled": guild_info['w']['dm']['e'], | |
"embedenabled": guild_info['w']['dm']['ue'], | |
"customembed": guild_info['w']['dm']['em'][1], | |
"message": guild_info['w']['dm']['m'], | |
} | |
channellist = [] | |
for channel in sorted( | |
guild_info['d']['c']['t'], | |
key=lambda o: o['position']): | |
channellist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['w']['c']: | |
_config['welcomerchannel'] = f"#deleted-channel ({guild_info['w']['c']})" | |
results = list(filter(lambda o, id=guild_info['w']['c']: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
_config['welcomerchannel'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['welcomerchannel'] = "" | |
return await render_template("dashboard/welcomerdms.html", session=session, guild=guild, guilds=guilds, config=_config, channellist=channellist) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['w.dm.e', form['dmenabled'], True, | |
"bool", "welcomerdms.enabled"], | |
['w.dm.ue', form['embedenabled'], True, | |
"bool", "welcomerdms.embedenabled"], | |
['w.dm.m', form['message'], False, | |
None, "welcomerdms.message"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = form['customembed'] | |
loader_type = None | |
try: | |
yaml.load(value, Loader=yaml.SafeLoader) | |
loader_type = "y" | |
except BaseException: | |
pass | |
try: | |
json.loads(value) | |
loader_type = "j" | |
except BaseException: | |
pass | |
if loader_type: | |
oldvalue = rockutils.getvalue("w.dm.em", guild_info)[1] | |
rockutils.setvalue("w.dm.em", guild_info, [loader_type, value]) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"welcomerdms.customembed", update_payload, None) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/punishments", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_punishments(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"forcereason": guild_info['m']['fr'], | |
"logmoderation": guild_info['lo']['m'] | |
} | |
hasfile = os.path.exists(f"punishments/{guild_id}.csv") | |
return await render_template("dashboard/punishmentmanage.html", session=session, guild=guild, guilds=guilds, config=_config, has_file=hasfile) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['m.fr', form['forcereason'], True, | |
"bool", "moderation.forcereason"], | |
['lo.m', form['logmoderation'], True, | |
"bool", "logging.logmoderation"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/punishments/<ptype>") | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_punishments_view(ptype="recent"): | |
if ptype not in ['recent', 'active']: | |
ptype = "recent" | |
if ptype == "active": | |
isactive = True | |
ptype = "true" | |
else: | |
isactive = False | |
ptype = "false" | |
ptype = f"/api/punishments/{ptype}" | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
guild_donations = await get_guild_donations(guild_info) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
return await render_template("dashboard/punishmentview.html", session=session, guild=guild, guilds=guilds, config=guild_info, donations=guild_donations, ptype=ptype, isactive=isactive) | |
@app.route("/dashboard/timerole", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_timerole(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
roles = guild_info['d']['r']['r'] | |
setroles = {} | |
for role in roles: | |
role['enabled'] = False | |
role['seconds'] = "" | |
setroles[role['id']] = role | |
for role in guild_info['tr']['r']: | |
if role[0] in setroles: | |
setroles[role[0]]['enabled'] = True | |
setroles[role[0]]['seconds'] = str(role[1]) | |
for id, value in setroles.items(): | |
try: | |
setroles[id]['seconds'] = math.floor( | |
int(setroles[id]['seconds']) / 60) | |
except BaseException: | |
pass | |
if guild_id in setroles: | |
del setroles[str(guild_id)] | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['tr']['e'], | |
"roles": setroles, | |
} | |
return await render_template("dashboard/timerole.html", session=session, guild=guild, guilds=guilds, config=_config, roles=roles) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['tr.e', form['enabled'], True, "bool", "timerole.enable"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
timeroles = [] | |
for id, role in form['roles'].items(): | |
if role[0]: | |
try: | |
seconds = int(math.floor(float(role[1])) * 60) | |
if seconds > 0: | |
timeroles.append([str(id), str(seconds)]) | |
except BaseException: | |
pass | |
oldvalue = rockutils.getvalue("tr.r", guild_info) | |
rockutils.setvalue("tr.r", guild_info, timeroles) | |
if timeroles != oldvalue: | |
# log added roles | |
# log removed roles | |
# log edited roles | |
rockutils.setvalue("freerole.roles", update_payload, "") | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/autorole", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_autorole(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
roles = guild_info['d']['r']['r'] | |
setroles = {} | |
for role in roles: | |
role['enabled'] = False | |
setroles[role['id']] = role | |
for role in guild_info['ar']['r']: | |
if role in setroles: | |
setroles[role]['enabled'] = True | |
if guild_id in setroles: | |
del setroles[str(guild_id)] | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['ar']['e'], | |
"roles": setroles | |
} | |
return await render_template("dashboard/autorole.html", session=session, guild=guild, guilds=guilds, config=_config, roles=roles) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['ar.e', form['enabled'], True, "bool", "autorole.enable"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
autorole = [] | |
for id, role in form['roles'].items(): | |
if role: | |
autorole.append(id) | |
oldvalue = rockutils.getvalue("ar.r", guild_info) | |
rockutils.setvalue("ar.r", guild_info, autorole) | |
if autorole != oldvalue: | |
# log added roles | |
# log removed roles | |
# log edited roles | |
rockutils.setvalue("autorole.roles", update_payload, "") | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/freerole", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_freerole(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
roles = guild_info['d']['r']['r'] | |
setroles = {} | |
for role in roles: | |
role['enabled'] = False | |
setroles[role['id']] = role | |
for role in guild_info['fr']['r']: | |
if role in setroles: | |
setroles[role]['enabled'] = True | |
if guild_id in setroles: | |
del setroles[str(guild_id)] | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['fr']['e'], | |
"roles": setroles | |
} | |
return await render_template("dashboard/freerole.html", session=session, guild=guild, guilds=guilds, config=_config, roles=roles) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['fr.e', form['enabled'], True, "bool", "freerole.enable"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
freerole = [] | |
for id, role in form['roles'].items(): | |
if role: | |
freerole.append(id) | |
oldvalue = rockutils.getvalue("fr.r", guild_info) | |
rockutils.setvalue("fr.r", guild_info, freerole) | |
if freerole != oldvalue: | |
# log added roles | |
# log removed roles | |
# log edited roles | |
rockutils.setvalue("freerole.roles", update_payload, "") | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/logs", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_logs(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['lo']['e'], | |
"audit": guild_info['lo']['a'], | |
"moderation": guild_info['lo']['m'], | |
"joinleaves": guild_info['lo']['jl'], | |
} | |
hasfile = os.path.exists(f"logs/{guild_id}.csv") | |
return await render_template("dashboard/logs.html", session=session, guild=guild, guilds=guilds, config=_config, has_file=hasfile) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['lo.e', form['enabled'], True, "bool", "logging.enabled"], | |
['lo.a', form['audit'], True, "bool", "logging.audit"], | |
['lo.m', form['moderation'], True, "bool", "logging.moderation"], | |
['lo.jl', form['joinleaves'], True, "bool", "logging.joinleaves"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if replace[1]: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv( | |
[ | |
math.floor(time.time()), | |
'CONFIG_CHANGE', | |
int(session.get("user_id", 0)), | |
session.get("user", ""), | |
json.dumps(update_payload) | |
], | |
f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/leaver", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_leaver(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['l']['e'], | |
"message": guild_info['l']['t'], | |
"embed": guild_info['l']['em'], | |
} | |
channellist = [] | |
for channel in sorted( | |
guild_info['d']['c']['t'], | |
key=lambda o: o['position']): | |
channellist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['l']['c']: | |
_config['channel'] = f"#deleted-channel ({guild_info['l']['c']})" | |
results = list(filter(lambda o, id=guild_info['l']['c']: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
_config['channel'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['channel'] = "" | |
return await render_template("dashboard/leaver.html", session=session, guild=guild, guilds=guilds, config=_config, channellist=channellist) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['l.e', form['enabled'], True, "bool", "leaver.enabled"], | |
['l.t', form['message'], False, None, "leaver.text"], | |
['l.em', form['embed'], True, "bool", "leaver.embed"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = re.findall(r"#.+ \(([0-9]+)\)", form['channel']) | |
if len(value) == 1: | |
value = value[0] | |
results = list(filter(lambda o, id=value: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
oldvalue = rockutils.getvalue("l.c", guild_info) | |
rockutils.setvalue("l.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"leaver.channel", update_payload, [ | |
oldvalue, value]) | |
else: | |
value = None | |
oldvalue = rockutils.getvalue("l.c", guild_info) | |
rockutils.setvalue("l.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"leaver.channel", update_payload, [ | |
oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/staff", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_staff(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"allowping": guild_info['st']['ap'], | |
} | |
ids = [v[0] for v in guild_info['st']['u']] | |
_config['staffids'] = ",".join(ids) | |
result_payload = await create_job(request=None, o="guildstaff", a=int(guild_id), r="*", timeout=2) | |
staff = [] | |
for data in result_payload['d'].values(): | |
if data['success']: | |
stafflist = data['data'] | |
staff = [] | |
for _staff in stafflist: | |
_staff['preferdms'] = True | |
for userid, preferdms in guild_info['st']['u']: | |
if str(userid) == str(_staff['id']): | |
_staff['preferdms'] = preferdms | |
staff.append(_staff) | |
return await render_template("dashboard/staff.html", session=session, guild=guild, guilds=guilds, config=_config, staff=staff) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['st.ap', form['allowping'], True, "bool", "staff.allowping"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
ids = form['staffids'].split(",") | |
staffids = [str(v[0]) for v in guild_info['st']['u']] | |
added, removed = [], [] | |
for id in ids: | |
if id not in staffids: | |
added.append(id) | |
for id in staffids: | |
if id not in ids: | |
removed.append(id) | |
stafflist = {} | |
for staff_id in ids: | |
stafflist[staff_id] = True | |
for staff_id, prefer_ping in guild_info['st']['u']: | |
if staff_id in ids: | |
stafflist[str(staff_id)] = prefer_ping | |
staff_list = [] | |
for staff_id, prefer_ping in stafflist.items(): | |
staff_list.append([ | |
staff_id, prefer_ping | |
]) | |
if len(added) + len(removed) > 0: | |
rockutils.setvalue("st.u", guild_info, staff_list) | |
rockutils.setvalue( | |
"staff.staff", update_payload, [ | |
added, removed]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/rules", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_rules(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['r']['e'], | |
"rules": "\n".join(guild_info['r']['r']), | |
} | |
return await render_template("dashboard/rules.html", session=session, guild=guild, guilds=guilds, config=_config) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['r.e', form['enabled'], True, "bool", "rules.enabled"], | |
['r.r', form['rules'].split("\n"), False, None, "rules.rules"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/automod", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_automod(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['am']['e'], | |
"smartmod": guild_info['am']['sm'], | |
"automodfiltinvites": guild_info['am']['g']['i'], | |
"automodfilturlredir": guild_info['am']['g']['ur'], | |
"automodfilturls": guild_info['am']['g']['ul'], | |
"automodfiltipgrab": guild_info['am']['g']['ig'], | |
"automodfiltmasscap": guild_info['am']['g']['mc'], | |
"automodfiltmassmen": guild_info['am']['g']['mm'], | |
"automodfiltprofan": guild_info['am']['g']['p'], | |
"automodfiltfilter": guild_info['am']['g']['f'], | |
"thresholdmention": guild_info['am']['t']['m'], | |
"thresholdcaps": guild_info['am']['t']['c'], | |
"filter": "\n".join(guild_info['am']['f']), | |
"regex": "\n".join(guild_info['am']['r']), | |
} | |
return await render_template("dashboard/automod.html", session=session, guild=guild, guilds=guilds, config=_config) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['am.e', form['enabled'], True, "bool", "automod.enabled"], | |
['am.sm', form['smartmod'], True, "bool", "automod.smartmod"], | |
['am.g.i', form['automodfiltinvites'], True, "bool", "automod.automodfiltinvites"], | |
['am.g.ur', form['automodfilturlredir'], True, "bool", "automod.automodfilturlredir"], | |
['am.g.ul', form['automodfilturls'], True, "bool", "automod.automodfilturls"], | |
['am.g.ig', form['automodfiltipgrab'], True, "bool", "automod.automodfiltipgrab"], | |
['am.g.mc', form['automodfiltmasscap'], True, "bool", "automod.automodfiltmasscap"], | |
['am.g.mm', form['automodfiltmassmen'], True, "bool", "automod.automodfiltmassmen"], | |
['am.g.p', form['automodfiltprofan'], True, "bool", "automod.automodfiltprofan"], | |
['am.g.f', form['automodfiltfilter'], True, "bool", "automod.automodfiltfilter"], | |
['am.t.m', form['thresholdmention'], True, "int", "automod.thresholdmention"], | |
['am.t.c', form['thresholdcaps'], True, "int", "automod.thresholdcaps"], | |
['am.f', form['filter'].split("\n"), False, None, "automod.filter"], | |
['am.r', form['regex'].split("\n"), False, None, "automod.regex"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/borderwall", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_borderwall(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
roles = guild_info['d']['r']['r'] | |
setroles = {} | |
for role in roles: | |
role['enabled'] = False | |
setroles[role['id']] = role | |
_roles = guild_info['bw']['r'] | |
if not isinstance(_roles, list): | |
_roles = [_roles] | |
for role in _roles: | |
if role in setroles: | |
setroles[role]['enabled'] = True | |
if guild_id in setroles: | |
del setroles[str(guild_id)] | |
if request.method == "GET": | |
_config = { | |
"channel": guild_info['bw']['c'], | |
"enabled": guild_info['bw']['e'], | |
"senddms": guild_info['bw']['d'], | |
"waittime": guild_info['bw']['w'], | |
"message": guild_info['bw']['m'], | |
"messageverify": guild_info['bw']['mv'], | |
"roles": setroles | |
} | |
channellist = [] | |
for channel in sorted( | |
guild_info['d']['c']['t'], | |
key=lambda o: o['position']): | |
channellist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['bw']['c']: | |
_config['channel'] = f"#deleted-channel ({guild_info['bw']['c']})" | |
results = list(filter(lambda o, id=guild_info['bw']['c']: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
_config['channel'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['channel'] = "" | |
return await render_template("dashboard/borderwall.html", session=session, guild=guild, guilds=guilds, config=_config, channellist=channellist, roles=roles) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['bw.e', form['enabled'], True, "bool", "borderwall.enabled"], | |
['bw.d', form['senddms'], True, "bool", "borderwall.senddms"], | |
['bw.w', form['waittime'], True, "int", "borderwall.waittime"], | |
['bw.m', form['message'], False, None, "borderwall.message"], | |
['bw.mv', form['messageverify'], False, None, "borderwall.messageverify"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = re.findall(r"#.+ \(([0-9]+)\)", form['channel']) | |
if len(value) == 1: | |
value = value[0] | |
results = list(filter(lambda o, id=value: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
oldvalue = rockutils.getvalue("bw.c", guild_info) | |
rockutils.setvalue("bw.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"borderwall.channel", update_payload, [ | |
oldvalue, value]) | |
else: | |
value = None | |
oldvalue = rockutils.getvalue("bw.c", guild_info) | |
rockutils.setvalue("bw.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"borderwall.channel", update_payload, [ | |
oldvalue, value]) | |
bwroles = [] | |
for id, role in form['roles'].items(): | |
if role: | |
bwroles.append(str(id)) | |
oldvalue = rockutils.getvalue("bw.r", guild_info) | |
rockutils.setvalue("bw.r", guild_info, bwroles) | |
if bwroles != oldvalue: | |
# log added roles | |
# log removed roles | |
# log edited roles | |
rockutils.setvalue("borderwall.roles", update_payload, "") | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/tempchannel", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_tempchannel(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['tc']['e'], | |
"category": guild_info['tc']['c'], | |
"lobby": guild_info['tc']['l'], | |
"defaultlimit": guild_info['tc']['dl'], | |
"autopurge": guild_info['tc']['ap'] | |
} | |
categorylist = [] | |
for channel in sorted( | |
guild_info['d']['c']['c'], | |
key=lambda o: o['position']): | |
categorylist.append(f"#{channel['name']} ({channel['id']})") | |
channellist = [] | |
for channel in sorted( | |
guild_info['d']['c']['t'], | |
key=lambda o: o['position']): | |
channellist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['tc']['c']: | |
_config['category'] = f"#deleted-category ({guild_info['tc']['c']})" | |
results = list(filter(lambda o, id=guild_info['tc']['c']: str( | |
o['id']) == str(id), guild_info['d']['c']['c'])) | |
if len(results) == 1: | |
_config['category'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['category'] = "" | |
if guild_info['tc']['l']: | |
_config['lobby'] = f"#deleted-channel ({guild_info['tc']['l']})" | |
results = list(filter(lambda o, id=guild_info['tc']['l']: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
_config['lobby'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['lobby'] = "" | |
return await render_template("dashboard/tempchannel.html", session=session, guild=guild, guilds=guilds, config=_config, channellist=channellist, categorylist=categorylist) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['tc.e', form['enabled'], True, "bool", "tempchannel.enabled"], | |
['tc.ap', form['autopurge'], False, None, "tempchannel.autopurge"], | |
['tc.dl', form['defaultlimit'], True, "int", "tempchannel.defaultlimit"] | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = re.findall(r"#.+ \(([0-9]+)\)", form['category']) | |
if len(value) == 1: | |
value = value[0] | |
results = list(filter(lambda o, id=value: str( | |
o['id']) == str(id), guild_info['d']['c']['c'])) | |
if len(results) == 1: | |
oldvalue = rockutils.getvalue("tc.c", guild_info) | |
rockutils.setvalue("tc.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"tempchannel.category", update_payload, [ | |
oldvalue, value]) | |
else: | |
value = None | |
oldvalue = rockutils.getvalue("tc.c", guild_info) | |
rockutils.setvalue("tc.c", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"tempchannel.category", update_payload, [ | |
oldvalue, value]) | |
value = re.findall(r"#.+ \(([0-9]+)\)", form['lobby']) | |
if len(value) == 1: | |
value = value[0] | |
results = list(filter(lambda o, id=value: str( | |
o['id']) == str(id), guild_info['d']['c']['t'])) | |
if len(results) == 1: | |
oldvalue = rockutils.getvalue("tc.l", guild_info) | |
rockutils.setvalue("tc.l", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"tempchannel.lobby", update_payload, [ | |
oldvalue, value]) | |
else: | |
value = None | |
oldvalue = rockutils.getvalue("tc.l", guild_info) | |
rockutils.setvalue("tc.l", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"tempchannel.lobby", update_payload, [ | |
oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/namepurge", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_namepurge(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['np']['e'], | |
"ignore": guild_info['np']['i'], | |
"filter": "\n".join(guild_info['np']['f']), | |
} | |
return await render_template("dashboard/namepurge.html", session=session, guild=guild, guilds=guilds, config=_config) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['np.e', form['enabled'], True, "bool", "namepurge.enabled"], | |
['np.i', form['ignore'], True, "bool", "namepurge.ignorebot"], | |
['np.f', form['filter'].split("\n"), False, None, "namepurge.filter"] | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/music", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_music(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"djsenabled": guild_info['mu']['d']['e'], | |
"djs": ",".join(guild_info['mu']['d']['l']), | |
"skiptype": guild_info['mu']['st'], | |
"threshold": guild_info['mu']['t'], | |
} | |
return await render_template("dashboard/music.html", session=session, guild=guild, guilds=guilds, config=_config) | |
if request.method == "POST": | |
form = dict(await request.form) | |
try: | |
update_payload = {} | |
replaces = [ | |
['mu.d.e', form['djsenabled'], True, "bool", "music.djsenabled"], | |
['mu.st', form['skiptype'], True, "int", "music.skiptype"], | |
['mu.t', form['threshold'], True, "int", "music.threshold"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
ids = form['djs'].split(",") | |
djids = guild_info['mu']['d']['l'] | |
added, removed = [], [] | |
for id in ids: | |
if id not in djids: | |
added.append(id) | |
for id in djids: | |
if id not in ids: | |
removed.append(id) | |
if len(added) + len(removed) > 0: | |
rockutils.setvalue("mu.d.l", guild_info, ids) | |
rockutils.setvalue( | |
"music.djslist", update_payload, [ | |
added, removed]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
@app.route("/dashboard/serverstats", methods=['GET', 'POST']) | |
@valid_oauth_required | |
@valid_dash_user | |
async def dashboard_serverstats(): | |
guild_id = session.get("dashboard_guild", None) | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild = list(filter(lambda o, id=guild_id: o['id'] == guild_id, guilds))[0] | |
if not guild_id: | |
return redirect("/dashboard") | |
guild_info = await get_guild_info(guild_id) | |
if not guild_info: | |
return redirect("/dashboard?missingdata") | |
if request.method == "GET": | |
_config = { | |
"enabled": guild_info['s']['e'], | |
"category": guild_info['s']['ca'], | |
"channels": guild_info['s']['c'], | |
} | |
categorylist = [] | |
for channel in sorted( | |
guild_info['d']['c']['c'], | |
key=lambda o: o['position']): | |
categorylist.append(f"#{channel['name']} ({channel['id']})") | |
if guild_info['s']['ca']: | |
_config['category'] = f"#deleted-category ({guild_info['s']['ca']})" | |
results = list(filter(lambda o, id=guild_info['s']['ca']: str( | |
o['id']) == str(id), guild_info['d']['c']['c'])) | |
if len(results) == 1: | |
_config['category'] = f"#{results[0]['name']} ({results[0]['id']})" | |
else: | |
_config['category'] = "" | |
return await render_template("dashboard/serverstats.html", session=session, guild=guild, guilds=guilds, config=_config, categorylist=categorylist) | |
if request.method == "POST": | |
form = json.loads(list(await request.form)[0]) | |
try: | |
update_payload = {} | |
replaces = [ | |
['s.e', form['enabled'], True, "bool", "serverstats.enabled"], | |
] | |
for replace in replaces: | |
if replace[2] and replace[3] and expect( | |
replace[1], replace[3]) or not replace[2]: | |
if not replace[1] is None: | |
value = normalize_form(replace[1]) | |
oldvalue = rockutils.getvalue(replace[0], guild_info) | |
rockutils.setvalue(replace[0], guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
replace[4], update_payload, [oldvalue, value]) | |
value = re.findall(r"#.+ \(([0-9]+)\)", form['category']) | |
if len(value) == 1: | |
value = value[0] | |
results = list(filter(lambda o, id=value: str( | |
o['id']) == str(id), guild_info['d']['c']['c'])) | |
if len(results) == 1: | |
oldvalue = rockutils.getvalue("s.ca", guild_info) | |
rockutils.setvalue("s.ca", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"serverstats.category", update_payload, [ | |
oldvalue, value]) | |
else: | |
value = None | |
oldvalue = rockutils.getvalue("s.ca", guild_info) | |
rockutils.setvalue("s.ca", guild_info, value) | |
if oldvalue != value: | |
rockutils.setvalue( | |
"serverstats.category", update_payload, [ | |
oldvalue, value]) | |
if len(update_payload) > 0: | |
await rockutils.logcsv([math.floor(time.time()), 'CONFIG_CHANGE', int(session.get("user_id", 0)), session.get("user", ""), json.dumps(update_payload)], f"{guild_id}.csv") | |
await update_guild_info(guild_id, guild_info) | |
await create_job(o="cachereload", a=guild_id, r="*") | |
return jsonify(success=True) | |
except Exception as e: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
return jsonify(success=False, error=str(e)) | |
# /dashboard/serverstats | |
# configured stats | |
# /dashboard/analytics | |
# enabled | |
# analytics listing | |
# change by (5 minute, 30 minute, 1 hour, 6 hour, 12 hour, 1 day, 1 week, 1 month) | |
# members joined | |
# members left | |
# member count | |
# messages sent | |
# bot count | |
# user count | |
# reset | |
# export | |
# ???? | |
# /dashboard/xp | |
# enabled | |
# reset | |
# (? may have xprole on the xp page or have it as a tab and have Manage and XPRole) | |
# /dashboard/xprole | |
# enabled | |
# roles (toggle then xp minimum) | |
# (? maybe a list saying how many users will have it) | |
# /formatting | |
# /customembeds | |
# /dashboard/logs/view | |
# /dashboard/api/xp/leaderboard | |
# /dashboard/api/xp/resetuser/<user id> | |
# /dashboard/api/xp/export | |
# /dashboard/api/xp/reset | |
# /profile | |
# /invite/<name> | |
# /guild/<id> | |
########################################################################## | |
@app.route("/") | |
async def index(): | |
guilds = sum(v.get('guilds', 0) for v in cluster_data.values()) | |
members = sum(v.get('members', 0) for v in cluster_data.values()) | |
return await render_template("index.html", session=session, guild_count=guilds, member_count=members) | |
path = config['cdn']['location'] | |
cdntotal = math.ceil(int(subprocess.check_output( | |
['du', '-s', path]).split()[0].decode('utf-8')) / 1000) | |
return await render_template("index.html", session=session, guild_count=guilds, member_count=members, cdntotal=cdntotal) | |
@app.route("/documentation") | |
async def documentation(): | |
try: | |
with open("cfg/documentation.yaml", 'r') as stream: | |
data_loaded = yaml.load(stream, Loader=yaml.BaseLoader) | |
loaded = True | |
except BaseException: | |
exc_info = sys.exc_info() | |
traceback.print_exception(*exc_info) | |
data_loaded = {} | |
loaded = False | |
return await render_template("documentation.html", session=session, loaded=loaded, docs=data_loaded) | |
@app.route("/status") | |
async def status(): | |
_time = time.time() | |
clusters = {} | |
for i in set(list(["debug", "donator"] + | |
list(range(config['bot']['clusters'])))): | |
clusters[i] = { | |
"alive": False, | |
"pingalive": False, | |
"stats": {}, | |
"lastping": 0 | |
} | |
for cluster, ping in last_ping.items(): | |
clusters[cluster]['alive'] = True | |
if _time - ping < 70: | |
clusters[cluster]['pingalive'] = True | |
clusters[cluster]['lastping'] = _time - ping | |
for cluster, data in cluster_data.items(): | |
clusters[cluster]['stats'] = data | |
clusters[cluster]['stats']['highest_latency'] = max( | |
list(map(lambda o: o[1], data['latencies']))) | |
clusters = list(sorted(clusters.items(), key=lambda o: str(o[0]))) | |
online = list( | |
sorted( | |
filter( | |
lambda o: o[1]['alive'] and o[1]['pingalive'], | |
clusters), | |
key=lambda o: str( | |
o[0]))) | |
degrade = list( | |
sorted( | |
filter( | |
lambda o: o[1]['alive'] and not o[1]['pingalive'], | |
clusters), | |
key=lambda o: str( | |
o[0]))) | |
offline = list( | |
sorted( | |
filter( | |
lambda o: not o[1]['alive'], | |
clusters), | |
key=lambda o: str( | |
o[0]))) | |
clusters = list(online + degrade + offline) | |
clusters = dict( | |
zip(map(lambda o: o[0], clusters), map(lambda o: o[1], clusters))) | |
return await render_template("status.html", session=session, clusters=clusters, botconfig=config['bot']) | |
@app.route("/search") | |
async def search(): | |
return await render_template("search.html", session=session) | |
@app.route("/guild/<id>") | |
@app.route("/g/<id>") | |
async def guild(id): | |
is_guild = False | |
staff = [] | |
guild_donations = [] | |
guild_info = await get_guild_info(id) | |
if guild_info: | |
is_guild = True | |
if is_guild: | |
result_payload = await create_job(request=None, o="guildstaff", a=int(id), r="*", timeout=1) | |
guild_donations = await get_guild_donations(guild_info) | |
for data in result_payload['d'].values(): | |
if data['success']: | |
staff = data['data'] | |
return await render_template("guild.html", session=session, valid_guild=is_guild, id=id, guildinfo=guild_info, donations=guild_donations, staff=staff) | |
@app.route("/invite/<id>") | |
@app.route("/i/<id>") | |
async def invite(id): | |
# add vanity invite stuff | |
invite_id = id | |
guild_info = await get_guild_info(id) | |
return await render_template("invite.html", session=session, id=invite_id, guild_id=id, guild_info=guild_info) | |
@app.route("/borderwall") | |
@app.route("/borderwall/<id>") | |
async def borderwall(id=""): | |
error = "" | |
borderwall_details = r.table("borderwall").get(id).run(rethink) or {} | |
if borderwall_details: | |
if borderwall_details['a']: | |
valid_borderwall = False | |
error = '<div class="success success-fill" role="info"><i class="mdi mdi-check"></i> This borderwall id has already been validated </div>' | |
else: | |
valid_borderwall = True | |
else: | |
valid_borderwall = False | |
error = '<div class="primary primary-fill" role="info">The borderwall id specified is not valid </div>' | |
ip = "Unknown" | |
for _h in ['HTTP_CF_CONNECTING_IP', 'HTTP_X_FORWARDED_FOR', 'REMOTE_ADDR']: | |
_ip = request.headers.get(_h, False) | |
if _ip: | |
ip = _ip | |
break | |
return await render_template("borderwall.html", headers=request.headers, session=session, data=borderwall_details, borderwall_id=id, ip=ip, error=error, show_bw=valid_borderwall) | |
@app.route("/borderwall/<id>/authorize", methods=['POST', 'GET']) | |
async def borderwall_authorize(id): | |
form = dict(await request.form) | |
token = form.get("token", False) | |
borderwall_details = r.table("borderwall").get(id).run(rethink) | |
if not token: | |
return jsonify(success=False, error="Invalid token given") | |
if not borderwall_details: | |
return jsonify(success=False, error="Invalid borderwall code given") | |
headers = { | |
"secret": config['keys']['recaptcha_server'], | |
"response": token | |
} | |
_ip = "Unknown" | |
for _h in ['HTTP_CF_CONNECTING_IP', 'HTTP_X_FORWARDED_FOR', 'REMOTE_ADDR']: | |
_ip = request.headers.get(_h, False) | |
if _ip: | |
headers['remoteip'] = _ip | |
ip = _ip | |
break | |
async with aiohttp.ClientSession() as _session: | |
async with _session.get("https://www.google.com/recaptcha/api/siteverify", headers=headers) as res: | |
gpayload = await res.json() | |
if gpayload['success']: | |
if gpayload['action'] == "borderwall": | |
if gpayload['score'] > 0.5: | |
borderwall_details['a'] = True | |
borderwall_details['ip'] = ip | |
r.table("borderwall").get(id).update( | |
borderwall_details).run(rethink) | |
await create_job(o="borderwallverify", a=[borderwall_details['ui'], borderwall_details['gi'], id], r="*") | |
return jsonify(success=True) | |
return jsonify( | |
success=False, | |
error="Your IP score was too low to verify. Please disable a VPN or Proxy if you are using one.") | |
return jsonify(success=False, | |
error="Invalid endpoint was used to verify") | |
return jsonify(success=False, error="Recaptcha failed") | |
return jsonify(success=True) | |
@app.route("/invitebot") | |
@app.route("/invitebot/<btype>") | |
async def invitebot(btype="bot"): | |
if btype == "bot": | |
id = config['ids']['main'] | |
elif btype == "beta": | |
id = config['ids']['debug'] | |
elif btype == "donator": | |
id = config['ids']['donator'] | |
else: | |
return redirect("/") | |
return redirect( | |
f"https://discordapp.com/oauth2/authorize?client_id={id}&scope=bot&permissions=8") | |
@app.route("/backgrounds") | |
async def backgrounds(): | |
path = os.path.join(config['cdn']['location'], "Images") | |
backgrounds = sorted(os.listdir(path)) | |
return await render_template("backgrounds.html", session=session, backgrounds=backgrounds, background_count=len(backgrounds)) | |
@app.route("/donate") | |
async def donate(): | |
error = "" | |
_args = list(dict(request.args).keys()) | |
if len(_args) == 1: | |
arg = _args[0] | |
if arg == "invaliddonation": | |
error = '<div class="alert alert-fill" role="info"><i class="mdi mdi-do-not-disturb"></i> An invalid donation type was specified in your request. If you have changed the donation url manually, change it back. </div>' | |
if arg == "paymentcreateerror": | |
error = '<div class="alert alert-fill-alert" role="alert"><i class="mdi mdi-do-not-disturb"></i> We were unable to create a payment for you to be able to use. Please try again. </div>' | |
if arg == "userinfogone": | |
error = '<div class="alert alert-fill-alert" role="alert"><i class="mdi mdi-do-not-disturb"></i> We were unable to retrieve your user information. Please try again. </div>' | |
if arg == "invalidpaymentid": | |
error = '<div class="alert alert-fill" role="info"><i class="mdi mdi-do-not-disturb"></i> Your request has returned an invalid payment id. Please attempt this again. Please note you have not been charged. </div>' | |
if arg == "invalidprice": | |
error = '<div class="alert alert-fill" role="info"><i class="mdi mdi-do-not-disturb"></i> Invalid price was specified in your request. If you have changed the donation url manually, change it back. </div>' | |
if arg == "invalidmonth": | |
error = '<div class="alert alert-fill" role="info"><i class="mdi mdi-do-not-disturb"></i> Invalid month was specified in your request. If you have changed the donation url manually, change it back. </div>' | |
if arg == "invalidarg": | |
error = '<div class="alert alert-fill-alert" role="alert"><i class="mdi mdi-do-not-disturb"></i> Your request is missing arguments. If you have changed the donation url manually, change it back. </div>' | |
if arg == "fatalexception": | |
error = '<div class="alert alert-fill-danger" role="danger"><i class="mdi mdi-do-not-disturb"></i> There was a problem assigning your donation role. Please visit the support server to fix this immediately as the payment has been processed </div>' | |
if arg == "success": | |
error = '<div class="alert alert-fill-success" role="alert"><i class="mdi mdi-check"></i> Thank you for donating! Your donation has gone through and been processed automatically. If you have not blocked your direct messages, you should receive a message from the bot and it should show on the support server under #donations </div>' | |
return await render_template("donate.html", session=session, error=error) | |
@app.route("/dashboard") | |
@valid_oauth_required | |
async def dashboard(): | |
error = "" | |
errortype = 0 | |
_args = list(dict(request.args).keys()) | |
if len(_args) == 1: | |
arg = _args[0] | |
if arg == "missingpermission": | |
error = '<div class="alert alert-fill-danger" role="alert"><i class="mdi mdi-do-not-disturb"></i> You do not have permission to view the dashboard of this server </div>' | |
errortype = 1 | |
elif arg == "invalidguild": | |
error = '<div class="alert alert-fill-danger" role="alert"><i class="mdi mdi-alert-circle"></i> The selected guild could not be found </div>' | |
errortype = 2 | |
elif arg == "missingdata": | |
error = '<div class="alert alert-fill-danger" role="alert"><i class="mdi mdi-database-remove"></i> Could not locate any data for this guild. <b>Please run a command on the server</b> </div>' | |
errortype = 3 | |
guilddata = None | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild_id = session.get("dashboard_guild", 0) | |
for item in guilds: | |
if str(item.get("id")) == str(guild_id): | |
guilddata = item | |
if guilddata: | |
guildinfo = await get_guild_info(guild_id) | |
else: | |
guildinfo = {} | |
# return jsonify(guilds, guild_id, guilddata, guildinfo) | |
return await render_template("dashboard/guilddetails.html", session=session, error=error, errortype=errortype, guilds=guilds, guild=guilddata, guildinfo=guildinfo) | |
@app.route("/dashboard/changeguild/<id>") | |
@valid_oauth_required | |
async def change_dashboard_guild(id): | |
guilds = list(sorted(session.get("guild_data", []), | |
key=lambda o: o.get("name", ""))) | |
guild_id = id | |
guilddata = None | |
for item in guilds: | |
if str(item.get("id")) == str(guild_id): | |
guilddata = item | |
if guilddata: | |
if await has_elevation(guild_id, session.get("user_id"), guild_info=None, bot_type="main") or \ | |
await has_elevation(guild_id, session.get("user_id"), guild_info=None, bot_type="donator"): | |
session['dashboard_guild'] = id | |
return redirect("/dashboard") | |
return redirect("/dashboard?missingpermission") | |
else: | |
return redirect("/dashboard?invalidguild") | |
@app.route("/errors/<id>", methods=['GET', 'POST']) | |
async def error_view(id): | |
error = r.table("errors").get(id).run(rethink) or {} | |
if request.method == "POST": | |
session['previous_path'] = request.path | |
if session.get("oauth2_token") is None: | |
return redirect("/login") | |
should_check = False | |
if sum( | |
1 if not session.get(c) else 0 for c in [ | |
'user_id', | |
'user_data', | |
'guild_data', | |
'oauth2_check']) > 0: | |
should_check = True | |
if session.get("oauth2_check") is None or time.time() - \ | |
session.get("oauth2_check") > 604800: | |
should_check = True | |
if should_check: | |
return redirect("/login") | |
_config = rockutils.load_json("cfg/config.json") | |
user_id = int(session['user_id']) | |
elevated = False | |
if user_id in _config['roles']['support']: | |
elevated = True | |
if user_id in _config['roles']['developer']: | |
elevated = True | |
if user_id in _config['roles']['admins']: | |
elevated = True | |
if user_id in _config['roles']['trusted']: | |
elevated = True | |
if elevated and error != {}: | |
if error['status'] == "not handled": | |
error['status'] = "handled" | |
else: | |
error['status'] = "not handled" | |
r.table("errors").get(id).update(error).run(rethink) | |
return await render_template("command_error.html", error=error) | |
@app.route("/errors") | |
@valid_oauth_required | |
async def error_list(): | |
_config = rockutils.load_json("cfg/config.json") | |
user_id = int(session['user_id']) | |
elevated = False | |
if user_id in _config['roles']['support']: | |
elevated = True | |
if user_id in _config['roles']['developer']: | |
elevated = True | |
if user_id in _config['roles']['admins']: | |
elevated = True | |
if user_id in _config['roles']['trusted']: | |
elevated = True | |
if not elevated: | |
return "", 401 | |
results = [] | |
values = r.table("errors").run(rethink) | |
while True: | |
try: | |
_val = values.next() | |
results.append(_val) | |
except Exception: | |
break | |
not_handled = list(filter(lambda o: o['status'] == "not handled", results)) | |
handled = list(filter(lambda o: o['status'] == "handled", results)) | |
return await render_template("command_error_list.html", not_handled=not_handled, handled=handled) | |
print("[init] Done") | |
app.run(host="0.0.0.0", port=config['ipc']['port'], debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment