Created
May 12, 2022 02:16
-
-
Save thehesiod/4574d96d4ccb8398977e8cf88ba363f7 to your computer and use it in GitHub Desktop.
async file syncer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
from pathlib import Path | |
from typing import Dict, Optional | |
import logging | |
import shutil | |
from functools import partial | |
import hashlib | |
import dataclasses | |
from asyncpool import AsyncPool | |
def md5_hash(path: Path, size: int = 2**10): | |
# pts = time.process_time() | |
# ats = time.time() | |
m = hashlib.md5() | |
with path.open('rb') as f: | |
while b := f.read(size): | |
m.update(b) | |
return m.hexdigest() | |
# print("{0:.3f} s".format(time.process_time() - pts)) | |
# print("{0:.3f} s".format(time.time() - ats)) | |
@dataclasses.dataclass | |
class PathInfo: | |
path: Path | |
lstat: os.stat_result | |
hash: Optional[str] = None | |
async def main(): | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger() | |
source_folder = Path('\\\\192.168.1.79@8081\\DavWWWRoot\DCIM\Camera') | |
dest_folder = Path('E:\\Pictures\\Phones\\Dayana') | |
dest_files: Dict[str, PathInfo] = dict() | |
for idx, dst_path in enumerate(dest_folder.glob('*')): | |
if idx % 100 == 0: | |
print(f"Gather Dest folder file {idx}") | |
if dst_path.name.startswith('.'): | |
continue | |
assert dst_path.name not in dest_files | |
dest_files[dst_path.name] = PathInfo(dst_path, dst_path.lstat()) | |
async def _process_path(src_path: Path): | |
dst_path = dest_folder / src_path.name | |
if dst_file_info := dest_files.get(src_path.name): | |
src_lstat = src_path.lstat() | |
if src_lstat.st_size == dst_file_info.lstat.st_size: | |
src_md5, dst_md5 = await asyncio.gather( | |
asyncio.to_thread(md5_hash, src_path), | |
asyncio.to_thread(md5_hash, dst_path) | |
) | |
if src_md5 == dst_md5: | |
logger.info(f'deleting {src_path} already exists at {dst_path}') | |
await asyncio.to_thread(src_path.unlink) | |
return | |
# different, what do we want to do | |
assert False | |
else: | |
logger.info(f'moving {src_path} to {dst_path}') | |
await asyncio.to_thread(shutil.move, src_path, dst_path) | |
async with AsyncPool(None, 7, 'file-workpool', logger, _process_path) as wp: | |
for idx, src_path in enumerate(source_folder.glob('*')): | |
if idx % 100 == 0: | |
print(f"Processing folder {idx}") | |
await wp.push(src_path) | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment