Last active
February 20, 2023 21:35
-
-
Save prggmr/2161849 to your computer and use it in GitHub Desktop.
Tool to import enormous XML files into mysql using lxml.etree
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def does_exist(tbl, where = None, where_field = 'id'): | |
"""Builds a MySQL SELECT statement | |
""" | |
where_clause = "WHERE `%s` = %%s" % where_field | |
query = "SELECT COUNT(`id`) AS total FROM %s %s" % ( | |
tbl, where_clause | |
) | |
cur.execute(query, (where,)) | |
return cur.rownumber != 0 | |
def insert_query(tbl, fields): | |
fields_insert_values = [] | |
fields_tmp = [] | |
fields_tmp_2 = [] | |
for _field, _val in fields.iteritems(): | |
fields_tmp.append('`%s`' % _field) | |
fields_tmp_2.append('%s') | |
fields_insert_values.append(_val) | |
query = "INSERT INTO %s (%s) VALUES (%s)" % ( | |
tbl, ','.join(fields_tmp), ','.join(fields_tmp_2) | |
) | |
try: | |
cur.execute(query, (fields_insert_values)) | |
con.commit() | |
return True | |
except Exception, e: | |
con.rollback() | |
return e | |
def update_query(tbl, fields, id): | |
fields_insert_values = [] | |
fields_tmp = [] | |
for _field, _val in fields.iteritems(): | |
fields_tmp.append('`%s` = %%s' % _field) | |
fields_insert_values.append(_val) | |
query = "UPDATE %s SET %s WHERE id = %%s" % ( | |
tbl, ','.join(fields_tmp) | |
) | |
fields_insert_values.append(id) | |
try: | |
cur.execute(query, (fields_insert_values)) | |
con.commit() | |
return True | |
except Exception, e: | |
con.rollback() | |
return e | |
class Import(object): | |
""" | |
Imports XML Data from a Content Feed. | |
There are two seperate ways of importing a feed FULL and INC. | |
A FULL will only insert data and not look for any changes. This should ONLY be | |
used when doing a **full** sync. | |
A INC will look for data changes and update the database as needed, along with | |
inserting new data it receives. This should be used when importing the | |
incremental feed updates provided by MN each day. | |
""" | |
def __init__(self, type, source): | |
self._log = logging.getLogger('import') | |
self._type = type | |
self._source = source | |
self._log.info("Import %s started" % self._source) | |
if self._type == IMPORT_FULL: | |
self._log.debug("FULL IMPORT PERFORMED") | |
# Has the file been uncompressed | |
if '.gz' in self._source: | |
self._log.info("Compressed data file encountered.") | |
# Uncompress file | |
try: | |
self._log.info("Starting uncompression of data file.") | |
self._source = gzip.open(self._source, 'rb') | |
self._log.info("Uncompression finished.") | |
except Exception, error: | |
self._log.critical(error) | |
sys.exit(2) | |
elements = etree.iterparse( | |
self._source, events = ('start', 'end') | |
) | |
lookup_element = ( | |
( | |
'parent_node', | |
( | |
('tbl_column', 'node_name'), | |
), | |
'tbl_name' | |
), | |
( | |
'parent_node', | |
( | |
""" | |
Allow for choosing elements in the case of | |
<parent_node><node_name><sub_node</node_name> | |
""" | |
('tbl_column', 'node_name', 'sub_node'), | |
), | |
'table_name', | |
custom_insert_function | |
), | |
) | |
root = None | |
fields = {} | |
lookup = None | |
func = None | |
where = None | |
exists_function = None | |
parent = None | |
for event, element in elements: | |
if event == 'start': | |
if root is None: | |
for tup in lookup_element: | |
if tup[0] == element.tag: | |
lookup = tup[1] | |
root = element.tag | |
fields = {} | |
tbl = tup[2] | |
func = self._insert_record | |
try: | |
exists_function = tup[3] | |
except IndexError: | |
exists_function = None | |
else: | |
for field in lookup: | |
use = False | |
if len(field) == 3: | |
if parent is None: | |
parent = element.tag | |
if parent == field[1] and element.tag == field[2]: | |
use = True | |
else: | |
if element.tag == field[1]: | |
use = True | |
if use: | |
try: | |
fields[field[0]] = element.text.encode('ascii','ignore') | |
except AttributeError: | |
fields[field[0]] = element.text | |
break | |
if event == 'end': | |
if parent is not None: | |
if element.tag == parent: | |
parent = None | |
if element.tag == root: | |
func(root, fields, tbl, exists_function) | |
lookup = None | |
fields = {} | |
func = None | |
root = None | |
exists_function = None | |
element.clear() | |
while element.getprevious() is not None: | |
del element.getparent()[0] | |
del element | |
self._log.info("Import complete") | |
def _cleanup(self, element): | |
element.clear() | |
while element.getprevious() is not None: | |
del element.getparent()[0] | |
del element | |
def _insert_record(self, type, fields, tbl, exists_function = None): | |
""" | |
Import a Record | |
:returns boolean: | |
""" | |
count = 0 | |
failed = False | |
query = None | |
if exists_function is None: | |
exists = does_exist(tbl, fields['id']) | |
else: | |
exists = exists_function(fields) | |
if exists is False: | |
query = insert_query(tbl, fields) | |
else: | |
self._log.info("Skipping %s [%s]" % (tbl, fields['id'])) | |
query = update_query(tbl, fields, fields['id']) | |
if isinstance(failed, bool) is False: | |
self._log.error("Failed to import %s %s (%s)" % | |
(type, fields, failed) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi. I wanted to use your code to import a 15GB XML file to MySQL. I am new to this part of database migration so bare with me. Here is the error I get when running the snippet in VS Code:
line 26
except Exception, e:
^^^^^^^^^^^^
SyntaxError: multiple exception types must be parenthesized