Skip to content

Instantly share code, notes, and snippets.

@yszheda
Created April 9, 2026 09:09
Show Gist options
  • Select an option

  • Save yszheda/848e9924e02dc6262a16e25e3ada888b to your computer and use it in GitHub Desktop.

Select an option

Save yszheda/848e9924e02dc6262a16e25e3ada888b to your computer and use it in GitHub Desktop.
sync_files.py
#!/usr/bin/env python3
"""
跨平台文件同步脚本 - 类似 rsync 的单向同步工具
支持按文件扩展名筛选,保留目录结构,按修改时间判断是否复制
"""
import argparse
import os
import shutil
import sys
from pathlib import Path
from typing import List, Set, Tuple
# 修复 Windows 控制台编码问题
if sys.platform == 'win32':
sys.stdout.reconfigure(encoding='utf-8')
def parse_args() -> argparse.Namespace:
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description="跨平台文件同步工具,支持按扩展名筛选"
)
parser.add_argument(
"-s", "--source",
required=True,
help="源目录路径"
)
parser.add_argument(
"-t", "--target",
required=True,
help="目标目录路径"
)
parser.add_argument(
"-e", "--extensions",
required=True,
help="文件扩展名过滤器,逗号分隔 (如 .jpg,.png,.md)"
)
parser.add_argument(
"-d", "--delete",
action="store_true",
help="开启镜像模式:删除目标目录中源目录没有的文件"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="详细输出模式"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="试运行:显示将要执行的操作但不实际执行"
)
return parser.parse_args()
class Logger:
"""简洁/详细双模式日志"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
def info(self, message: str):
"""始终显示的信息"""
print(message)
def debug(self, message: str):
"""仅在详细模式显示"""
if self.verbose:
print(message)
def scan_progress(self, file_path: Path):
"""扫描进度"""
self.debug(f"扫描中:{file_path}")
def copy_file(self, rel_path: Path, src: Path, dst: Path):
"""复制文件日志"""
if self.verbose:
print(f"复制:{rel_path} -> {dst}")
else:
print(f"复制:{rel_path}")
def skip_file(self, rel_path: Path, reason: str = "已存在"):
"""跳过文件日志"""
self.debug(f"跳过:{rel_path} ({reason})")
def delete_file(self, rel_path: Path, dst: Path = None):
"""删除文件日志"""
if self.verbose and dst:
print(f"删除:{rel_path} -> {dst}")
else:
print(f"删除:{rel_path}")
def summary(self, copied: int, skipped: int, deleted: int = 0, total: int = 0):
"""输出统计摘要"""
print(f"\n扫描完成:发现 {total} 个匹配文件")
print(f"同步完成:已复制 {copied} 个文件,跳过 {skipped} 个文件")
if deleted > 0:
print(f"删除 {deleted} 个文件")
def parse_extensions(extensions_str: str) -> Set[str]:
"""解析扩展名字符串为集合"""
return set(ext.strip().lower() for ext in extensions_str.split(","))
def scan_files(source_dir: Path, extensions: Set[str]) -> List[Path]:
"""
递归扫描目录,返回匹配的文件列表
Args:
source_dir: 源目录路径
extensions: 扩展名集合 (如 {'.jpg', '.png', '.md'})
Returns:
匹配的文件路径列表
"""
matched_files = []
for root, dirs, files in os.walk(source_dir):
for filename in files:
# 检查扩展名匹配 (不区分大小写)
_, ext = os.path.splitext(filename)
if ext.lower() in extensions:
file_path = Path(root) / filename
matched_files.append(file_path)
return matched_files
def needs_copy(src_file: Path, dst_file: Path) -> bool:
"""
判断是否需要复制文件
规则:
- 目标文件不存在:需要复制
- 目标文件存在但源文件更新:需要复制
- 否则:不需要复制
"""
if not dst_file.exists():
return True
src_mtime = src_file.stat().st_mtime
dst_mtime = dst_file.stat().st_mtime
return src_mtime > dst_mtime
def copy_file(src: Path, dst: Path) -> bool:
"""
复制文件到目标位置
Args:
src: 源文件路径
dst: 目标文件路径
Returns:
True 表示成功,False 表示失败
"""
try:
# 确保目标目录存在
dst.parent.mkdir(parents=True, exist_ok=True)
# 复制文件并保留元数据
shutil.copy2(src, dst)
return True
except (IOError, OSError, PermissionError) as e:
print(f"错误:复制 {src} 失败:{e}", file=sys.stderr)
return False
def sync_files(
source_dir: Path,
target_dir: Path,
extensions: Set[str],
logger: Logger,
delete: bool = False,
dry_run: bool = False
) -> Tuple[int, int, int]:
"""
执行文件同步
Args:
source_dir: 源目录
target_dir: 目标目录
extensions: 扩展名集合
logger: 日志记录器
delete: 是否删除目标目录多余文件
dry_run: 是否试运行
Returns:
(copied, skipped, deleted) 计数
"""
copied = 0
skipped = 0
deleted = 0
# 扫描源目录
src_files = scan_files(source_dir, extensions)
# 确保目标目录存在
if not dry_run:
target_dir.mkdir(parents=True, exist_ok=True)
# 处理每个源文件
for src_file in src_files:
rel_path = src_file.relative_to(source_dir)
dst_file = target_dir / rel_path
if needs_copy(src_file, dst_file):
logger.copy_file(rel_path, src_file, dst_file)
if not dry_run:
if copy_file(src_file, dst_file):
copied += 1
else:
copied += 1 # dry-run 也计数
else:
logger.skip_file(rel_path, "目标已存在且更新")
skipped += 1
# 处理删除(镜像模式)
if delete:
deleted = delete_extra_files(
source_dir, target_dir, extensions, logger, dry_run
)
return copied, skipped, deleted
def delete_extra_files(
source_dir: Path,
target_dir: Path,
extensions: Set[str],
logger: Logger,
dry_run: bool = False
) -> int:
"""
删除目标目录中源目录没有的文件
Returns:
删除的文件数量
"""
deleted = 0
# 扫描目标目录
dst_files = scan_files(target_dir, extensions)
for dst_file in dst_files:
rel_path = dst_file.relative_to(target_dir)
src_file = source_dir / rel_path
if not src_file.exists():
logger.delete_file(rel_path)
if not dry_run:
try:
dst_file.unlink()
deleted += 1
except (IOError, OSError, PermissionError) as e:
print(f"错误:删除 {dst_file} 失败:{e}", file=sys.stderr)
else:
deleted += 1 # dry-run 也计数
return deleted
def main():
"""主入口"""
args = parse_args()
logger = Logger(verbose=args.verbose)
# 验证源目录
source_dir = Path(args.source)
if not source_dir.exists():
logger.info(f"错误:源目录不存在:{source_dir}")
sys.exit(1)
if not source_dir.is_dir():
logger.info(f"错误:源路径不是目录:{source_dir}")
sys.exit(1)
# 解析扩展名
extensions = parse_extensions(args.extensions)
logger.info(f"过滤扩展名:{', '.join(sorted(extensions))}")
# 扫描文件
logger.info("开始扫描源目录...")
matched_files = scan_files(source_dir, extensions)
logger.info(f"扫描完成:发现 {len(matched_files)} 个匹配文件")
# 目标目录
target_dir = Path(args.target)
# 显示运行模式
if args.dry_run:
logger.info("【试运行模式】不会实际执行任何操作")
if args.delete:
logger.info("【镜像模式】将删除目标目录多余文件")
# 执行同步
logger.info("开始同步...")
copied, skipped, deleted = sync_files(
source_dir=source_dir,
target_dir=target_dir,
extensions=extensions,
logger=logger,
delete=args.delete,
dry_run=args.dry_run
)
# 输出摘要
logger.summary(copied, skipped, deleted, copied + skipped)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment