Created
June 7, 2026 18:43
-
-
Save CypherpunkSamurai/2531f31631053023c323f1a5a6302b95 to your computer and use it in GitHub Desktop.
YTDLP with DOH
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| yt-dlp-doh: DNS over HTTPS (DoH) Wrapper for yt-dlp with Logging | |
| This script enables DNS over HTTPS for yt-dlp by monkey-patching Python's | |
| socket.getaddrinfo() function to resolve hostnames via encrypted DoH queries | |
| instead of the system's default DNS resolver. | |
| Features: | |
| ✓ Encrypted DNS resolution via RFC 8484 DoH endpoints | |
| ✓ Fallback to secondary DoH provider on failure | |
| ✓ Graceful fallback to system DNS if all DoH attempts fail | |
| ✓ Comprehensive logging with configurable levels and output | |
| ✓ Colored console output for better readability | |
| ✓ Optional log file output with rotation | |
| ✓ Environment variable configuration support | |
| Supported DoH Providers (configurable): | |
| - Cloudflare: https://cloudflare-dns.com/dns-query | |
| - Google: https://dns.google/dns-query | |
| - Quad9: https://dns.quad9.net/dns-query | |
| - AdGuard: https://dns.adguard.com/dns-query | |
| - Custom: Any RFC 8484-compliant DoH endpoint | |
| Dependencies: | |
| - dnspython[doh]>=2.4.0 (for dns.query.https) | |
| - httpx (HTTP/2 client for DoH) | |
| - colorlog (optional) (colored console output) | |
| - yt-dlp (must be installed in same Python environment) | |
| Installation: | |
| pip install "dnspython[doh]>=2.4.0" yt-dlp colorlog | |
| Usage: | |
| python yt-dlp-doh.py [options] <URL> | |
| python yt-dlp-doh.py --log-level DEBUG --log-file doh.log "URL" | |
| Environment Variables: | |
| YT_DLP_DOH_ENDPOINT - Override DoH endpoint | |
| YT_DLP_DOH_TIMEOUT - Override DoH query timeout (seconds) | |
| YT_DLP_LOG_LEVEL - Set logging level (DEBUG/INFO/WARNING/ERROR) | |
| YT_DLP_LOG_FILE - Set default log file path | |
| Author: Community Contribution | |
| License: Unlicense / Public Domain (same as yt-dlp) | |
| GitHub: https://github.com/yt-dlp/yt-dlp | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import logging | |
| import os | |
| import socket | |
| import sys | |
| import typing | |
| from logging.handlers import RotatingFileHandler | |
| from urllib.parse import urlparse | |
| # ============================================================================= | |
| # CONFIGURATION - Edit these values to customize behavior | |
| # ============================================================================= | |
| # Default DoH endpoint (RFC 8484 compliant) | |
| DEFAULT_DOH_ENDPOINT = "https://cloudflare-dns.com/dns-query" | |
| # Fallback DoH endpoint (used if primary fails) | |
| DEFAULT_DOH_FALLBACK = "https://dns.google/dns-query" | |
| # Default query timeout in seconds (per DNS lookup) | |
| DEFAULT_DOH_TIMEOUT = 5.0 | |
| # Default logging configuration | |
| DEFAULT_LOG_LEVEL = "INFO" | |
| DEFAULT_LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" | |
| DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" | |
| # Log file rotation settings (if file logging enabled) | |
| LOG_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10 MB | |
| LOG_FILE_BACKUP_COUNT = 3 # Keep 3 rotated files | |
| # ============================================================================= | |
| # LOGGER SETUP - Centralized logging configuration | |
| # ============================================================================= | |
| logger = logging.getLogger("yt-dlp-doh") | |
| def _setup_logging( | |
| level: str = DEFAULT_LOG_LEVEL, | |
| log_file: str | None = None, | |
| console_output: bool = True, | |
| ) -> None: | |
| """ | |
| Configure the logging system for yt-dlp-doh. | |
| Args: | |
| level: Logging level name (DEBUG, INFO, WARNING, ERROR, CRITICAL) | |
| log_file: Optional path to log file for persistent output | |
| console_output: Whether to output logs to console (stderr) | |
| """ | |
| # Clear existing handlers to avoid duplicates on re-import | |
| logger.handlers.clear() | |
| logger.setLevel(getattr(logging, level.upper(), logging.INFO)) | |
| # Create formatter | |
| formatter = logging.Formatter( | |
| fmt=DEFAULT_LOG_FORMAT, | |
| datefmt=DEFAULT_LOG_DATE_FORMAT, | |
| ) | |
| # Try to enable colored output if colorlog is available | |
| if console_output: | |
| try: | |
| import colorlog | |
| console_formatter = colorlog.ColoredFormatter( | |
| "%(log_color)s%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| datefmt=DEFAULT_LOG_DATE_FORMAT, | |
| log_colors={ | |
| "DEBUG": "cyan", | |
| "INFO": "green", | |
| "WARNING": "yellow", | |
| "ERROR": "red", | |
| "CRITICAL": "bold_red", | |
| }, | |
| ) | |
| except ImportError: | |
| console_formatter = formatter | |
| console_handler = logging.StreamHandler(sys.stderr) | |
| console_handler.setFormatter(console_formatter) | |
| console_handler.setLevel(getattr(logging, level.upper(), logging.INFO)) | |
| logger.addHandler(console_handler) | |
| # Add file handler if requested | |
| if log_file: | |
| try: | |
| # Ensure log directory exists | |
| log_dir = os.path.dirname(log_file) | |
| if log_dir and not os.path.exists(log_dir): | |
| os.makedirs(log_dir, exist_ok=True) | |
| file_handler = RotatingFileHandler( | |
| log_file, | |
| maxBytes=LOG_FILE_MAX_BYTES, | |
| backupCount=LOG_FILE_BACKUP_COUNT, | |
| encoding="utf-8", | |
| delay=True, # Delay file creation until first log message | |
| ) | |
| file_handler.setFormatter(formatter) | |
| file_handler.setLevel(logging.DEBUG) # Log everything to file | |
| logger.addHandler(file_handler) | |
| logger.debug(f"Logging to file: {os.path.abspath(log_file)}") | |
| except Exception as e: | |
| logger.warning(f"Failed to setup file logging: {e}", exc_info=True) | |
| # Log startup info | |
| logger.info(f"yt-dlp-doh initialized | Log level: {level.upper()}") | |
| if log_file: | |
| logger.info(f"Log file: {os.path.abspath(log_file)}") | |
| # ============================================================================= | |
| # INTERNAL IMPLEMENTATION - DoH Resolution Logic | |
| # ============================================================================= | |
| if typing.TYPE_CHECKING: | |
| from collections.abc import Callable | |
| # Store original getaddrinfo for fallback | |
| _original_getaddrinfo: Callable | None = None | |
| # Runtime configuration (set via CLI args or environment) | |
| _runtime_config: dict[str, typing.Any] = { | |
| "doh_endpoint": DEFAULT_DOH_ENDPOINT, | |
| "doh_fallback": DEFAULT_DOH_FALLBACK, | |
| "doh_timeout": DEFAULT_DOH_TIMEOUT, | |
| } | |
| def _load_config_from_env() -> None: | |
| """Load configuration overrides from environment variables.""" | |
| if env_endpoint := os.getenv("YT_DLP_DOH_ENDPOINT"): | |
| _runtime_config["doh_endpoint"] = env_endpoint | |
| logger.debug(f"Loaded DoH endpoint from env: {env_endpoint}") | |
| if env_timeout := os.getenv("YT_DLP_DOH_TIMEOUT"): | |
| try: | |
| _runtime_config["doh_timeout"] = float(env_timeout) | |
| logger.debug(f"Loaded DoH timeout from env: {env_timeout}s") | |
| except ValueError: | |
| logger.warning(f"Invalid YT_DLP_DOH_TIMEOUT value: {env_timeout}") | |
| if env_log_level := os.getenv("YT_DLP_LOG_LEVEL"): | |
| logger.debug(f"Log level from env: {env_log_level}") | |
| if env_log_file := os.getenv("YT_DLP_LOG_FILE"): | |
| logger.debug(f"Log file from env: {env_log_file}") | |
| def _is_ip_address(host: str) -> bool: | |
| """ | |
| Check if a string is already an IP address (IPv4 or IPv6). | |
| Returns True if the host is an IP literal, avoiding unnecessary DNS lookups. | |
| """ | |
| try: | |
| socket.inet_pton(socket.AF_INET, host) | |
| return True | |
| except OSError: | |
| pass | |
| try: | |
| socket.inet_pton(socket.AF_INET6, host) | |
| return True | |
| except OSError: | |
| pass | |
| return False | |
| def _resolve_via_doh( | |
| hostname: str, | |
| family: int = socket.AF_UNSPEC, | |
| endpoint: str | None = None, | |
| timeout: float | None = None, | |
| label: str = "primary", | |
| ) -> list[tuple] | None: | |
| """ | |
| Resolve a hostname using DNS over HTTPS (DoH). | |
| Args: | |
| hostname: The domain name to resolve (e.g., "example.com") | |
| family: Socket address family (AF_INET, AF_INET6, or AF_UNSPEC) | |
| endpoint: DoH server URL (RFC 8484 compliant) | |
| timeout: Query timeout in seconds | |
| label: Label for logging ('primary' or 'fallback') | |
| Returns: | |
| List of addrinfo tuples compatible with socket.getaddrinfo(), | |
| or None if resolution failed. | |
| """ | |
| endpoint = endpoint or _runtime_config["doh_endpoint"] | |
| timeout = timeout or _runtime_config["doh_timeout"] | |
| # Skip resolution if already an IP address | |
| if _is_ip_address(hostname): | |
| logger.debug(f"[{label}] Skipping DoH for IP literal: {hostname}") | |
| return None | |
| try: | |
| import dns.message | |
| import dns.query | |
| import dns.rdatatype | |
| except ImportError as e: | |
| logger.error(f"[{label}] dnspython not available: {e}", exc_info=True) | |
| return None | |
| results: list[tuple] = [] | |
| query_types = [] | |
| if family in (socket.AF_UNSPEC, socket.AF_INET): | |
| query_types.append(("A", dns.rdatatype.A, socket.AF_INET)) | |
| if family in (socket.AF_UNSPEC, socket.AF_INET6): | |
| query_types.append(("AAAA", dns.rdatatype.AAAA, socket.AF_INET6)) | |
| for qname, rdtype, af in query_types: | |
| try: | |
| logger.debug(f"[{label}] Querying {qname} for {hostname} via {endpoint}") | |
| query = dns.message.make_query(hostname, rdtype) | |
| response = dns.query.https( | |
| query, | |
| endpoint, | |
| timeout=timeout, | |
| post=True, # Use POST for better compatibility | |
| ) | |
| for answer in response.answer: | |
| if answer.rdtype == rdtype: | |
| for rr in answer: | |
| if af == socket.AF_INET: | |
| sockaddr = (rr.address, 0) | |
| else: # AF_INET6 | |
| sockaddr = (rr.address, 0, 0, 0) | |
| results.append(( | |
| af, | |
| socket.SOCK_STREAM, | |
| 0, | |
| "", | |
| sockaddr, | |
| )) | |
| logger.debug(f"[{label}] Resolved {hostname} -> {rr.address} ({qname})") | |
| except dns.exception.Timeout: | |
| logger.warning(f"[{label}] DoH query timeout for {hostname} ({qname}) after {timeout}s") | |
| except dns.exception.DNSException as e: | |
| logger.warning(f"[{label}] DNS error for {hostname} ({qname}): {type(e).__name__}: {e}") | |
| except Exception as e: | |
| logger.warning(f"[{label}] Unexpected error resolving {hostname} ({qname}): {type(e).__name__}: {e}", exc_info=True) | |
| return results if results else None | |
| def _doh_getaddrinfo( | |
| host: str, | |
| port: int, | |
| family: int = socket.AF_UNSPEC, | |
| type: int = socket.SOCK_STREAM, | |
| proto: int = 0, | |
| flags: int = 0, | |
| ) -> list[tuple]: | |
| """ | |
| Monkey-patched replacement for socket.getaddrinfo() using DoH. | |
| Resolution strategy: | |
| 1. Skip if host is IP literal or localhost | |
| 2. Try primary DoH endpoint | |
| 3. Try fallback DoH endpoint if primary fails | |
| 4. Fall back to system resolver if all DoH attempts fail | |
| 5. Format and return results with correct port | |
| """ | |
| global _original_getaddrinfo | |
| # Ensure we have the original function stored | |
| if _original_getaddrinfo is None: | |
| _original_getaddrinfo = socket._getaddrinfo_original # type: ignore | |
| # Skip non-hostname lookups | |
| if _is_ip_address(host) or host.lower() in ("localhost", "localhost.localdomain", "::1"): | |
| logger.debug(f"Skipping DoH for literal/local host: {host}") | |
| return _original_getaddrinfo(host, port, family, type, proto, flags) | |
| logger.debug(f"DoH lookup requested: {host}:{port} (family={family})") | |
| # Try primary DoH endpoint | |
| result = _resolve_via_doh( | |
| host, family, | |
| endpoint=_runtime_config["doh_endpoint"], | |
| timeout=_runtime_config["doh_timeout"], | |
| label="primary", | |
| ) | |
| # Try fallback endpoint if primary failed | |
| if result is None and _runtime_config["doh_fallback"]: | |
| logger.info(f"Primary DoH failed for {host}, trying fallback") | |
| result = _resolve_via_doh( | |
| host, family, | |
| endpoint=_runtime_config["doh_fallback"], | |
| timeout=_runtime_config["doh_timeout"], | |
| label="fallback", | |
| ) | |
| # Final fallback to system resolver | |
| if result is None: | |
| logger.warning(f"All DoH attempts failed for {host}, falling back to system DNS") | |
| try: | |
| fallback_result = _original_getaddrinfo(host, port, family, type, proto, flags) | |
| logger.info(f"System DNS fallback succeeded for {host}: {[r[4][0] for r in fallback_result]}") | |
| return fallback_result | |
| except Exception as e: | |
| logger.error(f"System DNS fallback also failed for {host}: {e}", exc_info=True) | |
| raise | |
| # Format results with actual port | |
| formatted_results = [] | |
| for res in result: | |
| af, socktype, protocol, canonname, sockaddr = res | |
| if af == socket.AF_INET: | |
| new_sockaddr = (sockaddr[0], port) | |
| elif af == socket.AF_INET6: | |
| new_sockaddr = (sockaddr[0], port, sockaddr[2], sockaddr[3]) | |
| else: | |
| new_sockaddr = sockaddr | |
| formatted_results.append((af, socktype, protocol, canonname, new_sockaddr)) | |
| resolved_ips = [r[4][0] for r in formatted_results] | |
| logger.info(f"DoH resolved {host} -> {resolved_ips}") | |
| return formatted_results | |
| def _patch_socket_getaddrinfo() -> None: | |
| """ | |
| Apply the monkey patch to socket.getaddrinfo. | |
| This replaces the system DNS resolver with our DoH-aware version | |
| while preserving the original for fallback. | |
| """ | |
| global _original_getaddrinfo | |
| # Only patch once | |
| if hasattr(socket, "_getaddrinfo_original"): | |
| logger.debug("socket.getaddrinfo already patched, skipping") | |
| return | |
| # Store original function | |
| _original_getaddrinfo = socket.getaddrinfo | |
| socket._getaddrinfo_original = _original_getaddrinfo # type: ignore | |
| # Apply patch | |
| socket.getaddrinfo = _doh_getaddrinfo # type: ignore | |
| logger.info("✓ socket.getaddrinfo successfully patched with DoH support") | |
| def _parse_args() -> tuple[list[str], argparse.Namespace]: | |
| """ | |
| Parse command-line arguments, separating yt-dlp args from wrapper args. | |
| Returns: | |
| Tuple of (yt-dlp argument list, parsed wrapper options) | |
| """ | |
| parser = argparse.ArgumentParser( | |
| prog="yt-dlp-doh", | |
| description="yt-dlp with DNS over HTTPS support and logging", | |
| add_help=False, # Let yt-dlp handle --help | |
| allow_abbrev=False, | |
| ) | |
| # Logging options | |
| log_group = parser.add_argument_group("Logging Options") | |
| log_group.add_argument( | |
| "--log-level", | |
| type=str, | |
| default=None, | |
| choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | |
| help=f"Set logging level (default: {DEFAULT_LOG_LEVEL} or env YT_DLP_LOG_LEVEL)", | |
| ) | |
| log_group.add_argument( | |
| "--log-file", | |
| type=str, | |
| default=None, | |
| help="Write logs to file (default: none or env YT_DLP_LOG_FILE)", | |
| ) | |
| log_group.add_argument( | |
| "--no-console-log", | |
| action="store_true", | |
| help="Disable console output (logs only to file if specified)", | |
| ) | |
| # DoH configuration options | |
| doh_group = parser.add_argument_group("DoH Configuration") | |
| doh_group.add_argument( | |
| "--doh-endpoint", | |
| type=str, | |
| default=None, | |
| help="Override primary DoH endpoint (default: Cloudflare or env YT_DLP_DOH_ENDPOINT)", | |
| ) | |
| doh_group.add_argument( | |
| "--doh-fallback", | |
| type=str, | |
| default=None, | |
| help="Override fallback DoH endpoint (default: Google)", | |
| ) | |
| doh_group.add_argument( | |
| "--doh-timeout", | |
| type=float, | |
| default=None, | |
| help=f"Override DoH query timeout in seconds (default: {DEFAULT_DOH_TIMEOUT}s or env)", | |
| ) | |
| doh_group.add_argument( | |
| "--no-fallback", | |
| action="store_true", | |
| help="Disable fallback to system DNS (fail if DoH fails)", | |
| ) | |
| # Parse known args, leave rest for yt-dlp | |
| wrapper_args, yt_dlp_args = parser.parse_known_args() | |
| # Apply configuration in order: defaults -> env -> CLI | |
| _load_config_from_env() | |
| if wrapper_args.doh_endpoint: | |
| _runtime_config["doh_endpoint"] = wrapper_args.doh_endpoint | |
| logger.info(f"DoH endpoint set to: {wrapper_args.doh_endpoint}") | |
| if wrapper_args.doh_fallback is not None: | |
| _runtime_config["doh_fallback"] = wrapper_args.doh_fallback | |
| logger.info(f"DoH fallback set to: {wrapper_args.doh_fallback}") | |
| if wrapper_args.doh_timeout is not None: | |
| _runtime_config["doh_timeout"] = wrapper_args.doh_timeout | |
| logger.info(f"DoH timeout set to: {wrapper_args.doh_timeout}s") | |
| if wrapper_args.no_fallback: | |
| _runtime_config["doh_fallback"] = None | |
| logger.info("Fallback to system DNS disabled") | |
| # Determine final logging config | |
| log_level = ( | |
| wrapper_args.log_level | |
| or os.getenv("YT_DLP_LOG_LEVEL") | |
| or DEFAULT_LOG_LEVEL | |
| ) | |
| log_file = ( | |
| wrapper_args.log_file | |
| or os.getenv("YT_DLP_LOG_FILE") | |
| ) | |
| console_output = not wrapper_args.no_console_log | |
| # Setup logging with resolved config | |
| _setup_logging( | |
| level=log_level, | |
| log_file=log_file, | |
| console_output=console_output, | |
| ) | |
| # Log final configuration | |
| logger.debug(f"Final DoH config: endpoint={_runtime_config['doh_endpoint']}, " | |
| f"fallback={_runtime_config['doh_fallback']}, " | |
| f"timeout={_runtime_config['doh_timeout']}s") | |
| return yt_dlp_args, wrapper_args | |
| def main() -> int: | |
| """ | |
| Main entry point: configure logging, patch DNS, then delegate to yt-dlp. | |
| """ | |
| try: | |
| # Parse arguments first (before importing yt-dlp) | |
| yt_dlp_argv, _ = _parse_args() | |
| # Apply the monkey patch | |
| _patch_socket_getaddrinfo() | |
| # Import yt-dlp after patching to ensure all network calls use DoH | |
| try: | |
| from yt_dlp import main as yt_dlp_main | |
| except ImportError: | |
| logger.critical( | |
| "yt-dlp is not installed in this Python environment. " | |
| "Install it with: pip install yt-dlp", | |
| exc_info=True, | |
| ) | |
| return 1 | |
| # Delegate to yt-dlp with original arguments | |
| sys.argv = ["yt-dlp"] + yt_dlp_argv | |
| logger.info(f"Delegating to yt-dlp with args: {' '.join(yt_dlp_argv[:3])}...") | |
| return yt_dlp_main() | |
| except KeyboardInterrupt: | |
| logger.warning("Interrupted by user") | |
| return 130 | |
| except Exception as e: | |
| logger.critical(f"Unhandled exception: {type(e).__name__}: {e}", exc_info=True) | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment