Last active
March 22, 2021 07:45
-
-
Save vytas7/7e0a9d24a56a503f90e63d75c14d00fd to your computer and use it in GitHub Desktop.
Testing a simple Falcon+MongoDB document API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# # docker run --rm -it -p 27017:27017 mongo | |
# $ gunicorn --workers 8 --worker-class=meinheld.gmeinheld.MeinheldWorker test:app | |
import itertools | |
import random | |
import falcon | |
import pymongo | |
def take(n, iterable): | |
"Return first n items of the iterable as a list" | |
return list(itertools.islice(iterable, n)) | |
class Documents: | |
def __init__(self): | |
self._client = pymongo.MongoClient() | |
self._documents = self._client.database.documents | |
def on_get(self, req, resp): | |
count = self._documents.count_documents({}) | |
next_link = '/not-implemented' if count > 100 else None | |
documents = take(100, self._documents.find()) | |
for doc in documents: | |
doc['id'] = str(doc.pop('_id')) | |
resp.media = { | |
'documents': documents, | |
'next': next_link, | |
'total': count, | |
} | |
def on_get_document(self, req, resp, document_id): | |
self._documents.find_one({'_id': document_id}) | |
def on_post(self, req, resp): | |
ref = self._documents.insert_one(req.get_media()) | |
resp.location = f'/documents/{ref.inserted_id}' | |
resp.status = falcon.HTTP_CREATED | |
def on_post_random(self, req, resp): | |
# TODO: DRY with on_post. | |
number = random.randint(100_000_000, 999_999_999) | |
ref = self._documents.insert_one({'random': number}) | |
resp.location = f'/documents/{ref.inserted_id}' | |
resp.status = falcon.HTTP_CREATED | |
# Don't do this... | |
# This is just because I was too lazy to script `wrk` to send POSTs. | |
on_get_random = on_post_random | |
class MissingFunctionality: | |
def on_get(self, req, resp): | |
resp.content_type = falcon.MEDIA_TEXT | |
resp.status = falcon.HTTP_784 | |
resp.text = 'Patches welcome!' | |
documents = Documents() | |
app = falcon.App() | |
app.add_route('/documents', documents) | |
app.add_route('/documents/random', documents, suffix='random') | |
app.add_route('/documents/{document_id}', documents, suffix='document') | |
app.add_route('/not-implemented', MissingFunctionality()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment