Created
November 23, 2016 21:19
-
-
Save dz0ny/cf9d101e821e13b78520725b7ced208e to your computer and use it in GitHub Desktop.
TinyORM based on tinydb and jsonmodels
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import arrow | |
import jsonmodels.models | |
import tinydb | |
from six import add_metaclass | |
from tinydb_serialization import SerializationMiddleware | |
from tinydb_serialization import Serializer | |
class DateTimeSerializer(Serializer): | |
"""DateTimeSerializer based on arrow""" | |
OBJ_CLASS = datetime | |
def encode(self, obj): | |
return arrow.get(obj).isoformat() | |
def decode(self, s): | |
return arrow.get(s).datetime | |
class DatabaseProxy(object): | |
""" | |
Proxy that is used during context binding. | |
""" | |
__slots__ = ['db'] | |
def __init__(self): | |
self.proxy(None) | |
def proxy(self, db): | |
self.db = db | |
def __getattr__(self, attr): | |
if self.db is None: | |
raise AttributeError('Cannot use uninitialized DatabaseProxy.') | |
return getattr(self.db, attr) | |
def __setattr__(self, attr, value): | |
if attr not in self.__slots__: | |
raise AttributeError('Cannot set attribute on DatabaseProxy.') | |
return super(DatabaseProxy, self).__setattr__(attr, value) | |
class Database(object): | |
""" | |
Basic database that can be used globaly or per request. | |
Example: | |
database = Database('db.json') | |
class Cat(TinyModel): | |
name = fields.StringField(required=True) | |
class Meta: | |
database = database | |
Cat(name='Chuck').save() | |
""" | |
def __init__(self, db_file): | |
store_cls = tinydb.storages.MemoryStorage\ | |
if ':memory:' == db_file\ | |
else tinydb.storages.JSONStorage | |
serializer = SerializationMiddleware(store_cls) | |
serializer.register_serializer(DateTimeSerializer(), 'TinyDateTime') | |
self.db = tinydb.TinyDB(storage=serializer)\ | |
if ':memory:' == db_file\ | |
else tinydb.TinyDB(db_file, storage=serializer) | |
def __repr__(self): | |
return '<{}: storage={}>'.format( | |
self.__class__.__name__, | |
self.db._storage.__class__.__name__, | |
) | |
class ProxyDatabase(Database): | |
""" | |
Base proxy to be used where more control is needed over | |
curenty active database. | |
Example: | |
class Cat(TinyModel): | |
name = fields.StringField(required=True) | |
class Meta: | |
database = DatabaseProxy() | |
with ProxyDatabase(':memory:', [Cat]): | |
Cat(name='Chuck').save() | |
""" | |
def __init__(self, db_file, models): | |
super(ProxyDatabase, self).__init__(db_file) | |
self.models = models | |
def __enter__(self): | |
for model in self.models: | |
# setup proxy binding | |
model.Meta.database.proxy(self.db) | |
return self.db | |
def __exit__(self, *args): | |
self.db.close() | |
for model in self.models: | |
# remove proxy binding | |
model.Meta.database.proxy(None) | |
class TablenameMeta(type): | |
def __new__(cls, name, parents, attrs): | |
attrs.setdefault('__tablename__', name.lower()) | |
return type.__new__(cls, name, parents, attrs) | |
@add_metaclass(TablenameMeta) | |
class ModelWrapper(jsonmodels.models.Base): | |
"""Wraps tinydb with jsonmodels""" | |
def __repr__(self): | |
return '<{}: __tablename__={}>'.format( | |
self.__class__.__name__, | |
self.__tablename__, | |
) | |
class TinyModel(ModelWrapper): | |
"""Base model for all inherited models""" | |
def __init__(self, eid=None, *args, **kwargs): | |
super(TinyModel, self).__init__(*args, **kwargs) | |
self.eid = eid | |
@property | |
def table(self): | |
"""Returns tinydb.table instance for this model""" | |
return self.Meta.database.table(self.__tablename__) | |
@classmethod | |
def get(cls, cond): | |
"""Returns model instance for provided cond""" | |
table = cls.Meta.database.table(cls.__tablename__) | |
res = table.get(cond) | |
return cls(eid=res.eid, **res) | |
@classmethod | |
def search(cls, cond): | |
"""Returns model instances matching cond""" | |
table = cls.Meta.database.table(cls.__tablename__) | |
return [cls(eid=row.eid, **row) for row in table.search(cond)] | |
@classmethod | |
def all(cls): | |
"""Returns all model instances""" | |
table = cls.Meta.database.table(cls.__tablename__) | |
return [cls(eid=row.eid, **row) for row in table.all()] | |
def save(self): | |
"""Saves or updates this model instance to database""" | |
if not self.eid: | |
return self.table.insert(self.to_struct()) | |
return self.table.update(self.to_struct(), eids=[self.eid]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment