Last active
May 20, 2016 23:01
-
-
Save mzywiol/e49a6e348bdc527e64ff to your computer and use it in GitHub Desktop.
Command line tool to create and manage symlink collections for easy creation and update.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import path | |
import argparse | |
import csv | |
import re | |
import sys | |
import os | |
import collections | |
DEFAULT_LINKS_FILE = 'collection.csv' | |
parser = argparse.ArgumentParser( | |
description='Manage dir and file base for easy copying or creating filesystem links.', | |
prog='CollectionManager', | |
epilog='Made by mzywiol 2014-2016') | |
restore = parser.add_mutually_exclusive_group() | |
parser.add_argument('--file', '-f', | |
default=DEFAULT_LINKS_FILE, | |
help='file with collection information. Defaults to %s' % DEFAULT_LINKS_FILE, | |
metavar='LINK_FILE.csv') | |
parser.add_argument('--empty', '-e', | |
action='store_true', | |
help='clear the collection file') | |
parser.add_argument('--add', '-a', | |
nargs='?', | |
const='.', | |
help='source file or directory to collection', | |
metavar='SOURCE') | |
parser.add_argument('--delete', '-d', | |
help='delete entries pointing to targets by given filter', | |
metavar='FILTER') | |
parser.add_argument('--unlink', '-u', | |
help='delete entries with sources by given filter', | |
metavar='FILTER') | |
parser.add_argument('--list', '-l', '-lt', | |
nargs='?', | |
const='', | |
help='list entries from the collection, optionally filtered by target', | |
metavar='FILTER') | |
parser.add_argument('--list-source', '-ls', | |
help='list entries from the collection, optionally filtered by source', | |
metavar='FILTER') | |
parser.add_argument('--target', '-t', | |
help='name the entry will get in recreated collection. Defaults to same as source.', | |
metavar='LINK_NAME') | |
restore.add_argument('--restore-links', '-r', | |
nargs='?', | |
const='.', | |
help='create links based on collection file in given directory (defaults to current dir).', | |
metavar='RESTORE_ROOT') | |
restore.add_argument('--copy', '-c', | |
nargs='?', | |
const='.', | |
help='copy files from collection file into given directory (defaults to current dir).', | |
metavar='TARGET_ROOT') | |
parser.add_argument('--hard', | |
action='store_true', | |
help='will create hard file links') | |
parser.add_argument('--verbose', '-v', | |
action="store_true", | |
help='will print out messages') | |
args = parser.parse_args() | |
def confirm_yes_no(message): | |
# return True | |
while True: | |
print('%s [y/n]?' % message) | |
choice = input().lower() | |
if choice == 'y': | |
return True | |
if choice == 'n': | |
return False | |
def log(message): | |
if args.verbose: | |
print(message) | |
def path_to_dirs(path_to_split): | |
cur_path = path_to_split | |
dirs = [] | |
while True: | |
base_name = path.basename(cur_path) | |
if base_name == '': | |
break | |
dirs.append(base_name) | |
cur_path = path.dirname(cur_path) | |
return dirs | |
def save_list_to_file(link_map, links_file_name): | |
ordered_links = collections.OrderedDict(sorted(link_map.items())) | |
with open(links_file_name, 'w+', encoding='utf-8') as csvfile: | |
csvwriter = csv.writer(csvfile, quotechar='"', quoting=csv.QUOTE_ALL) | |
for tgt in ordered_links: | |
csvwriter.writerow([ordered_links[tgt], tgt]) | |
def read_map_from_file(links_file_name): | |
link_map = {} | |
if not path.exists(links_file_name): | |
return link_map | |
with open(links_file_name, "r", encoding='utf-8') as csvfile: | |
csv_reader = csv.reader(csvfile, quotechar='"') | |
for row in csv_reader: | |
if len(row) == 0: | |
continue | |
#print row | |
if row[1] in link_map: | |
log('Substituting link %s: removing source %s, adding %s' % (row[1], link_map[row[1]], row[0])) | |
link_map[row[1]] = row[0] | |
return link_map | |
def none_or_empty(str): | |
if not str: #TODO: can be done better? | |
return True | |
else: | |
return False | |
def or_(value, value_if_null_or_empty): #TODO: substitute with lambda? | |
return value if not none_or_empty(value) else value_if_null_or_empty | |
def escape_apos(string): | |
return string.replace('\'', '\\\'') | |
def substitute(substitutions, to_substitute): #TODO: explain substitutions in help | |
sub_match = re.match(r'%(N+)(\d*)(?:-(\d*))?', to_substitute) | |
if sub_match == None: | |
return to_substitute | |
lvl = or_(int(len(sub_match.group(1))) - 1, 0) | |
sub = substitutions[lvl] | |
bgn = int(or_(sub_match.group(2), 1))-1 | |
end = int(or_(sub_match.group(3), len(sub))) | |
return eval('\'%s\'[%d:%d]' % (escape_apos(sub), bgn, end)) | |
def substitute_all(source, substitution): | |
chopped = re.split(r'(%N+\d*-?\d*)', source) | |
for i in range(0, len(chopped)): | |
chopped[i] = substitute(substitution, chopped[i]) | |
return ''.join(chopped) | |
def parse_target(target, source_path): | |
target = substitute_all(target, path_to_dirs(source_path)) | |
return target | |
def add_link(path_to_add, target_name, links_file_name): | |
# save absolute path to add | |
path_added = path.abspath(path_to_add) | |
source_name = path.split(path_added)[1] | |
# find out target path | |
if target_name == None: | |
target_name = source_name | |
else: | |
# use target name provided | |
target_name = parse_target(target_name, path_added) | |
if target_name == '': | |
sys.exit("Error: target name cannot be empty.") | |
link_map = read_map_from_file(links_file_name) | |
link_map[target_name] = path_added | |
save_list_to_file(link_map, links_file_name) | |
print('%s -> %s' % (path_added, target_name)) | |
def list_file(links_file_name, sel, filter_by_target=True): | |
link_map = read_map_from_file(links_file_name) | |
pattern = re.compile('.*' + sel + '.*') | |
filtered_keys = [key for key in filter(lambda key: pattern.match(key if filter_by_target else link_map[key]) != None, link_map)] | |
print('%d links %sfound in file %s.' % (len(filtered_keys), ('with filter %s ' % sel) if sel else '', links_file_name)) | |
for key in filtered_keys: | |
print('%s -> %s' % (link_map[key], key)) | |
def delete_by_filter(links_file_name, sel, filter_by_target=True): | |
link_map = read_map_from_file(links_file_name) | |
pattern = re.compile('.*' + sel + '.*') | |
to_delete = [] | |
for tgt in link_map: | |
filtered = tgt if filter_by_target else link_map[tgt] | |
if pattern.match(filtered) != None: | |
to_delete.append(tgt) | |
for to_del in to_delete: | |
del link_map[to_del] | |
save_list_to_file(link_map, links_file_name) | |
def make_diretory_posix(path): | |
dirs = path.split('/') | |
curdir = dirs[0] | |
for d in dirs: | |
curdir += d + '/' | |
os.system('mkdir "%s"' % curdir) | |
def make_directory_windows(path): | |
os.system('mkdir "%s"' % path) | |
def make_directory(path): | |
if os.name == 'nt': | |
make_directory_windows(path) | |
elif os.name == 'posix': | |
make_directory_posix(path) | |
def create_link_posix(target, link_name, hardlink=False): | |
symb_flag = "-s" if not hardlink else "" | |
os.system('ln "%s" "%s"' % (target, link_name)) | |
def create_file_link_windows(target, link_name, hardlink=False): | |
hard_flag = "/h" if hardlink else "" | |
os.system('mklink %s "%s" "%s"' % (hard_flag, link_name, target)) | |
def create_file_link(target, link_name, hardlink=False): | |
if os.name == 'nt': | |
create_file_link_windows(target, link_name, hardlink) | |
elif os.name == 'posix': | |
create_link_posix(target, link_name, hardlink) | |
def create_directory_link_windows(target, link_name): | |
os.system('mklink /d "%s" "%s"' % (link_name, target)) | |
def create_directory_link(target, link_name): | |
if os.name == 'nt': | |
create_directory_link_windows(target, link_name) | |
elif os.name == 'posix': | |
create_link_posix(target, link_name) | |
def restore_links(links_dir, links_file_name, hardlinks=False): | |
if not path.exists(links_dir): | |
sys.exit('Restore root %s does not exist.' % links_dir) | |
link_map = read_map_from_file(links_file_name) | |
print('Creating %d links from %s in %s' % (len(link_map), links_file_name, links_dir)) | |
if len(link_map) == 0: | |
return | |
if not confirm_yes_no('Continue?'): | |
sys.exit('Restoration aborted.') | |
for tgt in link_map: | |
source_path = link_map[tgt] | |
target_path = path.join(links_dir, tgt) | |
if path.exists(target_path): | |
log('Target %s exists. Link not created.' % target_path) | |
continue | |
if not path.exists(source_path): | |
log('Source %s does not exist. Skipping...' % source_path) | |
continue | |
# if path to target does not exist, create all necessary directories | |
link_name = path.dirname(target_path) | |
if not path.exists(link_name): | |
make_directory(link_name) | |
# create appropriate link | |
if path.isdir(source_path): | |
create_directory_link(source_path, target_path) | |
elif path.isfile(source_path): | |
create_file_link(source_path, target_path, hardlinks) | |
def copy_file_windows(source, target): | |
os.system('robocopy "%s" "%s" /E' % (source, target)) | |
def copy_file(source_path, target_path): | |
if os.name == 'nt': | |
copy_file_windows(source_path, target_path) | |
elif os.name == 'posix': | |
print('Copy file to be implemented for posix') | |
def copy_directory_windows(source_path, target_path): | |
os.system('mkdir "%s"' % target_path) | |
copy_file_windows(source_path, target_path) | |
def copy_directory(source_path, target_path): | |
if os.name == 'nt': | |
copy_directory_windows(source_path, target_path) | |
elif os.name == 'posix': | |
print('Copy directory to be implemented for posix') | |
def copy_links(links_dir, links_file_name): | |
if not path.exists(links_dir): | |
sys.exit('Restore root %s does not exist.' % links_dir) | |
link_map = read_map_from_file(links_file_name) | |
print('Copying %d files and directories from collection %s into %s' % (len(link_map), links_file_name, links_dir)) | |
if len(link_map) == 0: | |
return | |
if not confirm_yes_no('Continue?'): | |
sys.exit('Copy aborted.') | |
for tgt in link_map: | |
source_path = link_map[tgt] | |
target_path = path.join(links_dir, tgt) | |
if path.exists(target_path): | |
log('Target %s exists. Skipping...' % target_path) | |
continue | |
if not path.exists(source_path): | |
log('Source %s does not exist. Skipping...' % source_path) | |
continue | |
# if path to target does not exist, create all necessary directories | |
link_name = path.dirname(target_path) | |
if not path.exists(link_name): | |
make_directory(link_name) | |
# create appropriate link | |
if path.isdir(source_path): | |
copy_directory(source_path, target_path) | |
elif path.isfile(source_path): | |
copy_file(source_path, target_path) | |
def clear_file(file_to_clear): | |
f = open(file_to_clear, 'w', encoding='utf-8') | |
f.close() | |
### MAIN PROCEDURE | |
if args.empty: | |
# empty links file | |
clear_file(args.file) | |
if args.delete != None: | |
delete_by_filter(args.file, args.delete) | |
if args.unlink != None: | |
delete_by_filter(args.file, args.unlink, filter_by_target=False) | |
if args.add != None: | |
# add link to links file | |
add_link(args.add, args.target, args.file) | |
if args.list != None: | |
list_file(args.file, args.list) | |
elif args.list_source != None: | |
list_file(args.file, args.list_source, filter_by_target=False) | |
if args.restore_links != None: | |
restore_dir = path.abspath(args.restore_links) | |
print('Restoring links from: %s in %s' % (args.file, restore_dir)) | |
restore_links(restore_dir, args.file, args.hard) | |
elif args.copy != None: | |
copy_links(args.copy, args.file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Now it's not just about links, so we're operating on collections now. They are still mapped source -> destination, but I added option of copying files listed in collection to the target paths, not only creating links to them.
Also figured out some encoding problems, extracted command line logging to separate function.
What remains: add copy support for Linux systems and logging to actual file.