Last active
August 31, 2021 20:52
-
-
Save raihanba13/f74196983f7459a58c08d2d0723b87eb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This code sample demonstrates an implementation of the Lex Code Hook Interface | |
in order to serve a bot which manages dentist appointments. | |
Bot, Intent, and Slot models which are compatible with this sample can be found in the Lex Console | |
as part of the 'MakeAppointment' template. | |
For instructions on how to set up and test this bot, as well as additional samples, | |
visit the Lex Getting Started documentation http://docs.aws.amazon.com/lex/latest/dg/getting-started.html. | |
""" | |
import json | |
import dateutil.parser | |
import datetime | |
import time | |
import os | |
import math | |
import random | |
import logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
""" --- Helpers to build responses which match the structure of the necessary dialog actions --- """ | |
def elicit_slot(session_attributes, intent_name, slots, slot_to_elicit, message, response_card): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'ElicitSlot', | |
'intentName': intent_name, | |
'slots': slots, | |
'slotToElicit': slot_to_elicit, | |
'message': message, | |
'responseCard': response_card | |
} | |
} | |
def confirm_intent(session_attributes, intent_name, slots, message, response_card): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'ConfirmIntent', | |
'intentName': intent_name, | |
'slots': slots, | |
'message': message, | |
'responseCard': response_card | |
} | |
} | |
def close(session_attributes, fulfillment_state, message): | |
response = { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'Close', | |
'fulfillmentState': fulfillment_state, | |
'message': message | |
} | |
} | |
return response | |
def delegate(session_attributes, slots): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'Delegate', | |
'slots': slots | |
} | |
} | |
def build_response_card(title, subtitle, options): | |
""" | |
Build a responseCard with a title, subtitle, and an optional set of options which should be displayed as buttons. | |
""" | |
buttons = None | |
if options is not None: | |
buttons = [] | |
for i in range(min(5, len(options))): | |
buttons.append(options[i]) | |
return { | |
'contentType': 'application/vnd.amazonaws.card.generic', | |
'version': 1, | |
'genericAttachments': [{ | |
'title': title, | |
'subTitle': subtitle, | |
'buttons': buttons | |
}] | |
} | |
""" --- Helper Functions --- """ | |
def parse_int(n): | |
try: | |
return int(n) | |
except ValueError: | |
return float('nan') | |
def try_ex(func): | |
""" | |
Call passed in function in try block. If KeyError is encountered return None. | |
This function is intended to be used to safely access dictionary. | |
Note that this function would have negative impact on performance. | |
""" | |
try: | |
return func() | |
except KeyError: | |
return None | |
def increment_time_by_thirty_mins(appointment_time): | |
hour, minute = list(map(int, appointment_time.split(':'))) | |
return '{}:00'.format(hour + 1) if minute == 30 else '{}:30'.format(hour) | |
def get_random_int(minimum, maximum): | |
""" | |
Returns a random integer between min (included) and max (excluded) | |
""" | |
min_int = math.ceil(minimum) | |
max_int = math.floor(maximum) | |
return random.randint(min_int, max_int - 1) | |
def get_availabilities(date): | |
""" | |
Helper function which in a full implementation would feed into a backend API to provide query schedule availability. | |
The output of this function is an array of 30 minute periods of availability, expressed in ISO-8601 time format. | |
In order to enable quick demonstration of all possible conversation paths supported in this example, the function | |
returns a mixture of fixed and randomized results. | |
On Mondays, availability is randomized; otherwise there is no availability on Tuesday / Thursday and availability at | |
10:00 - 10:30 and 4:00 - 5:00 on Wednesday / Friday. | |
""" | |
day_of_week = dateutil.parser.parse(date).weekday() | |
availabilities = [] | |
available_probability = 0.3 | |
if day_of_week == 0: | |
start_hour = 10 | |
while start_hour <= 16: | |
if random.random() < available_probability: | |
# Add an availability window for the given hour, with duration determined by another random number. | |
appointment_type = get_random_int(1, 4) | |
if appointment_type == 1: | |
availabilities.append('{}:00'.format(start_hour)) | |
elif appointment_type == 2: | |
availabilities.append('{}:30'.format(start_hour)) | |
else: | |
availabilities.append('{}:00'.format(start_hour)) | |
availabilities.append('{}:30'.format(start_hour)) | |
start_hour += 1 | |
if day_of_week == 2 or day_of_week == 4: | |
availabilities.append('10:00') | |
availabilities.append('16:00') | |
availabilities.append('16:30') | |
return availabilities | |
def isvalid_date(date): | |
try: | |
dateutil.parser.parse(date) | |
return True | |
except ValueError: | |
return False | |
def is_available(appointment_time, duration, availabilities): | |
""" | |
Helper function to check if the given time and duration fits within a known set of availability windows. | |
Duration is assumed to be one of 30, 60 (meaning minutes). Availabilities is expected to contain entries of the format HH:MM. | |
""" | |
if duration == 30: | |
return appointment_time in availabilities | |
elif duration == 60: | |
second_half_hour_time = increment_time_by_thirty_mins(appointment_time) | |
return appointment_time in availabilities and second_half_hour_time in availabilities | |
# Invalid duration ; throw error. We should not have reached this branch due to earlier validation. | |
raise Exception('Was not able to understand duration {}'.format(duration)) | |
def get_duration(appointment_type): | |
appointment_duration_map = {'cleaning': 30, 'root canal': 60, 'whitening': 30} | |
return try_ex(lambda: appointment_duration_map[appointment_type.lower()]) | |
def get_availabilities_for_duration(duration, availabilities): | |
""" | |
Helper function to return the windows of availability of the given duration, when provided a set of 30 minute windows. | |
""" | |
duration_availabilities = [] | |
start_time = '10:00' | |
while start_time != '17:00': | |
if start_time in availabilities: | |
if duration == 30: | |
duration_availabilities.append(start_time) | |
elif increment_time_by_thirty_mins(start_time) in availabilities: | |
duration_availabilities.append(start_time) | |
start_time = increment_time_by_thirty_mins(start_time) | |
return duration_availabilities | |
def build_validation_result(is_valid, violated_slot, message_content): | |
return { | |
'isValid': is_valid, | |
'violatedSlot': violated_slot, | |
'message': {'contentType': 'PlainText', 'content': message_content} | |
} | |
def validate_book_appointment(appointment_type, date, appointment_time): | |
if appointment_type and not get_duration(appointment_type): | |
return build_validation_result(False, 'AppointmentType', 'I did not recognize that, can I book you a root canal, cleaning, or whitening?') | |
if appointment_time: | |
if len(appointment_time) != 5: | |
return build_validation_result(False, 'Time', 'I did not recognize that, what time would you like to book your appointment?') | |
hour, minute = appointment_time.split(':') | |
hour = parse_int(hour) | |
minute = parse_int(minute) | |
if math.isnan(hour) or math.isnan(minute): | |
return build_validation_result(False, 'Time', 'I did not recognize that, what time would you like to book your appointment?') | |
if hour < 10 or hour > 16: | |
# Outside of business hours | |
return build_validation_result(False, 'Time', 'Our business hours are ten a.m. to five p.m. What time works best for you?') | |
if minute not in [30, 0]: | |
# Must be booked on the hour or half hour | |
return build_validation_result(False, 'Time', 'We schedule appointments every half hour, what time works best for you?') | |
if date: | |
if not isvalid_date(date): | |
return build_validation_result(False, 'Date', 'I did not understand that, what date works best for you?') | |
elif datetime.datetime.strptime(date, '%Y-%m-%d').date() <= datetime.date.today(): | |
return build_validation_result(False, 'Date', 'Appointments must be scheduled a day in advance. Can you try a different date?') | |
elif dateutil.parser.parse(date).weekday() == 5 or dateutil.parser.parse(date).weekday() == 6: | |
return build_validation_result(False, 'Date', 'Our office is not open on the weekends, can you provide a work day?') | |
return build_validation_result(True, None, None) | |
def build_time_output_string(appointment_time): | |
hour, minute = appointment_time.split(':') # no conversion to int in order to have original string form. for eg) 10:00 instead of 10:0 | |
if int(hour) > 12: | |
return '{}:{} p.m.'.format((int(hour) - 12), minute) | |
elif int(hour) == 12: | |
return '12:{} p.m.'.format(minute) | |
elif int(hour) == 0: | |
return '12:{} a.m.'.format(minute) | |
return '{}:{} a.m.'.format(hour, minute) | |
def build_available_time_string(availabilities): | |
""" | |
Build a string eliciting for a possible time slot among at least two availabilities. | |
""" | |
prefix = 'We have availabilities at ' | |
if len(availabilities) > 3: | |
prefix = 'We have plenty of availability, including ' | |
prefix += build_time_output_string(availabilities[0]) | |
if len(availabilities) == 2: | |
return '{} and {}'.format(prefix, build_time_output_string(availabilities[1])) | |
return '{}, {} and {}'.format(prefix, build_time_output_string(availabilities[1]), build_time_output_string(availabilities[2])) | |
def build_options(slot, appointment_type, date, booking_map): | |
""" | |
Build a list of potential options for a given slot, to be used in responseCard generation. | |
""" | |
day_strings = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] | |
if slot == 'AppointmentType': | |
return [ | |
{'text': 'cleaning (30 min)', 'value': 'cleaning'}, | |
{'text': 'root canal (60 min)', 'value': 'root canal'}, | |
{'text': 'whitening (30 min)', 'value': 'whitening'} | |
] | |
elif slot == 'Date': | |
# Return the next five weekdays. | |
options = [] | |
potential_date = datetime.date.today() | |
while len(options) < 5: | |
potential_date = potential_date + datetime.timedelta(days=1) | |
if potential_date.weekday() < 5: | |
options.append({'text': '{}-{} ({})'.format((potential_date.month), potential_date.day, day_strings[potential_date.weekday()]), | |
'value': potential_date.strftime('%A, %B %d, %Y')}) | |
return options | |
elif slot == 'Time': | |
# Return the availabilities on the given date. | |
if not appointment_type or not date: | |
return None | |
availabilities = try_ex(lambda: booking_map[date]) | |
if not availabilities: | |
return None | |
availabilities = get_availabilities_for_duration(get_duration(appointment_type), availabilities) | |
if len(availabilities) == 0: | |
return None | |
options = [] | |
for i in range(min(len(availabilities), 5)): | |
options.append({'text': build_time_output_string(availabilities[i]), 'value': build_time_output_string(availabilities[i])}) | |
return options | |
""" --- Functions that control the bot's behavior --- """ | |
def make_appointment(intent_request): | |
""" | |
Performs dialog management and fulfillment for booking a dentists appointment. | |
Beyond fulfillment, the implementation for this intent demonstrates the following: | |
1) Use of elicitSlot in slot validation and re-prompting | |
2) Use of confirmIntent to support the confirmation of inferred slot values, when confirmation is required | |
on the bot model and the inferred slot values fully specify the intent. | |
""" | |
appointment_type = intent_request['currentIntent']['slots']['AppointmentType'] | |
date = intent_request['currentIntent']['slots']['Date'] | |
appointment_time = intent_request['currentIntent']['slots']['Time'] | |
source = intent_request['invocationSource'] | |
output_session_attributes = intent_request['sessionAttributes'] if intent_request['sessionAttributes'] is not None else {} | |
booking_map = json.loads(try_ex(lambda: output_session_attributes['bookingMap']) or '{}') | |
if source == 'DialogCodeHook': | |
# Perform basic validation on the supplied input slots. | |
slots = intent_request['currentIntent']['slots'] | |
validation_result = validate_book_appointment(appointment_type, date, appointment_time) | |
if not validation_result['isValid']: | |
slots[validation_result['violatedSlot']] = None | |
return elicit_slot( | |
output_session_attributes, | |
intent_request['currentIntent']['name'], | |
slots, | |
validation_result['violatedSlot'], | |
validation_result['message'], | |
build_response_card( | |
'Specify {}'.format(validation_result['violatedSlot']), | |
validation_result['message']['content'], | |
build_options(validation_result['violatedSlot'], appointment_type, date, booking_map) | |
) | |
) | |
if not appointment_type: | |
return elicit_slot( | |
output_session_attributes, | |
intent_request['currentIntent']['name'], | |
intent_request['currentIntent']['slots'], | |
'AppointmentType', | |
{'contentType': 'PlainText', 'content': 'What type of appointment would you like to schedule?'}, | |
build_response_card( | |
'Specify Appointment Type', 'What type of appointment would you like to schedule?', | |
build_options('AppointmentType', appointment_type, date, None) | |
) | |
) | |
if appointment_type and not date: | |
return elicit_slot( | |
output_session_attributes, | |
intent_request['currentIntent']['name'], | |
intent_request['currentIntent']['slots'], | |
'Date', | |
{'contentType': 'PlainText', 'content': 'When would you like to schedule your {}?'.format(appointment_type)}, | |
build_response_card( | |
'Specify Date', | |
'When would you like to schedule your {}?'.format(appointment_type), | |
build_options('Date', appointment_type, date, None) | |
) | |
) | |
if appointment_type and date: | |
# Fetch or generate the availabilities for the given date. | |
booking_availabilities = try_ex(lambda: booking_map[date]) | |
if booking_availabilities is None: | |
booking_availabilities = get_availabilities(date) | |
booking_map[date] = booking_availabilities | |
output_session_attributes['bookingMap'] = json.dumps(booking_map) | |
appointment_type_availabilities = get_availabilities_for_duration(get_duration(appointment_type), booking_availabilities) | |
if len(appointment_type_availabilities) == 0: | |
# No availability on this day at all; ask for a new date and time. | |
slots['Date'] = None | |
slots['Time'] = None | |
return elicit_slot( | |
output_session_attributes, | |
intent_request['currentIntent']['name'], | |
slots, | |
'Date', | |
{'contentType': 'PlainText', 'content': 'We do not have any availability on that date, is there another day which works for you?'}, | |
build_response_card( | |
'Specify Date', | |
'What day works best for you?', | |
build_options('Date', appointment_type, date, booking_map) | |
) | |
) | |
message_content = 'What time on {} works for you? '.format(date) | |
if appointment_time: | |
output_session_attributes['formattedTime'] = build_time_output_string(appointment_time) | |
# Validate that proposed time for the appointment can be booked by first fetching the availabilities for the given day. To | |
# give consistent behavior in the sample, this is stored in sessionAttributes after the first lookup. | |
if is_available(appointment_time, get_duration(appointment_type), booking_availabilities): | |
return delegate(output_session_attributes, slots) | |
message_content = 'The time you requested is not available. ' | |
if len(appointment_type_availabilities) == 1: | |
# If there is only one availability on the given date, try to confirm it. | |
slots['Time'] = appointment_type_availabilities[0] | |
return confirm_intent( | |
output_session_attributes, | |
intent_request['currentIntent']['name'], | |
slots, | |
{ | |
'contentType': 'PlainText', | |
'content': '{}{} is our only availability, does that work for you?'.format | |
(message_content, build_time_output_string(appointment_type_availabilities[0])) | |
}, | |
build_response_card( | |
'Confirm Appointment', | |
'Is {} on {} okay?'.format(build_time_output_string(appointment_type_availabilities[0]), date), | |
[{'text': 'yes', 'value': 'yes'}, {'text': 'no', 'value': 'no'}] | |
) | |
) | |
available_time_string = build_available_time_string(appointment_type_availabilities) | |
return elicit_slot( | |
output_session_attributes, | |
intent_request['currentIntent']['name'], | |
slots, | |
'Time', | |
{'contentType': 'PlainText', 'content': '{}{}'.format(message_content, available_time_string)}, | |
build_response_card( | |
'Specify Time', | |
'What time works best for you?', | |
build_options('Time', appointment_type, date, booking_map) | |
) | |
) | |
return delegate(output_session_attributes, slots) | |
# Book the appointment. In a real bot, this would likely involve a call to a backend service. | |
duration = get_duration(appointment_type) | |
booking_availabilities = booking_map[date] | |
if booking_availabilities: | |
# Remove the availability slot for the given date as it has now been booked. | |
booking_availabilities.remove(appointment_time) | |
if duration == 60: | |
second_half_hour_time = increment_time_by_thirty_mins(appointment_time) | |
booking_availabilities.remove(second_half_hour_time) | |
booking_map[date] = booking_availabilities | |
output_session_attributes['bookingMap'] = json.dumps(booking_map) | |
else: | |
# This is not treated as an error as this code sample supports functionality either as fulfillment or dialog code hook. | |
logger.debug('Availabilities for {} were null at fulfillment time. ' | |
'This should have been initialized if this function was configured as the dialog code hook'.format(date)) | |
return close( | |
output_session_attributes, | |
'Fulfilled', | |
{ | |
'contentType': 'PlainText', | |
'content': 'Okay, I have booked your appointment. We will see you at {} on {}'.format(build_time_output_string(appointment_time), date) | |
} | |
) | |
""" --- Intents --- """ | |
def dispatch(intent_request): | |
# here intent is not important as we have only one | |
# the input data does not have any intent | |
# this is the site code | |
""" | |
Called when the user specifies an intent for this bot. | |
""" | |
site_id_str = intent_request['inputTranscript'] | |
site_id_int = text2int(site_id_str) | |
print(site_id_int) | |
try: | |
int(site_id_int) | |
except Exception as e: | |
print(e) | |
return False | |
dynamodb = boto3.resource('dynamodb') | |
get_hazards_table = dynamodb.Table('GetHazards') | |
hazard_data = get_hazards_table.get_item( | |
Key={ | |
'siteId': str(site_id_int) | |
} | |
) | |
hazard_message = hazard_data['Item']['hazards'] | |
print(hazard_data) | |
return { | |
"dialogAction": { | |
"type": "Close", | |
"fulfillmentState": "Fulfilled", | |
"message": { | |
"contentType": "SSML", | |
"content": hazard_message | |
} | |
} | |
# return True | |
logger.debug('dispatch userId={}, intentName={}'.format(intent_request['userId'], intent_request['currentIntent']['name'])) | |
intent_name = intent_request['currentIntent']['name'] | |
# Dispatch to your bot's intent handlers | |
if intent_name == 'MakeAppointment': | |
return make_appointment(intent_request) | |
raise Exception('Intent with name ' + intent_name + ' not supported') | |
def is_number(x): | |
if type(x) == str: | |
x = x.replace(',', '') | |
try: | |
float(x) | |
except: | |
return False | |
return True | |
def text2int (textnum, numwords={}): | |
units = [ | |
'zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', | |
'nine', 'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', | |
'sixteen', 'seventeen', 'eighteen', 'nineteen', | |
] | |
tens = ['', '', 'twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety'] | |
scales = ['hundred', 'thousand', 'million', 'billion', 'trillion'] | |
ordinal_words = {'first':1, 'second':2, 'third':3, 'fifth':5, 'eighth':8, 'ninth':9, 'twelfth':12} | |
ordinal_endings = [('ieth', 'y'), ('th', '')] | |
if not numwords: | |
numwords['and'] = (1, 0) | |
for idx, word in enumerate(units): numwords[word] = (1, idx) | |
for idx, word in enumerate(tens): numwords[word] = (1, idx * 10) | |
for idx, word in enumerate(scales): numwords[word] = (10 ** (idx * 3 or 2), 0) | |
textnum = textnum.replace('-', ' ') | |
current = result = 0 | |
curstring = '' | |
onnumber = False | |
lastunit = False | |
lastscale = False | |
def is_numword(x): | |
if is_number(x): | |
return True | |
if word in numwords: | |
return True | |
return False | |
def from_numword(x): | |
if is_number(x): | |
scale = 0 | |
increment = int(x.replace(',', '')) | |
return scale, increment | |
return numwords[x] | |
for word in textnum.split(): | |
if word in ordinal_words: | |
scale, increment = (1, ordinal_words[word]) | |
current = current * scale + increment | |
if scale > 100: | |
result += current | |
current = 0 | |
onnumber = True | |
lastunit = False | |
lastscale = False | |
else: | |
for ending, replacement in ordinal_endings: | |
if word.endswith(ending): | |
word = "%s%s" % (word[:-len(ending)], replacement) | |
if (not is_numword(word)) or (word == 'and' and not lastscale): | |
if onnumber: | |
# Flush the current number we are building | |
curstring += repr(result + current) + " " | |
curstring += word + " " | |
result = current = 0 | |
onnumber = False | |
lastunit = False | |
lastscale = False | |
else: | |
scale, increment = from_numword(word) | |
onnumber = True | |
if lastunit and (word not in scales): | |
# Assume this is part of a string of individual numbers to | |
# be flushed, such as a zipcode "one two three four five" | |
curstring += repr(result + current) | |
result = current = 0 | |
if scale > 1: | |
current = max(1, current) | |
current = current * scale + increment | |
if scale > 100: | |
result += current | |
current = 0 | |
lastscale = False | |
lastunit = False | |
if word in scales: | |
lastscale = True | |
elif word in units: | |
lastunit = True | |
if onnumber: | |
curstring += repr(result + current) | |
return curstring | |
""" --- Main handler --- """ | |
from pprint import pprint | |
from boto3.dynamodb.conditions import Key | |
import boto3 | |
def lambda_handler(event, context): | |
""" | |
Route the incoming request based on intent. | |
The JSON body of the request is provided in the event slot. | |
""" | |
# By default, treat the user request as coming from the America/New_York time zone. | |
os.environ['TZ'] = 'America/New_York' | |
time.tzset() | |
logger.debug('event.bot.name={}'.format(event['bot']['name'])) | |
pprint(event) | |
return dispatch(event) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment