Skip to content

Instantly share code, notes, and snippets.

@PizzaShift
Forked from prggmr/python_gxmlimport.py
Created February 20, 2023 21:16
Show Gist options
  • Save PizzaShift/0fbb7d4d8300af24f4c6bf8cc472a855 to your computer and use it in GitHub Desktop.
Save PizzaShift/0fbb7d4d8300af24f4c6bf8cc472a855 to your computer and use it in GitHub Desktop.
Tool to import enormous XML files into mysql using lxml.etree
def does_exist(tbl, where = None, where_field = 'id'):
"""Builds a MySQL SELECT statement
"""
where_clause = "WHERE `%s` = %%s" % where_field
query = "SELECT COUNT(`id`) AS total FROM %s %s" % (
tbl, where_clause
)
cur.execute(query, (where,))
return cur.rownumber != 0
def insert_query(tbl, fields):
fields_insert_values = []
fields_tmp = []
fields_tmp_2 = []
for _field, _val in fields.iteritems():
fields_tmp.append('`%s`' % _field)
fields_tmp_2.append('%s')
fields_insert_values.append(_val)
query = "INSERT INTO %s (%s) VALUES (%s)" % (
tbl, ','.join(fields_tmp), ','.join(fields_tmp_2)
)
try:
cur.execute(query, (fields_insert_values))
con.commit()
return True
except Exception, e:
con.rollback()
return e
def update_query(tbl, fields, id):
fields_insert_values = []
fields_tmp = []
for _field, _val in fields.iteritems():
fields_tmp.append('`%s` = %%s' % _field)
fields_insert_values.append(_val)
query = "UPDATE %s SET %s WHERE id = %%s" % (
tbl, ','.join(fields_tmp)
)
fields_insert_values.append(id)
try:
cur.execute(query, (fields_insert_values))
con.commit()
return True
except Exception, e:
con.rollback()
return e
class Import(object):
"""
Imports XML Data from a Content Feed.
There are two seperate ways of importing a feed FULL and INC.
A FULL will only insert data and not look for any changes. This should ONLY be
used when doing a **full** sync.
A INC will look for data changes and update the database as needed, along with
inserting new data it receives. This should be used when importing the
incremental feed updates provided by MN each day.
"""
def __init__(self, type, source):
self._log = logging.getLogger('import')
self._type = type
self._source = source
self._log.info("Import %s started" % self._source)
if self._type == IMPORT_FULL:
self._log.debug("FULL IMPORT PERFORMED")
# Has the file been uncompressed
if '.gz' in self._source:
self._log.info("Compressed data file encountered.")
# Uncompress file
try:
self._log.info("Starting uncompression of data file.")
self._source = gzip.open(self._source, 'rb')
self._log.info("Uncompression finished.")
except Exception, error:
self._log.critical(error)
sys.exit(2)
elements = etree.iterparse(
self._source, events = ('start', 'end')
)
lookup_element = (
(
'parent_node',
(
('tbl_column', 'node_name'),
),
'tbl_name'
),
(
'parent_node',
(
"""
Allow for choosing elements in the case of
<parent_node><node_name><sub_node</node_name>
"""
('tbl_column', 'node_name', 'sub_node'),
),
'table_name',
custom_insert_function
),
)
root = None
fields = {}
lookup = None
func = None
where = None
exists_function = None
parent = None
for event, element in elements:
if event == 'start':
if root is None:
for tup in lookup_element:
if tup[0] == element.tag:
lookup = tup[1]
root = element.tag
fields = {}
tbl = tup[2]
func = self._insert_record
try:
exists_function = tup[3]
except IndexError:
exists_function = None
else:
for field in lookup:
use = False
if len(field) == 3:
if parent is None:
parent = element.tag
if parent == field[1] and element.tag == field[2]:
use = True
else:
if element.tag == field[1]:
use = True
if use:
try:
fields[field[0]] = element.text.encode('ascii','ignore')
except AttributeError:
fields[field[0]] = element.text
break
if event == 'end':
if parent is not None:
if element.tag == parent:
parent = None
if element.tag == root:
func(root, fields, tbl, exists_function)
lookup = None
fields = {}
func = None
root = None
exists_function = None
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
del element
self._log.info("Import complete")
def _cleanup(self, element):
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
del element
def _insert_record(self, type, fields, tbl, exists_function = None):
"""
Import a Record
:returns boolean:
"""
count = 0
failed = False
query = None
if exists_function is None:
exists = does_exist(tbl, fields['id'])
else:
exists = exists_function(fields)
if exists is False:
query = insert_query(tbl, fields)
else:
self._log.info("Skipping %s [%s]" % (tbl, fields['id']))
query = update_query(tbl, fields, fields['id'])
if isinstance(failed, bool) is False:
self._log.error("Failed to import %s %s (%s)" %
(type, fields, failed)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment