Created
April 23, 2017 12:09
-
-
Save ranman/9753da70e1be9b31b99c36e1def78c29 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base64 import b64encode, b64decode | |
from hashlib import sha256 | |
import hmac | |
import json | |
import twitter | |
import os | |
import boto3 | |
lex = boto3.client('lex-runtime') | |
CONSUMER_KEY = os.getenv('CONSUMER_KEY') | |
CONSUMER_SECRET = os.getenv('CONSUMER_SECRET') | |
ACCESS_KEY = os.getenv('ACCESS_KEY') | |
ACCESS_SECRET = os.getenv('ACCESS_SECRET') | |
BOT_NAME = os.getenv('BOT_NAME') | |
BOT_ALIAS = os.getenv('BOT_ALIAS') | |
t_api = twitter.Api( | |
CONSUMER_KEY, CONSUMER_SECRET, | |
ACCESS_KEY, ACCESS_SECRET | |
) | |
def sign_crc(crc): | |
h = hmac.new(bytes(CONSUMER_SECRET, 'ascii'), bytes(crc, 'ascii'), digestmod=sha256) | |
return json.dumps({ | |
"response_token": "sha256=" + b64encode(h.digest()).decode() | |
}) | |
def verify_request(event, context): | |
crc = event['headers']['X-Twitter-Webhooks-Signature'] | |
h = hmac.new(bytes(CONSUMER_SECRET, 'ascii'), bytes(event['body'], 'utf-8'), digestmod=sha256) | |
crc = b64decode(crc[7:]) # 7 is len('sha256') | |
return hmac.compare_digest(h.digest(), crc) | |
def lex_it(dm): | |
if 'message_create' not in dm: | |
return | |
dm = dm['message_create'] | |
response = lex.post_text( | |
botName=BOT_NAME, | |
botAlias=BOT_ALIAS, | |
userId=dm['sender_id'], | |
inputText=dm['message_data']['text'] | |
) | |
if response['intentName'] == 'WhereRandall' and \ | |
response['dialogState'] == 'Fulfilled': | |
msg = response['message'] | |
t_api.PostDirectMessage(msg, user_id=dm['sender_id']) | |
def webhook(event, context): | |
resp = { | |
'statusCode': 200, | |
'body': '' | |
} | |
if event.get('path') != "/twitter/webhook": | |
resp['statusCode'] = 404 | |
return resp | |
if event.get('httpMethod') == 'GET': | |
if not event['queryStringParameters'] or 'crc_token' not in event['queryStringParameters']: | |
resp['statusCode'] = 400 | |
return resp | |
resp['body'] = sign_crc(event['queryStringParameters']['crc_token']) | |
return resp | |
if not verify_request(event, context): | |
print("Could not verify the crc in the header") | |
resp['statusCode'] = 400 | |
resp['body'] = json.dumps({"error": "bad header: X-Twitter-Webhooks-Signature"}) | |
return resp | |
dm_events = json.loads(event['body']) | |
for dm in dm_events['direct_message_events']: | |
if dm['type'] == 'message_create': | |
lex_it(dm) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os | |
import foursquare | |
import arrow | |
client = foursquare.Foursquare(access_token=os.getenv('ACCESS_TOKEN')) | |
def get_loc(): | |
checkin = client.users.checkins(params={'limit': 1})['checkins']['items'][0] | |
venue = checkin['venue'] | |
ts = checkin['createdAt'] | |
return { | |
'name': venue['name'], # name | |
'loc': [ # latitude and longitude | |
venue['location']['lat'], | |
venue['location']['lng'] | |
], | |
'ts': ts #when | |
} | |
def api_handler(event, context): | |
return get_loc() | |
def alexa_handler(event, context): | |
loc = get_loc() | |
msg = "Randall was last at {} {}".format(loc['name'], arrow.get(loc['ts']).humanize()) | |
resp = { | |
'version': '1.0', | |
'sessionAttributes': {}, | |
'response': { | |
'outputSpeech': { | |
'type': 'PlainText', | |
'text': msg | |
}, | |
'card': { | |
'type': 'Simple', | |
'title': 'SessionSpeechlet - Randall Location', | |
'content': 'SessionSpeechlet - ' + msg, | |
}, | |
'shouldEndSession': True | |
} | |
} | |
return resp | |
def lex_handler(event, context): | |
loc = get_loc() | |
msg = "Randall was last at {} {}".format(loc['name'], arrow.get(loc['ts']).humanize()) | |
return { | |
"dialogAction": { | |
"type": "Close", | |
"fulfillmentState": "Fulfilled", | |
"message": { | |
"contentType": "PlainText", | |
"content": msg | |
} | |
}} | |
event_function_map = { | |
'alexa': alexa_handler, | |
'lex': lex_handler, | |
'api': api_handler | |
} | |
def lambda_handler(event, context): | |
if 'bot' in event: | |
return event_function_map['lex'](event, context) | |
elif event.get('request', {}).get('type') == "IntentRequest": | |
return event_function_map['alexa'](event, context) | |
else: | |
return event_function_map['api'](event, context) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment