Instantly share code, notes, and snippets.
Last active
August 29, 2015 14:08
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save arnaudcordier/385a86e283194f7ef2bd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Manage files and the underlying tree of directories | |
you can use it to manage a lot of files with a «distributed» amount of files per directory | |
Usage : | |
# import | |
from hiddenFileSystem import HiddenFileSystem | |
# Instanciate with a directory name : | |
base_directory = "myDocs" | |
fs = HiddenFileSystem(base_directory) | |
# Write | |
file_name = "myFile.txt" | |
content = "Oh my lord, it works\n" | |
fs.write(file_name, content) | |
print "Path of", file_name, fs.get_path(file_name) | |
# read | |
content = fs.read(file_name) | |
print "content of", file_name, content, | |
# move an existing file | |
newFileName = 'test.txt' | |
with open(newFileName, 'w', encoding='utf-8') as newFile: | |
newFile.write(content) | |
print "Path of", newFileName, fs.move(newFileName) | |
# iterate over the files | |
for name in fs.files(): | |
print name | |
# delete | |
fs.delete(file_name) | |
fs.delete(newFileName) | |
You can open a file, but you have to close it ! | |
Written by ArnAud, licence MIT | |
""" | |
import hashlib | |
import os | |
import errno | |
from codecs import open | |
class HiddenFileSystem(): | |
def __init__(self, base_dir, depth=3): | |
if os.path.isdir(base_dir): | |
self.base_dir = base_dir | |
else: | |
print "%s is not a valid directory" % (base_dir) | |
raise OSError | |
self.depth = depth | |
# returns a file handle of the file | |
def open(self, file_name, mode='r', encoding='utf8', create_path=False): | |
path = self._get_path(file_name, create_path=create_path) | |
file_path = os.path.join(path, file_name) | |
fh = open(file_path, mode, encoding=encoding) | |
return fh | |
# returns the content of the file | |
def read(self, file_name, encoding='utf8'): | |
fh = self.open(file_name, encoding=encoding) | |
content = fh.read() | |
fh.close() | |
return content | |
# write content to the file | |
def write(self, file_name, content, encoding='utf8'): | |
fh = self.open(file_name, 'w', encoding=encoding, create_path=True) | |
if type(content) is str: | |
content = content.decode(encoding) | |
fh.write(content) | |
fh.close() | |
return True | |
# delete the file | |
def delete(self, file_name): | |
path = self._get_path(file_name) | |
if not path: | |
return | |
file_path = os.path.join(path, file_name) | |
os.remove(file_path) | |
# remove the empty directories | |
delpath = path | |
while delpath != self.base_dir: | |
try: | |
os.rmdir(delpath) | |
except OSError: | |
break | |
delpath, _ = os.path.split(delpath) | |
# move an existing file into the file system | |
def move(self, file_path): | |
if not os.path.isfile(file_path): | |
raise OSError | |
_, file_name = os.path.split(file_path) | |
new_path = os.path.join(self._get_path(file_name, create_path=True), file_name) | |
os.rename(file_path, new_path) | |
return new_path | |
# returns file path | |
def get_path(self, file_name): | |
path = self._get_path(file_name, True) | |
if not path: | |
return None | |
file_path = os.path.join(path, file_name) | |
return file_path | |
# iterator over the files names | |
def files(self, full_path=False): | |
for path, _, files in os.walk(self.base_dir): | |
for name in files: | |
if full_path: | |
name = os.path.join(path, name) | |
yield name | |
# iterator over the files names (recusive version) | |
def filesr(self, full_path=False, dirname=None): | |
if dirname is None: | |
dirname = self.base_dir | |
for name in os.listdir(dirname): | |
pathname = os.path.join(dirname, name) | |
if os.path.isdir(pathname): | |
names = self.files(full_path, pathname) | |
for name in names: | |
yield name | |
else: | |
if full_path: | |
yield pathname | |
else: | |
yield name | |
# iterator over the files names (iterative version) | |
def filesi(self, full_path=False): | |
names = [self.base_dir] | |
while names: | |
name = names.pop() | |
if os.path.isdir(name): | |
for n in os.listdir(name): | |
names.append(os.path.join(name, n)) | |
else: | |
if full_path: | |
yield name | |
else: | |
_, file_name = os.path.split(name) | |
yield file_name | |
# returns the path to the file | |
def _get_path(self, file_name, only_existing=False, create_path=False): | |
path = self.base_dir | |
hash = hashlib.md5(file_name).hexdigest()[0:self.depth] | |
for dir_name in list(hash): | |
path = os.path.join(path, dir_name) | |
if only_existing and not os.path.isfile(os.path.join(path, file_name)): | |
return None | |
if create_path: | |
self._create_path(path) | |
return path | |
# create the directory structure | |
def _create_path(self, path): | |
try: | |
os.makedirs(path) | |
except OSError as exc: # Python >2.5 | |
if exc.errno == errno.EEXIST and os.path.isdir(path): | |
pass | |
else: | |
raise | |
# find the depth of a given directory | |
@staticmethod | |
def find_depth(base_dir): | |
depth = -1 | |
while os.path.isdir(base_dir): | |
base_dir = os.path.join(base_dir, os.listdir(base_dir)[0]) | |
depth += 1 | |
if os.path.isdir(base_dir) and not os.listdir(base_dir): | |
depth += 1 | |
break | |
if depth < 1: | |
print "%s is not a valid directory" % (base_dir) | |
raise OSError | |
return depth | |
# migrate to fs of another depth | |
@staticmethod | |
def migrate(base_dir, depth): | |
old_depth = HiddenFileSystem.find_depth(base_dir) | |
if old_depth == depth: | |
return | |
base_dir = os.path.dirname(base_dir + os.sep) # strip / | |
new_rep = base_dir + "-" + str(depth) | |
old_fs = HiddenFileSystem(base_dir, old_depth) | |
old_fs._create_path(new_rep) | |
new_fs = HiddenFileSystem(new_rep, depth) | |
for name in old_fs.files(True): | |
new_fs.move(name) | |
return new_rep | |
# show stats about depth and number of files | |
@staticmethod | |
def statistic(total_files=19, not_enough=100, too_mutch=20000): | |
for nfiles in [10**j for j in xrange(4, total_files)]: | |
f = "Number of files :{: %d,}" % (total_files+1+(total_files/3)) | |
print f.format(nfiles) | |
depth = 0 | |
while True: | |
depth += 1 | |
ndir = 16**depth | |
file_per_dir = nfiles / ndir | |
if file_per_dir > too_mutch: | |
continue | |
if file_per_dir < not_enough: | |
break | |
f = "\tDepth:{: 3}, number of dir:{: %d}, files per dir: {: %d}" % (total_files-2, len(str(too_mutch))+1) | |
print f.format(depth, ndir, file_per_dir) | |
@staticmethod | |
def monitor(base_dir): | |
print '********** Hidden File System monitoring **********' | |
total_file = 0 | |
total_dir = 0 | |
nb_dir_of_nFiles = {} | |
for _, _, files in os.walk(base_dir): | |
nFiles = len(files) | |
if nFiles not in nb_dir_of_nFiles.keys(): | |
nb_dir_of_nFiles[nFiles] = 0 | |
nb_dir_of_nFiles[nFiles] += 1 | |
total_file += nFiles | |
total_dir += 1 | |
print '%s directories, %s files, depth %d in %s' % (total_dir, total_file, HiddenFileSystem.find_depth(base_dir), base_dir) | |
dir_sizes = [k for k in nb_dir_of_nFiles.keys()] | |
dir_sizes.sort() | |
print '\n'.join(['%s directories of %s files' % (nb_dir_of_nFiles[nfile], nfile) for nfile in dir_sizes]) | |
print '********** ***************************** **********' | |
if __name__ == '__main__': | |
import sys | |
import getopt | |
optlist, args = getopt.getopt(sys.argv[1:], 'mMs') | |
opt = [o[0] for o in optlist] | |
show_usage = False | |
# make sure argument is a directory | |
is_dir = False | |
if len(args) > 0: | |
base_dir = args.pop(0) | |
is_dir = os.path.isdir(base_dir) | |
# show help to choose depth | |
if '-s' in opt: | |
print "What Depth should you use:" | |
HiddenFileSystem.statistic() | |
# monitor a given directory | |
elif '-m' in opt and is_dir: | |
HiddenFileSystem.monitor(base_dir) | |
# migrate to another depth | |
elif '-M' in opt and is_dir and len(args) > 0: | |
depth = args[0] | |
if not depth.isdigit(): | |
print "Migration: depth should be an integer" | |
show_usage = True | |
else: | |
new_dir = HiddenFileSystem.migrate(base_dir, int(depth)) | |
HiddenFileSystem.monitor(new_dir) | |
# information about a file | |
elif is_dir and len(args) > 0: | |
file_name = args[0] | |
fs = HiddenFileSystem(base_dir, HiddenFileSystem.find_depth(base_dir)) | |
file_path = fs.get_path(file_name) | |
if not file_path: | |
print "Your file does not exist." | |
path = fs._get_path(file_name) | |
file_path = os.path.join(path, file_name) | |
print "But it would be", file_path | |
else: | |
print file_path | |
# help | |
else: | |
show_usage = True | |
if show_usage: | |
print "Usages:" | |
print " HiddenFileSystem -s => show help to choose depth" | |
print " HiddenFileSystem -m base_dir => monitor an existing HFS" | |
print " HiddenFileSystem -M base_dir newdepth => migrate base_dir HFS to another depth" | |
print " HiddenFileSystem base_dir file_name => show information about file_name in base_dir HFS" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment