-
-
Save sairion/3d8f88d7816a43b3d6d7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
from flask import current_app, request, jsonify, abort | |
from flask.ext.sqlalchemy import SQLAlchemy | |
from sqlalchemy import not_ | |
from sqlalchemy.dialects.postgresql import UUID | |
db = SQLAlchemy(current_app) | |
class Model(db.Model): | |
"""Abstract Base Model | |
""" | |
__abstract__ = True | |
_overwrite_columns = True | |
def __init__(self, **kwargs): | |
self._set_columns(**kwargs) | |
def _set_columns(self, **kwargs): | |
for key in self.__table__.columns.keys(): | |
if key in kwargs and (self._overwrite_columns | |
or not getattr(self, key)): | |
setattr(self, key, kwargs[key]) | |
for rel in self.__mapper__.relationships.keys(): | |
if rel in kwargs: | |
if self.__mapper__.relationships[rel].uselist: | |
valid_ids = [] | |
query = getattr(self, rel) | |
cls = self.__mapper__.relationships[rel].argument() | |
if 'id' in kwargs[rel]: | |
query.filter_by(id=kwargs[rel]['id']).update(kwargs[rel]) | |
valid_ids.append(kwargs[rel]['id']) | |
else: | |
col = cls() | |
col.set_columns(**kwargs[rel]) | |
query.append(col) | |
db.session.flush() | |
valid_ids.append(col.id) | |
# delete related rows that were not in kwargs[rel] | |
query.filter(not_(cls.id.in_(valid_ids))).delete() | |
else: | |
col = getattr(self, rel) | |
col.set_columns(**kwargs[rel]) | |
def set_columns(self, **kwargs): | |
self._set_columns(**kwargs) | |
if 'modified' in self.__table__.columns: | |
self.modified = datetime.utcnow() | |
def __repr__(self): | |
if 'id' in self.__table__.columns.keys(): | |
return '%s(%s)' % (self.__class__.__name__, self.id) | |
data = {} | |
for key in self.__table__.columns.keys(): | |
val = getattr(self, key) | |
if type(val) is datetime: | |
val = val.strftime('%Y-%m-%dT%H:%M:%SZ') | |
data[key] = val | |
return json.dumps(data, use_decimal=True) | |
def to_dict(self, show=None, hide=None, path=None): | |
if not show: | |
show = [] | |
if not hide: | |
hide = [] | |
data = {} | |
if not path: | |
path = self.__tablename__.lower() | |
def prepend_path(item): | |
item = item.lower() | |
if item.split('.', 1)[0] == path: | |
return item | |
if len(item) == 0: | |
return item | |
if item[0] != '.': | |
item = '.%s' % item | |
item = '%s%s' % (path, item) | |
return item | |
show[:] = [prepend_path(x) for x in show] | |
hide[:] = [prepend_path(x) for x in hide] | |
for key in self.__table__.columns.keys(): | |
check = '%s.%s' % (path, key) | |
if check not in hide and (key is 'id' or check in show): | |
data[key] = getattr(self, key) | |
for key in self.__mapper__.relationships.keys(): | |
check = '%s.%s' % (path, key) | |
if check not in hide and check in show: | |
hide.append(check) | |
if self.__mapper__.relationships[key].uselist: | |
data[key] = [] | |
for item in getattr(self, key): | |
data[key].append(item.to_dict( | |
show=list(show), | |
hide=list(hide), | |
path=('%s.%s' % (path, key.lower())), | |
)) | |
else: | |
data[key] = getattr(self, key) | |
return data | |
class User(Model): | |
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) | |
first_name = db.Column(db.String(120)) | |
last_name = db.Column(db.String(120)) | |
posts = db.relationship('Post', backref='user', lazy='dynamic') | |
class Post(Model): | |
id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) | |
user_id = db.Column(UUID(as_uuid=True), db.ForeignKey('user.id'), nullable=False) | |
title = db.Column(db.String(200)) | |
text = db.Column(db.String()) | |
def requested_columns(request): | |
show = request.args.get('show', None) | |
if not show: | |
return [] | |
return show.split(',') | |
@current_app.route('/users/<string:user_id>', methods=['GET']) | |
def read_user(user_id): | |
# get user from database | |
user = User.query.filter_by(id=user_id).first() | |
if not user: | |
abort(404) | |
# return user as json | |
show = requested_columns(request) | |
return jsonify(data=user.to_dict(show=show)) | |
@current_app.route('/users/<string:user_id>', methods=['PUT']) | |
def update_user(user_id): | |
# get user from database | |
user = User.query.filter_by(id=user_id).first() | |
if not user: | |
abort(404) | |
# validate json user input using WTForms | |
# form = UserUpdateForm.from_json(request.get_json()) | |
# if not form.validate(): | |
# return jsonify(errors=form.errors), 400 | |
# update user in database | |
user.set_columns(**request.get_json()) | |
db.session.commit() | |
# return user as json | |
show = requested_columns(request) | |
return jsonify(data=user.to_dict(show=show)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment