Skip to content

Instantly share code, notes, and snippets.

@polyvertex
Last active May 6, 2025 07:54
Show Gist options
  • Save polyvertex/91e455b34e6a92affa982c89c94159e8 to your computer and use it in GitHub Desktop.
Save polyvertex/91e455b34e6a92affa982c89c94159e8 to your computer and use it in GitHub Desktop.
arty.py - A collection wrapper to easily get values from nested collections in a pythonic way
# Copyright (c) Jean-Charles Lefebvre
# SPDX-License-Identifier: MIT
import re
from collections.abc import Iterable, Mapping, Sequence, Sized
__all__ = ["Arty"]
_NONE_TYPE = type(None)
_QSEP_REGEX_TMPL = r"""
(
QSEP\s{0,64}\[\s{0,64}\" | \"\s{0,64}\]\s{0,64}QSEP
|
QSEP\s{0,64}\[\s{0,64}\' | \'\s{0,64}\]\s{0,64}QSEP
|
QSEP\s{0,64}\[ | \]\s{0,64}QSEP
|
\[\s{0,64}\" | \"\s{0,64}\]
|
\[\s{0,64}\' | \'\s{0,64}\]
|
\[ | \]
|
QSEP
|
\Z
)
"""
_QSEP_REGEX_FLAGS = re.X
_QSEP_REGEXES = {
qsep: re.compile(
_QSEP_REGEX_TMPL.replace("QSEP", re.escape(qsep)),
_QSEP_REGEX_FLAGS)
for qsep in (".", ",", "/", "\\")}
# python literal int, as convertible with int(s, base=0)
_RXPAT_INT = r"""
[\+\-]?
(?:
(?:
[0-9]{1,64}
(?:_[0-9]{1,64}){0,64}
)
|
(?:
0[xX]
[0-9a-fA-F]{1,64}
(?:_[0-9a-fA-F]{1,64}){0,64}
)
|
(?:
0[bB]
[01]{1,64}
(?:_[01]{1,64}){0,64}
)
|
(?:
0[oO]
[0-7]{1,64}
(?:_[0-7]{1,64}){0,64}
)
)
"""
_RX_INDEX_OR_SLICE = re.compile(
rf"""
\A
\s{{0,64}}
(?:
_{{0,64}}
(?P<index>{_RXPAT_INT})
_{{0,64}}
|
# same than above with square brackets
\[
\s{{0,64}}
(?P<index_sb>{_RXPAT_INT})
\s{{0,64}}
\]
|
\[
\s{{0,64}}
(?P<start>(?:|{_RXPAT_INT}))
\s{{0,64}}
\:
\s{{0,64}}
(?P<stop>(?:|{_RXPAT_INT}))
\s{{0,64}}
(?:
\:
\s{{0,64}}
(?P<step>(?:|{_RXPAT_INT}))
)?
\s{{0,64}}
\]
)
\s{{0,64}}
\Z
""",
re.A | re.X)
class Arty:
"""
A collection wrapper to easily get values from nested collections in a
pythonic way.
Useful to access JSON data structures for instance.
.. code-block:: python
foo = {"a": 1, "b": ["foo", "bar", {"hello": 11, "world": 22}], "c": 2}
foo = Arty(foo) # wrap!
# access by attribute name
assert foo.a == 1
# index as an attribute name
# note the use of the underscore character as a prefix to form a valid
# python identifier
assert foo.b._0 == "foo"
assert foo.b._2.hello == 11
# access by key still supported
assert foo["a"] == 1
assert foo["b"][0] == "foo"
assert foo["b"][0:2] == ["foo", "bar"]
assert foo.b[0:2] == ["foo", "bar"]
# accessing by key escapes from Arty's auto-wrapping
assert isinstance(foo.b, Arty)
assert isinstance(foo["b"], list)
# explicit access to wrapped object using the dunder property
assert isinstance(foo.__, dict)
assert foo.__["a"] == 1
# use method q() to get the first available among specified path(s)
assert foo.q("a") == 1
assert foo.q("a", "c") == 1
assert foo.q("c", "a") == 2
assert foo.q("UNKNOWN", "a") == 1
assert foo.q("UNKNOWN", "a", "UNKNOWN2") == 1
assert foo.q("UNKNOWN", "a", "c") == 1
# a query to q() and qall() can be a string expression
assert foo.q("b[0]") == "foo"
assert foo.q("UNKNOWN", "b[0]") == "foo"
assert foo.q("b._0") == "foo"
assert foo.q("b.[0]") == "foo"
assert foo.q("b._2['hello']") == 11 # string keys discouraged
assert foo.q("b._2.hello") == 11 # preferred variant
# a query to q() and qall() can be an iterable of keys
assert foo.q(["b", 0]) == "foo"
assert foo.q(("b", 0)) == "foo"
assert foo.q(("b", 0), "UNKNOWN") == "foo"
assert foo.q("UNKNOWN", ("b", 0)) == "foo"
assert foo.q(["b", "2", "hello"]) == 11
# use method qall() to get multiple values at once
assert foo.qall("a") == [1]
assert foo.qall("a", "c") == [1, 2]
assert foo.qall("c", "a") == [2, 1]
# qall() requires all specified queries to succeed
try:
assert foo.qall("UNKNOWN", "a", "c") == [1, 2]
raise AssertionError
except KeyError:
pass # ok
"""
__slots__ = ("__weakref__", "__wrapped", "__is_map", "__path")
#: Mapping and Sequence types to be wrapped by `Arty.__getattr__()`
#: automatically before being returned.
#:
#: Specified types must be subscriptable. That is, they must implement
#: ``__getitem__``.
#:
#: Default value should work for every extracted JSON stream.
#:
#: This value can be modified at runtime either at class or object level.
AUTOWRAP_TYPES = (Mapping, list, tuple)
#: Enable path tracking (enabled by default).
#:
#: Useful to give extra information upon `KeyError` and `IndexError`
#: typically when querying nested collections. When enabled, error messages
#: include the absolute path to the missing key, index or attribute, as
#: queried by the caller.
#:
#: Disable to slightly improve speed and memory footprint at the cost of
#: error messages having no hint about queried path.
#:
#: This value can be modified at runtime either at class or object level.
PATH_TRACKING = True
UNSET = object()
def __init__(self, wrapped, *, path=None):
self.__wrapped = wrapped
self.__is_map = isinstance(wrapped, Mapping)
if not self.__is_map and not isinstance(wrapped, Sequence):
raise ValueError("wrapped")
if not self.PATH_TRACKING or not path:
self.__path = ()
else:
assert isinstance(path, Iterable)
self.__path = tuple(path) # copy or fully consume *path*
assert all(isinstance(p, str) for p in self.__path)
def __repr__(self):
if not self.__path:
path = f"#{id(self.__wrapped)}"
else:
path = ".".join(map(str, self.__path)) # map(str) for extra safety
return "<{}.{}:/{}>".format(
type(self).__name__,
type(self.__wrapped).__name__,
path)
def __len__(self):
return len(self.__wrapped)
def __contains__(self, item):
return item in self.__wrapped
def __iter__(self):
return iter(self.__wrapped)
def __getitem__(self, key):
"""
Direct access to wrapped object's ``__getitem__()`` method.
No extra wrapping involved here, such that it can be used as a
deterministic way to escape from automatic wrapping mechanism
implemented in `__getattr__()`.
"""
return self.__wrapped[key]
def __getattr__(self, name):
# if wrapped object is a sequence, *name* must be converted to an index,
# otherwise, keep *key* as-is
if self.__is_map:
key = name
else:
try:
key = int(name.strip("_"), base=0)
except ValueError as exc:
raise ValueError(
f"malformed index or slice: {name}; error: {exc}") from None
try:
value = self.__wrapped.__getitem__(key)
except (IndexError, KeyError):
path = ".".join((*self.__path, name, ))
raise AttributeError(f"key or index not found at: {path}") from None
if isinstance(value, self.AUTOWRAP_TYPES):
path = None if not self.PATH_TRACKING else (*self.__path, name)
value = type(self)(value, path=path)
return value
@property
def __(self):
"""
Access to the wrapped object itself.
This can be used as an escape from `Arty.__getattr__` auto-wrap
mechanism, in addition to `Arty.__getitem__`.
"""
return self.__wrapped
def q(self, *queries, default=UNSET, qsep=".", wrap=False, types=UNSET):
"""
Query the current structure, optionally multiple times, in order to get
the first existing of the queried values.
Return *default* if specified, and when none of the queries succeeded.
Otherwise, raise `KeyError`, even when the underlying object is a
Sequence.
.. note::
This method is useful and allows for less verbosity on caller side,
when one specific value is required but stored differently depending
on JSON structure version, or on the queried API.
"""
if not queries:
raise ValueError("queries")
for query in queries:
query = self.__parse_query(query, qsep=qsep)
path_tail = []
value = self
for idx, qpart in enumerate(query):
assert isinstance(value, type(self))
# convert key to an int or slice if *value* is a sequence
if not value.__is_map and isinstance(qpart, str):
try:
key = self.__parse_index_or_slice(qpart)
except ValueError as exc:
raise ValueError(
f"invalid query part for a sequence; {exc}") from None
else:
key = qpart
try:
value = value.__getitem__(key)
except (IndexError, KeyError):
break
if self.PATH_TRACKING:
path_tail.append(self.__query_part_to_string(key))
# *value* must be wrapped as long as query is not done
# note: no *path* passed to intermediate collections
if idx < len(query) - 1:
try:
value = type(self)(value)
except ValueError:
# *value* is not a collection, which invalidates the
# remaining of the query
break
else:
if types is not self.UNSET:
self.__validate_type(value, types)
if wrap and isinstance(value, self.AUTOWRAP_TYPES):
path = (
None if not self.PATH_TRACKING
else self.__path + tuple(path_tail))
value = type(self)(value, path=path)
return value
if default is self.UNSET:
raise KeyError
return default
def qall(self, *queries, **kwargs):
"""
Like `q()`, but all *queries* are expected to succeed and results are
returned in a `list` with queries order being preserved.
"""
return [self.q(query, **kwargs) for query in queries]
@staticmethod
def __parse_query(query, *, qsep):
if not isinstance(query, str):
# query is not a string so assume it is an iterable of keys
if not isinstance(query, Iterable):
raise ValueError("query not an iterable")
# *query* length is required below, so fully consume and copy the
# iterable now
if not isinstance(query, Sized):
return tuple(query)
return query
# prepare regex object
if isinstance(qsep, re.Pattern):
regex = qsep
elif isinstance(qsep, str):
try:
regex = _QSEP_REGEXES[qsep]
except KeyError:
# TODO XXX: this could be optimized if needed by implementing a
# threading.Lock-protected dict-based global cache;
# i.e. dict[qsep] = regex
regex = re.compile(
_QSEP_REGEX_TMPL.replace("QSEP", re.escape(qsep)),
_QSEP_REGEX_FLAGS)
else:
raise ValueError("invalid qsep type")
# parse query string
start = 0
in_slice = False
in_slice_quote = None
parsed_query = []
for rem in regex.finditer(query):
rem_start, rem_end = rem.span()
if rem_start == start:
raise ValueError("empty query part")
rem = rem[1]
qpart = query[start:rem_start]
start = rem_end
if in_slice:
if "[" in rem:
raise ValueError("nested slice")
if "]" not in rem:
raise ValueError("slice not closed")
if in_slice_quote:
if in_slice_quote not in rem:
raise ValueError("string key not terminated or too exotic")
else:
qpart = f"[{qpart}]"
in_slice = False
in_slice_quote = None
elif "[" in rem:
in_slice = True
if '"' in rem:
in_slice_quote = '"'
elif "'" in rem:
in_slice_quote = "'"
else:
in_slice_quote = None
elif "]" in rem:
raise ValueError("closing unopened slice")
# do not parse index or slice here because the collection type at
# this position in the query is not known in advance, so we are
# done here, just append this query part to the result
parsed_query.append(qpart)
if rem_end >= len(query):
break
if not parsed_query:
raise ValueError("empty query")
return parsed_query
@staticmethod
def __parse_index_or_slice(expression):
rem = _RX_INDEX_OR_SLICE.fullmatch(expression)
if not rem:
raise ValueError("index or slice string expected")
for name in ("index", "index_sb"):
index = rem[name]
if index:
assert not rem["start"]
assert not rem["stop"]
assert not rem["step"]
return int(index, base=0)
start = rem["start"]
stop = rem["stop"]
step = rem["step"]
return slice(
None if not start else int(start, base=0),
None if not stop else int(stop, base=0),
None if not step else int(step, base=0))
@staticmethod
def __query_part_to_string(qpart):
if isinstance(qpart, int):
return f"[{qpart}]"
if isinstance(qpart, slice):
start = qpart.start
stop = qpart.stop
step = qpart.step
if start is None and stop is None and step is None:
return "[:]"
if step is None:
start = "" if start is None else str(start)
stop = "" if stop is None else str(stop)
return f"[{start}:{stop}]"
start = "" if start is None else str(start)
stop = "" if stop is None else str(stop)
step = "" if step is None else str(step)
return f"[{start}:{stop}:{step}]"
if isinstance(qpart, str):
if qpart.isidentifier():
return qpart
if qpart[:16].isidentifier():
return f"{qpart[:16]}<...>"
return f"<<{type(qpart).__name__}:#{id(qpart)}>>"
@staticmethod
def __validate_type(value, types):
"""
Like `isinstance()`, but just returns nothing on success, or raises
`TypeError` instead of returning a `bool` value.
Also, *types* can be (or contain) `None`, so to validate `None` *value*.
Raise `ValueError` when *types* value itself is invalid.
"""
def _validate_types_item(type_):
if type_ is None:
return _NONE_TYPE # allow case isinstance(value, (None, ))
if isinstance(type_, type):
return type_
raise ValueError("not a type")
# fastpath: in case *types* is valid, or in case *value* gets validated
# by isinstance() before reaching an invalid item in *types*
try:
if isinstance(value, types):
return
except TypeError:
pass
else:
raise TypeError # isinstance() returned False
# check *types* before calling isinstance() again
if types is None or isinstance(types, type):
types = (_validate_types_item(types), )
elif isinstance(types, Mapping):
raise ValueError("types is a map, tuple expected")
elif isinstance(types, Iterable):
# copy iterable or consume iterator, while validating its items
types = tuple(_validate_types_item(type_) for type_ in types)
else:
raise ValueError("types type unsupported")
# last try
try:
if isinstance(value, types):
return
except Exception as exc:
raise RuntimeError(
f"isinstance call raised {type(exc).__name__}: {exc}") from None
raise TypeError
if __name__ == "__main__":
foo = {"a": 1, "b": ["foo", "bar", {"hello": 11, "world": 22}], "c": 2}
foo = Arty(foo) # wrap!
# access by attribute name
assert foo.a == 1
# index as an attribute name
# note the use of the underscore character as a prefix to form a valid
# python identifier
assert foo.b._0 == "foo"
assert foo.b._2.hello == 11
# access by key still supported
assert foo["a"] == 1
assert foo["b"][0] == "foo"
assert foo["b"][0:2] == ["foo", "bar"]
assert foo.b[0:2] == ["foo", "bar"]
# accessing by key escapes from Arty's auto-wrapping
assert isinstance(foo.b, Arty)
assert isinstance(foo["b"], list)
# explicit access to wrapped object using the dunder property
assert isinstance(foo.__, dict)
assert foo.__["a"] == 1
# use method q() to get the first available among specified path(s)
assert foo.q("a") == 1
assert foo.q("a", "c") == 1
assert foo.q("c", "a") == 2
assert foo.q("UNKNOWN", "a") == 1
assert foo.q("UNKNOWN", "a", "UNKNOWN2") == 1
assert foo.q("UNKNOWN", "a", "c") == 1
# a query to q() and qall() can be a string expression
assert foo.q("b[0]") == "foo"
assert foo.q("UNKNOWN", "b[0]") == "foo"
assert foo.q("b._0") == "foo"
assert foo.q("b.[0]") == "foo"
assert foo.q("b._2['hello']") == 11 # string keys discouraged
assert foo.q("b._2.hello") == 11 # preferred variant
# a query to q() and qall() can be an iterable of keys
assert foo.q(["b", 0]) == "foo"
assert foo.q(("b", 0)) == "foo"
assert foo.q(("b", 0), "UNKNOWN") == "foo"
assert foo.q("UNKNOWN", ("b", 0)) == "foo"
assert foo.q(["b", "2", "hello"]) == 11
# use method qall() to get multiple values at once
assert foo.qall("a") == [1]
assert foo.qall("a", "c") == [1, 2]
assert foo.qall("c", "a") == [2, 1]
# qall() requires all specified queries to succeed
try:
assert foo.qall("UNKNOWN", "a", "c") == [1, 2]
raise AssertionError
except KeyError:
pass # ok
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment