Created
May 11, 2012 11:02
-
-
Save stringfellow/2658992 to your computer and use it in GitHub Desktop.
TMS to OSM directory conversion script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import argparse | |
import commands | |
from datetime import datetime | |
try: | |
import progressbar as pb | |
except ImportError: | |
pb = False | |
class AlreadyCopiedFrom(Exception): | |
pass | |
class AlreadyCopiedTo(Exception): | |
pass | |
class BadPath(Exception): | |
pass | |
class FilesystemDiver(object): | |
"""Crawls the filesystem and extracts filepath & filenames information.""" | |
def __init__(self, path): | |
self.path = path | |
def get_folders_in_filepath(self): | |
return [f for f in os.listdir(self.path) | |
if not os.path.isfile(os.path.join(self.path, f))] | |
def yield_filepath_info(self): | |
"""Yields a tuple with filepath and filenames.""" | |
for root, dirs, files in os.walk(self.path): | |
yield (root, dirs, files) | |
class TMS2OSM(object): | |
"""Convert TMS tile directory format to OSM z,x,y format""" | |
def __init__(self, args): | |
self.args = args | |
self.count = 0 | |
self.in_path = args.in_dir | |
self.out_path = args.out_dir or os.path.join( | |
self.in_path, '../', 'OSM_ZXY') | |
self.copied_to = [] | |
self.copied_to_from = {} | |
self.duplicates = [] | |
def reformat(self, path_list): | |
"""Re-join TMS spilt path into OSM x, y, z format. | |
E.g., given: [11, 000, 000, 010, 000, 026, 013] | |
Result is [11, 10, 26013] | |
I.e. _z_ ______x______ _______y______ | |
| | | | | |
[11, 000, 000, 010, 000, 026, 013] | |
Note that all zeros should be treated as significant until we have the | |
final triplet, at which point the leading ones can be stripped. | |
""" | |
z = path_list[0] | |
x = "%s%s%s" % (path_list[1], path_list[2], path_list[3]) | |
y = "%s%s%s" % (path_list[4], path_list[5], path_list[6]) | |
return [int(z), int(x), int(y)] | |
def rewrite(self, filename, osm_parts): | |
"""Write the filename to the correct OSM path.""" | |
new_dir = os.path.join( | |
self.out_path, str(osm_parts[0]), str(osm_parts[1])) | |
new_filename = os.path.join(new_dir, str(osm_parts[2]) + '.png') | |
if not os.path.exists(new_dir) and not self.args.dry_run: | |
os.makedirs(new_dir) | |
if self.args.perform_checks: | |
try: | |
if new_filename in self.copied_to: | |
raise AlreadyCopiedTo( | |
"%s -> %s" % (filename, new_filename)) | |
self.copied_to.append(new_filename) | |
self.copied_to_from[new_filename] = filename | |
except AlreadyCopiedTo, e: | |
print e | |
self.duplicates.append(new_filename) | |
self.count += 1 | |
if self.args.progress: | |
if pb: | |
self.progress.update(self.count) | |
elif self.count % 50000 == 0: | |
delta = datetime.now() - self.start | |
print "[%s] Done %s (%s)" % ( | |
delta, self.count, (self.count / self.maxval)) | |
if self.args.verbose and self.args.dry_run: | |
print "Pretending to move %s to %s" % (filename, new_filename) | |
if not self.args.dry_run: | |
if self.args.verbose: | |
print "Moving %s to %s" % (filename, new_filename) | |
shutil.move(filename, new_filename) | |
def check_paths(self): | |
"""Make sure we are ok to do this.""" | |
if not os.path.exists(self.in_path): | |
raise BadPath("The in-path (%s) wasn't found." % self.in_path) | |
if not os.path.exists(self.out_path): | |
print "Making directory %s" % self.out_path | |
if not self.args.dry_run: | |
os.makedirs(self.out_path) | |
elif len(os.listdir(self.out_path)) > 0 and not self.args.force: | |
raise BadPath("Out-path (%s) is non-empty!" % self.out_path) | |
return True | |
def get_max_count(self): | |
"""Get the number of PNGs in the dirs...""" | |
# bit unsafe... | |
return int( | |
commands.getoutput('find %s -type f | wc -l' % self.in_path)) | |
def run(self): | |
"""Walk the path, do the things.""" | |
self.start = datetime.now() | |
self.check_paths() | |
self.maxval = self.get_max_count() | |
if pb and self.args.progress: | |
self.progress = pb.ProgressBar(maxval=self.maxval).start() | |
filesystem = FilesystemDiver(self.in_path) | |
for info in filesystem.yield_filepath_info(): | |
for fn in info[2]: | |
file_path = os.path.join(info[0], fn) | |
# get the tms path without the parent dir | |
tms_root = info[0].replace(self.in_path, '') | |
component = fn.replace('.png', '') | |
tms_path = os.path.join(tms_root, component) | |
tms_parts = tms_path.split('/')[1:] | |
osm_parts = self.reformat(tms_parts) | |
self.rewrite(file_path, osm_parts) | |
if pb and self.args.progress: | |
self.progress.finish() | |
if self.args.perform_checks: | |
for d in self.duplicates: | |
print("Duplicate destination: %s" | |
"\n\t\tThe first file was: %s" % ( | |
d, self.copied_to_from[d])) | |
print "%s duplicates" % len(self.duplicates) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description='Convert TMS directories/files to OSM') | |
parser.add_argument("in_dir") | |
parser.add_argument("--out-dir") | |
parser.add_argument("--dry-run", action="store_true") | |
parser.add_argument("--verbose", action="store_true") | |
parser.add_argument("--progress", action="store_true") | |
parser.add_argument( | |
"--force", action="store_true", help="Don't care if out-dir is empty") | |
parser.add_argument( | |
"--perform-checks", | |
action="store_true", | |
help=("Check every file to make sure we haven't already " | |
"moved it (causes slowdown).")) | |
args = parser.parse_args() | |
converter = TMS2OSM(args) | |
converter.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment