Skip to content

Instantly share code, notes, and snippets.

@ericmoritz
Created July 30, 2011 17:05
Show Gist options
  • Save ericmoritz/1115749 to your computer and use it in GitHub Desktop.
Save ericmoritz/1115749 to your computer and use it in GitHub Desktop.
riak mapping interface
from collections import Mapping
def force_str(str_or_unicode):
if type(str_or_unicode) is unicode:
return str_or_unicode.encode("utf-8")
else:
return str_or_unicode
class RiakMapping(Mapping):
def __init__(self, bucket, content_type="application/json"):
self._bucket = bucket
self.content_type = content_type
def __getitem__(self, key):
key = force_str(key)
obj, merged = self._get_object_or_merge(key)
if not obj.exists():
raise KeyError("%s not found" % (key, ))
return self.transform_data(obj, obj.get_data())
def __setitem__(self, key, data):
key = force_str(key)
obj, merged = self._get_object_or_merge(key, data=data)
# If the record was merged, it has already been stored
if merged:
return
else:
self.store_internal(obj, data)
def __delitem__(self, key):
key = force_str(key)
obj = self._bucket.get(key)
if obj.exists() or obj.has_siblings():
self.delete_internal(obj)
return
raise KeyError("%s not found" % (key, ))
def _get_object_or_merge(self, key, data=None):
obj = self._bucket.get(key)
if obj.has_siblings():
siblings = [obj.get_sibling(i) for i in range(obj.get_sibling_count())]
obj = self.merge(siblings, data=data)
obj.store_internal(obj, obj.get_data())
return obj, True
return obj, False
##
# Extend these if needed
##
def __iter__(self):
"""Iteration over keys is not supported by default. Extend
this if you have a specific mechanism such for retrieving the bucket keys"""
raise NotImplementedError()
def __len__(self):
"""Mapping length is not supported by default. Extend this if
you have a specific mechanism such for counting the bucket
keys"""
raise NotImplementedError()
def transform_data(self, obj, data):
"""Use this method for manipulating the retrieved data"""
return data
def store_internal(self, obj, data):
"""Use this method for manipulating the stored data and mutate
the RiakObject prior to storing such as adding links and
indexing"""
obj.set_content_type(self.content_type)
obj.set_data(data)
obj.store()
def delete_internal(self, obj):
"""Use this method for any clean up prior to deletion of the
object"""
obj.delete()
def merge(self, siblings, data=None):
"""Use this method for merging conflicted siblings. If the
conflict occuurs prior to storage, the data kwarg will be
provided"""
raise NotImplementedError()
from riakmap import RiakMapping
import unittest
import riak
client = riak.RiakClient()
class LastWriteWinsTest(unittest.TestCase):
def setUp(self):
self.bucket = client.bucket("test")
self.mapping = RiakMapping(self.bucket)
obj = self.bucket.get("one", r=riak.ALL)
obj.delete(rw=riak.ALL)
obj = self.bucket.get("not-found", r=riak.ALL)
obj.delete(rw=riak.ALL)
def test_set(self):
expected = {"index": 1}
self.mapping['one'] = expected
obj = self.bucket.get('one')
self.assertEqual(obj.get_data(), expected)
def test_get(self):
expected = {"index": 1}
self.mapping['one'] = expected
result = self.mapping['one']
self.assertEqual(result, expected)
def test_get_not_found(self):
def inner():
self.mapping['not-found']
self.assertRaises(KeyError, inner)
def test_delete(self):
expected = {"index": 1}
self.mapping['one'] = expected
del self.mapping['one']
obj = self.bucket.get('one')
self.assertFalse(obj.exists())
def test_delete_not_found(self):
def inner():
del self.mapping['not-found']
self.assertRaises(KeyError, inner)
## This isn't the brightest conflict resolution alogrithm lightburb in the house.
class MergingMapping(RiakMapping):
def merge(self, siblings, data=None):
# set union the values
new_data = set()
for sibling in siblings:
new_data = new_data.union(sibling.get_data())
# If the merge is occuring before a store(), merge in that value as well
if data is not None:
new_data = new_data.union(data)
first = siblings[0]
first.set_data(sorted(new_data))
return first
class MergeTest(unittest.TestCase):
def setUp(self):
self.bucket = client.bucket("sibling-test")
self.bucket.set_allow_multiples(True)
self.mapping = MergingMapping(self.bucket)
obj = self.bucket.get("split", r=riak.ALL)
obj.delete(rw=riak.ALL)
obj = self.bucket.get("not-found", r=riak.ALL)
obj.delete(rw=riak.ALL)
# Create a pair of siblings
client.set_client_id("herp")
obj = self.bucket.new("split", data=[1])
obj.store()
client.set_client_id("derp")
obj = self.bucket.new("split", data=[2])
obj.store()
def test_set(self):
expected = [1,2,3]
self.mapping['split'] = [3]
obj = self.bucket.get('split')
self.assertEqual(obj.get_data(), expected)
def test_get(self):
expected = [1,2]
result = self.mapping['split']
self.assertEqual(result, expected)
def test_get_not_found(self):
def inner():
self.mapping['not-found']
self.assertRaises(KeyError, inner)
def test_delete(self):
del self.mapping['split']
obj = self.bucket.get('one')
self.assertFalse(obj.exists())
def test_delete_not_found(self):
def inner():
del self.mapping['not-found']
self.assertRaises(KeyError, inner)
if __name__ == "__main__":
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment