Skip to content

Instantly share code, notes, and snippets.

@Notookk
Created February 27, 2025 07:49
Show Gist options
  • Save Notookk/917bd7b37b419e397f922bafa77b0ffd to your computer and use it in GitHub Desktop.
Save Notookk/917bd7b37b419e397f922bafa77b0ffd to your computer and use it in GitHub Desktop.
import asyncio
import os
import re
import json
from typing import Union
import yt_dlp
from pyrogram.enums import MessageEntityType
from pyrogram.types import Message
from youtubesearchpython.__future__ import VideosSearch
from BrandrdXMusic.utils.database import is_on_off
from BrandrdXMusic.utils.formatters import time_to_seconds
import os
import glob
import random
import logging
def cookie_txt_file():
folder_path = f"{os.getcwd()}/cookies"
filename = f"{os.getcwd()}/cookies/logs.csv"
txt_files = glob.glob(os.path.join(folder_path, '*.txt'))
if not txt_files:
raise FileNotFoundError("No .txt files found in the specified folder.")
cookie_txt_file = random.choice(txt_files)
with open(filename, 'a') as file:
file.write(f'Choosen File : {cookie_txt_file}\n')
return f"""cookies/{str(cookie_txt_file).split("/")[-1]}"""
async def check_file_size(link):
async def get_format_info(link):
proc = await asyncio.create_subprocess_exec(
"yt-dlp",
"--cookies", cookie_txt_file(),
"-J",
link,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
print(f'Error:\n{stderr.decode()}')
return None
return json.loads(stdout.decode())
def parse_size(formats):
total_size = 0
for format in formats:
if 'filesize' in format:
total_size += format['filesize']
return total_size
info = await get_format_info(link)
if info is None:
return None
formats = info.get('formats', [])
if not formats:
print("No formats found.")
return None
total_size = parse_size(formats)
return total_size
async def shell_cmd(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
out, errorz = await proc.communicate()
if errorz:
if "unavailable videos are hidden" in (errorz.decode("utf-8")).lower():
return out.decode("utf-8")
else:
return errorz.decode("utf-8")
return out.decode("utf-8")
class YouTubeAPI:
def __init__(self):
self.base = "https://www.youtube.com/watch?v="
self.regex = r"(?:youtube\.com|youtu\.be)"
self.status = "https://www.youtube.com/oembed?url="
self.listbase = "https://youtube.com/playlist?list="
self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
async def exists(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if re.search(self.regex, link):
return True
else:
return False
async def url(self, message_1: Message) -> Union[str, None]:
messages = [message_1]
if message_1.reply_to_message:
messages.append(message_1.reply_to_message)
text = ""
offset = None
length = None
for message in messages:
if offset:
break
if message.entities:
for entity in message.entities:
if entity.type == MessageEntityType.URL:
text = message.text or message.caption
offset, length = entity.offset, entity.length
break
elif message.caption_entities:
for entity in message.caption_entities:
if entity.type == MessageEntityType.TEXT_LINK:
return entity.url
if offset in (None,):
return None
return text[offset : offset + length]
async def details(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
results = VideosSearch(link, limit=1)
for result in (await results.next())["result"]:
title = result["title"]
duration_min = result["duration"]
thumbnail = result["thumbnails"][0]["url"].split("?")[0]
vidid = result["id"]
if str(duration_min) == "None":
duration_sec = 0
else:
duration_sec = int(time_to_seconds(duration_min))
return title, duration_min, duration_sec, thumbnail, vidid
async def title(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
results = VideosSearch(link, limit=1)
for result in (await results.next())["result"]:
title = result["title"]
return title
async def duration(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
results = VideosSearch(link, limit=1)
for result in (await results.next())["result"]:
duration = result["duration"]
return duration
async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
results = VideosSearch(link, limit=1)
for result in (await results.next())["result"]:
thumbnail = result["thumbnails"][0]["url"].split("?")[0]
return thumbnail
async def video(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
proc = await asyncio.create_subprocess_exec(
"yt-dlp",
"--cookies",cookie_txt_file(),
"-g",
"-f",
"best[height<=?720][width<=?1280]",
f"{link}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if stdout:
return 1, stdout.decode().split("\n")[0]
else:
return 0, stderr.decode()
async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
if videoid:
link = self.listbase + link
if "&" in link:
link = link.split("&")[0]
playlist = await shell_cmd(
f"yt-dlp -i --get-id --flat-playlist --cookies {cookie_txt_file()} --playlist-end {limit} --skip-download {link}"
)
try:
result = playlist.split("\n")
for key in result:
if key == "":
result.remove(key)
except:
result = []
return result
async def track(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
results = VideosSearch(link, limit=1)
for result in (await results.next())["result"]:
title = result["title"]
duration_min = result["duration"]
vidid = result["id"]
yturl = result["link"]
thumbnail = result["thumbnails"][0]["url"].split("?")[0]
track_details = {
"title": title,
"link": yturl,
"vidid": vidid,
"duration_min": duration_min,
"thumb": thumbnail,
}
return track_details, vidid
async def formats(self, link: str, videoid: Union[bool, str] = None):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
ytdl_opts = {"quiet": True, "cookiefile" : cookie_txt_file()}
ydl = yt_dlp.YoutubeDL(ytdl_opts)
with ydl:
formats_available = []
r = ydl.extract_info(link, download=False)
for format in r["formats"]:
try:
str(format["format"])
except:
continue
if not "dash" in str(format["format"]).lower():
try:
format["format"]
format["filesize"]
format["format_id"]
format["ext"]
format["format_note"]
except:
continue
formats_available.append(
{
"format": format["format"],
"filesize": format["filesize"],
"format_id": format["format_id"],
"ext": format["ext"],
"format_note": format["format_note"],
"yturl": link,
}
)
return formats_available, link
async def slider(
self,
link: str,
query_type: int,
videoid: Union[bool, str] = None,
):
if videoid:
link = self.base + link
if "&" in link:
link = link.split("&")[0]
a = VideosSearch(link, limit=10)
result = (await a.next()).get("result")
title = result[query_type]["title"]
duration_min = result[query_type]["duration"]
vidid = result[query_type]["id"]
thumbnail = result[query_type]["thumbnails"][0]["url"].split("?")[0]
return title, duration_min, thumbnail, vidid
async def download(
self,
link: str,
mystic,
video: Union[bool, str] = None,
videoid: Union[bool, str] = None,
songaudio: Union[bool, str] = None,
songvideo: Union[bool, str] = None,
format_id: Union[bool, str] = None,
title: Union[bool, str] = None,
) -> str:
if videoid:
link = self.base + link
loop = asyncio.get_running_loop()
def audio_dl():
ydl_optssx = {
"format": "bestaudio/best",
"outtmpl": "downloads/%(id)s.%(ext)s",
"geo_bypass": True,
"nocheckcertificate": True,
"quiet": True,
"cookiefile" : cookie_txt_file(),
"no_warnings": True,
}
x = yt_dlp.YoutubeDL(ydl_optssx)
info = x.extract_info(link, False)
xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}")
if os.path.exists(xyz):
return xyz
x.download([link])
return xyz
def video_dl():
ydl_optssx = {
"format": "(bestvideo[height<=?720][width<=?1280][ext=mp4])+(bestaudio[ext=m4a])",
"outtmpl": "downloads/%(id)s.%(ext)s",
"geo_bypass": True,
"nocheckcertificate": True,
"quiet": True,
"cookiefile" : cookie_txt_file(),
"no_warnings": True,
}
x = yt_dlp.YoutubeDL(ydl_optssx)
info = x.extract_info(link, False)
xyz = os.path.join("downloads", f"{info['id']}.{info['ext']}")
if os.path.exists(xyz):
return xyz
x.download([link])
return xyz
def song_video_dl():
formats = f"{format_id}+140"
fpath = f"downloads/{title}"
ydl_optssx = {
"format": formats,
"outtmpl": fpath,
"geo_bypass": True,
"nocheckcertificate": True,
"quiet": True,
"no_warnings": True,
"cookiefile" : cookie_txt_file(),
"prefer_ffmpeg": True,
"merge_output_format": "mp4",
}
x = yt_dlp.YoutubeDL(ydl_optssx)
x.download([link])
def song_audio_dl():
fpath = f"downloads/{title}.%(ext)s"
ydl_optssx = {
"format": format_id,
"outtmpl": fpath,
"geo_bypass": True,
"nocheckcertificate": True,
"quiet": True,
"no_warnings": True,
"cookiefile" : cookie_txt_file(),
"prefer_ffmpeg": True,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
}
x = yt_dlp.YoutubeDL(ydl_optssx)
x.download([link])
if songvideo:
await loop.run_in_executor(None, song_video_dl)
fpath = f"downloads/{title}.mp4"
return fpath
elif songaudio:
await loop.run_in_executor(None, song_audio_dl)
fpath = f"downloads/{title}.mp3"
return fpath
elif video:
if await is_on_off(1):
direct = True
downloaded_file = await loop.run_in_executor(None, video_dl)
else:
proc = await asyncio.create_subprocess_exec(
"yt-dlp",
"--cookies",cookie_txt_file(),
"-g",
"-f",
"best[height<=?720][width<=?1280]",
f"{link}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if stdout:
downloaded_file = stdout.decode().split("\n")[0]
direct = False
else:
direct = True
downloaded_file = await loop.run_in_executor(None, video_dl)
else:
direct = True
downloaded_file = await loop.run_in_executor(None, audio_dl)
return downloaded_file, direct
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment