Skip to content

Instantly share code, notes, and snippets.

@remram44
Last active April 7, 2020 15:15
Show Gist options
  • Save remram44/1029c5f662050aba73ba62197790ca1c to your computer and use it in GitHub Desktop.
Save remram44/1029c5f662050aba73ba62197790ca1c to your computer and use it in GitHub Desktop.
Filtering YAML loader
import json
import sys
import yaml
import yaml.events
import yaml.nodes
class FilterLoader(yaml.SafeLoader):
def __init__(self, stream):
super(FilterLoader, self).__init__(stream)
self.__filters = set()
self.__indent = 0
self.__path = []
self.__discard = False
def add_filter(self, filter):
self.__filters.add(tuple(filter))
def _check_filter(self):
return tuple(self.__path) not in self.__filters
def compose_sequence_node(self, anchor):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(yaml.nodes.SequenceNode, None, start_event.implicit)
node = yaml.nodes.SequenceNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
if anchor is not None:
self.anchors[anchor] = node
index = 0
orig_index = 0
while not self.check_event(yaml.events.SequenceEndEvent):
self.__path.append(orig_index)
if self.__discard:
self.compose_node(node, index)
elif self._check_filter():
node.value.append(self.compose_node(node, index))
else:
self.__discard = True
self.compose_node(node, index)
self.__discard = False
self.__path.pop(-1)
index += 1
orig_index += 1
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def compose_mapping_node(self, anchor):
start_event = self.get_event()
tag = start_event.tag
if tag is None or tag == '!':
tag = self.resolve(yaml.nodes.MappingNode, None, start_event.implicit)
node = yaml.nodes.MappingNode(tag, [],
start_event.start_mark, None,
flow_style=start_event.flow_style)
if anchor is not None:
self.anchors[anchor] = node
while not self.check_event(yaml.events.MappingEndEvent):
item_key = self.compose_node(node, None)
self.__path.append(item_key.value)
if self.__discard:
self.compose_node(node, item_key)
elif self._check_filter():
item_value = self.compose_node(node, item_key)
node.value.append((item_key, item_value))
else:
self.__discard = True
self.compose_node(node, item_key)
self.__discard = False
self.__path.pop(-1)
end_event = self.get_event()
node.end_mark = end_event.end_mark
return node
def main():
with open(sys.argv[1]) as fp:
loader = FilterLoader(fp)
for filter in sys.argv[2:]:
loader.add_filter(filter.split('.'))
obj = loader.get_data()
while obj:
json.dump(obj, sys.stdout)
obj = loader.get_data()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment