Skip to content

Instantly share code, notes, and snippets.

@x0x8x
Last active June 8, 2020 16:26
Show Gist options
  • Save x0x8x/fa4b3223b874f33fba9ff6562a79d647 to your computer and use it in GitHub Desktop.
Save x0x8x/fa4b3223b874f33fba9ff6562a79d647 to your computer and use it in GitHub Desktop.
Telegram UserBot using Telethon framework and meval for eval func.
import logging, traceback, sys, io, os, html, textwrap, asyncio, inspect, random, re, time, subprocess, glob
from datetime import datetime
from asyncio import sleep
from contextlib import redirect_stdout
from io import StringIO
from utils import *
from telethon import TelegramClient, events, utils
from telethon.events import NewMessage, MessageEdited
from telethon.utils import parse_username, get_display_name
from telethon.tl import functions, types, custom
from telethon.tl.custom import Message
from telethon.tl.functions import *
from telethon.tl.functions.channels import *
from telethon.errors import *
client = TelegramClient(session, api_id, api_hash)
@client.on(events.NewMessage(outgoing=True, pattern=r"(\,x\s).+"))
@client.on(events.MessageEdited(outgoing=True, pattern=r"(\,x\s).+"))
async def eval(event: Message):
client = event.client
message = event.message
cmd = event.pattern_match.group(1)
code: str = event.raw_text
code = code.replace(cmd, "", 1)
caption = "<b>Evaluated expression:</b>\n<code>{}</code>\n\n<b>Result:</b>\n".format(
code
)
preserve_stdout = sys.stdout
sys.stdout = StringIO()
try:
res = str(await meval(code, locals()))
except Exception:
caption = "<b>Evaluation failed:</b>\n<code>{}</code>\n\n<b>Result:</b>\n".format(
code
)
etype, value, tb = sys.exc_info()
res = "".join(traceback.format_exception(etype, value, None, 0))
sys.stdout = preserve_stdout
try:
val = sys.stdout.getvalue()
except AttributeError:
val = None
sys.stdout = preserve_stdout
try:
await event.edit(
caption + f"<code>{html.escape(res)}</code>", parse_mode="html"
)
except MessageTooLongError:
res = textwrap.wrap(res, 4096 - len(caption))
await event.reply(caption + f"<code>{res[0]}</code>", parse_mode="html")
for part in res[1::]:
await asyncio.sleep(3)
await event.reply(f"<code>{part}</code>", parse_mode="html")
else:
await event.reply(caption, parse_mode="html")
client.start()
client.run_until_disconnected()
import asyncio, os, time, traceback, ast, importlib.util, types
from asyncio import get_event_loop
from asyncio.futures import Future
from html import escape
from functools import wraps, partial
from typing import Callable, Awaitable, Coroutine
from traceback import format_exc
loop = asyncio.get_event_loop()
def aiowrap(fn: Callable) -> Callable[[], Awaitable]:
@wraps(fn)
def decorator(*args, **kwargs):
wrapped = partial(fn, *args, **kwargs)
return get_event_loop().run_in_executor(None, wrapped)
return decorator
async def meval(code, local_vars):
locs = {}
globs = globals().copy()
global_args = "_globs"
while global_args in globs.keys():
global_args = "_" + global_args
local_vars[global_args] = {}
for glob in ["__name__", "__package__"]:
local_vars[global_args][glob] = globs[glob]
root = ast.parse(code, "exec")
code = root.body
if isinstance(code[-1], ast.Expr):
code[-1] = ast.copy_location(ast.Return(code[-1].value), code[-1])
glob_copy = ast.Expr(
ast.Call(
func=ast.Attribute(
value=ast.Call(
func=ast.Name(id="globals", ctx=ast.Load()), args=[], keywords=[]
),
attr="update",
ctx=ast.Load(),
),
args=[],
keywords=[
ast.keyword(arg=None, value=ast.Name(id=global_args, ctx=ast.Load()))
],
)
)
ast.fix_missing_locations(glob_copy)
code.insert(0, glob_copy)
args = []
for a in list(map(lambda x: ast.arg(x, None), local_vars.keys())):
ast.fix_missing_locations(a)
args += [a]
args = ast.arguments(
args=[],
vararg=None,
kwonlyargs=args,
kwarg=None,
defaults=[],
kw_defaults=[None for i in range(len(args))],
)
if int.from_bytes(importlib.util.MAGIC_NUMBER[:-2], "little") >= 3410:
args.posonlyargs = []
fun = ast.AsyncFunctionDef(
name="tmp", args=args, body=code, decorator_list=[], returns=None
)
ast.fix_missing_locations(fun)
mod = ast.parse("")
mod.body = [fun]
comp = compile(mod, "<string>", "exec")
exec(comp, {}, locs)
r = await locs["tmp"](**local_vars)
if isinstance(r, types.CoroutineType) or isinstance(r, Future):
r = await r
try:
globals().clear()
finally:
globals().update(**globs)
return r
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment