-
-
Save gkliska/b2ac86103e32ead44ceb to your computer and use it in GitHub Desktop.
PoC: Convert a xml to csv and csv to xml or another format for openerp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Author: Stéphane Wirtel (@matrixise) | |
# Basic implementation of CSV Renderer by Yannick Vaucher (@yvaucher) (Camptocamp) | |
import os | |
from collections import OrderedDict | |
from pprint import pprint as pp | |
import itertools | |
from lxml import etree | |
import csv | |
import json | |
import yaml | |
class Record(object): | |
def __init__(self, model, xml_id=None): | |
self.attributes = {'model': model, 'xml_id': xml_id} | |
self.fields = {} | |
def field(self, name, value): | |
self.fields[name] = value | |
@property | |
def model(self): | |
return self.attributes.get('model') | |
@property | |
def xml_id(self): | |
return self.attributes.get('xml_id') | |
def __repr__(self): | |
return str(self.to_dict()) | |
def to_dict(self): | |
return {'model': self.model, | |
'xml_id': self.xml_id or False, | |
'attributes': self.attributes, | |
'fields': self.fields} | |
def to_json(self): | |
return json.dumps(self.to_dict()) | |
def to_yaml(self): | |
return yaml.dumps(self.to_dict()) | |
class Parser(object): | |
def __init__(self, filename, module=None): | |
assert len(filename) != 0 | |
assert os.path.exists(filename), "The file %s does not exist" % (filename,) | |
self.filename = filename | |
self.module = module | |
class XmlParser(Parser): | |
TAGS = ('record', 'menuitem', 'act_window', 'report',) | |
@staticmethod | |
def create_record(node, update=True): | |
record = Record(xml_id=node.get('id'), | |
model=node.get('model')) | |
record.attributes['xml_tag'] = node.tag | |
record.attributes['update'] = update | |
for field in node.getchildren(): | |
# TODO: handle the case when we receive multiattribute | |
record.field(field.get('name'), field.text or field.get('ref')) | |
return record | |
def parse(self): | |
with open(self.filename, 'r') as fp: | |
content = fp.read() | |
tree = etree.fromstring(content) | |
assert tree.tag == 'openerp' | |
for tag_data in tree.findall('data'): | |
no_update = tag_data.get('noupdate', False) | |
for item in tag_data.getchildren(): | |
if isinstance(item, etree._Element) and item.tag in self.TAGS: | |
record = XmlParser.create_record(item, not no_update) | |
yield record | |
class CSVParser(Parser): | |
@staticmethod | |
def create_record(model, line): | |
record = Record(model) | |
return record | |
def parse(self): | |
with open(self.filename, 'r') as fp: | |
model, ext = os.path.splitext(os.path.basename(self.filename)) | |
reader = csv.reader(fp) | |
for row in reader: | |
record = CSVParser.create_record(model, row) | |
yield record | |
class Renderer(object): | |
def __init__(self, iterable): | |
self.iterable = iterable | |
class XmlRenderer(Renderer): | |
def save(self, file_like): | |
root = etree.Element('openerp') | |
data_node = etree.SubElement(root, 'data') | |
for item in self.iterable: | |
self._serialize_record(data_node, item) | |
file_like.write(etree.tostring(root, pretty_print=True, xml_declaration=True, encoding='utf-8')) | |
def _serialize_record(self, parent, item): | |
record = etree.SubElement(parent, 'record') | |
record.set('model', item.attributes['model']) | |
class CSVRenderer(Renderer): | |
def save(self, file_like): | |
csvwriter = csv.writer(file_like, delimiter=',', quotechar='"') | |
headers = False | |
for item in self.iterable: | |
if not headers: | |
headers = item.fields.keys() | |
csvwriter.writerow(['id'] + headers) | |
line = [item.xml_id] | |
for col in headers: | |
line += [unicode(item.fields.get(col) or '').encode('utf-8')] | |
csvwriter.writerow(line) | |
frontends = { | |
'.csv': CSVParser, | |
'.xml': XmlParser, | |
} | |
backends = { | |
# '.xml': XmlRenderer, | |
'.csv': CSVRenderer, | |
} | |
def parse(filename, module=None): | |
assert len(filename) != 0 | |
ext = os.path.splitext(filename)[-1] | |
frontend_class = frontends.get(ext) | |
if frontend_class is None: | |
raise Exception('There is no parser for this extension (%s)' % (ext,)) | |
return frontend_class(filename=filename, module=module).parse() | |
if __name__ == '__main__': | |
with open('res.better.zip.csv', 'w') as output: | |
CSVRenderer(itertools.chain(parse('l10n_ch_better_zip.xml'))).save(output) | |
#with XmlRenderer('ir.model.access.xml') as render: | |
# render.save(itertools.chain(parse('ir.model.access.csv'))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment