-
-
Save gabrielgbs97/010f5845a69ce5cfb2f75c9aa20311a3 to your computer and use it in GitHub Desktop.
Fast and recursive duplicate file finder written in python3. It finds duplicated files in a whole directory tree. It is not memory aware, if there are A LOT (tested successfully with thousands) of files it .may be shut down by OOM killer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Fast duplicate file finder. | |
Usage: duplicates.py <folder> [<folder>...] | |
Based on https://stackoverflow.com/a/36113168/300783 | |
Modified for Python3 with some small code improvements. | |
""" | |
import os | |
import sys | |
import hashlib | |
from collections import defaultdict | |
from pathlib import Path | |
def chunk_reader(fobj, chunk_size=1024): | |
""" Generator that reads a file in chunks of bytes """ | |
while True: | |
chunk = fobj.read(chunk_size) | |
if not chunk: | |
return | |
yield chunk | |
def get_hash(filename, first_chunk_only=False, hash_algo=hashlib.sha1): | |
hashobj = hash_algo() | |
with open(filename, "rb") as f: | |
if first_chunk_only: | |
hashobj.update(f.read(1024)) | |
else: | |
for chunk in chunk_reader(f): | |
hashobj.update(chunk) | |
return hashobj.digest() | |
calls = 0 | |
def recursive_checker(paths): | |
global calls | |
files_by_size = defaultdict(list) | |
# Storing files path in a dict, accessible by size | |
for path in paths: | |
pathlib_path=Path(path).resolve() | |
file_generator=pathlib_path.rglob("*") | |
for file in file_generator: | |
try: | |
full_path = file.resolve() | |
file_size = full_path.stat().st_size | |
files_by_size[file_size].append(full_path) | |
except OSError: | |
# not accessible (permissions, etc) - pass on | |
continue | |
return files_by_size | |
def check_for_duplicates(paths, deep=True): | |
files_by_small_hash = defaultdict(list) | |
files_by_full_hash = defaultdict(list) | |
files_by_size = recursive_checker(paths) | |
print("FILES WITH SAME SIZE: ", len(files_by_size)) | |
# For all files with the same file size, get their hash on the first 1024 bytes | |
for files in files_by_size.values(): | |
if len(files) < 2: | |
continue # this file size is unique, no need to spend cpu cycles on it | |
for filename in files: | |
try: | |
small_hash = get_hash(filename, first_chunk_only=True) | |
except OSError: | |
# the file access might've changed till the exec point got here | |
continue | |
files_by_small_hash[small_hash].append(filename) | |
# For all files with the hash on the first 1024 bytes, get their hash on the full | |
for files in files_by_small_hash.values(): | |
if len(files) < 2: | |
# the hash of the first 1k bytes is unique -> skip this file | |
continue | |
for filename in files: | |
try: | |
full_hash = get_hash(filename, first_chunk_only=False) | |
except OSError: | |
# the file access might've changed till the exec point got here | |
continue | |
if full_hash in files_by_full_hash[full_hash]: | |
files_by_full_hash[full_hash].append(filename) | |
else: | |
files_by_full_hash[full_hash].append(filename) | |
# file - collisions will be duplicates | |
f = open("collisions.csv", "w", encoding="utf-8") | |
#Cleaning btained dict hash -> list of files | |
for hash, files in files_by_full_hash.items(): | |
# Skiping unique files | |
if len(files) < 2: | |
continue | |
#Writing results to csv file: | |
for file_path in files: | |
f.write("|"+hash.hex()+"|"+"?"+"|"+str(file_path)+"|"+"\n") | |
f.close() | |
return | |
if __name__ == "__main__": | |
if sys.argv[1:]: | |
check_for_duplicates(sys.argv[1:]) | |
else: | |
print("Usage: %s <folder> [<folder>...]" % sys.argv[0]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment