Created
August 23, 2015 13:41
-
-
Save miohtama/e85ef31110ce2e0d7c38 to your computer and use it in GitHub Desktop.
PrestoDoctor OAuth implementation with Authomatic, Pyramid and Python 3.x
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Prestodoctor OAuth on Authomatic implementation with data import and mapping to internal database.""" | |
from argparse import Namespace | |
import time | |
from authomatic.core import json_qs_parser | |
from authomatic.providers.oauth2 import OAuth2 | |
from websauna.system.model import now | |
from websauna.system.user.social import EmailSocialLoginMapper, \ | |
NotSatisfiedWithData | |
from trees.models import UserMedia | |
__author__ = "Mikko Ohtamaa <[email protected]>" | |
__license__ = "AGPL" | |
class PrestodoctorAuthomatic(OAuth2): | |
"""Prestodoctor Authomatic OAuth2 implementation. | |
* Docs: https://github.com/PrestoDoctor/omniauth-prestodoctor/blob/master/lib/omniauth/strategies/prestodoctor.rb | |
""" | |
user_authorization_url = 'https://prestodoctor.com/oauth/authorize' | |
access_token_url = 'https://prestodoctor.com/oauth/token' | |
user_info_url = 'https://prestodoctor.com/api/v1/user' | |
info_base_url = "https://prestodoctor.com/api/v1/user" | |
#: Comes from config file | |
user_info_scope = [] | |
def _x_scope_parser(self, scope): | |
"""Space separated scopes""" | |
return 'user_info recommendation photo_id' | |
def _update_or_create_user(self, data, credentials=None, content=None): | |
"""Fetch user info from Prestodoctor specific endpoints.""" | |
super(PrestodoctorAuthomatic, self)._update_or_create_user(data, credentials, content) | |
self.user.base_data = self.access(self.info_base_url, content_parser=json_qs_parser).data | |
# Recommendation data might be empty if the user has not done medical evaluation yet | |
self.user.recommendation_data = self.access(self.info_base_url + "/recommendation", content_parser=json_qs_parser).data or {} | |
self.user.photo_data = self.access(self.info_base_url + "/photo_id", content_parser=json_qs_parser).data or {} | |
return self.user | |
class PrestodoctorMapper(EmailSocialLoginMapper): | |
"""Map Prestodoctor external users to our database.""" | |
def import_social_media_user(self, user): | |
"""Convert incoming Authomatic object to info dictionary.""" | |
# Merge all separate data sources to a single dictionary | |
info = user.base_data.copy() | |
info["photo_id"] = user.photo_data.copy() | |
info["recommendation"] = user.recommendation_data.copy() | |
return info | |
def update_first_login_social_data(self, user, data): | |
"""Update user full name on the first login only.""" | |
super(PrestodoctorMapper, self).update_first_login_social_data(user, data) | |
user.full_name = data["first_name"] + " " + data["last_name"] | |
def update_full_presto_data(self, user, info:Namespace): | |
"""Download copy of Prestodoctor photo files to local server. | |
Set user's medical license verified if it looks good. | |
""" | |
user.license_initial_upload_completed_at = now() | |
# Trust Prestodoctor licenses if they are not expired | |
if info.recommendation.expires > time.time(): | |
user.license_verified_by = None | |
user.license_verified_at = now() | |
user.presto_license_number = info.recommendation.id_num | |
user.medical_license_upload_completed_at = now() | |
user.license_initial_upload_completed_at = now() | |
# Download copy of government issued id so the driver can check this is the right person | |
driving_license = UserMedia.fetch_from_url(self.registry, info.photo_id.url, user=user) | |
driving_license.approved_by = None | |
driving_license.approved_at = now() | |
driving_license.store_bbb_copy(self.registry, "driving") # Backwards compatibility | |
# Set a marker for tests so we know we don't do this operation twice | |
user.user_data["social"]["prestodoctor"]["full_data_updated_at"] = now().isoformat() | |
def update_every_login_social_data(self, user, data): | |
"""Update user data every time they relogin.""" | |
# If the user has been using our system before, get the current active prestodoctor recommendation issued date | |
last_known_recommendation_issued = user.user_data["social"].get("prestodoctor", {}).get("recommendation", {}).get("issued", None) | |
super(PrestodoctorMapper, self).update_every_login_social_data(user, data) | |
# Convert data to dotted notation for saving wrists below | |
# http://stackoverflow.com/a/16279578/315168 | |
info = Namespace(**data) | |
info.address = Namespace(**info.address) | |
info.photo_id = Namespace(**info.photo_id) | |
info.recommendation = Namespace(**info.recommendation) | |
# Map Presto fields to our fields | |
mappings = { | |
"dob": info.dob, | |
"photo_url": info.photo, | |
"country": "US", | |
"zipcode": info.address.zip5, | |
"zip4": info.address.zip4, | |
"gender": None, | |
"first_name": info.first_name, | |
"last_name": info.last_name, | |
"full_name": info.first_name + " " + info.last_name, | |
"city": info.address.city, | |
"state": info.address.state, | |
"postal_code": info.address.zip5, | |
"address": info.address.address1, | |
"apartment": info.address.address2, | |
"external_data_updated": now().isoformat() | |
} | |
# TODO: Set user medical license verified if Presto gives us its number | |
# Update internal structure. Only override existing value if we have data from Presto. | |
# E.g. phone number might be missing, but we have it, so we don't want to replace existing phone number with empty string. | |
for key, value in mappings.items(): | |
if value: | |
user.user_data[key] = value | |
if user.user_data["social"]["prestodoctor"]["recommendation"].get("issued") != last_known_recommendation_issued: | |
# The prestodoctor evaluation issue has changed, do the heavy data update | |
self.update_full_presto_data(user, info) | |
def capture_social_media_user(self, request, result): | |
"""Extract social media information from the Authomatic login result in order to associate the user account.""" | |
# Should not happen | |
assert not result.error | |
email = result.user.base_data.get("email") | |
if not email: | |
# We cannot login if the Facebook doesnt' give us email as we use it for the user mapping | |
# This can also happen when you have not configured Facebook app properly in the developers.facebook.com | |
raise NotSatisfiedWithData("Email address is needed in order to user this service and we could not get one from your social media provider. Please try to sign up with your email instead.") | |
user = self.get_or_create_user_by_social_medial_email(request, result.user) | |
return user | |
#: This map is to satisfy Authomatic module loader | |
PROVIDER_ID_MAP = [PrestodoctorAuthomatic] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pytest | |
import transaction | |
from trees.tests.utils import start_order, fill_in_delivery_details, \ | |
confirm_delivery | |
from trees.usermodels import User | |
PRESTO_USER_WITH_RECOMMENDATION = "" | |
PRESTO_USER_WITHOUT_RECOMMENDATION = "" | |
PRESTO_PASSWORD = "" | |
PHONE_NUMBER = "" | |
@pytest.fixture | |
def non_evaluated_user_browser(request, browser_instance_getter): | |
"""Selenium/slinter/pytest-splinter does not properly clean the browser between tests. | |
https://github.com/pytest-dev/pytest-splinter/issues/49 | |
""" | |
return browser_instance_getter(request, non_evaluated_user_browser) | |
def do_presto_login(browser, presto_user=PRESTO_USER_WITH_RECOMMENDATION, presto_password=PRESTO_PASSWORD): | |
"""This will cause an alert in your Presto login management which you need to clear later.""" | |
b = browser | |
assert b.is_text_present("Sign in to your account") | |
# Fill in Presto login page | |
b.fill("user[email]", presto_user) | |
b.fill("user[password]", presto_password) | |
b.find_by_css("input[name='commit']").click() | |
# First time login pops up allow permission dialog. | |
if b.is_text_present("Authorization required"): | |
b.find_by_css("input[name='commit']").click() | |
def do_presto_login_if_needed(browser, presto_user=PRESTO_USER_WITH_RECOMMENDATION, presto_password=PRESTO_PASSWORD): | |
"""For subsequent tests the Prestodoctor auth keys remain activate and we don't need to enter username and password again. | |
Redirecting through OAuth provider endpoint is enough.""" | |
if browser.is_text_present("Sign in to your account"): | |
do_presto_login(browser, presto_user, presto_password) | |
def test_presto_login(web_server, browser, DBSession, init): | |
"""Login / sign up with Prestodoctor. | |
Presto application must be configurd as web application, running in http://localhost:8521/. | |
Example invocation: PRESTO_USER="040xxxXXXX" PRESTO_PASSWORD="yyyy" py.test trees -s --splinter-webdriver=firefox --splinter-make-screenshot-on-failure=false --ini=test.ini -k test_facebook_login | |
:param web_server: Py.text fixture, gives HTTP address where the functional test web server is running, ``http://localhost:8521/`` | |
:param browser: Py.test Splinter Browser fixture | |
:param DBSession: Py.test SQLALchemy session | |
:param init: Websauna configuration object | |
""" | |
b = browser | |
# Initiate Presto login with Authomatic | |
b.visit("{}/login".format(web_server)) | |
b.find_by_css(".btn-login-prestodoctor").click() | |
do_presto_login_if_needed(b) | |
assert b.is_text_present("You are now logged in") | |
# See that we got somewhat sane data | |
with transaction.manager: | |
assert DBSession.query(User).count() == 1 | |
u = DBSession.query(User).get(1) | |
assert u.first_login | |
assert u.email == PRESTO_USER_WITH_RECOMMENDATION | |
assert u.activated_at | |
assert u.last_login_ip == "127.0.0.1" | |
# Check user basic data | |
assert u.full_name == 'Test Oauth1' | |
assert u.user_data["social"]["prestodoctor"]["dob"] == -621648001 | |
assert u.address == "123 MARKET ST" | |
assert u.city == "SAN FRANCISCO" | |
assert u.state == "CA" | |
assert u.zipcode == "94105" | |
# License details | |
assert u.presto_license_number == 692624515 | |
assert u.medical_license_upload_completed_at | |
assert u.driving_license_upload_completed_at | |
assert u.license_initial_upload_completed_at | |
# Generated by our backend on succesful oauth login | |
assert b.is_text_present("You are now logged in") | |
b.find_by_css("#nav-logout").click() | |
assert b.is_text_present("You are now logged out") | |
def test_presto_double_login(web_server, browser, DBSession, init): | |
"""Login Presto user twice and see we do heavy data import only once.""" | |
b = browser | |
# Initiate Presto login with Authomatic | |
b.visit("{}/login".format(web_server)) | |
b.find_by_css(".btn-login-prestodoctor").click() | |
do_presto_login_if_needed(b) | |
assert b.is_text_present("You are now logged in") | |
# See that we got somewhat sane data | |
with transaction.manager: | |
assert DBSession.query(User).count() == 1 | |
u = DBSession.query(User).get(1) | |
# Grab timestamp of full data update | |
full_data_updated_at = u.user_data["social"]["prestodoctor"]["full_data_updated_at"] | |
b.find_by_css("#nav-logout").click() | |
# Go again | |
b.visit("{}/login".format(web_server)) | |
b.find_by_css(".btn-login-prestodoctor").click() | |
do_presto_login_if_needed(b) | |
assert b.is_text_present("You are now logged in") | |
with transaction.manager: | |
assert DBSession.query(User).count() == 1 | |
u = DBSession.query(User).get(1) | |
# Grab timestamp of full data update | |
assert u.user_data["social"]["prestodoctor"]["full_data_updated_at"] == full_data_updated_at | |
def test_presto_non_evaluated_user(web_server, non_evaluated_user_browser, DBSession, init): | |
"""Login Presto user who has not evaluation done yet.""" | |
b = non_evaluated_user_browser | |
# Initiate Presto login with Authomatic | |
b.visit("{}/login".format(web_server)) | |
b.find_by_css(".btn-login-prestodoctor").click() | |
do_presto_login_if_needed(b, presto_user=PRESTO_USER_WITHOUT_RECOMMENDATION) | |
assert b.is_text_present("You are now logged in") | |
# See that we got somewhat sane data | |
with transaction.manager: | |
assert DBSession.query(User).count() == 1 | |
u = DBSession.query(User).get(1) | |
assert u.first_login | |
assert u.email == PRESTO_USER_WITHOUT_RECOMMENDATION | |
assert u.activated_at | |
assert u.last_login_ip == "127.0.0.1" | |
# Check user basic data | |
assert u.full_name == 'Test Oauth2' | |
assert u.user_data["social"]["prestodoctor"]["dob"] == -621648001 | |
assert u.address == "123 MARKET ST" | |
assert u.city == "SAN FRANCISCO" | |
assert u.state == "CA" | |
assert u.zipcode == "94105" | |
# License details should be empty | |
assert not u.presto_license_number | |
assert not u.medical_license_upload_completed_at | |
assert not u.driving_license_upload_completed_at | |
assert not u.license_initial_upload_completed_at | |
def test_presto_order(web_server, browser, DBSession, init): | |
"""Do direct-to-buy now with licenced presto user. | |
This should go directly to thank you page, no medical evaluation questions needed. | |
""" | |
b = browser | |
start_order(web_server, browser, init, login_needed=False) | |
assert b.is_text_present("Sign in to buy") | |
b.find_by_css(".btn-login-prestodoctor").click() | |
do_presto_login_if_needed(b) | |
# Generated by our backend on succesful oauth login | |
assert b.is_text_present("You are now logged in") | |
assert b.is_text_present("Checkout") | |
# Assert we are on the order page | |
fill_in_delivery_details(b, phone_number=PHONE_NUMBER, email=None) | |
confirm_delivery(b, membership=True) | |
assert b.is_element_visible_by_css("#thank-you") | |
def test_presto_order_non_evaluated_user(web_server, non_evaluated_user_browser, DBSession, init): | |
"""Do direct-to-buy now with licenced presto user. | |
This should go directly to thank you page, no medical evaluation questions needed. | |
""" | |
b = non_evaluated_user_browser | |
start_order(web_server, b, init, login_needed=False) | |
assert b.is_text_present("Sign in to buy") | |
b.find_by_css(".btn-login-prestodoctor").click() | |
do_presto_login_if_needed(b, presto_user=PRESTO_USER_WITHOUT_RECOMMENDATION) | |
# Generated by our backend on succesful oauth login | |
assert b.is_text_present("You are now logged in") | |
assert b.is_text_present("Checkout") | |
# Assert we are on the order page | |
fill_in_delivery_details(b, phone_number=PHONE_NUMBER, email=None) | |
confirm_delivery(b, membership=True) | |
assert b.is_element_visible_by_css("#medical-recommendation") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment