Last active
September 2, 2020 02:41
-
-
Save d-lua-stuff/aa97987d0fd6f82b6a8994f8d0272ef9 to your computer and use it in GitHub Desktop.
A script for extracting dump.lua output from *.xml log files. See https://board.dualthegame.com/index.php?/topic/20052-lua-all-global-variables/ for more info. License: WTFPL
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Extracts dump.lua output from *.xml log files | |
# Works with Python 2.7 and 3.6 | |
# Dumped global variable members are internally represented by a unidirectional graph, which can contain cycles | |
import argparse | |
import errno | |
import os | |
import re | |
from collections import deque | |
from six.moves.html_parser import HTMLParser | |
DUMP_PATTERN = r'(?<=Lua globals dump: \n).*?(?="?\|?</message>)' | |
LINE_PATTERN = r'(^\s*)([^\s]+)\s{2,}(.*)$' | |
UNIT_NAME_PATTERN = r'^Unit_' | |
INPUT_SPACES_PER_INDENT = 2 | |
OUTPUT_SPACES_PER_INDENT = 4 | |
OUTPUT_LINE_FORMAT = '{0:<50}{1}' | |
class Node(object): | |
__slots__ = 'name', 'value', 'nodes' | |
def __init__(self, name, value): | |
self.name = name | |
self.value = value | |
self.nodes = [] | |
def __repr__(self): | |
return 'Node(%s, %s) # %s leaves' % (repr(self.name), repr(self.value), len(self.nodes)) | |
def any_leaf_with_name(self, leaf_name): | |
return any(leaf.name == leaf_name for leaf in self.nodes) | |
def get_args(): | |
parser = argparse.ArgumentParser(description="Extract element APIs from logs") | |
parser.add_argument('log_file', type=str, help="Log file to parse") | |
return parser.parse_args() | |
def mkdir_p(path): | |
try: | |
os.makedirs(path) | |
except OSError as e: | |
if e.errno == errno.EEXIST and os.path.isdir(path): | |
pass | |
else: | |
raise | |
def prepare_out_dir(log_file): | |
log_file_dir = os.path.dirname(log_file) | |
out_dir = os.path.splitext(log_file)[0] | |
mkdir_p(out_dir) | |
return out_dir | |
def find_node(start_node, predicate): | |
to_visit = deque([start_node]) | |
visited = {} | |
while len(to_visit) > 0: | |
node = to_visit.popleft() | |
if predicate(node): | |
return node | |
if len(node.nodes) > 0: | |
unvisited_linked_nodes = [linked_node for linked_node in node.nodes if not (linked_node in visited or linked_node in to_visit)] | |
unvisited_linked_nodes.reverse() | |
to_visit.extendleft(unvisited_linked_nodes) | |
visited[node] = True | |
return None | |
def find_node_with_name(start_node, name): | |
return find_node(start_node, lambda node: node.name == name) | |
def find_node_with_value(start_node, value): | |
return find_node(start_node, lambda node: node.value == value) | |
def contains_linked_node(node, predicate): | |
for node in node.nodes: | |
if predicate(node): | |
return True | |
return False | |
def contains_linked_node_with_value_and_matching_name(node, value, name_pattern): | |
return contains_linked_node(node, lambda node: node.value == value and re.match(name_pattern, node.name)) | |
def find_node_with_linked_node_with_matching_name(start_node, name_pattern): | |
def contains_named_linked_node(node): | |
for linked_node in node.nodes: | |
if re.match(name_pattern, linked_node.name): | |
return True | |
return False | |
return find_node(start_node, contains_named_linked_node) | |
def dump_to_graph(dump): | |
start_node = Node("[dump]", None) | |
stack = [] | |
prev_indent = None | |
prev_node = start_node | |
for line in dump.split('\n'): | |
if len(line) == 0: continue | |
match = re.match(LINE_PATTERN, line) | |
if match is None: | |
if line.endswith('[see above]'): | |
assert(len(prev_node.nodes) == 0) | |
refd_node = find_node_with_value(start_node, prev_node.value) | |
if refd_node is prev_node and refd_node.name == "_G": | |
# An early version of the dumping script immediately started with the _G members | |
refd_node = start_node | |
else: | |
assert(refd_node is not None) | |
assert(refd_node is not prev_node) | |
prev_node.nodes = refd_node.nodes | |
continue | |
else: | |
raise ValueError("Don't know what to do with this line: " + repr(line)) | |
indent = match.group(1) | |
name = match.group(2) | |
value = match.group(3) | |
node = Node(name, value) | |
if prev_indent is None or len(indent) > len(prev_indent): | |
stack.append(prev_node) | |
else: | |
times_to_pop = (len(prev_indent) - len(indent)) // INPUT_SPACES_PER_INDENT | |
for _ in range(times_to_pop): | |
stack.pop() | |
parent = stack[-1] | |
parent.nodes.append(node) | |
prev_indent = indent | |
prev_node = node | |
return start_node | |
def dump_linked_nodes(node, visited_nodes = None, indent = ""): | |
lines = [] | |
visited_nodes = visited_nodes or {} | |
next_indent = indent + " " * OUTPUT_SPACES_PER_INDENT | |
if node in visited_nodes and len(node.nodes) > 0: | |
return [indent + "[see above]"] | |
else: | |
visited_nodes[node] = True | |
for linked_node in node.nodes: | |
name = linked_node.name | |
value = linked_node.value | |
is_slot = linked_node.any_leaf_with_name("export") and linked_node.any_leaf_with_name("unit") | |
if value.startswith("table:"): value = "table" | |
if value.startswith("function:"): value = "function" | |
lines.append(OUTPUT_LINE_FORMAT.format(indent + name, value)) | |
if name == "export" and value == "table": | |
lines.append(next_indent + "[same functions]") | |
elif name == "unit" and value == "table" and indent == "": | |
lines.append(next_indent + "[reference to self]") | |
elif is_slot: | |
lines.append(next_indent + "[slot skipped]") | |
else: | |
lines += dump_linked_nodes(linked_node, visited_nodes, next_indent) | |
return lines | |
def prepare_log_contents(log_contents): | |
# logs sometimes contain invalid characters (not unicode, maybe binary or garbage) | |
log_contents = log_contents.decode('ascii', errors='ignore') | |
# CRLF to LF | |
log_contents = log_contents.replace('\r\n', '\n') | |
# remove consecutive newlines | |
while '\n\n' in log_contents: | |
log_contents = log_contents.replace('\n\n', '\n') | |
return HTMLParser().unescape(log_contents) | |
def run(): | |
args = get_args() | |
with open(args.log_file, 'r') as f: | |
log_contents = prepare_log_contents(f.read()) | |
dumps = re.findall(DUMP_PATTERN, log_contents, flags=re.S) | |
if len(dumps) > 0: | |
out_dir = prepare_out_dir(args.log_file) | |
for dump_index, dump in enumerate(dumps): | |
dump_name = 'dump %s' % dump_index | |
dump_file = os.path.join(out_dir, '%s.txt' % dump_name) | |
with open(dump_file, 'w') as f: | |
f.write(dump) | |
try: | |
start_node = dump_to_graph(dump) | |
except BaseException as e: | |
print("Error parsing dump; skipping linked element dumping") | |
print(e) | |
continue | |
g_node = find_node_with_name(start_node, '_G') | |
unit_node = find_node_with_linked_node_with_matching_name(start_node, r'^unit_start\d+') | |
if g_node is None: | |
print("Cannot find _G; skipping linked element dumping") | |
continue | |
if unit_node is None: | |
print("Cannot find the unit start handler; skipping linked element dumping") | |
continue | |
elements_dir = os.path.join(out_dir, dump_name) | |
mkdir_p(elements_dir) | |
for linked_node in unit_node.nodes: | |
is_element_node = contains_linked_node_with_value_and_matching_name(g_node, linked_node.value, UNIT_NAME_PATTERN) | |
if not is_element_node: | |
continue | |
element_dump_lines = dump_linked_nodes(linked_node) | |
element_dump = "\n".join(element_dump_lines) | |
element_file = os.path.join(elements_dir, '%s.txt' % linked_node.name) | |
with open(element_file, 'w') as f: | |
f.write(element_dump) | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment