Created
February 24, 2025 02:17
-
-
Save Notookk/123b6322b6ea91b43e19a2d4abc66914 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
# Bot credentials | |
API_ID = '25193832' | |
API_HASH = 'e154b1ccb0195edec0bc91ae7efebc2f' | |
# MongoDB connection string | |
DB_URI = 'mongodb+srv://copy:[email protected]/?retryWrites=true&w=majority&appName=Cluster0' | |
LOG_CHANNEL = '-1002240372506' | |
TOKEN = "7691103794:AAG453Q33UQ1cByVMQ-22ZneBtMHIbP3KZ0" | |
OWNER_ID = 7875192045 # Change to actual owner ID | |
ALERT_CHANNEL_ID = "-1002329693689" | |
MEDIA_DIR = "../media" | |
DB_PATH = "nsfw_bot.db" | |
AUTH_USERS = [7875192045] | |
DB_NAME = "BroadcastBot" | |
BROADCAST_AS_COPY = True | |
BOT_OWNER_ID = 7875192045 | |
OWNER_IDS = [7875192045, 6656608288, 6545754981] # Add as many as you want! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import asyncio | |
import logging | |
import requests | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, MessageEntity | |
from telegram.ext import Application, MessageHandler, CommandHandler, filters, CallbackQueryHandler, CallbackContext | |
from telegram.constants import ParseMode | |
from telegram import ChatPermissions, InputMediaPhoto, InputMediaVideo | |
from telegram.ext import ContextTypes | |
from datetime import datetime, timedelta | |
from database import ( | |
is_approved, update_violations, add_approved_user, remove_approved_user, get_user_violations, Database) | |
from config import ALERT_CHANNEL_ID | |
EXEMPT_USER_IDS = [7875192045] | |
GROUP_CHAT_IDS = [] | |
# Initialize the database | |
db = Database("users.db") | |
logging.basicConfig( | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
level=logging.INFO, | |
) | |
OWNER_USER_ID = 7875192045 | |
GROUP_CHAT_IDS = set() | |
# Set a maximum length for messages | |
MAX_MESSAGE_LENGTH = 200 | |
# List of video file URLs to send randomly | |
VIDEO_LIST = [ | |
"https://telegra.ph/file/1722b8e21ef54ef4fbc23.mp4", | |
"https://telegra.ph/file/ac7186fffc5ac5f764fc1.mp4", | |
"https://telegra.ph/file/4156557a73657501918c4.mp4", | |
"https://telegra.ph/file/0d896710f1f1c02ad2549.mp4", | |
"https://telegra.ph/file/03ac4a6e94b5b4401fa5a.mp4", | |
] | |
# Function to create the main inline keyboard | |
def get_main_inline_keyboard(): | |
keyboard = [ | |
[ | |
InlineKeyboardButton("‣ʜᴇʟᴘ‣", callback_data="help"), | |
InlineKeyboardButton("‣ᴀᴅᴅ ᴍᴇ‣", url="https://t.me/copyright_ro_bot?startgroup=true"), | |
], | |
[ | |
InlineKeyboardButton("‣ʀᴇᴘᴏ‣", callback_data="repo"), | |
InlineKeyboardButton("‣ᴏᴡɴᴇʀ‣", callback_data="owner"), | |
] | |
] | |
return InlineKeyboardMarkup(keyboard) | |
# Function to create the "Back" button to return to the main menu | |
def get_back_inline_keyboard(): | |
keyboard = [[InlineKeyboardButton("‣ʙᴀᴄᴋ‣", callback_data="back")]] | |
return InlineKeyboardMarkup(keyboard) | |
# Function to check if a user is exempt from deletion | |
def is_exempt_user(user_id): | |
return user_id in EXEMPT_USER_IDS | |
# Handler for the /start command | |
async def start_command(update: Update, context: CallbackContext): | |
"""Handles the /start command, sends an animated message, and tracks the user.""" | |
message = update.message | |
user = update.effective_user | |
# ✅ Step 1: Store user in the database and mark them as started | |
if user: | |
await db.add_user(user.id, user.username or "Unknown") | |
await db.mark_user_started(user.id) | |
print(f"✅ User started the bot: {user.id} - {user.username}") | |
# ✅ Step 2: Animate the message "dιиg dιиg" | |
accha = await message.reply_text(text="❤️🔥ᴅιиg ᴅιиg ꨄ︎ ѕтαятιиg••") | |
await asyncio.sleep(0.2) | |
await accha.edit_text("💛ᴅιиg ᴅιиg ꨄ︎ sтαятιиg•••") | |
await asyncio.sleep(0.2) | |
await accha.edit_text("🩵ᴅιиg ᴅιиg ꨄ︎ sтαятιиg•••••") | |
await asyncio.sleep(0.2) | |
await accha.edit_text("🤍ᴅιиg ᴅιиg ꨄ︎ sтαятιиg••••••••") | |
await asyncio.sleep(0.2) | |
await accha.delete() | |
# ✅ Step 3: Select a random video from the VIDEO_LIST | |
video_url = random.choice(VIDEO_LIST) | |
user_first_name = update.effective_user.first_name | |
user_username = update.effective_user.id | |
# ✅ Step 4: Prepare the final message caption | |
caption = "Hey [{user_first_name}](tg://user?id={user_username}), 🥀\n" \ | |
"𝐓ʜɪs ɪs [˹𝑪𝒐𝒑𝒚𝒓𝒊𝒈𝒉𝒕 ✗ 𝜝𝒐𝒕˼](https://t.me/copyright_ro_bot) 🤍\n" \ | |
"➻ 𝐀 𝐅ᴀsᴛ & 𝐏ᴏᴡᴇʀғᴜʟ 𝐓ᴇʟᴇɢʀᴀᴍ 𝐒ᴇᴄᴜʀɪᴛʏ 𝐁ᴏᴛ.\n" \ | |
"𝐅ᴀsᴛ 𝐍sғᴡ 𝐌ᴏᴅᴇʟ ɪɴsᴛᴀʟʟᴇᴅ 𝐇ᴇʟᴩs 𝐓ᴏ 𝐏ʀᴏᴛᴇᴄᴛ 𝐘ᴏᴜʀ 𝐆ʀᴏᴜᴘ 𝐅ʀᴏᴍ 𝐏ᴏʀɴᴏɢʀᴀᴘʜʏ 𝐀ɴᴅ 𝐂ᴏᴘʏʀɪɢʜᴛ\n" \ | |
"──────────────────\n" \ | |
"๏ 𝐂ʟɪᴄᴋ 𝐎ɴ 𝐓ʜᴇ 𝐇ᴇʟᴩ 𝐁ᴜᴛᴛᴏɴ 𝐓ᴏ 𝐆ᴇᴛ ɪɴғᴏʀᴍᴀᴛɪᴏɴ 𝐀ʙᴏᴜᴛ 𝐂ᴏᴍᴍᴀɴᴅs.".format(user_first_name=user_first_name, user_username=user_username) | |
# ✅ Step 5: Send the video with the caption and inline keyboard | |
await message.reply_video( | |
video=video_url, | |
caption=caption, | |
parse_mode="Markdown", | |
reply_markup=get_main_inline_keyboard() | |
) | |
# Handler for button presses | |
async def button_handler(update: Update, context): | |
query = update.callback_query | |
await query.answer() | |
if query.data == "help": | |
help_text = ( "💫Here are some commands:\n\n" "● ᴛʜɪs ʙᴏᴛ ᴀᴜᴛᴏᴍᴀᴛɪᴄᴀʟʟʏ ᴅᴇʟᴇᴛᴇs ᴇᴅɪᴛᴇᴅ ᴍᴇssᴀɢᴇs, ʟᴏɴɢ ᴍᴇssᴀɢᴇs, ᴀɴᴅ sʜᴀʀᴇᴅ ʟɪɴᴋs ᴏʀ ᴘᴅғs ᴀɴᴅ ᴀʟsᴏ 18+ ᴄᴏɴᴛᴇɴᴛs 🍃\n" "● ɪғ ʏᴏᴜ ᴡᴀɴᴛ ᴛᴏ ᴀᴅᴅ ʏᴏᴜʀsᴇʟᴠᴇ ᴀs sᴜᴅᴏ ᴅᴍ - @xotikop_bot💛\n" "● ᴛʏᴘᴇ /command ᴛᴏ ɢᴇᴛ ᴄᴏᴍᴍᴀɴᴅs\n\n" "#𝐒ᴀфᴇ ᴇᴄᴏ🍃 , #𝐗ᴏᴛɪᴋ❤️🔥" ) | |
await query.message.edit_caption(help_text, reply_markup=get_back_inline_keyboard()) | |
elif query.data == "back": | |
video_url = random.choice(VIDEO_LIST) | |
user_first_name = update.effective_user.first_name | |
user_username = update.effective_user.id | |
caption = "Hey [{user_first_name}](https://t.me/{user_username}), 🥀\n" \ | |
"𝐓ʜɪs ɪs [˹𝑪𝒐𝒑𝒚𝒓𝒊𝒈𝒉𝒕 ✗ 𝜝𝒐𝒕˼](https://t.me/copyright_ro_bot) 🤍\n" \ | |
"➻ 𝐀 𝐅ᴀsᴛ & 𝐏ᴏᴡᴇʀғᴜʟ 𝐓ᴇʟᴇɢʀᴀᴍ 𝐒ᴇᴄᴜʀɪᴛʏ 𝐁ᴏᴛ.\n" \ | |
"𝐅ᴀsᴛ 𝐍sғᴡ 𝐌ᴏᴅᴇʟ ɪɴsᴛᴀʟʟᴇᴅ 𝐇ᴇʟᴩs 𝐓ᴏ 𝐏ʀᴏᴛᴇᴄᴛ 𝐘ᴏᴜʀ 𝐆ʀᴏᴜᴘ 𝐅ʀᴏᴍ 𝐏ᴏʀɴᴏɢʀᴀᴘʜʏ 𝐀ɴᴅ 𝐂ᴏᴘʏʀɪɢʜᴛ\n" \ | |
"──────────────────\n" \ | |
"๏ 𝐂ʟɪᴄᴋ 𝐎ɴ 𝐓ʜᴇ 𝐇ᴇʟᴩ 𝐁ᴜᴛᴛᴏɴ 𝐓ᴏ 𝐆ᴇᴛ ɪɴғᴏʀᴍᴀᴛɪᴏɴ 𝐀ʙᴏᴜᴛ 𝐂ᴏᴍᴍᴀɴᴅs.".format(user_first_name=user_first_name, user_username=user_username) | |
await query.message.edit_caption(caption, reply_markup=get_main_inline_keyboard(), parse_mode="Markdown") | |
elif query.data == "owner": | |
await query.message.edit_caption("❤️🔥 𝙢𝙮 𝙤𝙬𝙣𝙚𝙧 𝙞𝙨 𝙘𝙤𝙢𝙢𝙞𝙣𝙜 ● ") | |
await query.message.edit_caption("🍁 𝙢𝙮 𝙤𝙬𝙣𝙚𝙧 𝙞𝙨 𝙘𝙤𝙢𝙢𝙞𝙣𝙜 ●● ") | |
await query.message.edit_caption("🌙 𝙢𝙮 𝙤𝙬𝙣𝙚𝙧 𝙞𝙨 𝙘𝙤𝙢𝙢𝙞𝙣𝙜 ●●● ") | |
await query.message.edit_caption("💫 𝙢𝙮 𝙤𝙬𝙣𝙚𝙧 𝙞𝙨 𝙘𝙤𝙢𝙢𝙞𝙣𝙜 ●●●● ") | |
image_url = "https://files.catbox.moe/0jb630.jpg" | |
await query.message.edit_media( | |
InputMediaPhoto(media=image_url) | |
) | |
await query.message.edit_caption( | |
f"𓆰 𝙃𝙀𝙍𝙀 𝙄𝙎 \n 𓆩𝙈𝙔 𝘾𝙐𝙏𝙀𓆪 𝙊𝙒𝙉𝙀𝙍𓂃🍃🥂꯭ ⎯᪵\n\n", | |
reply_markup=InlineKeyboardMarkup([ | |
[ | |
InlineKeyboardButton("⭑ᴀʙᴏᴜᴛ⭑", url="https://t.me/love_mhe"), | |
InlineKeyboardButton("⭑ʜᴇᴀᴠᴇɴ⭑", url="https://t.me/vibes_I"), | |
], | |
[ | |
InlineKeyboardButton("⭑ᴘᴀʀᴀᴅɪsᴇ⭑", url="https://t.me/links_of"), | |
InlineKeyboardButton("⭑ǫᴜɪᴄᴋ⭑", url="https://t.me/addlist/Hbspf10i_LliZjI0"), | |
], | |
[ | |
InlineKeyboardButton("Back", callback_data="back") | |
] | |
]) | |
) | |
elif query.data == "repo": | |
video_link = "https://files.catbox.moe/jeufr1.mp4" | |
caption = "BAHUT TEEZ HO RHE HO BAHUT TEZZ" | |
await query.message.edit_media( | |
InputMediaVideo(media=video_link, caption=caption), | |
reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("Back", callback_data="back")]]) | |
) | |
# Function to resolve user input | |
async def resolve_user(context: ContextTypes.DEFAULT_TYPE, update: Update, user_input): | |
""" | |
Resolve a user ID from various input types: numeric ID, username, or mention. | |
""" | |
try: | |
# Case 1: Input is numeric (user ID) | |
if str(user_input).isdigit(): | |
return int(user_input) | |
# Case 2: Input is a username | |
async def get_user_id_from_username(bot, username): | |
try: | |
# Add some debug logging here | |
print(f"Searching for user with username {username}") | |
print(f"Username: {username}") | |
search_results = await bot.search(username) | |
print(f"Search results: {search_results}") | |
if search_results and len(search_results) > 0: | |
user_id = search_results[0].id | |
print(f"Found user with ID {user_id}") | |
return user_id | |
else: | |
print(f"User not found") | |
return None | |
except Exception as e: | |
print(f"Error: {e}") | |
return None | |
# Case 3: Input is a mention (from message entities) | |
if update.message.entities: | |
for entity in update.message.entities: | |
if entity.type == MessageEntity.MENTION: | |
mention_user_id = entity.user.id if entity.user else None | |
if mention_user_id: | |
return mention_user_id | |
# Case 4: Replied message (fetch user ID) | |
if update.message.reply_to_message: | |
return update.message.reply_to_message.from_user.id | |
# Final fallback | |
await update.message.reply_text( | |
"❌ Could not resolve the user. Provide a valid numeric ID, username, or reply to a user's message." | |
) | |
return None | |
except Exception as e: | |
print(f"Unexpected error in resolve_user: {e}") | |
await update.message.reply_text("❌ An error occurred while resolving the user.") | |
return None | |
import requests | |
def get_user_id_from_username(bot, username): | |
try: | |
response = requests.get(f"https://t.me/{username}") | |
if response.status_code == 200: | |
return response.url.split("/")[-1] | |
else: | |
return None | |
except Exception as e: | |
print(f"Error: {e}") | |
return None | |
# Command to add a user to sudo | |
# async def add_user(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# """ | |
# Adds a user to the sudo list. Accepts user ID, username, or mention. | |
# Only the owner can add users. | |
# """ | |
# if update.message.from_user.id != OWNER_USER_ID: | |
# await update.message.reply_text("❌ You don't have permission to add sudo users!") | |
# return | |
# # Check if the command includes arguments or is a reply | |
# user_input = None | |
# if len(context.args) == 1: | |
# user_input = context.args[0] # Get the username/user_id from the arguments | |
# elif update.message.reply_to_message: | |
# user_input = update.message.reply_to_message.from_user.id # Get the user ID from the replied message | |
# if not user_input: | |
# await update.message.reply_text( | |
# "❌ Usage: /add <username>, <user_id>, or reply to a user's message with /add." | |
# ) | |
# return | |
# # Resolve the user ID | |
# resolved_user_id = await resolve_user(context, update, user_input) | |
# if resolved_user_id is None: | |
# await update.message.reply_text("❌ Unable to resolve the user. Please ensure the input is correct.") | |
# return | |
# # Add the user to the sudo list if not already present | |
# if resolved_user_id not in EXEMPT_USER_IDS: | |
# EXEMPT_USER_IDS.append(resolved_user_id) | |
# await update.message.reply_text(f"✅ User {resolved_user_id} has been added to the sudo list!") | |
# else: | |
# await update.message.reply_text("❌ This user is already in the sudo list.") | |
# async def remove_user(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
# """ | |
# Removes a user from the sudo list. Accepts user ID, username, or mention. | |
# Only the owner can remove users. | |
# """ | |
# if update.message.from_user.id != OWNER_USER_ID: | |
# await update.message.reply_text("❌ You don't have permission to remove sudo users!") | |
# return | |
# # Check if the command includes arguments or is a reply | |
# user_input = None | |
# if len(context.args) == 1: | |
# user_input = context.args[0] # Get the username/user_id from the arguments | |
# elif update.message.reply_to_message: | |
# user_input = update.message.reply_to_message.from_user.id # Get the user ID from the replied message | |
# if not user_input: | |
# await update.message.reply_text( | |
# "❌ Usage: /remove <username>, <user_id>, or reply to a user's message with /remove." | |
# ) | |
# return | |
# # Resolve the user ID | |
# resolved_user_id = await resolve_user(context, update, user_input) | |
# if resolved_user_id is None: | |
# await update.message.reply_text("❌ Unable to resolve the user. Please ensure the input is correct.") | |
# return | |
# # Remove the user from the sudo list if present | |
# if resolved_user_id in EXEMPT_USER_IDS: | |
# EXEMPT_USER_IDS.remove(resolved_user_id) | |
# await update.message.reply_text(f"✅ User {resolved_user_id} has been removed from the sudo list!") | |
# else: | |
# await update.message.reply_text("❌ This user is not in the sudo list.") | |
# Command to mute a user | |
async def mute_user(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
""" | |
Mutes a user in the chat. Accepts user ID, username, or mention. | |
Usage: /mute <user_id|username|mention> <duration_in_minutes> | |
""" | |
if update.message.from_user.id != OWNER_USER_ID: | |
await update.message.reply_text("❌ You need to be the owner or admin to use this command!") | |
return | |
# Check if the user provided an input or mentioned/replied to someone | |
user_input = None | |
if context.args: | |
user_input = context.args[0] # Get the user identifier from arguments | |
elif update.message.reply_to_message: | |
user_input = update.message.reply_to_message.from_user.id # Get the user ID from a reply | |
elif update.message.entities: | |
for entity in update.message.entities: | |
if entity.type == "mention": | |
user_input = update.message.text[entity.offset : entity.offset + entity.length] # Extract mention text | |
if not user_input: | |
await update.message.reply_text( | |
"❌ Usage: /mute <user_id|username|mention> <duration_in_minutes>, or reply to a user's message." | |
) | |
return | |
# Default duration is 60 minutes (1 hour) | |
try: | |
duration = int(context.args[1]) if len(context.args) > 1 else 60 | |
if duration <= 0: | |
raise ValueError("Duration must be a positive integer.") | |
except ValueError: | |
await update.message.reply_text("❌ Invalid duration. Please provide a positive number of minutes.") | |
return | |
# Resolve user ID | |
resolved_user_id = await resolve_user(context, update, user_input) | |
if resolved_user_id is None: | |
await update.message.reply_text( | |
f"❌ Unable to resolve the user '{user_input}'. Ensure the input is correct and the user has started the bot." | |
) | |
return | |
# Mute the user | |
try: | |
permissions = ChatPermissions(can_send_messages=False) | |
until_date = datetime.now() + timedelta(minutes=duration) | |
await context.bot.restrict_chat_member( | |
chat_id=update.effective_chat.id, | |
user_id=resolved_user_id, | |
permissions=permissions, | |
until_date=until_date | |
) | |
await update.message.reply_text( | |
f"✅ User {user_input} ({resolved_user_id}) has been muted for {duration} minutes." | |
) | |
except Exception as e: | |
print(f"Error while muting user {resolved_user_id}: {e}") | |
await update.message.reply_text("❌ Failed to mute the user. Please check the bot's permissions.") | |
async def unmute_user(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
""" | |
Unmutes a user in the chat. Accepts user ID, username, or mention. | |
Usage: /unmute <user_id|username|mention>, or reply to a user's message. | |
""" | |
if update.message.from_user.id != OWNER_USER_ID: | |
await update.message.reply_text("❌ You need to be the owner or admin to use this command!") | |
return | |
# Check if the user provided an input or mentioned/replied to someone | |
user_input = None | |
if context.args: | |
user_input = context.args[0] # Get the user identifier from arguments | |
elif update.message.reply_to_message: | |
user_input = update.message.reply_to_message.from_user.id # Get the user ID from a reply | |
elif update.message.entities: | |
for entity in update.message.entities: | |
if entity.type == "mention": | |
user_input = update.message.text[entity.offset : entity.offset + entity.length] # Extract mention text | |
if not user_input: | |
await update.message.reply_text( | |
"❌ Usage: /unmute <user_id|username|mention>, or reply to a user's message." | |
) | |
return | |
# Resolve user ID | |
resolved_user_id = await resolve_user(context, update, user_input) | |
if resolved_user_id is None: | |
await update.message.reply_text( | |
f"❌ Unable to resolve the user '{user_input}'. Ensure the input is correct and the user has started the bot." | |
) | |
return | |
# Unmute the user | |
try: | |
permissions = ChatPermissions(can_send_messages=True) # Grant permission to send messages | |
await context.bot.restrict_chat_member( | |
chat_id=update.effective_chat.id, | |
user_id=resolved_user_id, | |
permissions=permissions | |
) | |
await update.message.reply_text( | |
f"✅ User {user_input} ({resolved_user_id}) has been unmuted." | |
) | |
except Exception as e: | |
print(f"Error while unmuting user {resolved_user_id}: {e}") | |
await update.message.reply_text("❌ Failed to unmute the user. Please check the bot's permissions.") | |
import psutil | |
from psutil import cpu_percent, virtual_memory, disk_usage | |
import time | |
# /ping Command | |
async def ping_u(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
start = time.perf_counter() | |
image_url = "https://files.catbox.moe/tizgai.jpg" | |
event = await update.message.reply_photo(image_url, caption="ᴩɪɴɢɪɴɢ ʙᴀʙʏ ●🍃") | |
await event.edit_caption("ᴩɪɴɢɪɴɢ ʙᴀʙʏ ●●🍃") | |
await event.edit_caption("ᴩɪɴɢɪɴɢ ʙᴀʙʏ ●●●🍃") | |
await event.edit_caption("ᴩɪɴɢɪɴɢ ʙᴀʙʏ ●●●●🍃") | |
await event.edit_caption("ᴩɪɴɢɪɴɢ ʙᴀʙʏ ●●●●●🍃") | |
await asyncio.sleep(2) | |
end = time.perf_counter() | |
ms = (end - start) * 1000 | |
cpu = psutil.cpu_percent(interval=0.1) | |
ram = psutil.virtual_memory().percent | |
disk = psutil.disk_usage('/').percent | |
ping_text = f"\n\n<b><u>System Stats of copyright bot :</u></b>\n\n 🍃 Uptime : {ms}\n»»————- ★ - ★ ————-««\n ❣️ Ram : {ram}\n»»————- ★ - ★ ————-««\n ❣️ Cpu : {cpu}\n»»————- ★ - ★ ————-««\n ❣️ Disk : {disk}\n»»————- ★ - ★ ————-««\n 🍃 Py-TgCalls : {ms}ms\n»»————- ★ - ★ ————-««" | |
ping_text += " ◈ ━━━━━━━ ⸙ - ⸙ ━━━━━━━ ◈\n🍃 \n◈ ━━━━━━━ ⸙ - ⸙ ━━━━━━━ ◈\n\n ᴍᴀᴅᴇ ᴡɪᴛʜ 🖤 ʙʏ <a href=\"https://t.me/xazoc\">||ᴀʀɪ||❣️</a>" | |
await event.edit_caption(caption=ping_text, parse_mode=ParseMode.HTML) | |
MAX_MESSAGE_LENGTH = 1000 # Example maximum length for messages | |
# Ensure the handler is registered in your bot | |
async def delete_edited_messages(update: Update, context: CallbackContext): | |
if update.edited_message: | |
user_id = update.edited_message.from_user.id | |
# Check if the user is exempt from deletion | |
if is_exempt_user(user_id): | |
return # Do nothing if the user is exempt | |
user_mention = update.edited_message.from_user.mention_html() | |
# Delete the edited message | |
try: | |
await context.bot.delete_message( | |
chat_id=update.edited_message.chat_id, | |
message_id=update.edited_message.message_id | |
) | |
# Notify the group about the deleted edited message | |
await context.bot.send_message( | |
chat_id=update.edited_message.chat_id, | |
text=f"🚫 {user_mention}, edited messages are not allowed and have been deleted!", | |
parse_mode=ParseMode.HTML | |
) | |
except Exception as e: | |
# Log any error that occurs during deletion | |
logger.error(f"Error deleting edited message: {e}") | |
# Handler to delete links, PDFs, long messages, and notify the user | |
async def delete_invalid_messages(update: Update, context: CallbackContext): | |
user_id = update.message.from_user.id | |
# Check if the user is exempt from deletion | |
if is_exempt_user(user_id): | |
return # Do nothing if the user is exempt | |
user_mention = update.message.from_user.mention_html() | |
# Check if the message contains a link or PDF | |
if (update.message.entities and any(entity.type in [MessageEntity.URL, MessageEntity.TEXT_LINK] for entity in update.message.entities)) or \ | |
update.message.document: | |
await update.message.delete() | |
# Notify the group about the deleted message | |
await context.bot.send_message( | |
chat_id=update.message.chat_id, | |
text=f"🚫 {user_mention}, links or PDFs are not allowed and have been deleted!", | |
parse_mode=ParseMode.HTML | |
) | |
# Check if the message exceeds the maximum length | |
elif len(update.message.text) > MAX_MESSAGE_LENGTH: | |
await update.message.delete() | |
# Notify the group about the deleted message | |
await context.bot.send_message( | |
chat_id=update.message.chat_id, | |
text=f"🚫 {user_mention}, long messages are not allowed and have been deleted!", | |
parse_mode=ParseMode.HTML | |
) | |
# # Handler to add user ID to the EXEMPT_USER_IDS list | |
# async def add_user_command(update: Update, context): | |
# # Only allow the owner to use this command | |
# if update.message.from_user.id != OWNER_USER_ID: | |
# await update.message.reply_text("❌ You don't have permission to add users!") | |
# return | |
# slang_words = [ | |
# "anal", "anus", "arse", "ass", "asses", "assfucker", "assfukka", "asshole", "arsehole", "asswhole", | |
# "assmunch", "auto erotic", "autoerotic", "ballsack", "bastard", "beastial", "bestial", "bhen ka lode", | |
# "betichod", "bhenchod", "bellend", "bdsm", "beastiality", "bestiality", "bitch", "bitches", "bitchin", | |
# "bitching", "bimbo", "bimbos", "blow job", "blowjob", "blowjobs", "blue waffle", "boob", "boobs", | |
# "booobs", "boooobs", "booooobs", "booooooobs", "breasts", "booty call", "brown shower", "brown showers", | |
# "boner", "bondage", "buceta", "bukake", "bukkake", "bullshit", "bull shit", "busty", "butthole", | |
# "carpet muncher", "cawk", "chink", "chut", "chutiya", "cipa", "clit", "clits", "clitoris", "cnut", | |
# "cock", "chod dunga", "cocks", "cockface", "cockhead", "cockmunch", "cockmuncher", "cocksuck", | |
# "cocksucked", "cocksucking", "cocksucks", "cocksucker", "cokmuncher", "coon", "cow girl", "cow girls", | |
# "cowgirl", "cowgirls", "crap", "crotch", "cum", "cummer", "cumming", "cuming", "cums", "cumshot", | |
# "cunilingus", "cunillingus", "cunnilingus", "cunt", "cuntlicker", "cuntlicking", "cunts", "damn", | |
# "dick", "dickhead", "dildo", "dildos", "dink", "dinks", "deepthroat", "deep throat", "dog style", | |
# "doggie style", "doggiestyle", "doggy style", "doggystyle", "donkeyribber", "doosh", "douche", "duche", | |
# "dyke", "ejaculate", "ejaculated", "ejaculates", "ejaculating", "ejaculatings", "ejaculation", | |
# "ejakulate", "erotic", "erotism", "fag", "fuddi", "fuddu", "faggot", "fagging", "faggit", "faggitt", | |
# "faggs", "fagot", "fagots", "fags", "fatass", "femdom", "fingering", "footjob", "foot job", "fuck", | |
# "fucks", "fucker", "fuckers", "fucked", "fuckhead", "fuckheads", "fuckin", "fucking", "fcuk", | |
# "fcuker", "fcuking", "felching", "fellate", "fellatio", "fingerfuck", "fingerfucked", "fingerfucker", | |
# "fingerfuckers", "fingerfucking", "fingerfucks", "fistfuck", "fistfucked", "fistfucker", "fistfuckers", | |
# "fistfucking", "fistfuckings", "fistfucks", "flange", "fook", "fooker", "fucka", "fuk", "fuks", "fuker", | |
# "fukker", "fukkin", "fukking", "futanari", "futanary", "gangbang", "gangbanged", "gang bang", "gokkun", | |
# "golden shower", "goldenshower", "gaysex", "gand", "gand mara", "goatse", "handjob", "hand job", | |
# "hentai", "hooker", "hoer", "homo", "horny", "incest", "jackoff", "jack off", "jerkoff", "jerk off", | |
# "jizz", "knob", "kinbaku", "labia", "lund", "lun", "lawda", "lavda", "masturbate", "masochist", "mofo", | |
# "mothafuck", "motherfuck", "motherfucker", "mothafucka", "mothafuckas", "mothafuckaz", "mothafucked", | |
# "mothafucker", "mothafuckers", "mothafuckin", "mothafucking", "mothafuckings", "mothafucks", | |
# "mother fucker", "motherfucked", "motherfucker", "motherfuckers", "motherfuckin", "motherfucking", | |
# "motherfuckings", "motherfuckka", "motherfucks", "milf", "muff", "nigga", "nigger", "nigg", "nipple", | |
# "nipples", "nob", "nob jokey", "nobhead", "nobjocky", "nobjokey", "numbnuts", "nutsack", "nude", | |
# "nudes", "orgy", "orgasm", "orgasms", "panty", "panties", "penis", "playboy", "porn", "porno", | |
# "pornography", "pron", "pussy", "pussies", "rape", "raping", "rapist", "rectum", "retard", "rimming", | |
# "sadist", "sadism", "schlong", "scrotum", "sex", "semen", "shemale", "she male", "shibari", "shibary", | |
# "shit", "shitdick", "shitfuck", "shitfull", "shithead", "shiting", "shitings", "shits", "shitted", | |
# "shitters", "shitting", "shittings", "shitty", "shota", "skank", "slut", "sluts", "smut", "smegma", | |
# "spunk", "strip club", "stripclub", "tit", "tits", "titties", "titty", "titfuck", "tittiefucker", | |
# "titties", "tittyfuck", "tittywank", "titwank", "threesome", "three some", "throating", "twat", | |
# "twathead", "twatty", "twunt", "viagra", "vagina", "vulva", "wank", "wanker", "wanky", "whore", | |
# "whoar", "xxx", "xx", "yaoi", "yury", "sexy", "Myr", "Myru", "Myre", "Andi", "Kunna", "Kunne", | |
# "Pelayadi", "Polayadi", "Phoonda", "Phoonde", "Kundi", "Pooru", "Poori", "Pooran", "Umb", "poothichi", | |
# "vedichi", "pulayaadi", "pelichi", "koothichi", "Oomb", "oombu", "Oomban", "Umban" | |
# ] | |
# async def delete_slang_words(update: Update, context: CallbackContext): | |
# message = update.message | |
# text = message.text.lower() | |
# if any(word in text for word in slang_words): | |
# await message.delete() | |
# await context.bot.send_message(chat_id=update.effective_chat.id, text=f"🚫 <a href='tg://user?id={message.from_user.id}'>{message.from_user.first_name}</a>, Slang words are not allowed and have been deleted!🍃", parse_mode="HTML") | |
# keyboard = [[InlineKeyboardButton("👤 View Profile", url=f"tg://user?id={message.from_user.id}")]] | |
# reply_markup = InlineKeyboardMarkup(keyboard) | |
# text = f"🚫{message.from_user.first_name}, Slang has been deleted! 🍃\nDeleted text: {message.text}\nReason: Slang words are not allowed" | |
# spoiler_offset = text.find(message.text) | |
# entities = [MessageEntity(type=MessageEntity.SPOILER, offset=spoiler_offset, length=len(message.text))] | |
# await context.bot.send_message(ALERT_CHANNEL_ID, text=text, entities=entities, reply_markup=reply_markup) | |
async def commands(update: Update, context: CallbackContext): | |
await update.message.reply_text( | |
"Available commands:\n\n" | |
"/start - Start the bot\n" | |
"/ping - Check the bot's latency\n" | |
"/userinfo - Get information about a user\n" | |
"/myinfo - Get information about yourself\n" | |
"/mute - Mute a user in the chat\n" | |
"/unmute - Unmute a user in the chat\n" | |
"/broadcast - Broadcast a message to all users and groups\n" | |
"/add - Add a user to the database\n" | |
"/remove - Remove a user from the database\n" | |
"/sudolist - Get a list of approved users\n" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
from config import DB_PATH | |
import aiosqlite | |
import datetime | |
import asyncio | |
import motor.motor_asyncio | |
def init_db(): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("CREATE TABLE IF NOT EXISTS approved_users (user_id INTEGER PRIMARY KEY)") | |
cursor.execute("CREATE TABLE IF NOT EXISTS user_violations (user_id INTEGER, category TEXT, count INTEGER, PRIMARY KEY(user_id, category))") | |
cursor.execute("CREATE TABLE IF NOT EXISTS users (user_id INTEGER PRIMARY KEY)") | |
conn.commit() | |
conn.close() | |
def is_approved(user_id): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT user_id FROM approved_users WHERE user_id = ?", (user_id,)) | |
result = cursor.fetchone() | |
conn.close() | |
return result is not None | |
def update_violations(user_id, category): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("INSERT INTO user_violations (user_id, category, count) VALUES (?, ?, 1) ON CONFLICT(user_id, category) DO UPDATE SET count = count + 1", (user_id, category)) | |
conn.commit() | |
conn.close() | |
def add_approved_user(user_id): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("INSERT OR IGNORE INTO approved_users (user_id) VALUES (?)", (user_id,)) | |
conn.commit() | |
conn.close() | |
def remove_approved_user(user_id): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("DELETE FROM approved_users WHERE user_id = ?", (user_id,)) | |
conn.commit() | |
conn.close() | |
def get_user_violations(user_id): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT category, count FROM user_violations WHERE user_id = ?", (user_id,)) | |
results = cursor.fetchall() | |
conn.close() | |
return results | |
def get_all_users(): | |
conn = sqlite3.connect(DB_PATH) | |
cursor = conn.cursor() | |
cursor.execute("SELECT user_id FROM approved_users") | |
result = cursor.fetchall() | |
conn.close() | |
return [user[0] for user in result] | |
class Database: | |
"""SQLite3 database handler for managing Telegram bot users & groups.""" | |
def __init__(self, db_path="users.db"): | |
self.db_path = db_path | |
async def init_db(self): | |
"""Initialize the database and ensure all necessary columns exist.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
# ✅ Create users table with started_bot column | |
await db.execute( | |
"""CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY, | |
username TEXT, | |
started_bot INTEGER DEFAULT 0 | |
)""" | |
) | |
# ✅ Ensure `started_bot` column exists (for old databases) | |
try: | |
await db.execute("SELECT started_bot FROM users LIMIT 1") | |
except aiosqlite.OperationalError: | |
print("🛠 Adding missing 'started_bot' column to users table...") | |
await db.execute("ALTER TABLE users ADD COLUMN started_bot INTEGER DEFAULT 0") | |
# ✅ Create groups table | |
await db.execute( | |
"""CREATE TABLE IF NOT EXISTS groups ( | |
id INTEGER PRIMARY KEY, | |
title TEXT | |
)""" | |
) | |
await db.commit() | |
async def add_user(self, user_id: int, username: str = None): | |
"""Add a user when they interact with the bot (avoid duplicates).""" | |
async with aiosqlite.connect(self.db_path) as db: | |
await db.execute( | |
"INSERT OR IGNORE INTO users (id, username) VALUES (?, ?)", | |
(user_id, username), | |
) | |
await db.commit() | |
async def mark_user_started(self, user_id: int): | |
"""Mark a user as having started the bot.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
await db.execute( | |
"UPDATE users SET started_bot = 1 WHERE id = ?", | |
(user_id,), | |
) | |
await db.commit() | |
async def add_group(self, group_id: int, title: str): | |
"""Add a new group when a message is sent there (avoiding duplicates).""" | |
async with aiosqlite.connect(self.db_path) as db: | |
await db.execute( | |
"INSERT OR IGNORE INTO groups (id, title) VALUES (?, ?)", | |
(group_id, title), | |
) | |
await db.commit() | |
async def get_all_users(self) -> list[int]: | |
"""Get all user IDs stored in the database.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
cursor = await db.execute("SELECT id FROM users") | |
users = await cursor.fetchall() | |
return [row[0] for row in users] if users else [] | |
async def get_users_who_started(self) -> list[int]: | |
"""Get all users who have started the bot.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
cursor = await db.execute("SELECT id FROM users WHERE started_bot = 1") | |
users = await cursor.fetchall() | |
return [row[0] for row in users] if users else [] | |
async def get_all_groups(self) -> list[int]: | |
"""Get all stored group IDs.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
cursor = await db.execute("SELECT id FROM groups") | |
groups = await cursor.fetchall() | |
return [row[0] for row in groups] if groups else [] | |
async def delete_user(self, user_id: int): | |
"""Delete a user from the database.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
await db.execute("DELETE FROM users WHERE id = ?", (user_id,)) | |
await db.commit() | |
async def get_all_notif_user(self) -> list[int]: | |
"""Retrieve all user IDs who have started the bot.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
cursor = await db.execute("SELECT id FROM users WHERE started_bot = 1") | |
users = await cursor.fetchall() | |
return [row[0] for row in users] if users else [] | |
async def get_total_counts(self) -> dict: | |
"""Get the total count of users and groups in the database.""" | |
async with aiosqlite.connect(self.db_path) as db: | |
user_cursor = await db.execute("SELECT COUNT(*) FROM users") | |
total_users = (await user_cursor.fetchone())[0] | |
group_cursor = await db.execute("SELECT COUNT(*) FROM groups") | |
total_groups = (await group_cursor.fetchone())[0] | |
return {"users": total_users, "groups": total_groups} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import asyncio | |
import nest_asyncio | |
from telegram.ext import ApplicationBuilder, Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters | |
from config import TOKEN | |
from database import Database | |
from handlers.copyright import ( | |
start_command, | |
button_handler, | |
mute_user, | |
unmute_user, | |
ping_u, | |
get_user_id_from_username, | |
delete_edited_messages, | |
delete_invalid_messages, | |
commands | |
) | |
from handlers.nsfw import * | |
from handlers.utils import * | |
from handlers.broadcast import * | |
# ✅ Fix event loop conflict | |
nest_asyncio.apply() | |
# ✅ Configure Logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
) | |
logger = logging.getLogger(__name__) | |
# ✅ Initialize Database | |
db = Database() | |
async def main(): | |
"""Main function to initialize the bot and start polling.""" | |
await db.init_db() # ✅ Ensure database is ready before starting bot | |
application = ApplicationBuilder().token(TOKEN).build() | |
# ✅ Register Handlers | |
application.add_handler(CommandHandler("start", start_command)) | |
application.add_handler(CallbackQueryHandler(button_handler)) | |
application.add_handler(CommandHandler("command", commands)) | |
application.add_handler(CommandHandler("mute", mute_user)) | |
application.add_handler(CommandHandler("unmute", unmute_user)) | |
application.add_handler(CommandHandler("ping", ping_u)) | |
application.add_handler(CommandHandler("info", get_user_id_from_username)) | |
application.add_handler(CommandHandler("userinfo", user_info)) | |
application.add_handler(CommandHandler("myinfo", my_info)) | |
application.add_handler(CommandHandler("sudolist", get_approved_users_list)) | |
application.add_handler(CommandHandler("add", add_approved)) | |
application.add_handler(CommandHandler("remove", remove_approved)) | |
application.add_handler(CommandHandler("broadcast", broadcast)) | |
#application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, delete_slang_words)) | |
application.add_handler(MessageHandler(filters.ALL, handle_media)) | |
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, track_user)) | |
application.add_handler(MessageHandler(filters.ALL, track_group)) | |
application.add_handler(CommandHandler("listusers", list_users_and_groups)) | |
application.add_handler(MessageHandler(filters.UpdateType.EDITED_MESSAGE, delete_edited_messages)) | |
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, delete_invalid_messages)) | |
logger.info("🤖 Bot is running...✅") | |
await application.run_polling(timeout=30) # Increase timeout to 30 seconds | |
logger.info("🤖 Bot stopped...✅") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) # ✅ Uses existing event loop (NO conflict) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import ffmpeg | |
import imageio | |
import numpy as np | |
from PIL import Image | |
from lottie import parsers | |
from telegram import constants | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, CallbackContext | |
from config import TOKEN, OWNER_ID, ALERT_CHANNEL_ID, MEDIA_DIR | |
from database import is_approved, update_violations, add_approved_user, remove_approved_user, get_user_violations, get_all_users | |
from .predict import detect_nsfw | |
from database.database import init_db | |
# Ensure media directory exists | |
os.makedirs(MEDIA_DIR, exist_ok=True) | |
# Setup database | |
init_db() | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# ✅ Convert WebP static stickers to PNG | |
def convert_webp_to_png(file_path): | |
try: | |
png_path = file_path.replace(".webp", ".png") | |
img = Image.open(file_path).convert("RGB") | |
img.save(png_path, "PNG") | |
return png_path | |
except Exception as e: | |
logger.error(f"Error converting WebP to PNG: {e}") | |
return None | |
# ✅ Extract frame from animated WEBM stickers | |
def extract_frame_from_webm(input_path): | |
"""Extracts the first frame from a .webm video file and saves it as a .jpg image.""" | |
output_path = input_path.replace(".webm", ".jpg") # Set output as JPG | |
try: | |
reader = imageio.get_reader(input_path, format="webm") | |
frame = reader.get_next_data() # Get the first frame | |
reader.close() | |
# Convert frame to numpy array (ensure it's writable) | |
frame = np.array(frame, dtype=np.uint8) | |
# Save frame as JPEG | |
imageio.imwrite(output_path, frame, format="JPEG") | |
return output_path | |
except Exception as e: | |
print(f"Error extracting frame from WEBM: {e}") | |
return None | |
# ✅ Convert TGS stickers (Lottie) to PNG | |
def convert_tgs_to_png(file_path): | |
output_path = file_path.replace(".tgs", ".png") | |
try: | |
animation = parse_tgs(file_path) | |
frame = animation.render_frame(0) | |
frame.save(output_path, "PNG") | |
return output_path | |
except Exception as e: | |
logger.error(f"Error converting TGS to PNG: {e}") | |
return None | |
# ✅ Handle all media (images, videos, stickers, GIFs, animated stickers) | |
async def handle_media(update: Update, context: CallbackContext): | |
"""Handles all media types and scans for NSFW content.""" | |
user = update.message.from_user | |
chat_id = update.message.chat_id | |
media_files = update.message.effective_attachment | |
if is_approved(user.id): | |
return # Skip NSFW scan for approved users | |
if not isinstance(media_files, (list, tuple)): | |
media_files = [media_files] | |
for file in media_files: | |
if not hasattr(file, "file_id"): | |
continue | |
file_obj = await context.bot.get_file(file.file_id) | |
file_path = os.path.join(MEDIA_DIR, f"{user.id}_{file.file_id}") | |
await file_obj.download_to_drive(file_path) | |
if not os.path.exists(file_path): | |
logger.error(f"Downloaded file missing: {file_path}") | |
continue | |
# Handle Stickers Separately | |
if update.message.sticker: | |
if update.message.sticker.is_animated: | |
# Convert `.tgs` animated sticker to PNG | |
new_path = convert_tgs_to_png(file_path) | |
elif update.message.sticker.is_video: | |
# Convert `.webm` video sticker to an image frame | |
new_path = extract_frame_from_webm(file_path) | |
else: | |
# Convert `.webp` static sticker to PNG | |
new_path = convert_webp_to_png(file_path) | |
if new_path: | |
file_path = new_path | |
else: | |
continue | |
# Run NSFW detection | |
result = detect_nsfw(file_path) | |
if result is None: | |
continue | |
max_category = max(result, key=result.get) | |
if max_category in ["porn", "sexy", "hentai"]: | |
await update.message.delete() | |
update_violations(user.id, max_category) | |
alert_message = f""" | |
╭───────────────── | |
╰──●𝙽𝚂𝙵𝚆 𝙳𝙴𝚃𝙴𝙲𝚃𝙴𝙳 🔞 | |
╭✠╼━━━━━━❖━━━━━━━✠╮ | |
│☾𝚄𝚜𝚎𝚛: {user.id} | |
│☾𝚄𝚜𝚎𝚛𝚗𝚊𝚖𝚎: @{user.username if user.username else 'None'} | |
│☾𝙳𝚎𝚝𝚊𝚒𝚕𝚜: | |
│☾𝙳𝚛𝚊𝚠𝚒𝚗𝚐𝚜: {result['drawings']:.2f} | |
│☾𝙽𝚎𝚞𝚝𝚛𝚊𝚕: {result['neutral']:.2f} | |
│☾𝙿𝚘𝚛𝚗: {result['porn']:.2f} | |
│☾𝙷𝚎𝚗𝚝𝚊𝚒: {result['hentai']:.2f} | |
│☾𝚂𝚎𝚡𝚢: {result['sexy']:.2f} | |
╰✠╼━━━━━━❖━━━━━━━✠╯""" | |
await context.bot.send_message(chat_id, alert_message, parse_mode="Markdown") | |
channel_alert = f""" | |
NSFW DETECTED 🔞 | |
User: {user.id} | |
Username: @{user.username if user.username else 'None'} | |
Details: | |
Drawings: {result['drawings']:.2f} | |
Neutral: {result['neutral']:.2f} | |
Porn: {result['porn']:.2f} | |
Hentai: {result['hentai']:.2f} | |
Sexy: {result['sexy']:.2f} | |
Chat: {chat_id} | |
""" | |
keyboard = [[InlineKeyboardButton("👤 View Profile", url=f"tg://user?id={user.id}")]] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await context.bot.send_message(ALERT_CHANNEL_ID, channel_alert, reply_markup=reply_markup) | |
# Delete the scanned file | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
else: | |
logger.warning(f"File not found for deletion: {file_path}") | |
# ✅ Owner Commands | |
async def add_approved(update: Update, context: CallbackContext): | |
if update.message.from_user.id != OWNER_ID: | |
await update.message.reply_text("» ᴀᴡᴡ, ᴛʜɪs ɪs ɴᴏᴛ ғᴏʀ ʏᴏᴜ ʙᴀʙʏ.") | |
return | |
try: | |
user_id = int(context.args[0]) | |
add_approved_user(user_id) | |
await update.message.reply_text(f"✅ User {user_id} added to approved list.") | |
except (IndexError, ValueError): | |
await update.message.reply_text("❌ Usage: /approve <user_id>") | |
async def remove_approved(update: Update, context: CallbackContext): | |
if update.message.from_user.id != OWNER_ID: | |
await update.message.reply_text("» ᴀᴡᴡ, ᴛʜɪs ɪs ɴᴏᴛ ғᴏʀ ʏᴏᴜ ʙᴀʙʏ.") | |
return | |
try: | |
user_id = int(context.args[0]) | |
remove_approved_user(user_id) | |
await update.message.reply_text(f"❌ User {user_id} removed from approved list.") | |
except (IndexError, ValueError): | |
await update.message.reply_text("❌ Usage: /remove <user_id>") | |
async def my_info(update: Update, context: CallbackContext): | |
user_id = update.message.from_user.id | |
violations = get_user_violations(user_id) | |
if not violations: | |
await update.message.reply_text("✅ You have a clean record.") | |
else: | |
info = "📊 **NSFW Violation History**\n" | |
for category, count in violations: | |
info += f"🔸 {category}: {count} times\n" | |
await update.message.reply_text(info, parse_mode="Markdown") | |
async def user_info(update: Update, context: CallbackContext): | |
message = update.message | |
args = context.args | |
if len(args) == 0: | |
username_or_userid = message.from_user.id | |
else: | |
username_or_userid = args[0] | |
violation_history = get_user_violations(username_or_userid) | |
if violation_history: | |
reply_text = f"User {username_or_userid} has violation history:\n" | |
reply_text += "\n".join(f"{violation[0]}: {violation[1]}" for violation in violation_history) | |
else: | |
reply_text = f"User {username_or_userid} is clear." | |
# Send the reply to the user | |
await message.reply_text(reply_text) | |
from telegram.constants import ParseMode | |
async def get_approved_users_list(update: Update, context: CallbackContext): | |
approved_users_list = [] | |
for i, user_id in enumerate(get_all_users()): | |
user = await context.bot.get_chat(user_id) | |
if is_approved(user_id): | |
user_name = user.first_name | |
if user.username: | |
user_url = f"https://t.me/{user.username}" | |
else: | |
user_url = f"https://t.me/{user_id}" | |
user_link = f"{i+1}. [{user_name}]({user_url})" | |
approved_users_list.append(user_link) | |
if not approved_users_list: | |
await update.message.reply_text("❌ No approved users found.") | |
else: | |
approved_users_str = "\n".join(approved_users_list) | |
message = f"""``` ✨Approved Users✨ ``` | |
╭✠╼━━━━━━❖━━━━━━━✠━━━━━━ | |
│ | |
{approved_users_str} | |
│ | |
╰✠╼━━━━━━❖━━━━━━━✠━━━━━━ | |
💫Total Approved Users: {len(approved_users_list)} | |
""" | |
await update.message.reply_text(f""""{message}""", parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import tensorflow as tf | |
import tensorflow_hub as hub | |
from tensorflow import keras | |
import numpy as np | |
from PIL import Image | |
MODEL_PATH = os.path.join(os.path.dirname(__file__), 'nsfw_model', 'nsfw_mobilenet2.224x224.h5') | |
IMAGE_DIM = 224 | |
# Load Model | |
def load_model(): | |
return tf.keras.models.load_model(MODEL_PATH, custom_objects={'KerasLayer': hub.KerasLayer}, compile=False) | |
model = load_model() | |
def classify(model, image_path): | |
img = keras.preprocessing.image.load_img(image_path, target_size=(IMAGE_DIM, IMAGE_DIM)) | |
img = keras.preprocessing.image.img_to_array(img) / 255.0 | |
img = np.expand_dims(img, axis=0) | |
categories = ['drawings', 'hentai', 'neutral', 'porn', 'sexy'] | |
predictions = model.predict(img)[0] | |
os.remove(image_path) | |
return {category: float(predictions[i]) for i, category in enumerate(categories)} | |
def detect_nsfw(image_path): | |
return classify(model, image_path) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
from telegram import User | |
from config import MEDIA_DIR | |
# Set up logging | |
logging.basicConfig( | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
level=logging.INFO, | |
) | |
logger = logging.getLogger(__name__) | |
def mention_user(user: User): | |
"""Creates a clickable mention for a Telegram user.""" | |
return f"[{user.first_name}](tg://user?id={user.id})" | |
def clean_media_folder(): | |
"""Ensures media directory is clean before storing new files.""" | |
if not os.path.exists(MEDIA_DIR): | |
os.makedirs(MEDIA_DIR) | |
for file in os.listdir(MEDIA_DIR): | |
os.remove(os.path.join(MEDIA_DIR, file)) | |
def log_message(update, nsfw_category=None): | |
"""Logs incoming messages and NSFW detections.""" | |
user = update.message.from_user | |
chat = update.message.chat | |
content_type = update.message.effective_attachment | |
log_msg = (f"User: {user.id} | Chat: {chat.id} | " | |
f"Media Type: {content_type} | NSFW: {nsfw_category if nsfw_category else 'Safe'}") | |
logger.info(log_msg) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import cv2 | |
def capture_screenshot(video_path: str, output_dir: str = "screenshots"): | |
# Ensure the output directory exists | |
os.makedirs(output_dir, exist_ok=True) | |
# Load the video | |
vid_obj = cv2.VideoCapture(video_path) | |
fps = vid_obj.get(cv2.CAP_PROP_FPS) | |
if not fps or fps <= 0: | |
fps = 30 # Default to 30 FPS if unable to read | |
frame_interval = int(fps * 10) # Capture every 10 seconds | |
frames = [] | |
count = 0 | |
try: | |
while True: | |
success, frame = vid_obj.read() | |
if not success: | |
break | |
if count % frame_interval == 0: | |
frame_path = os.path.join(output_dir, f"screenshot_{count}.png") | |
cv2.imwrite(frame_path, frame) | |
frames.append(frame_path) | |
count += 1 | |
except Exception as e: | |
print(f"Error capturing screenshots: {e}") | |
finally: | |
vid_obj.release() | |
return frames | |
import subprocess | |
import os | |
def convert_webm_to_png(webm_path: str) -> str: | |
try: | |
png_path = os.path.splitext(webm_path)[0] + ".png" | |
command = [ | |
"ffmpeg", | |
"-i", webm_path, | |
"-vf", "fps=1", | |
"-vframes", "1", | |
png_path | |
] | |
subprocess.run(command, check=True) | |
return png_path if os.path.exists(png_path) else None | |
except Exception as e: | |
logging.error(f"Failed to convert webm to PNG: {e}") | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment