Created
May 19, 2022 20:45
-
-
Save kraftwerk28/290185b0248e818007ac55f8be01a02b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import emoji | |
import gzip | |
from io import BytesIO | |
import json | |
import logging | |
from pyrogram import filters, types | |
from pyrogram.raw.functions.messages import GetStickerSet | |
from pyrogram.raw.functions.upload import GetFile | |
from pyrogram.raw.types import ( | |
InputStickerSetAnimatedEmoji, Document, | |
StickerSet, InputDocumentFileLocation, | |
) | |
log = logging.getLogger(__name__) | |
@app.on_message( | |
filters.reply & | |
filters.regex( | |
r"^\.amp(?:l(?:i(?:fy?)?)?)?" | |
r"(?:\s+(-?\d+))?" | |
r"(?:\s+(reply))?" | |
r"?(?:\s+(-?\d+))?$", | |
) | |
) | |
async def on_amplify_sticker(client: Client, message: types.Message): | |
# if message.from_user is not None and message.from_user.is_self: | |
# await client.remember_my_reply(message) | |
log.info("Amplify command") | |
re_match, = message.matches | |
reply = message.reply_to_message | |
try: | |
amplify_amount = int(re_match[1]) | |
except Exception: | |
amplify_amount = 8 | |
log.info(f"Amount: {amplify_amount}") | |
reply_to_msg_id = message.id | |
if "reply" in (re_match[1], re_match[2]): | |
reply_to_msg_id = reply.id | |
if message.from_user.is_self: | |
await message.delete() | |
text = reply.text if reply.text is not None else reply.caption | |
if (reply.sticker and reply.sticker.is_animated) or reply.document: | |
log.info("Amplifying sticker...") | |
sticker_io = await client.download_media(reply, in_memory=True) | |
sticker_filename = sticker_io.name | |
sticker_io.seek(0) | |
tgs_json = json.loads(gzip.decompress(sticker_io.read())) | |
elif text is not None: | |
msg_emojis = emoji.emoji_lis(text) | |
log.info( | |
"Emoji in the message: " | |
f"{','.join(e['emoji'] for e in msg_emojis)}" | |
) | |
nth_emoji = 1 | |
if re_match[3] is not None: | |
nth_emoji = int(re_match[3]) | |
try: | |
target_emoji = msg_emojis[nth_emoji - 1]["emoji"] | |
except IndexError: | |
return | |
log.info(f"Amplifying emoji #{nth_emoji} ({target_emoji})") | |
t = InputStickerSetAnimatedEmoji() | |
req = GetStickerSet(stickerset=t, hash=0) | |
stickerset: StickerSet = await app.invoke(req) | |
pack = next( | |
(p for p in stickerset.packs if p.emoticon == target_emoji), | |
None | |
) | |
if pack is None: | |
log.info(f"Pack for this {target_emoji} not found.") | |
return | |
sticker_doc: Document = next( | |
(d for d in stickerset.documents if d.id == pack.documents[0]), | |
None | |
) | |
if sticker_doc is None: | |
return | |
input_doc = InputDocumentFileLocation( | |
id=sticker_doc.id, | |
access_hash=sticker_doc.access_hash, | |
file_reference=sticker_doc.file_reference, | |
thumb_size="", | |
) | |
req = GetFile(location=input_doc, offset=0, limit=0x100000) | |
msession = await app.media_session_for_dc(sticker_doc.dc_id) | |
sticker_file = await msession.invoke(req) | |
tgs_json = json.loads(gzip.decompress(sticker_file.bytes)) | |
sticker_filename = "AnimatedSticker.tgs" | |
else: | |
return | |
def json_rec(o): | |
if type(o) is dict: | |
for k, v in o.items(): | |
if type(v) in (float, int): | |
if k == "x": | |
o[k] = v | |
elif k == "y": | |
o[k] = v * amplify_amount | |
else: | |
json_rec(v) | |
elif type(o) is list: | |
for v in o: | |
json_rec(v) | |
json_rec(tgs_json) | |
tgs_result = gzip.compress(json.dumps(tgs_json).encode("utf-8")) | |
result_io = BytesIO(tgs_result) | |
result_io.name = sticker_filename | |
sent = await client.send_sticker( | |
message.chat.id, | |
result_io, | |
reply_to_message_id=reply_to_msg_id, | |
) | |
if sent.document.mime_type == "application/x-bad-tgsticker": | |
await sent.delete() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment