Created
June 18, 2020 06:04
-
-
Save Multihuntr/3a377ec1449c903e2aab82e00b8049b3 to your computer and use it in GitHub Desktop.
A list-like wrapper for dictionary-like objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
class ListLike(collections.abc.MutableSequence): | |
''' | |
A list-like interface wrapping dictionary-like objects. | |
Uses integer keys, like you would for a list, has range checking, | |
negative indexing and slicing working as you'd expect. | |
i.e. | |
with shelve.open('foo.dat') as sf: | |
sf.clear() | |
q = ListLike(sf) | |
q.extend(['Hello', 'world', 'string']) | |
q[0] # gives 'Hello' | |
q[-1] # gives 'string' | |
q[-2:] # gives a generator with ['world', 'string'] | |
This implementation is intended to be used with shelf.Shelf, | |
so, internally, the keys are stored as strings, while all of | |
the functions still expect integers. | |
''' | |
def __init__(self, base={}): | |
self.base = base | |
def clear(self): | |
self.base.clear() | |
def append(self, item): | |
n = len(self) | |
self.base[f'{n}'] = item | |
def extend(self, items): | |
n = len(self) | |
keys = (f'{i+n}' for i in range(len(items))) | |
self.base.update(zip(keys, items)) | |
def count(self, item): | |
total = 0 | |
for e in self: | |
if e == item: | |
total += 1 | |
return total | |
def index(self, item): | |
for i, e in enumerate(self): | |
if e == item: | |
return i | |
raise ValueError(f'{item} is not in this list') | |
def insert(self, idx, item): | |
n = len(self) | |
idx = self._fix_idx(idx) | |
for i in range(n, idx, -1): | |
self.base[f'{i}'] = self.base[f'{i-1}'] | |
self.base[f'{idx}'] = item | |
def pop(self, idx=0): | |
result = self[idx] | |
n = len(self) | |
idx = self._fix_idx(idx) | |
for i in range(idx, n-1): | |
self.base[f'{i}'] = self.base[f'{i+1}'] | |
del self.base[f'{n-1}'] | |
return result | |
def remove(self, item): | |
for k, e in self.base.items(): | |
if e == item: | |
self.pop(int(k)) | |
break | |
def _fix_idx(self, idx): | |
n = len(self) | |
if idx < 0: | |
idx += n | |
if idx < 0 or idx >= n: | |
raise IndexError("Index out of range") | |
return idx | |
def _fix_idx_range(self, low, high): | |
n = len(self) | |
if low < 0: | |
low += n | |
if high < 0: | |
high += n | |
low = max(0, min(len(self.base), low)) | |
high = min(len(self.base), max(0, high)) | |
return low, high | |
def _del_range(self, low, high, step=1): | |
if step != 1: | |
raise NotImplementedError | |
low, high = self._fix_idx_range(low, high) | |
diff = high-low | |
if diff != 0: | |
for i in range(high, len(self)): | |
self.base[f'{i-diff}'] = self.base[f'{i}'] | |
for i in range(low, high): | |
idx = self._fix_idx(-1) | |
del self.base[f'{idx}'] | |
def _insert_range(self, low, items): | |
diff = len(items) | |
for i in range(len(self)-1+diff, low-1+diff, -1): | |
self.base[f'{i}'] = self.base[f'{i-diff}'] | |
for i in range(diff): | |
self.base[f'{low+i}'] = items[i] | |
def __len__(self): | |
return len(self.base) | |
def __delitem__(self, idx): | |
n = len(self) | |
if isinstance(idx, int): | |
self.pop(idx) | |
elif isinstance(idx, slice): | |
self._del_range(idx.start or 0, idx.stop or n) | |
else: | |
raise IndexError(f'indices must be integers or slices, not {type(idx).__name__}') | |
def __getitem__(self, idx): | |
n = len(self) | |
if isinstance(idx, int): | |
idx = self._fix_idx(idx) | |
return self.base[f'{idx}'] | |
elif isinstance(idx, slice): | |
lo, hi = self._fix_idx_range(idx.start or 0, idx.stop or n) | |
k_idxs = range(lo, hi, idx.step or 1) | |
return (self[i] for i in k_idxs) | |
else: | |
raise IndexError(f'indices must be integers or slices, not {type(idx).__name__}') | |
def __setitem__(self, idx, item): | |
n = len(self) | |
if isinstance(idx, int): | |
idx = self._fix_idx(idx) | |
self.base[f'{idx}'] = item | |
elif isinstance(idx, slice): | |
insert_idx = idx.start or 0 | |
if insert_idx < 0: | |
insert_idx += n | |
insert_idx = max(0, min(len(self.base), insert_idx)) | |
self._del_range(idx.start or 0, idx.stop or n) | |
self._insert_range(insert_idx, item) | |
else: | |
raise IndexError(f'indices must be integers or slices, not {type(idx).__name__}') | |
def __iter__(self): | |
return self[:] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shelve | |
import tempfile | |
import os.path | |
from listlike import ListLike | |
class A: | |
def __init__(self, some, val): | |
self.stored = str(some) + str(val) | |
def __repr__(self): | |
return self.stored | |
def mainTest(): | |
# persist list data to `foo.dat`, arbitrary size | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
# normal Python list operations work as expected | |
q.append('x'); | |
assert len(q) == 1 | |
q.extend([1, 2, 3]) | |
q.remove('x'); | |
assert len(q) == 3 | |
q.pop(0); | |
assert len(q) == 2 | |
q.append('Hello!') | |
assert q[2] == 'Hello!' | |
# technically you can use the list() to create an actual list | |
assert list(q) == [2, 3, 'Hello!'] | |
# but if your sf is super large, this obviously is bad. | |
# I use it here for illustration purposes. | |
# still, you can use all the normal list operations | |
# (that I remembered to implement/check) | |
assert 2 in q # => True | |
del q[2] | |
assert list(q[1:2]) == [3] | |
assert list(q[-1:]) == [3] | |
# except addition, 'cause we don't want to copy large data | |
try: | |
q + [10] | |
except TypeError: | |
pass | |
# but, iadd works fine | |
q += [10] # same as q.extend([10]) | |
# normal index range rules | |
try: | |
q[100] | |
except IndexError: | |
pass | |
q.extend([0, 1, 2, 3, 4, 5]) | |
# both of the following work as intended | |
k1 = [item for item in q if item > 2] | |
k2 = list(filter(lambda item: item > 2, q)) | |
assert list(q) == [2, 3, 10, 0, 1, 2, 3, 4, 5] | |
assert k1 == [3, 10, 3, 4, 5] | |
assert k2 == [3, 10, 3, 4, 5] | |
assert k1 == k2 | |
# the values get pickled, so they can be any arbitrary object | |
q.append(A(1,2)) | |
q.append(A('Hello',' there')) | |
q.append(A('Hello',2)) | |
print(q[-2]) # => Hello there | |
# reminder: pickling pickles the whole class, so if you are | |
# storing instances of your own custom class, then updates to | |
# that class' code won't be reflected in the persisted instances. | |
def test0(): | |
''' All the basic operations ''' | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
similist = [0, 1, 2, 3, 4, 5, 6, 7, 8, 1, 1, 1] | |
q.extend(similist) | |
assert q.count(1) == 4 | |
assert q.index(8) == 8 | |
q.insert(9, 9) | |
similist.insert(9, 9) | |
assert list(q) == similist | |
q.insert(-2, 25) | |
similist.insert(-2, 25) | |
assert list(q) == similist | |
q.pop(-2) | |
similist.pop(-2) | |
assert list(q) == similist | |
q.remove(1) | |
similist.remove(1) | |
assert list(q) == similist | |
def _test_get(similist, idx): | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
q.extend(similist) | |
assert q[idx] == similist[idx] | |
def _test_get_range(similist, idx): | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
q.extend(similist) | |
assert list(q[idx]) == similist[idx] | |
def test_get(): | |
''' Getting ranges ''' | |
_test_get([0,1,2,3,4,5,6,7,8], 0) | |
_test_get([0,1,2,3,4,5,6,7,8], -5) | |
_test_get([0,1,2,3,4,5,6,7,8], 7) | |
_test_get([0,1,2,3,4,5,6,7,8], -1) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(0, 1)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(-10, 5)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(5, 15)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(-2, -1)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(-10, -20)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(-20, -10)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(10, 20)) | |
_test_get_range([0,1,2,3,4,5,6,7,8], slice(20, 10)) | |
def _test_delete(similist, idx): | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
q.extend(similist) | |
del q[idx] | |
del similist[idx] | |
assert list(q) == similist | |
assert set(q.base.keys()) == {f'{i}' for i in range(len(similist))} | |
def test_deletes(): | |
''' Deleting ranges ''' | |
_test_delete([0,1,2,3,4,5,6,7,8], 0) | |
_test_delete([0,1,2,3,4,5,6,7,8], -5) | |
_test_delete([0,1,2,3,4,5,6,7,8], 7) | |
_test_delete([0,1,2,3,4,5,6,7,8], -1) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(0, 1)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(-10, 5)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(5, 15)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(-2, -1)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(-10, -20)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(-20, -10)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(10, 20)) | |
_test_delete([0,1,2,3,4,5,6,7,8], slice(20, 10)) | |
def _test_assign(similist, idx, item): | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
q.extend(similist) | |
q[idx] = item | |
similist[idx] = item | |
assert list(q) == similist | |
assert set(q.base.keys()) == {f'{i}' for i in range(len(similist))} | |
def test_assigns(): | |
_test_assign([0,1,2,3,4,5,6,7,8], 0, 10) | |
_test_assign([0,1,2,3,4,5,6,7,8], -5, 10) | |
_test_assign([0,1,2,3,4,5,6,7,8], 7, 10) | |
_test_assign([0,1,2,3,4,5,6,7,8], -1, 10) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(0, 1), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-10, 5), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(5, 15), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-2, -1), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-10, -20), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-20, -10), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(10, 20), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(20, 10), [10]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(0, 1), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-10, 5), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(5, 15), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-2, -1), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-10, -20), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(-20, -10), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(10, 20), [10, 11, 12, 13]) | |
_test_assign([0,1,2,3,4,5,6,7,8], slice(20, 10), [10, 11, 12, 13]) | |
def test_addition(): | |
''' Addition ''' | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
sf.clear() | |
q = ListLike(sf) | |
q.extend([1,2,3,4]) | |
try: | |
q + [10] | |
except TypeError: | |
pass | |
q += [10] | |
assert list(q) == [1,2,3,4,10] | |
def test_order(): | |
''' Insert order has no correlation with list order ''' | |
with tempfile.TemporaryDirectory() as tempdir: | |
with shelve.open(os.path.join(tempdir,'foo.dat')) as sf: | |
q = ListLike(sf) | |
q += [1,2,3,4,5,6,7] | |
q.insert(1, 8) | |
q.insert(3, 9) | |
q.insert(5, 10) | |
assert list(q) == [1,8,2,9,3,10,4,5,6,7] | |
if __name__ == '__main__': | |
mainTest() | |
test0() | |
test_get() | |
test_deletes() | |
test_assigns() | |
test_addition() | |
test_order() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Warning: Many of the list operations involve shuffling indices, which causes a lot of the data to be read/written, so this isn't very performant in those cases. For the simple case of only using
.append()
/.extend()
and iterating through the dataset, it's fine.