Created
April 2, 2025 03:04
-
-
Save Notookk/21130fd5c3796ec5cb35a52b62bf1b55 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
# Bot credentials | |
API_ID = '25193832' | |
API_HASH = 'e154b1ccb0195edec0bc91ae7efebc2f' | |
# MongoDB connection string | |
DB_URI = 'mongodb+srv://copy:[email protected]/?retryWrites=true&w=majority&appName=Cluster0' | |
LOG_CHANNEL = '-1002240372506' | |
TOKEN = "7488527811:AAEPMstK_YmEXlB2nOhaBYElaQimhoiYXQc" | |
OWNER_ID = 7379318591 # Change to actual owner ID | |
ALERT_CHANNEL_ID = "-1002469829347" | |
MEDIA_DIR = "../media" | |
DB_PATH = "nsfw_bot.db" | |
AUTH_USERS = [7379318591] | |
DB_NAME = "BroadcastBot" | |
BROADCAST_AS_COPY = True | |
BOT_OWNER_ID = 7379318591 | |
OWNER_IDS = [7379318591, 6656608288, 6545754981] # Add as many as you want! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import ffmpeg | |
import imageio | |
import numpy as np | |
from PIL import Image | |
from lottie import parsers | |
from telegram import constants | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, CallbackContext | |
from config import TOKEN, OWNER_ID, ALERT_CHANNEL_ID, MEDIA_DIR | |
from database import is_approved, update_violations, add_approved_user, remove_approved_user, get_user_violations, get_all_users | |
from .predict import detect_nsfw | |
from database.database import init_db | |
# Ensure media directory exists | |
os.makedirs(MEDIA_DIR, exist_ok=True) | |
# Setup database | |
init_db() | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# ✅ Convert WebP static stickers to PNG | |
def convert_webp_to_png(file_path): | |
try: | |
png_path = file_path.replace(".webp", ".png") | |
img = Image.open(file_path).convert("RGB") | |
img.save(png_path, "PNG") | |
return png_path | |
except Exception as e: | |
logger.error(f"Error converting WebP to PNG: {e}") | |
return None | |
# ✅ Extract frame from animated WEBM stickers | |
def extract_frame_from_webm(input_path): | |
"""Extracts the first frame from a .webm video file and saves it as a .jpg image.""" | |
output_path = input_path.replace(".webm", ".jpg") # Set output as JPG | |
try: | |
reader = imageio.get_reader(input_path, format="webm") | |
frame = reader.get_next_data() # Get the first frame | |
reader.close() | |
# Convert frame to numpy array (ensure it's writable) | |
frame = np.array(frame, dtype=np.uint8) | |
# Save frame as JPEG | |
imageio.imwrite(output_path, frame, format="JPEG") | |
return output_path | |
except Exception as e: | |
print(f"Error extracting frame from WEBM: {e}") | |
return None | |
# ✅ Convert TGS stickers (Lottie) to PNG | |
def convert_tgs_to_png(file_path): | |
output_path = file_path.replace(".tgs", ".png") | |
try: | |
animation = parse_tgs(file_path) | |
frame = animation.render_frame(0) | |
frame.save(output_path, "PNG") | |
return output_path | |
except Exception as e: | |
logger.error(f"Error converting TGS to PNG: {e}") | |
return None | |
# ✅ Handle all media (images, videos, stickers, GIFs, animated stickers) | |
async def handle_media(update: Update, context: CallbackContext): | |
"""Handles all media types and scans for NSFW content.""" | |
user = update.message.from_user | |
chat_id = update.message.chat_id | |
media_files = update.message.effective_attachment | |
if is_approved(user.id): | |
return # Skip NSFW scan for approved users | |
if not isinstance(media_files, (list, tuple)): | |
media_files = [media_files] | |
for file in media_files: | |
if not hasattr(file, "file_id"): | |
continue | |
file_obj = await context.bot.get_file(file.file_id) | |
file_path = os.path.join(MEDIA_DIR, f"{user.id}_{file.file_id}") | |
await file_obj.download_to_drive(file_path) | |
if not os.path.exists(file_path): | |
logger.error(f"Downloaded file missing: {file_path}") | |
continue | |
# Handle Stickers Separately | |
if update.message.sticker: | |
if update.message.sticker.is_animated: | |
# Convert `.tgs` animated sticker to PNG | |
new_path = convert_tgs_to_png(file_path) | |
elif update.message.sticker.is_video: | |
# Convert `.webm` video sticker to an image frame | |
new_path = extract_frame_from_webm(file_path) | |
else: | |
# Convert `.webp` static sticker to PNG | |
new_path = convert_webp_to_png(file_path) | |
if new_path: | |
file_path = new_path | |
else: | |
continue | |
# Run NSFW detection | |
result = detect_nsfw(file_path) | |
if result is None: | |
continue | |
max_category = max(result, key=result.get) | |
if max_category in ["porn", "sexy", "hentai"]: | |
await update.message.delete() | |
update_violations(user.id, max_category) | |
alert_message = f""" | |
╭───────────────── | |
╰──●𝙽𝚂𝙵𝚆 𝙳𝙴𝚃𝙴𝙲𝚃𝙴𝙳 🔞 | |
╭✠╼━━━━━━❖━━━━━━━✠╮ | |
│☾𝚄𝚜𝚎𝚛: {user.id} | |
│☾𝚄𝚜𝚎𝚛𝚗𝚊𝚖𝚎: @{user.username if user.username else 'None'} | |
│☾𝙳𝚎𝚝𝚊𝚒𝚕𝚜: | |
│☾𝙳𝚛𝚊𝚠𝚒𝚗𝚐𝚜: {result['drawings']:.2f} | |
│☾𝙽𝚎𝚞𝚝𝚛𝚊𝚕: {result['neutral']:.2f} | |
│☾𝙿𝚘𝚛𝚗: {result['porn']:.2f} | |
│☾𝙷𝚎𝚗𝚝𝚊𝚒: {result['hentai']:.2f} | |
│☾𝚂𝚎𝚡𝚢: {result['sexy']:.2f} | |
╰✠╼━━━━━━❖━━━━━━━✠╯""" | |
await context.bot.send_message(chat_id, alert_message, parse_mode="Markdown") | |
channel_alert = f""" | |
NSFW DETECTED 🔞 | |
User: {user.id} | |
Username: @{user.username if user.username else 'None'} | |
Details: | |
Drawings: {result['drawings']:.2f} | |
Neutral: {result['neutral']:.2f} | |
Porn: {result['porn']:.2f} | |
Hentai: {result['hentai']:.2f} | |
Sexy: {result['sexy']:.2f} | |
Chat: {chat_id} | |
""" | |
keyboard = [[InlineKeyboardButton("👤 View Profile", url=f"tg://user?id={user.id}")]] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await context.bot.send_message(ALERT_CHANNEL_ID, channel_alert, reply_markup=reply_markup) | |
# Delete the scanned file | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
else: | |
logger.warning(f"File not found for deletion: {file_path}") | |
# ✅ Owner Commands | |
async def add_approved(update: Update, context: CallbackContext): | |
if update.message.from_user.id != OWNER_ID: | |
await update.message.reply_text("» ᴀᴡᴡ, ᴛʜɪs ɪs ɴᴏᴛ ғᴏʀ ʏᴏᴜ ʙᴀʙʏ.") | |
return | |
try: | |
user_id = int(context.args[0]) | |
add_approved_user(user_id) | |
await update.message.reply_text(f"✅ User {user_id} added to approved list.") | |
except (IndexError, ValueError): | |
await update.message.reply_text("❌ Usage: /approve <user_id>") | |
async def remove_approved(update: Update, context: CallbackContext): | |
if update.message.from_user.id != OWNER_ID: | |
await update.message.reply_text("» ᴀᴡᴡ, ᴛʜɪs ɪs ɴᴏᴛ ғᴏʀ ʏᴏᴜ ʙᴀʙʏ.") | |
return | |
try: | |
user_id = int(context.args[0]) | |
remove_approved_user(user_id) | |
await update.message.reply_text(f"❌ User {user_id} removed from approved list.") | |
except (IndexError, ValueError): | |
await update.message.reply_text("❌ Usage: /remove <user_id>") | |
async def my_info(update: Update, context: CallbackContext): | |
user_id = update.message.from_user.id | |
violations = get_user_violations(user_id) | |
if not violations: | |
await update.message.reply_text("✅ You have a clean record.") | |
else: | |
info = "📊 **NSFW Violation History**\n" | |
for category, count in violations: | |
info += f"🔸 {category}: {count} times\n" | |
await update.message.reply_text(info, parse_mode="Markdown") | |
async def user_info(update: Update, context: CallbackContext): | |
message = update.message | |
args = context.args | |
if len(args) == 0: | |
username_or_userid = message.from_user.id | |
else: | |
username_or_userid = args[0] | |
violation_history = get_user_violations(username_or_userid) | |
if violation_history: | |
reply_text = f"User {username_or_userid} has violation history:\n" | |
reply_text += "\n".join(f"{violation[0]}: {violation[1]}" for violation in violation_history) | |
else: | |
reply_text = f"User {username_or_userid} is clear." | |
# Send the reply to the user | |
await message.reply_text(reply_text) | |
from telegram.constants import ParseMode | |
async def get_approved_users_list(update: Update, context: CallbackContext): | |
approved_users_list = [] | |
for i, user_id in enumerate(get_all_users()): | |
user = await context.bot.get_chat(user_id) | |
if is_approved(user_id): | |
user_name = user.first_name | |
if user.username: | |
user_url = f"https://t.me/{user.username}" | |
else: | |
user_url = f"https://t.me/{user_id}" | |
user_link = f"{i+1}. [{user_name}]({user_url})" | |
approved_users_list.append(user_link) | |
if not approved_users_list: | |
await update.message.reply_text("❌ No approved users found.") | |
else: | |
approved_users_str = "\n".join(approved_users_list) | |
message = f"""``` ✨Approved Users✨ ``` | |
╭✠╼━━━━━━❖━━━━━━━✠━━━━━━ | |
│ | |
{approved_users_str} | |
│ | |
╰✠╼━━━━━━❖━━━━━━━✠━━━━━━ | |
💫Total Approved Users: {len(approved_users_list)} | |
""" | |
await update.message.reply_text(f""""{message}""", parse_mode=ParseMode.MARKDOWN, disable_web_page_preview=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import tensorflow as tf | |
import tensorflow_hub as hub | |
from tensorflow import keras | |
import numpy as np | |
from PIL import Image | |
MODEL_PATH = os.path.join(os.path.dirname(__file__), 'nsfw_model', 'nsfw_mobilenet2.224x224.h5') | |
IMAGE_DIM = 224 | |
# Load Model | |
def load_model(): | |
return tf.keras.models.load_model(MODEL_PATH, custom_objects={'KerasLayer': hub.KerasLayer}, compile=False) | |
model = load_model() | |
def classify(model, image_path): | |
img = keras.preprocessing.image.load_img(image_path, target_size=(IMAGE_DIM, IMAGE_DIM)) | |
img = keras.preprocessing.image.img_to_array(img) / 255.0 | |
img = np.expand_dims(img, axis=0) | |
categories = ['drawings', 'hentai', 'neutral', 'porn', 'sexy'] | |
predictions = model.predict(img)[0] | |
os.remove(image_path) | |
return {category: float(predictions[i]) for i, category in enumerate(categories)} | |
def detect_nsfw(image_path): | |
return classify(model, image_path) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nsfw_bot/ | |
├── main.py # Main entry point of the bot | |
├── database/ | |
│ ├── __init__.py # Initialization for database package | |
│ └── db.py # MongoDB database operations | |
├── handlers/ | |
│ ├── __init__.py # Initialization for handlers package | |
│ ├── nsfw_handler.py # NSFW detection logic | |
│ └── stats_handler.py # Statistics handler | |
├── models/ | |
│ ├── __init__.py # Initialization for models package | |
│ └── nsfw_model.py # Model and processor loading | |
├── utils/ | |
│ ├── __init__.py # Initialization for utils package | |
│ └── video_utils.py # Utility functions for handling video frames | |
├── requirements.txt # Dependencies list | |
└── config.py # Configuration file for constants like API tokens, DB URL, etc. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
from telegram import User | |
from config import MEDIA_DIR | |
# Set up logging | |
logging.basicConfig( | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
level=logging.INFO, | |
) | |
logger = logging.getLogger(__name__) | |
def mention_user(user: User): | |
"""Creates a clickable mention for a Telegram user.""" | |
return f"[{user.first_name}](tg://user?id={user.id})" | |
def clean_media_folder(): | |
"""Ensures media directory is clean before storing new files.""" | |
if not os.path.exists(MEDIA_DIR): | |
os.makedirs(MEDIA_DIR) | |
for file in os.listdir(MEDIA_DIR): | |
os.remove(os.path.join(MEDIA_DIR, file)) | |
def log_message(update, nsfw_category=None): | |
"""Logs incoming messages and NSFW detections.""" | |
user = update.message.from_user | |
chat = update.message.chat | |
content_type = update.message.effective_attachment | |
log_msg = (f"User: {user.id} | Chat: {chat.id} | " | |
f"Media Type: {content_type} | NSFW: {nsfw_category if nsfw_category else 'Safe'}") | |
logger.info(log_msg) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import cv2 | |
def capture_screenshot(video_path: str, output_dir: str = "screenshots"): | |
# Ensure the output directory exists | |
os.makedirs(output_dir, exist_ok=True) | |
# Load the video | |
vid_obj = cv2.VideoCapture(video_path) | |
fps = vid_obj.get(cv2.CAP_PROP_FPS) | |
if not fps or fps <= 0: | |
fps = 30 # Default to 30 FPS if unable to read | |
frame_interval = int(fps * 10) # Capture every 10 seconds | |
frames = [] | |
count = 0 | |
try: | |
while True: | |
success, frame = vid_obj.read() | |
if not success: | |
break | |
if count % frame_interval == 0: | |
frame_path = os.path.join(output_dir, f"screenshot_{count}.png") | |
cv2.imwrite(frame_path, frame) | |
frames.append(frame_path) | |
count += 1 | |
except Exception as e: | |
print(f"Error capturing screenshots: {e}") | |
finally: | |
vid_obj.release() | |
return frames | |
import subprocess | |
import os | |
def convert_webm_to_png(webm_path: str) -> str: | |
try: | |
png_path = os.path.splitext(webm_path)[0] + ".png" | |
command = [ | |
"ffmpeg", | |
"-i", webm_path, | |
"-vf", "fps=1", | |
"-vframes", "1", | |
png_path | |
] | |
subprocess.run(command, check=True) | |
return png_path if os.path.exists(png_path) else None | |
except Exception as e: | |
logging.error(f"Failed to convert webm to PNG: {e}") | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment