Created
October 24, 2014 15:29
-
-
Save vdcrim/f8b6e2dd9abd1a48d9b6 to your computer and use it in GitHub Desktop.
Update the timestamps of a directory tree with the ones from a different tree
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Update the timestamps of a directory tree with the ones from a different | |
tree. Optionally timestamps can be changed only for directories or for | |
files, or only when they are older or newer. | |
Requeriments: | |
- python 3.3+ | |
- pywin32 (Windows, to sync also creation date, optional) | |
Copyright (C) 2014 Diego Fernández Gosende <[email protected]> | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License along | |
with this program. If not, see <http://www.gnu.org/licenses/gpl-3.0.html>. | |
""" | |
version = '0.1.0' | |
excluded_paths = '$RECYCLE.BIN', 'System Volume Information' | |
import os | |
import sys | |
import builtins | |
def print(*args, **kwargs): | |
"""Replace characters that can't be printed to console""" | |
builtins.print(*(str(arg).encode(sys.stdout.encoding, 'backslashreplace') | |
.decode(sys.stdout.encoding) | |
for arg in args), **kwargs) | |
try: | |
import win32file, win32con, pywintypes | |
except ImportError as err: | |
import datetime | |
pywin32_err = str(err) | |
AccessDeniedError = PermissionError | |
def get_file_times(filename): | |
stat = os.stat(filename) | |
return stat.st_ctime_ns, stat.st_atime_ns, stat.st_mtime_ns | |
def set_file_times(filename, times): | |
os.utime(filename, ns=times[1:]) | |
else: | |
pywin32_err = None | |
AccessDeniedError = pywintypes.error | |
def get_file_times(filename): | |
filehandle = win32file.CreateFileW( | |
filename, | |
win32con.GENERIC_READ, | |
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE \ | |
| win32con.FILE_SHARE_DELETE, | |
None, | |
win32con.OPEN_EXISTING, | |
win32con.FILE_FLAG_BACKUP_SEMANTICS, | |
None) | |
c, a, m = win32file.GetFileTime(filehandle) | |
filehandle.close() | |
return c, a, m | |
def set_file_times(filename, times): | |
filehandle = win32file.CreateFileW( | |
filename, | |
win32con.GENERIC_WRITE, | |
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE \ | |
| win32con.FILE_SHARE_DELETE, | |
None, | |
win32con.OPEN_EXISTING, | |
win32con.FILE_FLAG_BACKUP_SEMANTICS, | |
None) | |
win32file.SetFileTime(filehandle, *times) | |
filehandle.close() | |
def _get_timestamp_str(ts): | |
if pywin32_err: | |
return str(datetime.datetime.fromtimestamp(ts / 10**9)) | |
ts = ts.astimezone() | |
return '{} {}'.format(ts.date(), ts.time()) | |
def sync_file_timestamps(source_path, target_path, ctime=None, atime=None, | |
mtime=None, time_crit=None, dry_run=None, | |
verbose=None): | |
if dry_run is None: | |
dry_run = False | |
if verbose is None: | |
verbose = False | |
if ctime is None: | |
ctime = True | |
if atime is None: | |
atime = True | |
if mtime is None: | |
mtime = True | |
if not (ctime or atime or mtime): | |
return | |
if verbose: | |
print('\n{}\n{}'.format(source_path, target_path)) | |
try: | |
s_times = get_file_times(source_path) | |
t_times = get_file_times(target_path) | |
except AccessDeniedError as err: | |
if verbose: | |
print(' {}'.format(err.strerror)) | |
return | |
n_times = [None, None, None] | |
for i, time_type, s_time, t_time in zip(range(3), | |
('ctime', 'atime', 'mtime'), s_times, t_times): | |
if not eval(time_type): | |
continue | |
if (time_crit == 'older' and t_time > s_time or | |
time_crit == 'newer' and t_time < s_time or | |
time_crit is None and t_time != s_time): | |
if verbose: | |
print(' {}: old: {}\n new: {}'.format(time_type, | |
_get_timestamp_str(t_time), _get_timestamp_str(s_time))) | |
n_times[i] = s_time | |
elif verbose: | |
print(' {}: unchanged'.format(time_type)) | |
if not dry_run and any(n_times): | |
if pywin32_err: | |
if n_times[1] and not n_times[2]: | |
n_times[2] = t_times[2] | |
if n_times[2] and not n_times[1]: | |
n_times[1] = t_times[1] | |
try: | |
set_file_times(target_path, n_times) | |
except AccessDeniedError as err: | |
if verbose: | |
print(' {}'.format(err.strerror)) | |
return | |
def sync_tree_timestamps(source_tree, target_tree, ctime=None, atime=None, | |
mtime=None, type_crit=None, time_crit=None, | |
dry_run=None, verbose=None): | |
for target_dirpath, dirnames, filenames in os.walk(target_tree): | |
if type_crit != 'directories': | |
for filename in filenames: | |
target_file = os.path.join(target_dirpath, filename) | |
target_file_lower = target_file.lower() | |
for excluded_path in excluded_paths: | |
if excluded_path in target_file_lower: | |
break | |
else: | |
source_file = os.path.join(source_tree, | |
os.path.relpath(target_file, target_tree)) | |
if os.path.isfile(source_file): | |
sync_file_timestamps(source_file, target_file, ctime, | |
atime, mtime, time_crit, dry_run, verbose) | |
if type_crit != 'files': | |
for dirname in dirnames: | |
target_dir = os.path.join(target_dirpath, dirname) | |
target_dir_lower = target_dir.lower() | |
for excluded_path in excluded_paths: | |
if excluded_path in target_dir_lower: | |
break | |
else: | |
source_dir = os.path.join(source_tree, | |
os.path.relpath(target_dir, target_tree)) | |
if os.path.isdir(source_dir): | |
sync_file_timestamps(source_dir, target_dir, ctime, | |
atime, mtime, time_crit, dry_run, verbose) | |
excluded_paths = [path.lower() for path in excluded_paths] | |
if __name__ == '__main__': | |
import argparse | |
import shutil | |
import atexit | |
atexit.register(input, '\nPress Return to finish...') | |
# Check arguments as paths. Added 'directory' and 'executable' keywords | |
class CheckPathAction(argparse.Action): | |
def __init__(self, option_strings, dest, **kwargs): | |
self.is_directory = kwargs.pop('directory', None) | |
self.is_executable = kwargs.pop('executable', False) | |
if self.is_executable: | |
self.is_directory = False | |
argparse.Action.__init__(self, option_strings, dest, **kwargs) | |
def __call__(self, parser, namespace, values, option_string=None): | |
if self.is_directory is None: | |
path_type = 'path' | |
path_exists = os.path.exists | |
elif self.is_directory: | |
path_type = 'directory' | |
path_exists = os.path.isdir | |
else: | |
path_type = 'file' | |
if self.is_executable: | |
path_exists = shutil.which | |
else: | |
path_exists = os.path.isfile | |
if isinstance(values, str): | |
values = os.path.expandvars(os.path.expanduser(values)) | |
if not path_exists(values): | |
parser.error('the parameter passed is not a {}\n {}\n' | |
.format(path_type, values)) | |
else: | |
for i, path in enumerate(values): | |
path = os.path.expandvars(os.path.expanduser(path)) | |
if not path_exists(path): | |
parser.error('the parameter passed is not a {}\n {}\n' | |
.format(path_type, path)) | |
values[i] = path | |
setattr(namespace, self.dest, values) | |
# Parse command line, check and report settings | |
name = os.path.basename(__file__) | |
description, license1, license2 = __doc__.rpartition('\nCopyright') | |
license = license1 + license2 | |
parser = argparse.ArgumentParser(prog=name, description=description, | |
epilog=license, formatter_class=argparse.RawDescriptionHelpFormatter) | |
parser.add_argument('-V', '--version', action='version', | |
version='{} v{}\n{}'.format(name, version, license)) | |
parser.add_argument('-v', '--verbose', | |
action='store_true', default=False, | |
help='show detailed info') | |
parser.add_argument('-dr', '--dry-run', | |
action='store_true', default=False, | |
help='don\'t change timestamps, implies --verbose') | |
parser.add_argument('-nc', '--no-ctime', dest='ctime', | |
action='store_false', default=True, | |
help='don\'t sync creation time (Windows)') | |
parser.add_argument('-na', '--no-atime', dest='atime', | |
action='store_false', default=True, | |
help='don\'t sync access time') | |
parser.add_argument('-nm', '--no-mtime', dest='mtime', | |
action='store_false', default=True, | |
help='don\'t sync modification time') | |
type_crit = parser.add_mutually_exclusive_group() | |
type_crit.add_argument('-f', '--files', dest='type_crit', | |
action='store_const', const='files', | |
help='only update file timestamps') | |
type_crit.add_argument('-d', '--directories', dest='type_crit', | |
action='store_const', const='directories', | |
help='only update directory timestamps') | |
time_crit = parser.add_mutually_exclusive_group() | |
time_crit.add_argument('-o', '--older', dest='time_crit', | |
action='store_const', const='older', | |
help='only update timestamps if the ones in ' | |
'source_tree are older') | |
time_crit.add_argument('-n', '--newer', dest='time_crit', | |
action='store_const', const='newer', | |
help='only update timestamps if the ones in ' | |
'source_tree are newer') | |
parser.add_argument('source_tree', | |
action=CheckPathAction, directory=True, | |
help='specify the source directory tree') | |
parser.add_argument('target_tree', | |
action=CheckPathAction, directory=True, | |
help='specify the target directory tree') | |
settings = parser.parse_args() | |
if settings.verbose: | |
ts_list = [] | |
if settings.ctime and os.name == 'nt': | |
ts_list.append('creation') | |
if settings.atime: | |
ts_list.append('access') | |
if settings.mtime: | |
ts_list.append('modification') | |
settings_str = ('{} v{}\n\n' | |
'Source tree: {s.source_tree}\n' | |
'Target tree: {s.target_tree}\n' | |
'Timestamps to be updated: {}\n').format( | |
name, version, ', '.join(ts_list), s=settings) | |
if settings.type_crit is not None: | |
settings_str += 'Only sync {}\n'.format(settings.type_crit) | |
if settings.time_crit is not None: | |
settings_str += 'Only sync {} timestamps\n'.format( | |
settings.time_crit) | |
if settings.dry_run: | |
settings_str += 'Dry run\n' | |
print(settings_str) | |
else: | |
atexit.unregister(input) | |
if pywin32_err and settings.ctime: | |
if os.name == 'nt': | |
print('Error importing pywin32: {}\nCan\'t change creation date\n' | |
.format(pywin32_err), file=sys.stderr) | |
settings.ctime = False | |
if not (settings.ctime or settings.atime or settings.mtime): | |
parser.error('at least one timestamp must be updated') | |
# Sync timestamps | |
sync_tree_timestamps(settings.source_tree, settings.target_tree, | |
settings.ctime, settings.atime, settings.mtime, | |
settings.type_crit, settings.time_crit, | |
settings.dry_run, settings.verbose) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment