Skip to content

Instantly share code, notes, and snippets.

@JimDennis
Last active August 29, 2015 14:08
Show Gist options
  • Save JimDennis/9d800d9eea1c5fa38c3b to your computer and use it in GitHub Desktop.
Save JimDennis/9d800d9eea1c5fa38c3b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import print_function
import json, sys
usage = \
'''Extract data from JSON structures using compact "paths" of keys/indexes
%s file [path [path ...]]
Given a JSON file and a list of paths (. seperate list of keys) return the
item in the data structure extracted by traversing the path.
Given just a JSON filename, print a list of all paths found.
'''
class JSON_Walker(object):
'''Provide an object which can be called to traverse a JSON data
structure and build a list of paths (sequences of dictionary
keys/list indexes) to each leaf node of data in the structure
>>> example = JSON_Walker(); example({"top": [{"frob": "baz",
... "foo": "bar"}, {"nums": [1, 2, 3]}]})
['.top.0.frob', '.top.0.foo', '.top.1.nums.0', '.top.1.nums.1', '.top.1.nums.2']
'''
def __init__(self):
self.nodes = list()
self.data = None
def __call__(self, data, path=''):
'''Recursively traverses a JSON data structure adding the
"path" to each leaf node to self.nodes
'''
if self.data is None:
self.data = data
if isinstance(data, type([])):
for key in range(len(data)):
self(data[key], '%s.%s' % (path, key))
elif hasattr(data, 'keys') and callable(data.keys):
for key in data.keys():
self(data[key], '%s.%s' % (path, key))
else:
self.nodes.append(path[1:])
return self.nodes
def __getitem__(self, item):
'''Given a path return the data node at that path
>>> example = JSON_Walker(); this = example({"top": [{"frob": "baz",
... "foo": "bar"}, {"nums": [1, 2, 3]}]}); example['.top.1.nums']
[1, 2, 3]
'''
if self.data is None:
raise KeyError, "No data"
if isinstance(item, type(int())):
return self.nodes[item]
if item.startswith('.'):
item = item[1:] # trim off leading dot
path = str(item).split('.')
t = self.data.copy()
for i in path:
if t is None:
break
elif isinstance(t, type([])):
try:
i = int(i)
except ValueError, e:
break
try:
t = t[i]
except (IndexError, ValueError), e:
break
elif hasattr(t, 'get'):
t1 = t.get(i, None)
t = t1
if t is None:
raise KeyError, "Key not found: %s" % path
return t
def __repr__(self):
return self.nodes
## Following is alternative implementation of the __call__() function
## in the previous class:
def get_json_keys(data, path=''):
'''Given JSON data, yield a sequence of the paths leading to each item
Recursively traverses a JSON data structure yielding each "path"
(keys or indices) which could be used to extract data in the leaves.
>>> get_json_keys({"top": [{"frob": "baz", "foo": "bar"},
... {"nums": [1, 2, 3]}]}); results
['.top.0.frob', '.top.0.foo', '.top.1.nums.0', '.top.1.nums.1', '.top.1.nums.2']
'''
global results
if isinstance(data, type([])):
for key in range(len(data)):
get_json_keys(data[key], '%s.%s' % (path, key))
elif hasattr(data, 'keys') and callable(data.keys):
for key in data.keys():
get_json_keys(data[key], '%s.%s' % (path, key))
else:
results.append(path)
## print path
def self_test(**opts):
import doctest
return doctest.testmod(**opts)
if __name__ == '__main__':
## Alternative impelementation requires a global variable! :(
results = list()
args = sys.argv[1:]
if len(args):
if args[0] in ('-t', '--test'):
results = self_test(verbose=True)
sys.exit(results[0])
else:
fn = args[0]
try:
f = open(fn, 'r')
except EnvironmentError, e:
print('Unable to read %s: %s' % (fn, e), file=sys.stderr)
sys.exit(1)
try:
data = json.load(f)
except ValueError, e:
print('Unable to parse data from %s: %s' % (fn, e), file=sys.stderr)
sys.exit(2)
walk = JSON_Walker()
walk(data)
else:
print(usage % sys.argv[0], file=sys.stderr)
sys.exit(0)
if len(args) == 1:
print('\n'.join(walk))
else: # It's more than 1 so:
results = list()
for each in args[1:]:
try:
results.append(str(walk[each]))
except KeyError, e:
print('NotFound(%s)' % each, file=sys.stderr)
print(' '.join(results))
## Stuff below doesn't work ... was for trying various other
## alternative implementations:
## for i in get_json_keys(data):
## print i
## print '\n'.join(get_json_keys(data, list()))
## get_json_keys(data)
## walk = JSON_Walker()
## print '\n'.join(walk(data))
## print walk.results
## get_json_keys(data)
## print '\n'.join(results)
@JimDennis
Copy link
Author

Running this against the output of every aws ec2 describe-* command available to me I generated 32 files containing over 580,000 entries. The largest JSON output was from the aws ec2 describe-images command which generates 416,000 lines describing 18,922 images (and takes my xj.py code about 2 or three seconds to process).

Boiling this down (replacing all .xxxx. numeric index components with .*. and piping through sort -u leaves me with 285 unique key-paths.

I'm thinking of adding two features to xj.py: one to allow one to extract keys matching a glob pattern, another to match on glob patterns in values while extracting some other data. So, for example, when you want to find the PublicIpAddress for an instance using its InstanceId you might use:

xj myinstances.json '*.InstanceId=i-1234abcd:*.PublicIpAddress'

Instead of something like:

aws ec2 describe-instances --instance-id i-1234abcd > this.json
xj this.json Instances.0.NetworkInterfaces.0.PublicIpAddresses.0.PublicIpAddress

I'll also add a feature that handles - as a filename, slurping data in from stdin and using json.loads() instead of json.load() on a file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment