Skip to content

Instantly share code, notes, and snippets.

@dennyabrain
Created May 15, 2017 19:57
Show Gist options
  • Save dennyabrain/cc446c83663935237d268ad6b1548b24 to your computer and use it in GitHub Desktop.
Save dennyabrain/cc446c83663935237d268ad6b1548b24 to your computer and use it in GitHub Desktop.
/**
* Created by nivsherf on 07/07/2016.
*/
'use strict';
var watsonController = require('../controllers/watson-controller'),
sns = require('../controllers/sns-controller'),
trainingController = require('../controllers/training-controller'),
conversation = require('../models/conversation'),
question = require('../models/question'),
user = require('../models/user'),
config = require('config'),
io = require('socket.io-emitter')(config.redis),
log = require('../commons/logger'),
Promise = require('bluebird');
function routeToExpert(conversationId) {
log.info('routing conversation ' + conversationId);
return conversation.getUserId(conversationId)
.then(userId => {
io.of('/expert').emit('new conversation', {
conversation_id: conversationId,
user_id: userId
});
conversation.deletePossibleAnswers(conversationId)
.catch(err => log.error('Error deleting possible answers for conversation %d', conversationId, err));
return conversation.markAsRouted(conversationId);
});
}
function sendMsgToExpert(conversationId, expertId, userId, input) {
log.info('sending question to expert');
/*TODO - handle case when expert disconnects during a conversation and is no longer available
(if adding to the queue is the solution - then it can be done with:
conversation.addToQueue(conversationId, input);
draining the queue upon reconnecting is already taken care of.
*/
io.of('/expert').to('id:' + expertId).emit('message', {
user_id: userId,
conversation_id: conversationId,
body: input
});
}
function sendMsgToUser(conversationId, userId, from, input, expertId) {
return Promise.all([user.isAvailable(userId), conversation.getExpertConsoleConversationId(conversationId), conversation.getUserARN(conversationId)])
.spread((isAvailable, expertConsoleConversationId, arn) => {
if (isAvailable) {
let payload = {
conversation_id: conversationId,
question_id: expertConsoleConversationId,
from: from,
expert_id: expertId,
body: input
};
log.info(`Sending message to userId ${userId}, payload: ${JSON.stringify(payload)}`);
io.of('/user').to('id:' + userId).emit('message', payload);
} else {
log.info('user #%s not avail - sending push notification.', userId);
return sns.sendNewMessagePushNotification(userId, input[0], conversationId, arn)
.catch(err => log.error('Error sending push notification.', err));
}
});
}
function sendTagToUser(conversationId, userId, tagId) {
return Promise.all([user.isAvailable(userId), conversation.getExpertConsoleConversationId(conversationId)])
.spread((isAvailable, expertConsoleConversationId) => {
if (isAvailable) {
io.of('/user').to('id:' + userId).emit('tag', {
conversation_id: conversationId,
question_id: expertConsoleConversationId,
tag_id: tagId
});
} else {
log.info('user #%s not avail - not sending tag event.', userId);
}
});
}
function notifyUserOnConversationEnd(conversationId, userId) {
return Promise.all([user.isAvailable(userId), conversation.getExpertConsoleConversationId(conversationId)])
.spread((isAvailable, expertConsoleConversationId) => {
if (isAvailable) {
io.of('/user').to('id:' + userId).emit('conversation ended', {
conversation_id: conversationId,
question_id: expertConsoleConversationId
});
} else {
log.info('user #%s not avail - not sending conversation ended event.', userId);
}
});
}
function persistWatsonConversationResponse(response, isPrivate) {
const conversationId = response.context.conversation_id;
response.output.text.forEach(message => {
conversation.saveMessageInExpertConsole({conversationId, senderId: 1, message})
.catch(err => log.error('Error persisting message.', err));
});
return conversation.setContext(conversationId, response.context);
}
class ConversationController {
userLogin(userId) {
return user.setIsAvailable(userId, true);
}
userLogout(userId) {
return user.setIsAvailable(userId, false);
}
/**
* Initiates a new conversation for the user with a given question
*
* @param userId
* @param msg - the socket.io msg object with the question asked as the message body
*/
createConversation(userId, msg) {
log.info(`creating new conversation. userId: ${userId}, msg: ${JSON.stringify(msg)}`);
let conversationId, shouldRouteToExpert = true;
//Classifying the question with the Q&A classifier
const classify = watsonController.getAnswers(msg.body);
//Starting the conversation with Watson's conversation tool
const converse = classify.then(possibleAnswers => {
log.info('confidence: %s, type: %s', possibleAnswers[0].confidence, typeof possibleAnswers[0].confidence);
const context = {};
if (possibleAnswers[0].confidence >= 0.55) {
context.answer = possibleAnswers[0].answer;
shouldRouteToExpert = false;
}
return watsonController.converse(msg.body, context);
});
converse.then(converseResult => log.info('converse result: ', converseResult));
const createConversationInExpertConsole = converse.then(converseResult =>
conversation.createConversationInExpertConsole(converseResult.context.conversation_id, userId, msg.body, msg.is_private));
createConversationInExpertConsole.catch(err => log.error('Error creating conversation in DB:', JSON.stringify(err)));
const persistUserMessage = createConversationInExpertConsole
.then(questionId => conversation.saveMessageInExpertConsole({questionId, senderId: userId, message: msg.body}));
return Promise.all([classify, converse, createConversationInExpertConsole, persistUserMessage])
.spread((possibleAnswers, converseResult, questionId, messageId) => {
conversationId = converseResult.context.conversation_id;
log.info('conversation id = %s', conversationId);
//Storing conversation data in cache
conversation.setUserId(conversationId, userId)
.catch(err => log.error('Error setting userId', err));
conversation.setExpertConsoleConversationId(conversationId, questionId)
.catch(err => log.error('Error setting expert console conversationId', err));
conversation.setPossibleAnswers(conversationId, possibleAnswers)
.catch(err => log.error('Error setting possible answers.', err));
conversation.setOriginalQuestion(conversationId, msg.body)
.catch(err => log.error('Error saving original message', err));
if (msg.arn) {
conversation.setUserARN(conversationId, msg.arn);
}
msg.conversation_id = conversationId;
//In case watson couldn't answer the question - setting the question status to 'new'
let updateQuestionStatus = shouldRouteToExpert ?
conversation.updateConversationStatusExpertConsole({questionId, status: 'new'}) : Promise.resolve();
return updateQuestionStatus
.then(() => persistWatsonConversationResponse(converseResult))
.then(() => converseResult.output.text);
})
.then(watsonMessages => sendMsgToUser(conversationId, userId, 'watson', watsonMessages))
.then(() => this.tagQuestion(msg.body, userId, conversationId))
.then(() => {
if (shouldRouteToExpert) {
log.info('routing to expert!');
return routeToExpert(conversationId);
}
})
.catch(err => {
log.error('Error initiating conversation: ', JSON.stringify(err));
throw err;
});
}
processExpertMessage(msg, expertId) {
log.info('New expert message: %s, expertId: %s', msg, expertId);
let conversationId = msg.conversation_id;
conversation.setExpertId(conversationId, expertId);
return conversation.getUserId(conversationId).then(userId => {
conversation.saveMessageInExpertConsole({conversationId, senderId: expertId, message: msg.body})
.catch(err => log.error('Error persisting expert message.', err));
if (userId || arn) {
sendMsgToUser(msg.conversation_id, userId, 'expert', [msg.body], expertId);
} else {
log.warn('No userId and arn for conversation #' + msg.conversation_id);
//TODO - emit error
}
});
}
processUserMessage(userId, msg) {
log.info('new user message from user #%d: ', userId, JSON.stringify(msg));
const conversationId = msg.conversation_id;
return Promise.all([conversation.saveMessageInExpertConsole({conversationId, senderId: userId, message: msg.body}),
conversation.getExpertId(conversationId),
conversation.getOriginalQuestion(conversationId),
conversation.getContext(conversationId),
conversation.isRouted(conversationId),
(msg.arn ? conversation.setUserARN(conversationId, arn) : Promise.resolve())
])
.then(([dbRes, expertId, originalQ, context, isRouted]) => {
log.info('data: expertId: %d, isRouted: %s, context:', expertId, isRouted, context);
if (expertId) {
//if an expert ID is attached to this conversation, we try to send the message via socket
sendMsgToExpert(conversationId, expertId, userId, msg.body);
} else if (!isRouted) {
//if conversation is not routed (still in "Watson stage"), we pass it through the conversation tool
watsonController.converse(msg.body, context).then(converseResult => {
log.info('converseResult: ', converseResult);
if (converseResult.entities.some(cur => cur.entity === 'feedback' && cur.value === 'positive')) {
log.info('positive feedback!');
//if we got positive feedback, then we're adding the user input to the watson codex and setting the conversation state to 'support'
return Promise.all([conversation.getPossibleAnswers(conversationId),
conversation.updateConversationStatusExpertConsole({conversationId, status: 'support'})])
.spread((possibleAnswers) => {
//letting the user client know conversation ended
notifyUserOnConversationEnd(conversationId, userId);
//the correct answer is the first in the array - the one with the greatest confidence score
let answer = possibleAnswers[0].id;
//adding the Q&A pair to the watson codex and resetting conversation state
log.info('Received positive feedback - clearing history and adding Q&A to watson codex. Q: %s, A: %s', originalQ, answer);
conversation.deletePossibleAnswers(conversationId);
return trainingController.addVariation({
text: originalQ,
question_id: answer
}).catch(err => log.error('failed to add to watson codex: ', err));
}).catch(err => log.error('Error closing conversation.', err));
} else if(converseResult.entities.some(cur => cur.entity === 'feedback' && cur.value === 'negative')) {
//got negative feedback - persisting and sending back "We're sorry..." message,
// setting conversation status to 'new' and routing to expert
log.info('negative feedback - routing question.');
persistWatsonConversationResponse(converseResult, msg.is_private);
sendMsgToUser(conversationId, userId, 'watson', converseResult.output.text);
return conversation.updateConversationStatusExpertConsole({conversationId, status: 'new'})
.then(() => routeToExpert(conversationId));
}else{
log.info('unnecessary state - handle with a dummy response');
sendMsgToUser(conversationId, userId, 'watson', 'Could you please clarify whether that answered your question?');
}
});
}
});
}
tagQuestion(question, userId, conversationId) {
log.info('Tagging original Q:', question);
const getTag = watsonController.getTag(question);
getTag.then(tagId => log.info('Tagged! Q: %s, tag: %s', question, tagId));
getTag.then(tagId => sendTagToUser(conversationId, userId, tagId))
.catch(err => log.error('Error sending tag to user, conversation id %d', conversationId, err));
const persistTag = getTag.then(tagId => conversation.persistConversationTag(conversationId, tagId))
.catch(err => log.error('error tagging question for conversation id %d', conversationId, err));
return persistTag;
}
wrapUpConversation(conversationId) {
log.info("conversation #" + conversationId + " wrapped up by expert.");
return conversation.getUserId(conversationId).then(userId => {
notifyUserOnConversationEnd(conversationId, userId);
});
}
}
module.exports = new ConversationController();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment