Created
November 3, 2009 16:17
-
-
Save kerspoon/225171 to your computer and use it in GitHub Desktop.
Plaintext table parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/local/bin/python | |
from pyparsing import * | |
import StringIO | |
from decimal import Decimal | |
import string | |
import logging | |
logger = logging.getLogger(__name__) | |
testme = """ | |
# comment | |
# ---------------------------------------- | |
[people] | |
#h id , name , size , active , type | |
#t integer, word , decimal, boolean, word | |
#d 0 , bob , 0.00 , f , NA | |
# ---------------------------------------- | |
1 , frank , 2.63 , t , none | |
2 , dave , 1 , FALSE , tree | |
3 , jon , 4 , 1 , list | |
4 , mike , 1.85 , 0 , tree | |
5 , tim , , trUE , goat | |
# ---------------------------------------- | |
""" | |
# a comma seperated files with seperate sections | |
# comments are the '#' character | |
# sections have named headers as well as data types | |
# and defult values. | |
class ToBoolean(TokenConverter): | |
""" Converter to make token boolean """ | |
def postParse(self, instring, loc, tokenlist): | |
""" Converts the first token to boolean """ | |
tok = string.lower(tokenlist[0]) | |
if tok in ["t", "true", "1"]: | |
return True | |
elif tok in ["f", "false", "0"]: | |
return False | |
else: | |
raise Exception | |
class ToInteger(TokenConverter): | |
""" Converter to make token into an integer """ | |
def postParse(self, instring, loc, tokenlist): | |
""" Converts the first token to an integer """ | |
return int(tokenlist[0]) | |
class ToDecimal(TokenConverter): | |
""" Converter to make token into a float """ | |
def postParse(self, instring, loc, tokenlist): | |
""" Converts the first token into a float """ | |
return Decimal(tokenlist[0]) | |
decimal_sep = "." | |
sign = oneOf("+ -") | |
symbols = "_-." | |
bool_true = Or([CaselessLiteral("true"), CaselessLiteral("t"), Literal("1")]) | |
bool_false = Or([CaselessLiteral("false"), CaselessLiteral("f"), Literal("0")]) | |
boolean = ToBoolean(Or([bool_true, bool_false])) | |
integer = ToInteger( | |
Combine(Optional(sign) + Word(nums)) | |
) | |
decimal = ToDecimal( | |
Combine( | |
Optional(sign) + | |
Word(nums) + | |
Optional(decimal_sep + Word(nums)) + | |
Optional(oneOf("E e") + Optional(sign) + Word(nums)) | |
) | |
) | |
word = Word(alphanums, alphanums + symbols) | |
qstring = (sglQuotedString | dblQuotedString) | |
# parse_stream | |
# stream -> dict(string, dict(string, val)) | |
# example: | |
# >>> thefile = parse_stream(open("myfile.jb")) | |
# >>> thefile["phonebook"][0]["area-code"] | |
# 01291 | |
# | |
def parse_stream(stream): | |
logger.debug("Parsing stream: %s" % stream) | |
permittedvalue = Or(Word(alphanums + symbols), qstring) | |
newfile = JBFile() | |
comment = Group(Literal('#') + restOfLine).suppress() | |
commentlines = ZeroOrMore(comment) | |
datatypenames = oneOf("decimal word integer boolean qstring") | |
datatypes = Or([permittedvalue]) | |
titleline = Literal("[").suppress() + word + Literal("]").suppress() | |
titleline.setParseAction(newfile.nextSection) | |
headingsline = Literal("#h").suppress() + delimitedList(word) | |
headingsline.setParseAction(newfile.addHeading) | |
typeline = Literal("#t").suppress() + delimitedList(Optional(datatypenames, default="word")) | |
typeline.setParseAction(newfile.addTypes) | |
defaultsline = Literal("#d").suppress() + delimitedList(Optional(datatypes, default=None)) | |
defaultsline.setParseAction(newfile.addDefaults) | |
csvbody = delimitedList(Optional(permittedvalue,default=None)) | |
csvbody.setParseAction(newfile.addLine) | |
infolines = Optional(headingsline + Optional(typeline + Optional(defaultsline))) | |
csvlines = OneOrMore(commentlines + csvbody) | |
parser = commentlines + OneOrMore(titleline + infolines + csvlines) + commentlines | |
print "ready!?" | |
parser.parseFile(stream) | |
print "qwe!" | |
return newfile | |
class JBSection(object): | |
def __init__(self, name): | |
self.name = name | |
self.data = [] | |
def addHeading(self, headings): | |
print "headings", headings | |
self.headings = headings | |
self.num_columns = len(headings) | |
def addTypes(self, types): | |
print "types", types | |
assert len(types) == self.num_columns | |
self.type_name = types | |
def convert(self, column, item): | |
newtype = self.type_name[column] | |
if item == None or item == "": | |
return None | |
if newtype == "decimal": | |
full_decimal = StringStart() + decimal + StringEnd() | |
return full_decimal.parseString(str(item))[0] | |
if newtype == "integer": | |
full_decimal = StringStart() + integer + StringEnd() | |
return full_decimal.parseString(str(item))[0] | |
if newtype == "boolean": | |
full_decimal = StringStart() + boolean + StringEnd() | |
return full_decimal.parseString(str(item))[0] | |
if newtype == "qstring": | |
full_decimal = StringStart() + qstring + StringEnd() | |
return full_decimal.parseString(str(item))[0] | |
if newtype == "word": | |
full_decimal = StringStart() + word + StringEnd() | |
return full_decimal.parseString(str(item))[0] | |
raise Exception, "super bad" | |
def ConvertTypes(self, line): | |
"""converts line from a list of str to the correct type""" | |
assert len(line) == self.num_columns | |
return [self.convert(column,item) for column,item in enumerate(line)] | |
def FillInDefaults(self, line): | |
assert len(line) == self.num_columns | |
newline = line[:] | |
for column,item in enumerate(line): | |
if item == None: | |
newline[column] = self.defaults[column] | |
return newline | |
def addDefaults(self, defaults): | |
print "defaults", defaults | |
assert len(defaults) == self.num_columns | |
self.defaults = self.ConvertTypes(defaults) | |
def addLine(self, line): | |
print "line", line | |
assert len(line) == self.num_columns | |
self.data.append(self.ConvertTypes(self.FillInDefaults(line))) | |
def getData(self, row, column): | |
if isinstance(column,int): | |
return self.data[row][column] | |
elif isinstance(column, str): | |
return self.data[row][self.headings.index(column)] | |
else: | |
raise Exception | |
def test_JBSection(): | |
news = JBSection("news") | |
news.addHeading("a b c d".split()) | |
news.addTypes("integer integer integer integer".split()) | |
news.addDefaults("0 123 -0 -1".split()) | |
news.addLine(["1","1","1","1"]) | |
news.addLine(["","1","",""]) | |
print news.data[0] | |
print news.data[1] | |
print news.getData(0,"a") | |
test_JBSection() | |
class JBFile(object): | |
def __init__(self): | |
self.sections = {} | |
def nextSection(self, tokens): | |
logger.debug("Pushing section: %s" % tokens[0]) | |
print "section", tokens | |
assert tokens[0] not in self.sections, "more than one section called " + tokens[0] | |
self.current_section = self.sections[tokens[0]] = JBSection(tokens[0]) | |
def addHeading(self, tokens): | |
self.current_section.addHeading(tokens) | |
def addTypes(self, tokens): | |
self.current_section.addTypes(tokens) | |
def addDefaults(self, tokens): | |
self.current_section.addDefaults(tokens) | |
def addLine(self, tokens): | |
if len(tokens) == 1 and tokens[0] == None: | |
return | |
self.current_section.addLine(tokens) | |
def getData(self, section_name, row, column): | |
section = self.sections[section_name] | |
return section.getData(row, column) | |
# stuff = parse_stream(StringIO.StringIO(testme)) | |
# print stuff.getData("people", 3, "name") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment