Last active
March 21, 2025 12:27
-
-
Save Lulalaby/5e8ec692f53eea35bea14323074fe722 to your computer and use it in GitHub Desktop.
Script to export files and medias from a telegram chat
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Telegram Message Downloader | |
--------------------------------------------------------------------------- | |
This script downloads all media from your 'Saved Messages' or any other chat on Telegram. | |
It ensures that multiple media files in a single message (like albums) are downloaded properly. | |
Obtain your api id and hash at https://my.telegram.org/apps. | |
Usage: | |
python telegram_downloader_fixed.py --api-id 123456 --api-hash abcdef123456 --drives "C:\\,D:\\,E:\\" --min-free 5 | |
Features: | |
- Sequential downloads (avoid connection overhead). | |
- Handles multiple media files per message (albums). | |
- Automatically switches drives when space is low. | |
- Rich progress bar with speed, filename, and percentage. | |
- Pause (P) and Resume (R) support. | |
Author: Lala Sabathil | |
License: MIT | |
""" | |
import os | |
import sys | |
import argparse | |
import shutil | |
import time | |
import asyncio | |
import keyboard | |
from telethon import TelegramClient | |
from rich.progress import Progress, BarColumn, TimeElapsedColumn, TransferSpeedColumn, TextColumn | |
import threading | |
paused = False | |
def parse_args(): | |
"""Parses command-line arguments.""" | |
parser = argparse.ArgumentParser(description="Download all media from Telegram Saved Messages.") | |
parser.add_argument("--api-id", required=True, type=int, help="Telegram API ID (from my.telegram.org).") | |
parser.add_argument("--api-hash", required=True, type=str, help="Telegram API Hash (from my.telegram.org).") | |
parser.add_argument("--drives", type=str, default="C:\\", help="Comma-separated list of drives.") | |
parser.add_argument("--min-free", type=int, default=5, help="Minimum free space (GB) before switching drives.") | |
parser.add_argument("--session-name", type=str, default="saved_media_downloader", help="Telethon session name.") | |
parser.add_argument("--chat-id", type=str, default='me', help="The target chat id, defaults to 'me'") | |
return parser.parse_args() | |
def format_size(num_bytes: float) -> str: | |
"""Converts bytes to a human-readable format (e.g., MB, GB).""" | |
for unit in ["B", "KB", "MB", "GB", "TB"]: | |
if num_bytes < 1024.0: | |
return f"{num_bytes:.2f} {unit}" | |
num_bytes /= 1024.0 | |
return f"{num_bytes:.2f} PB" | |
def get_available_drive(drives_list, min_free_space_gb): | |
"""Returns the first drive with enough space. If none are found, returns None.""" | |
for drive in drives_list: | |
drive = drive.strip() | |
total, used, free = shutil.disk_usage(drive) | |
free_gb = free / (1024 ** 3) | |
if free_gb > min_free_space_gb: | |
return drive | |
return None | |
def get_proper_filename(message, media_index=None): | |
""" | |
Generates a filename for media. Handles multiple media in a single message using an index, | |
and includes the message ID to ensure uniqueness across albums with identical doc attributes. | |
Format: YYYY-mm-dd_HH-MM-SS_msgId[_index]_[original_name or extension] | |
""" | |
msg_date = message.date.strftime("%Y-%m-%d_%H-%M-%S") | |
# message.id ensures uniqueness even if album items share doc attributes | |
base_part = f"{msg_date}_{message.id}" | |
index_part = f"_{media_index}" if media_index is not None else "" | |
if message.document and message.document.attributes: | |
for attr in message.document.attributes: | |
if hasattr(attr, 'file_name'): | |
return f"{base_part}{index_part}_{attr.file_name}" | |
if message.photo: | |
return f"{base_part}{index_part}.jpg" | |
if message.video: | |
return f"{base_part}{index_part}.mp4" | |
if message.audio: | |
return f"{base_part}{index_part}.ogg" | |
return f"{base_part}{index_part}.bin" | |
def check_for_pause_or_resume(e): | |
"""Checks if the user pressed 'P' to pause or 'R' to resume.""" | |
global paused | |
if e.name == 'p': | |
paused = True | |
print("\nDownload paused. Press 'R' to resume.") | |
elif e.name == 'r': | |
paused = False | |
print("\nDownload resumed.") | |
def listen_for_keypresses(): | |
"""Listens for keypresses and controls pausing/resuming.""" | |
keyboard.on_press(check_for_pause_or_resume) | |
keyboard.wait() | |
async def download_media(message, drives_list, min_free_space_gb, task_id, progress): | |
"""Handles downloading all media in a single Telegram message (including multiple files).""" | |
global paused | |
if not message.media: | |
return | |
try: | |
drive = get_available_drive(drives_list, min_free_space_gb) | |
if not drive: | |
print("\nNo available drive with sufficient space!") | |
return | |
folder = os.path.join( | |
drive, | |
"TelegramDownloads", | |
message.date.strftime("%Y"), | |
message.date.strftime("%m"), | |
message.date.strftime("%d") | |
) | |
os.makedirs(folder, exist_ok=True) | |
media_items = [] | |
if hasattr(message.media, "grouped_id"): | |
album_messages = await message.client.get_messages(message.chat_id, grouped_id=message.media.grouped_id) | |
media_items = [(m, i) for i, m in enumerate(album_messages) if m.media] | |
else: | |
media_items.append((message, None)) | |
for media, index in media_items: | |
file_name = get_proper_filename(media, media_index=index) | |
file_path = os.path.join(folder, file_name) | |
progress.update(task_id, filename=file_name, total=1, completed=0) | |
start_time = time.time() | |
downloaded_path = await media.download_media( | |
file=file_path, | |
progress_callback=lambda current, total: progress.update(task_id, completed=current, total=total) | |
) | |
if paused: | |
print("\nWaiting for resume...") | |
while paused: | |
await asyncio.sleep(1) | |
elapsed = time.time() - start_time | |
if downloaded_path and os.path.exists(downloaded_path): | |
file_size_bytes = os.path.getsize(downloaded_path) | |
if file_size_bytes > 0: | |
file_size = format_size(file_size_bytes) | |
speed = file_size_bytes / elapsed if elapsed > 0 else 0 | |
speed_str = format_size(speed) + "/s" | |
print(f"\nDownloaded: {downloaded_path} | Size: {file_size} | Time: {elapsed:.2f}s | Speed: {speed_str}") | |
except Exception as e: | |
print(f"\nError downloading media from message {message.id}: {e}") | |
async def main(): | |
args = parse_args() | |
client = TelegramClient(args.session_name, args.api_id, args.api_hash) | |
await client.start() | |
drives_list = args.drives.split(",") | |
target = await client.get_entity(args.chat_id) | |
print("Starting sequential download from your account...") | |
print("\nPress 'p' to pause the download process after a file finished downloading, and 'r' to resume it!") | |
listener_thread = threading.Thread(target=listen_for_keypresses, daemon=True) | |
listener_thread.start() | |
with Progress( | |
TextColumn("{task.fields[filename]}"), | |
BarColumn(), | |
"[progress.percentage]{task.percentage:>3.0f} %", | |
TimeElapsedColumn(), | |
TransferSpeedColumn(), | |
) as progress: | |
task_id = progress.add_task("Downloading", total=1, filename="Preparing...") | |
async for message in client.iter_messages(target, reverse=True): | |
await download_media(message, drives_list, args.min_free, task_id, progress) | |
print("\nAll media downloads completed!") | |
await client.disconnect() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Comments are disabled for this gist.