Skip to content

Instantly share code, notes, and snippets.

@4piu
Created November 1, 2025 16:25
Show Gist options
  • Select an option

  • Save 4piu/6afce08d247ea99a605ac8a92b6ec4c3 to your computer and use it in GitHub Desktop.

Select an option

Save 4piu/6afce08d247ea99a605ac8a92b6ec4c3 to your computer and use it in GitHub Desktop.
Download Telegram chats medias with iyear/tdl
#!/usr/bin/env python3
"""
Sync Telegram chats using tdl (Telegram Downloader).
This script fetches the chat list, processes channels and groups,
exports chat data, and manages download folders.
"""
import json
import sys
import re
from datetime import datetime
from pathlib import Path
from utils import check_dependencies, run_command, setup_logging
# Configuration
script_dir = Path(__file__).parent.resolve()
DOWNLOAD_DIR = script_dir / "downloads"
CHAT_DIR = script_dir / "chat"
LOG_DIR = script_dir / "logs"
TMP_DIR = script_dir / "tmp"
CHAT_MAPPING_FILE = CHAT_DIR / "chat_mapping.txt"
CHAT_TIMESTAMPS_FILE = CHAT_DIR / "chat_timestamps.json"
unix_ts = int(datetime.now().timestamp())
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
ERROR_LOG = LOG_DIR / f"sync-error {timestamp}.log"
STANDARD_LOG = LOG_DIR / f"sync {timestamp}.log"
def escape_filename(name):
"""Escape visible_name to be safe for use as a file path."""
# Replace invalid characters for filenames (e.g., /, \, :, etc.)
name = re.sub(r"[^\w\s-]", "_", name) # Replace special chars with _
return name.strip("_").strip()
def fetch_chat_list():
"""Fetch the chat list from tdl and return as JSON."""
logger.info("Fetching chat list from Telegram...")
result = run_command(
["tdl", "chat", "ls", "-o", "json"], capture_output=True, display_output=False
)
if result.returncode != 0:
logger.error(
f"Failed to fetch chat list. Exit code: {result.returncode}\n{result.stdout}"
)
sys.exit(1)
try:
chats = json.loads(result.stdout)
return chats
except json.JSONDecodeError as e:
logger.error(f"Failed to parse chat list JSON: {e}")
sys.exit(1)
def filter_chats(chats):
"""Filter chats to only include channels and groups."""
filtered = [chat for chat in chats if chat.get("type") in ["channel", "group"]]
logger.info(
f"Found {len(filtered)} channels/groups out of {len(chats)} total chats."
)
return filtered
def rename_folder_if_needed(download_dir, chat_id, escaped_name):
"""Handle folder renaming for changed chat names."""
download_base = Path(download_dir)
# Find existing folder with same chat_id but different name
if download_base.exists():
for folder in download_base.iterdir():
if folder.is_dir() and folder.name.startswith(f"[{chat_id}] "):
expected_name = f"[{chat_id}] {escaped_name}"
if folder.name != expected_name:
new_path = download_base / expected_name
if new_path.exists():
logger.info(
f"Target folder '{new_path}' already exists. Skipping rename."
)
else:
logger.warning(
f"Renaming folder '{folder.name}' to '{expected_name}'"
)
try:
folder.rename(new_path)
except Exception as e:
logger.error(f"Failed to rename folder: {e}")
break
def export_chat(chat_id, escaped_name, json_output, ts_begin=None, ts_end=None):
"""Export chat data using tdl."""
logger.info(f"Exporting chat data: [{chat_id}] {escaped_name}")
command = ["tdl", "chat", "export", "-c", str(chat_id), "-o", json_output]
if ts_begin and not ts_end:
ts_end = unix_ts
if ts_begin and ts_end:
command += ["-T", "time", "-i", f"{ts_begin},{ts_end}"]
result = run_command(
command,
capture_output=False,
display_output=True,
)
if result.returncode != 0:
logger.error(f"Failed to export chat [{chat_id}] {escaped_name}. Skipping.")
return False
return True
def load_chat_timestamps():
"""Load per-chat last sync timestamps from JSON file."""
if CHAT_TIMESTAMPS_FILE.exists():
try:
with open(CHAT_TIMESTAMPS_FILE, "r") as f:
timestamps = json.load(f)
logger.info(f"Loaded timestamps for {len(timestamps)} chats")
return timestamps
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load chat timestamps: {e}")
return {}
else:
logger.info("No chat timestamps file found. Will create new one.")
return {}
def save_chat_timestamps(timestamps):
"""Save per-chat last sync timestamps to JSON file."""
try:
with open(CHAT_TIMESTAMPS_FILE, "w") as f:
json.dump(timestamps, f, indent=2)
logger.info(f"Saved timestamps for {len(timestamps)} chats")
return True
except IOError as e:
logger.error(f"Failed to save chat timestamps: {e}")
return False
def get_chat_last_sync(chat_timestamps, chat_id):
"""Get the last sync timestamp for a specific chat."""
chat_id_str = str(chat_id)
if chat_id_str in chat_timestamps:
ts = chat_timestamps[chat_id_str]
logger.info(
f"Chat {chat_id} last synced at {ts} ({datetime.fromtimestamp(ts)})"
)
return ts
else:
logger.info(
f"Chat {chat_id} has no previous sync timestamp. Will perform full sync."
)
return None
def merge_chat_jsons(original_json, new_json, output_json):
"""Merge new chat messages with existing ones, avoiding duplicates by message ID."""
try:
# Load original data
if Path(original_json).exists():
with open(original_json, "r") as f:
original_data = json.load(f)
else:
logger.info(f"No existing data for {original_json}. Using new data only.")
original_data = {"messages": []}
# Load new data
with open(new_json, "r") as f:
new_data = json.load(f)
# Create a set of existing message IDs for fast lookup
existing_ids = {
msg.get("id") for msg in original_data.get("messages", []) if msg.get("id")
}
# Add new messages that don't already exist
original_messages = original_data.get("messages", [])
new_messages = new_data.get("messages", [])
added_count = 0
for msg in new_messages:
msg_id = msg.get("id")
if msg_id and msg_id not in existing_ids:
original_messages.append(msg)
existing_ids.add(msg_id)
added_count += 1
# Sort messages by ID to maintain order
original_messages.sort(key=lambda x: x.get("id", 0))
# Update the original data
original_data["messages"] = original_messages
# Copy over other metadata from new data if present
for key in new_data:
if key != "messages":
original_data[key] = new_data[key]
# Write merged data to output
with open(output_json, "w") as f:
json.dump(original_data, f, indent=2, ensure_ascii=False)
logger.info(
f"Merged {added_count} new messages. Total messages: {len(original_messages)}"
)
return True
except Exception as e:
logger.error(f"Failed to merge chat JSONs: {e}")
return False
def main():
"""Main sync process."""
print("=" * 60)
print("Telegram Chat Sync Script")
print("=" * 60)
# Check dependencies
check_dependencies(["tdl", "python3"])
# Create necessary directories
DOWNLOAD_DIR.mkdir(exist_ok=True)
CHAT_DIR.mkdir(exist_ok=True)
TMP_DIR.mkdir(exist_ok=True)
# Load per-chat timestamps
chat_timestamps = load_chat_timestamps()
# Fetch and filter chat list
chats = fetch_chat_list()
filtered_chats = filter_chats(chats)
if not filtered_chats:
logger.warning("No channels or groups found to sync.")
return
logger.info(f"Processing {len(filtered_chats)} chats...")
# Process each chat
successful_chats = []
updated_timestamps = chat_timestamps.copy()
for i, chat in enumerate(filtered_chats, 1):
chat_id = chat.get("id")
visible_name = chat.get("visible_name", "unnamed_chat")
escaped_name = escape_filename(visible_name)
logger.info(
f"\n[{i}/{len(filtered_chats)}] Processing chat: {escaped_name} (ID: {chat_id})"
)
# Define output paths
json_output = CHAT_DIR / f"{chat_id}.json"
tmp_json_output = TMP_DIR / f"{chat_id}.json"
filtered_json = CHAT_DIR / f"{chat_id}_filtered.json"
download_dir = DOWNLOAD_DIR / f"[{chat_id}] {escaped_name}"
# Handle folder renaming
rename_folder_if_needed(DOWNLOAD_DIR, chat_id, escaped_name)
# Get last sync timestamp for this specific chat
chat_last_sync = get_chat_last_sync(chat_timestamps, chat_id)
# Export chat data (to TMP_DIR if incremental, otherwise to CHAT_DIR)
export_target = str(tmp_json_output) if chat_last_sync else str(json_output)
if export_chat(
chat_id,
escaped_name,
export_target,
ts_begin=chat_last_sync,
ts_end=unix_ts,
):
# If incremental sync, merge with existing data
if chat_last_sync:
if merge_chat_jsons(
str(json_output), str(tmp_json_output), str(json_output)
):
logger.info(
f"Successfully merged incremental data for chat {chat_id}"
)
else:
logger.error(f"Failed to merge data for chat {chat_id}")
# Save successful chat mapping
successful_chats.append((chat_id, escaped_name))
# Update timestamp for this chat
updated_timestamps[str(chat_id)] = unix_ts
print("-" * 60)
# Write new chat mapping
with open(CHAT_MAPPING_FILE, "w") as f:
for chat_id, escaped_name in successful_chats:
f.write(f"{chat_id}\t{escaped_name}\n")
# Save updated per-chat timestamps
if successful_chats:
save_chat_timestamps(updated_timestamps)
# Summary
print("\n" + "=" * 60 + "\nSync Complete!")
print(f"Total chats processed: {len(filtered_chats)}")
print(f"Successfully exported: {len(successful_chats)}")
print(f"Failed: {len(filtered_chats) - len(successful_chats)}")
print(f"\nChat data saved to: {CHAT_DIR}")
print(f"Chat mapping saved to: {CHAT_MAPPING_FILE}")
print(f"Chat timestamps saved to: {CHAT_TIMESTAMPS_FILE}")
if chat_timestamps:
print(f"Current sync: {datetime.fromtimestamp(unix_ts)}")
if (len(filtered_chats) - len(successful_chats)) > 0:
print(f"Error log: {ERROR_LOG}")
print("=" * 60)
if __name__ == "__main__":
LOG_DIR.mkdir(exist_ok=True)
logger = setup_logging(ERROR_LOG, STANDARD_LOG)
try:
main()
except KeyboardInterrupt:
logger.info("\n\nExited!")
sys.exit(0)
except Exception as e:
logger.error(f"\nUnexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
#!/usr/bin/env python3
"""
Filter message lists for all downloaded chats.
Extracts chat_id from directory names in downloads/ and filters messages
based on whether their files have already been downloaded.
"""
import json
import os
import re
import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from utils import setup_logging
# Configuration
script_dir = Path(__file__).parent.resolve()
DOWNLOADS_DIR = script_dir / "downloads"
CHAT_DIR = script_dir / "chat"
LOG_DIR = script_dir / "logs"
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
ERROR_LOG = LOG_DIR / f"filter-error {timestamp}.log"
STANDARD_LOG = LOG_DIR / f"filter {timestamp}.log"
def escape_filename(name):
"""Escape filename to match Go filenamify (https://github.com/flytam/filenamify)."""
if not name:
return None
# Replacement character (matches Go default)
replacement = "!"
# Regex patterns from filenamify
re_control_chars = re.compile(r"[\x00-\x1f\x80-\x9f]")
re_relative_path = re.compile(r"^\.+")
re_reserved = re.compile(r'[<>:"/\\|?*\x00-\x1F]')
re_windows_reserved = re.compile(
r"^(con|prn|aux|nul|com[0-9]|lpt[0-9])$", re.IGNORECASE
)
# Apply replacements
str_ = re_reserved.sub(replacement, name)
str_ = re_control_chars.sub(replacement, str_)
str_ = re_relative_path.sub(replacement, str_)
# Trim repeated replacement characters
re_repeated = re.compile(r"(?:" + re.escape(replacement) + r"){2,}")
str_ = re_repeated.sub(replacement, str_)
# Strip outer replacement characters
re_outer = re.compile(
r"^" + re.escape(replacement) + r"+|" + re.escape(replacement) + r"+$"
)
str_ = re_outer.sub("", str_)
# Handle Windows reserved names
if re_windows_reserved.match(str_):
str_ = str_ + replacement
# Truncate to max length (100 runes ≈ characters for simplicity)
str_ = str_[:100]
# Ensure non-empty name
return str_ or None
def filter_chat_messages(
input_json, download_dir, output_json, chat_id=None, chat_name=None
):
"""
Filter messages from a chat JSON file based on downloaded files.
Args:
input_json: Path to the input JSON file
download_dir: Path to the download directory for this chat
output_json: Path to save the filtered JSON
chat_id: Chat ID for logging (optional)
chat_name: Chat name for logging (optional)
Returns:
Tuple of (chat_id, chat_name, total_messages, filtered_messages, error) counts
"""
# Read input JSON
try:
with open(input_json, "r", encoding="utf-8") as f:
chat_history = json.load(f)
except Exception as e:
return (chat_id, chat_name, None, None, f"Error reading JSON {input_json}: {e}")
# Filter messages
filtered_messages = []
chat_id = chat_history["id"]
total_messages = len(chat_history.get("messages", []))
for msg in chat_history.get("messages", []):
message_id = msg.get("id")
file_name = msg.get("file", None)
if not (chat_id and message_id and file_name):
continue # Skip messages without required fields
# Construct file path using tdl's naming pattern
escaped_file_name = escape_filename(file_name)
target_file = os.path.join(
download_dir, f"{chat_id}_{message_id}_{escaped_file_name}"
)
# Include message only if file doesn't exist
if not os.path.isfile(target_file):
filtered_messages.append(msg)
chat_history["messages"] = filtered_messages
# Write filtered JSON
try:
with open(output_json, "w", encoding="utf-8") as f:
json.dump(
chat_history,
f,
ensure_ascii=False,
separators=(",", ":"),
)
return (chat_id, chat_name, total_messages, len(filtered_messages), None)
except Exception as e:
return (
chat_id,
chat_name,
None,
None,
f"Error writing JSON {output_json}: {e}",
)
def extract_chat_id_from_dirname(dir_name):
"""
Extract chat_id and chat_name from directory name.
Expected pattern: [chat_id] escaped_name
Returns:
Tuple of (chat_id, chat_name) or (None, None) if pattern doesn't match
"""
match = re.match(r"^\[(\d+)\]\s+(.+)$", dir_name)
if match:
return match.group(1), match.group(2)
return None, None
def main():
"""Main filter process."""
print("=" * 60)
print("Telegram Chat Filter Script")
print("=" * 60)
# Check if required directories exist
if not DOWNLOADS_DIR.exists():
logger.error(f"Downloads directory not found: {DOWNLOADS_DIR}")
sys.exit(1)
if not CHAT_DIR.exists():
logger.error(f"Chat directory not found: {CHAT_DIR}")
sys.exit(1)
logger.info(f"Scanning downloads directory: {DOWNLOADS_DIR}")
# Collect all tasks to process
tasks = []
for dir_path in sorted(DOWNLOADS_DIR.iterdir()):
# Skip if not a directory
if not dir_path.is_dir():
continue
dir_name = dir_path.name
chat_id, chat_name = extract_chat_id_from_dirname(dir_name)
if not chat_id:
logger.warning(
f"Directory name doesn't match expected pattern [chat_id] name: {dir_name}"
)
continue
# Define file paths
input_json = CHAT_DIR / f"{chat_id}.json"
output_json = CHAT_DIR / f"{chat_id}_filtered.json"
# Check if input JSON exists
if not input_json.exists():
logger.warning(f" Input JSON not found: {input_json}")
continue
tasks.append(
(str(input_json), str(dir_path), str(output_json), chat_id, chat_name)
)
logger.info(f"\nFound {len(tasks)} chats to process")
logger.info("Starting parallel processing...")
# Process chats in parallel
processed_count = 0
success_count = 0
total_filtered = 0
with ProcessPoolExecutor() as executor:
# Submit all tasks
future_to_chat = {
executor.submit(filter_chat_messages, *task): task for task in tasks
}
# Process results as they complete
for future in as_completed(future_to_chat):
task = future_to_chat[future]
chat_id, chat_name = task[3], task[4]
try:
result_chat_id, result_chat_name, total, filtered, error = (
future.result()
)
if error:
logger.error(
f" ✗ Failed to filter chat {result_chat_id} ({result_chat_name}): {error}"
)
elif total is not None and filtered is not None:
logger.info(
f" ✓ Chat {result_chat_id} ({result_chat_name}): "
f"Filtered {filtered} from {total} messages"
)
success_count += 1
total_filtered += filtered
else:
logger.error(
f" ✗ Failed to filter messages for chat_id: {result_chat_id}"
)
except Exception as e:
logger.error(
f" ✗ Exception processing chat {chat_id} ({chat_name}): {e}"
)
processed_count += 1
# Summary
print("\n" + "=" * 60)
print("Filter Processing Complete!")
print("=" * 60)
print(f"Total directories processed: {processed_count}")
print(f"Successfully filtered: {success_count}")
print(f"Failed: {processed_count - success_count}")
print(f"Total messages to download: {total_filtered}")
print(f"\nFiltered data saved to: {CHAT_DIR}")
print(f"Standard log: {STANDARD_LOG}")
if processed_count - success_count > 0:
print(f"Error log: {ERROR_LOG}")
print("=" * 60)
if __name__ == "__main__":
LOG_DIR.mkdir(exist_ok=True)
logger = setup_logging(ERROR_LOG, STANDARD_LOG)
try:
main()
except KeyboardInterrupt:
logger.info("\n\nExited!")
sys.exit(0)
except Exception as e:
logger.error(f"\nUnexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
#!/usr/bin/env python3
"""
Download media for all processed Telegram chats.
Monitors download processes and kills them if network stalls are detected.
"""
import os
import sys
import time
import signal
import subprocess
from datetime import datetime
from pathlib import Path
from threading import Thread, Event
from utils import check_dependencies, setup_logging
# Configuration
script_dir = Path(__file__).parent.resolve()
DOWNLOAD_DIR = script_dir / "downloads"
CHAT_DIR = script_dir / "chat"
LOG_DIR = script_dir / "logs"
CHAT_MAPPING = CHAT_DIR / "chat_mapping.txt"
# Download settings
CONCURRENT_DOWNLOADS = 4
CONNECTIONS_PER_DOWNLOAD = 2
DOWNLOAD_DELAY = "2s"
NETWORK_STALL_CHECK_INTERVAL = 1 # in seconds
NETWORK_STALL_THRESHOLD = 10000 # in B/s
NETWORK_STALL_COUNT = 60
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
ERROR_LOG = LOG_DIR / f"download-error {timestamp}.log"
STANDARD_LOG = LOG_DIR / f"download {timestamp}.log"
class NetworkMonitor:
"""Monitor network activity for a process and detect stalls."""
def __init__(self, pid, chat_name, logger):
self.pid = pid
self.chat_name = chat_name
self.logger = logger
self.stall_count = 0
self.prev_rx = self.get_net_rx()
self.should_stop = Event()
self.stalled = False
def get_net_rx(self):
"""Get total network bytes received for the process."""
net_dev_path = f"/proc/{self.pid}/net/dev"
if not os.path.exists(net_dev_path):
return 0
try:
with open(net_dev_path, "r") as f:
lines = f.readlines()
total_rx = 0
for line in lines[2:]: # Skip header lines
parts = line.split()
if len(parts) >= 2:
try:
total_rx += int(parts[1])
except ValueError:
continue
return total_rx
except (FileNotFoundError, PermissionError, IOError):
return 0
def monitor(self):
"""Monitor the download process for network stalls."""
self.logger.info(
f"Monitoring download process (PID: {self.pid}) for {self.chat_name}"
)
while not self.should_stop.is_set():
time.sleep(NETWORK_STALL_CHECK_INTERVAL)
# Check if process is still alive
try:
os.kill(self.pid, 0)
except OSError:
# Process has terminated
break
curr_rx = self.get_net_rx()
diff = curr_rx - self.prev_rx
rate = diff // NETWORK_STALL_CHECK_INTERVAL
if rate < NETWORK_STALL_THRESHOLD:
self.stall_count += 1
# self.logger.warning(
# f"Stall detected for {self.chat_name}: "
# f"{rate} B/s (count: {self.stall_count}/{NETWORK_STALL_COUNT})"
# )
if self.stall_count >= NETWORK_STALL_COUNT:
self.logger.error(
f"Download stalled for {self.chat_name}. Killing process {self.pid}."
)
try:
os.kill(self.pid, signal.SIGKILL)
self.stalled = True
except OSError:
pass
break
else:
# if self.stall_count > 0:
# self.logger.info(
# f"Download resumed for {self.chat_name}: {rate} B/s"
# )
# Reset counter if download is active
self.stall_count = 0
self.prev_rx = curr_rx
if not self.stalled:
self.logger.info(f"Download process completed for {self.chat_name}")
def stop(self):
"""Stop monitoring."""
self.should_stop.set()
def download_chat_media(chat_id, escaped_name, filtered_json, download_dir, logger):
"""
Download media for a single chat.
Args:
chat_id: The chat ID
escaped_name: The escaped chat name
filtered_json: Path to the filtered JSON file
download_dir: Directory to download files to
logger: Logger instance
Returns:
True if successful, False otherwise
"""
if not filtered_json.exists():
logger.error(f"Filtered JSON not found for chat [{chat_id}] {escaped_name}")
return False
logger.info(f"Downloading media for chat: {escaped_name}")
# Build tdl command
cmd = [
"tdl",
"--delay",
DOWNLOAD_DELAY,
"dl",
"-f",
str(filtered_json),
"-d",
str(download_dir),
"--group",
"--skip-same",
"--takeout",
"--continue",
"-t",
str(CONNECTIONS_PER_DOWNLOAD),
"-l",
str(CONCURRENT_DOWNLOADS),
]
try:
# Start the download process
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True,
)
# Start network monitoring in a separate thread
monitor = NetworkMonitor(process.pid, escaped_name, logger)
monitor_thread = Thread(target=monitor.monitor, daemon=True)
monitor_thread.start()
# Stream output in real-time
if process.stdout:
for line in process.stdout:
print(line, end="")
# logger.debug(line.rstrip())
# Wait for process to complete
return_code = process.wait()
# Stop monitoring
monitor.stop()
monitor_thread.join(timeout=2)
if monitor.stalled:
logger.error(
f"Failed to download media for chat [{chat_id}] {escaped_name} due to network stall."
)
return False
if return_code != 0:
logger.error(
f"Download failed for chat [{chat_id}] {escaped_name} with exit code {return_code}"
)
return False
logger.info(
f"✓ Successfully downloaded media for chat [{chat_id}] {escaped_name}"
)
return True
except Exception as e:
logger.error(
f"Error downloading media for chat [{chat_id}] {escaped_name}: {e}"
)
return False
def load_chat_mapping(chat_mapping_file):
"""
Load chat mapping from file.
Returns:
List of tuples (chat_id, escaped_name)
"""
if not chat_mapping_file.exists():
logger.error(f"Chat mapping file not found: {chat_mapping_file}")
return []
chats = []
try:
with open(chat_mapping_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
parts = line.split("\t")
if len(parts) == 2:
chats.append((parts[0], parts[1]))
else:
logger.warning(f"Invalid line in chat mapping: {line}")
return chats
except Exception as e:
logger.error(f"Error reading chat mapping file: {e}")
return []
def main():
"""Main download process."""
print("=" * 60)
print("Telegram Media Download Script")
print("=" * 60)
# Check dependencies
check_dependencies(["tdl"])
# Create necessary directories
DOWNLOAD_DIR.mkdir(exist_ok=True)
CHAT_DIR.mkdir(exist_ok=True)
logger.info("Starting media downloads...")
logger.info(f"Configuration:")
logger.info(f" Concurrent downloads: {CONCURRENT_DOWNLOADS}")
logger.info(f" Connections per download: {CONNECTIONS_PER_DOWNLOAD}")
logger.info(f" Download delay: {DOWNLOAD_DELAY}")
logger.info(f" Network stall threshold: {NETWORK_STALL_THRESHOLD} B/s")
logger.info(f" Network stall count: {NETWORK_STALL_COUNT}")
# Load chat mapping
chats = load_chat_mapping(CHAT_MAPPING)
if not chats:
logger.warning("No chats found in chat mapping file.")
return
logger.info(f"\nFound {len(chats)} chats to download\n")
# Download media for each chat
success_count = 0
for i, (chat_id, escaped_name) in enumerate(chats, 1):
logger.info(
f"\n[{i}/{len(chats)}] Processing chat: {escaped_name} (ID: {chat_id})"
)
filtered_json = CHAT_DIR / f"{chat_id}_filtered.json"
download_dir = DOWNLOAD_DIR / f"[{chat_id}] {escaped_name}"
if download_chat_media(
chat_id, escaped_name, filtered_json, download_dir, logger
):
success_count += 1
logger.info("-" * 60)
# Summary
print("\n" + "=" * 60)
print("Download Complete!")
print("=" * 60)
print(f"Total chats processed: {len(chats)}")
print(f"Successfully downloaded: {success_count}")
print(f"Failed: {len(chats) - success_count}")
print(f"\nDownloads saved to: {DOWNLOAD_DIR}")
print(f"Standard log: {STANDARD_LOG}")
if len(chats) - success_count > 0:
print(f"Error log: {ERROR_LOG}")
print("=" * 60)
if __name__ == "__main__":
LOG_DIR.mkdir(exist_ok=True)
logger = setup_logging(ERROR_LOG, STANDARD_LOG)
try:
main()
except KeyboardInterrupt:
logger.info("\n\nExited!")
sys.exit(0)
except Exception as e:
logger.error(f"\nUnexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
import logging
from pathlib import Path
import shutil
import subprocess
import sys
def setup_logging(error_log: Path, standard_log: Path) -> logging.Logger:
"""Setup logging configuration."""
# Create a logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.DEBUG)
# Create formatters
console_formatter = logging.Formatter("[%(levelname)s] %(message)s")
file_formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
# Console handler for all levels
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# File handler for ERROR and above
error_handler = logging.FileHandler(error_log)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(file_formatter)
logger.addHandler(error_handler)
# File handler for INFO and below (DEBUG, INFO)
info_handler = logging.FileHandler(standard_log)
info_handler.setLevel(logging.DEBUG)
info_handler.addFilter(
lambda record: record.levelno <= logging.ERROR
) # Filter to include only below ERROR
info_handler.setFormatter(file_formatter)
logger.addHandler(info_handler)
return logger
def check_dependencies(dependencies: list):
"""Check if required dependencies are installed."""
for dep in dependencies:
if not shutil.which(dep):
print(f"ERROR: {dep} not found. Please install {dep}.", file=sys.stderr)
sys.exit(1)
def run_command(cmd, capture_output=False, display_output=True):
"""
Run a shell command.
Args:
cmd: Command to run (list or string)
capture_output: If True, capture and return output; if False, display in real-time
display_output: If True, display output to terminal in real-time
Returns:
CompletedProcess object if capture_output=True, or return code otherwise
"""
if capture_output:
result = subprocess.run(cmd, capture_output=True, text=True)
if display_output and result.stdout:
print(result.stdout, end="")
if display_output and result.stderr:
print(result.stderr, end="", file=sys.stderr)
return result
else:
# Real-time output
return subprocess.run(cmd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment