Created
May 11, 2012 00:15
-
-
Save emacsen/2656735 to your computer and use it in GitHub Desktop.
OSM Tiger expansion code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from xml.sax.handler import ContentHandler | |
import os | |
import codecs | |
from xml.sax.saxutils import escape | |
class OSMHandler(ContentHandler): | |
"""This is a base OSMHandler class which sets up the XML parsing, etc. | |
You will want to override the selectElement and transformElement | |
functions""" | |
def __init__(self, file_prefix): | |
self.path = file_prefix | |
self.file_prefix = file_prefix | |
self.object_counter = 0 | |
self.clear() | |
self.max_objects_per_file = 1000 | |
self.file_counter = 0 | |
self.out = None | |
def _open(self): | |
if not os.path.isdir(self.path): | |
os.mkdir(self.path) | |
#fh = codecs.open(self.path + '/' + "%s_%04d.osm" % | |
# (self.file_prefix, self.file_counter), 'w', "utf-8") | |
self.fname = self.path + '/' + "%04d.osm" % self.file_counter | |
#print "Opening %s" % self.fname | |
fh = codecs.open(self.fname, 'w', "utf-8") | |
self.out = fh | |
self.out.write('<?xml version=\'1.0\' encoding=\'UTF-8\'?>\n') | |
self.out.write('<osm version="0.6" generator="pyxbot">\n') | |
def _close(self): | |
#print "Closing " + self.fname | |
self.out.write('</osm>\n') | |
self.out.flush() | |
self.out.close() | |
self.out = None | |
self.object_counter = 0 | |
self.file_counter = self.file_counter + 1 | |
def bump_version(self): | |
self.attrs['version'] = str(int(self.attrs['version']) + 1) | |
self.attrs['version'] = str(int(self.attrs['version']) + 1) | |
def remove_user_changeset(self): | |
if self.attrs.get('changeset'): | |
del(self.attrs['changeset']) | |
if self.attrs.get('uid'): | |
del(self.attrs['uid']) | |
if self.attrs.get('user'): | |
del(self.attrs['user']) | |
if self.attrs.get('timestamp'): | |
del(self.attrs['timestamp']) | |
# The output methods don't do any kind of data validation | |
def _str_node(self): | |
"Return a node as a string" | |
if self.tags: | |
s = u'<node %s >\n' % ' '.join([u'%s="%s"' % (x,y) | |
for x,y in self.attrs.items()]) | |
for key,val in self.tags.items(): | |
s += u' <tag k="%s" v="%s" />\n' % (escape(key), escape(val)) | |
s += u'</node>' | |
else: | |
s = u'<node %s />\n' % ' '.join(['%s="%s"' % (x,y) | |
for x,y in self.attrs.items()]) | |
return s | |
def _str_way(self): | |
"Output a way as a string" | |
s = u'<way %s >\n' % ' '.join([u'%s="%s"' % (x, y) | |
for x, y in self.attrs.items()]) | |
for nodeid in self.nodes: | |
s += u' <nd ref="%s" />\n' % nodeid | |
for key, val in self.tags.items(): | |
s += u' <tag k="%s" v="%s" />\n' % (escape(key), escape(val)) | |
s += u'</way>\n' | |
return s | |
def _str_relation(self): | |
if self.members or self.tags: | |
s = u'<relation %s >\n' % ' '.join([u'%s="%s"' % (x, y) | |
for x, y in self.attrs.items()]) | |
for member in members: | |
s += u' <member %s />\n' % ' '.join(['%s="%s"' % (x,y) | |
for x,y in member.items()]) | |
for key, val in self.tags.items(): | |
s += u' <tag k="%s" v="%s" />\n' % (escape(key), escape(val)) | |
s += u'</relation>\n' | |
else: | |
s = u'<relation %s />\n' % ' '.join([u'%s="%s"' % (x, y) | |
for x, y in self.attrs.items()]) | |
return s | |
def emit(self): | |
"Output the current element" | |
if self.type == 'node': | |
s = self._str_node() | |
elif self.type == 'way': | |
s = self._str_way() | |
elif self.type == 'relation': | |
s = self._str_relation() | |
self.out.write(s) | |
def clear(self): | |
"Initialize the state machine" | |
self.type = None | |
self.tags = {} | |
self.nodes = [] | |
self.members = [] | |
self.attrs = {} | |
self.fixed = None | |
def startElement(self, tag, attrs): | |
"This function is called at the start of the element (as per SAX)" | |
if tag == 'node': | |
self.type = 'node' | |
self.attrs = dict(attrs) | |
elif tag == 'way': | |
self.type = 'way' | |
self.attrs = dict(attrs) | |
elif tag == 'relation': | |
self.type = 'relation' | |
self.attrs = dict(attrs) | |
elif tag == 'tag': | |
self.tags[attrs.get('k')] = attrs.get('v') | |
elif tag == 'member': | |
self.members.append(attrs.copy()) | |
elif tag == 'nd': | |
self.nodes.append(attrs.get('ref')) | |
def selectElement(self): | |
"""Select whether or not we care about the OSM object (True or | |
False). Override this function in your handler""" | |
return False | |
def transformElement(self): | |
"""Transform the element. Override this function in your | |
handler""" | |
pass | |
def deleteElement(self): | |
"""Returns the string to delete the element. Please use with | |
caution!""" | |
self.out.write('<delete version="%s" generator="%s">\n' % | |
(VERSION, BOTNAME)) | |
self.emit() | |
self.out.write('</delete>\n') | |
def endElement(self, tag): | |
"""As per the SAX handler, this method is where any work is | |
done. You may want to override it, but probably not""" | |
# If there's no open output, we need to open it | |
if not self.out: | |
self._open() | |
if tag == 'way': | |
self.nodes = tuple(self.nodes) | |
elif tag == 'relation': | |
self.members = tuple(self.members) | |
if tag == 'node' or tag == 'way' or tag == 'relation': | |
if self.selectElement(): | |
self.transformElement() | |
if self.fixed: | |
self.emit() | |
self.object_counter = self.object_counter + 1 | |
if self.object_counter >= self.max_objects_per_file: | |
self._close() | |
self.clear() | |
def endDocument(self): | |
self._close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""This is the base library that can used to run various OSM bots | |
which are implemented as plugins""" | |
import sys | |
from xml.sax.handler import ContentHandler | |
from xml.sax import make_parser | |
from xml.sax.saxutils import quoteattr | |
import argparse | |
from pyxbot import OSMHandler | |
from os import remove | |
import codecs | |
def add_or_incr(dct, item): | |
if dct.has_key(item): | |
dct[item] = dct[item] + 1 | |
else: | |
dct[item] = 1 | |
# Thank you https://www.usps.com/send/official-abbreviations.htm | |
road_types = { | |
'Aly': 'Alley', | |
'Anx': 'Annex', ## From USPS | |
'Arc': 'Arcade', ## From USPS | |
'Ave': 'Avenue', | |
'Bch': 'Beach', ## From USPS | |
'Blf': 'Bluff', ## From USPS | |
'Blfs': 'Bluffs', ## From USPS | |
'Blvd': 'Boulevard', | |
'Bnd': 'Bend', ## From USPS | |
'Br': 'Bridge', | |
'Brg': 'Bridge', | |
'Byp': 'Bypass', | |
'Byu': 'Bayoo', ## From USPS | |
'Cir': 'Circle', | |
'Cres': 'Crescent', | |
'Cswy': 'Crossway', | |
'Ct': 'Court', | |
'Ctr': 'Center', | |
'Cv': 'Cove', | |
'Dr': 'Drive', | |
'Expy': 'Expressway', | |
'Expwy': 'Expressway', | |
'FMRd': 'Farm to Market Road', | |
'Fwy': 'Freeway', | |
'Grd': 'Grade', | |
'Hbr': 'Harbor', | |
'Holw': 'Hollow', | |
'Hwy': 'Highway', | |
'Ln': 'Lane', | |
'Lndg': 'Landing', | |
'Mal': 'Mall', | |
'Mtwy': 'Motorway', | |
'Ovps': 'Overpass', | |
'Pky': 'Parkway', | |
'Pkwy': 'Parkway', | |
'Pl': 'Place', | |
'Plz': 'Plaza', | |
'Rd': 'Road', | |
'Rdg': 'Ridge', | |
'RMRd': 'Ranch to Market Road', | |
'Rte': 'Route', | |
'Skwy', 'Skyway', | |
'Sq': 'Square', | |
'St': 'Street', | |
'Ter': 'Terrace', | |
'Tfwy': 'Trafficway', | |
'Thfr': 'Thoroughfare', | |
'Thwy': 'Thruway', | |
'Tpke': 'Turnpike', | |
'Trce': 'Trace', | |
'Trl' : 'Trail', | |
'Tunl': 'Tunnel', | |
'Unp': 'Underpass', | |
'Wkwy': 'Walkway', | |
'Xing': 'Crossing', | |
### NOT EXPANDED | |
'Way': 'Way', | |
'Walk': 'Walk', | |
'Loop': 'Loop', | |
'Oval': 'Oval', | |
'Ramp': 'Ramp', | |
'Row': 'Row', | |
'Run': 'Run', | |
'Pass': 'Pass', | |
'Spur': 'Spur', | |
'Path': 'Path', | |
'Pike': 'Pike', | |
'Rue': 'Rue', | |
'Mall': 'Mall', | |
} | |
directions = { | |
'N': 'North', | |
'S': 'South', | |
'E': 'East', | |
'W': 'West', | |
'NE': 'Northeast', | |
'NW': 'Northwest', | |
'SE': 'Southeast', | |
'SW': 'Southwest'} | |
class TigerRoadExpansionHandler(OSMHandler): | |
def __init__(self, file_prefix): | |
OSMHandler.__init__(self, file_prefix) | |
self.roads = 0 | |
self.num_fixed = 0 | |
self.checkme_ways = [] | |
self.unrecognized_tags = {} | |
self.unrecognized_direction_tags = {} | |
self.ambigious_expansions = {} | |
def selectElement(self): | |
tags = self.tags | |
# We only care about ways with highway=* tags that have tiger:name_base | |
if not (self.type == 'way' and tags.has_key('highway') and | |
tags.has_key('tiger:name_base')): | |
return | |
# Of those, we only care about those with a name | |
if not tags.has_key('name'): | |
return | |
name = tags['name'] | |
self.roads += 1 | |
self.namel = name.split() | |
# If we have a name_type that we haven't seen, store it. | |
# If the name is ambigious, store it. | |
road_type = tags.get('tiger:name_type') | |
if road_type: | |
if road_type not in road_types: | |
add_or_incr(self.unrecognized_tags, road_type) | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Unknown road_type (%s)' % road_type}) | |
road_type = None | |
elif self.namel.count(road_type) > 1: | |
add_or_incr(self.ambigious_expansions, name) | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Ambigious expansion'}) | |
road_type = None | |
elif self.namel.count(road_type) < 1: | |
if not self.namel.count(road_types[road_type]) >= 1: | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Road type (%s) not in name' % road_type}) | |
road_type = None | |
self.road_type = road_type | |
# Same with the direction tags prefix | |
dir_tag_prefix = tags.get('tiger:name_direction_prefix') | |
if dir_tag_prefix: | |
if not dir_tag_prefix in directions: | |
add_or_incr(self.unrecognized_direction_tags, dir_tag_prefix) | |
dir_tag_prefix = None | |
else: | |
if self.namel.count(dir_tag_prefix) > 1: | |
add_or_incr(self.ambigious_expansions, name) | |
dir_tag_prefix = None | |
elif self.namel.count(dir_tag_prefix) < 1: | |
dir_tag_prefix = None | |
self.dir_tag_prefix = dir_tag_prefix | |
dir_tag_suffix = tags.get('tiger:name_direction_suffix') | |
if dir_tag_suffix: | |
if not dir_tag_suffix in directions: | |
add_or_incr(self.unrecognized_direction_tags, dir_tag_suffix) | |
dir_tag_suffix = None | |
else: | |
if self.namel.count(dir_tag_suffix) > 1: | |
add_or_incr(self.ambigious_expansions, name) | |
dir_tag_suffix = None | |
elif self.namel.count(dir_tag_suffix) < 1: | |
dir_tag_suffix = None | |
self.dir_tag_suffix = dir_tag_suffix | |
if road_type or dir_tag_suffix or dir_tag_prefix: | |
return True | |
def transformElement(self): | |
tags = self.tags | |
name = tags['name'] | |
tags = self.tags | |
namel = self.namel | |
short_road_type = self.road_type | |
if short_road_type: | |
long_road_type = road_types[short_road_type] | |
indx = namel.index(short_road_type) | |
namel[indx] = long_road_type | |
dir_tag_prefix = self.dir_tag_prefix | |
if dir_tag_prefix: | |
try: | |
long_direction = directions[dir_tag_prefix] | |
except KeyError: | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Direction prefix (%s) not in directions list' % dir_tag_prefix}) | |
return | |
try: | |
indx = namel.index(dir_tag_prefix) | |
namel[indx] = long_direction | |
except ValueError: | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Direction prefix (%s) not in name' % dir_tag_prefix}) | |
return | |
dir_tag_suffix = self.dir_tag_suffix | |
if dir_tag_suffix: | |
try: | |
long_direction = directions[dir_tag_suffix] | |
except KeyError: | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Direction suffix (%s) not in directions list' % dir_tag_prefix}) | |
return | |
try: | |
indx = namel.index(dir_tag_suffix) | |
namel[indx] = long_direction | |
except ValueError: | |
self.checkme_ways.append({'name': tags.get('name'), | |
'id': self.attrs['id'], | |
'reason': 'Direction suffix (%s) not in name' % dir_tag_suffix}) | |
return | |
newname = ' '.join(namel) | |
if newname != name: | |
self.tags['name'] = newname | |
self.bump_version() | |
self.remove_user_changeset() | |
self.fixed = True | |
self.num_fixed += 1 | |
def endDocument(self): | |
self._close() | |
if self.num_fixed == 0: | |
remove(self.fname) | |
def main(): | |
argparser = argparse.ArgumentParser(description="Tiger expansion bot") | |
argparser.add_argument('--input', dest = 'infname', | |
help = 'The input filename') | |
argparser.add_argument('--outdir', dest = 'outdirname', | |
default = 'processed', help = 'The output directory') | |
argparser.add_argument('--checkways', dest = 'checkways_fname', | |
default = 'ways.csv', | |
help = "Unfixable way csv file") | |
args = argparser.parse_args() | |
if args.infname == '-': | |
input = sys.stdin | |
args.infname = 'expansion' | |
else: | |
input = open(args.infname, 'r') | |
if not args.outdirname: | |
args.outdirname = args.infname | |
dirname = args.outdirname | |
parser = make_parser() | |
handler = TigerRoadExpansionHandler(dirname) | |
parser.setContentHandler(handler) | |
parser.parse(input) | |
#print "%d total roads" % handler.roads | |
#print "%d fixed roads" % handler.num_fixed | |
#print "%d unrecognized tags" % len(handler.unrecognized_tags) | |
#print "%d ambigious road names" % len(handler.ambigious_expansions) | |
#print "Ambigious Names" | |
#print "================" | |
#for key, val in handler.ambigious_expansions.items(): | |
# print "%s (%s)" % (key, val) | |
#print "Unrecognized Tags" | |
#print "=================" | |
#for key,val in handler.unrecognized_tags.items(): | |
# print "%s (%s)" % (key, val) | |
if handler.checkme_ways: | |
fd = codecs.open(args.checkways_fname, 'w', 'utf-8') | |
fd.write('ID,Name,Reason\n') | |
for i in handler.checkme_ways: | |
fd.write("%s,%s,%s\n" % (i['id'], i['name'], i['reason'])) | |
fd.close() | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment