Created
August 4, 2017 06:14
-
-
Save drhenner/44a04f838ba612778db1e4dc093d0e48 to your computer and use it in GitHub Desktop.
Message Bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################################### | |
# A Topic is a superclass that all Communications inherit from | |
# | |
# A Communication Channel will accept a number of firms_topics it wants communications for. | |
# eg. A mobile Device will by default want to hear all communications. | |
# If the device adds a filter they will not accept communications for | |
# That type of message. | |
# | |
# If it accepts a given firms_topic and there is a message to send. The firms_topic | |
# has one topic "belongs_to :topic" that performs the method `Process`. | |
# The `process` method generally takes the params (device_id, object_id, message) | |
# | |
# -------------- -------------- ------------------ | |
# | Firm | | FirmsTopic | | Topic | | |
# | |-------------------------------<| |>-------| | | |
# | | | | | medium | | |
# | | | | | message_type | | |
# | | | | | type | | |
# | | | | | name | | |
# | | | | | description | | |
# -------------- -------------- ------------------ | |
# | | | |
# | | | |
# / \ / \ | |
# -------------- ------------------ ------------------ | |
# | User | | Communication | | Notification | | |
# | |-------<| Channel |-------<| Filters | | |
# | | | | | | | |
# | | | type | | | | |
# | | | name | | | | |
# | | | meta | | | | |
# | | | options | | | | |
# | | | ------------- | | | | |
# | | | medium (email) | | | | |
# -------------- ------------------ ------------------ | |
# | |
################################################################### | |
class Topic < ActiveRecord::Base | |
#has_many :firms_topics # shouldn't be called as a method this would return all firm's firms_topics | |
validates :description, presence: true | |
validates :name, presence: true | |
validates :medium, presence: true | |
validates :message_type, presence: true, uniqueness: { scope: [:medium, :type]} | |
MESSAGE_ADDED = 'message_added' | |
TASK_ASSIGNED = 'task_assigned' | |
TASK_COMPLETED = 'task_completed' | |
TASK_DUE_TODAY = 'task_due_today' | |
TASK_READY = 'task_ready' | |
INTEGRATION_INSTALLED = 'integration_installed' | |
MESSAGE_TYPES = [ | |
MESSAGE_ADDED, | |
TASK_ASSIGNED, | |
TASK_COMPLETED, | |
TASK_DUE_TODAY, | |
TASK_READY | |
] | |
INFO = [ | |
{ type: 'Topics::EmailMessageAdded', medium: :email, message_type: MESSAGE_ADDED, name: 'Message Added', description: 'Send notification to email when a message is added to a task that I follow.'}, | |
{ type: 'Topics::EmailTaskCompleted', medium: :email, message_type: TASK_COMPLETED, name: 'Task Completed', description: 'Send notification to email when a task is Completed that I follow'}, | |
{ type: 'Topics::MobileMessageAdded', medium: :mobile, message_type: MESSAGE_ADDED, name: 'Message Added', description: 'Send notification to device when a message is added to a task that I follow.'}, | |
{ type: 'Topics::MobileTaskCompleted',medium: :mobile, message_type: TASK_COMPLETED, name: 'Task Completed', description: 'Send notification to device when a task is Completed that I follow'}, | |
#{ type: 'Topics::SlackTaskAssigned', medium: :slack, message_type: TASK_ASSIGNED, name: 'Task Assigned', description: 'Send personal notification to Slack when a task is assigned to me'}, | |
#{ type: 'Topics::SlackIntegrationInstalled', medium: :slack, message_type: INTEGRATION_INSTALLED, name: 'Integration Installed', description: 'Send personal notification to Slack when Slack integration is installed'}, | |
#{ type: 'Topics::SlackMessageAdded', medium: :slack, message_type: MESSAGE_ADDED, name: 'Message Added', description: 'Send notification to Slack when a message is added to a task that I follow.'}, | |
] | |
def perform(channel_id, object_id, message = nil, options = {}) | |
raise 'please implement `perform` in sub-class' | |
end | |
end | |
#################################################################### | |
class FirmsTopic < ActiveRecord::Base | |
belongs_to :firm | |
belongs_to :topic | |
validates :topic_id, presence: true, uniqueness: { scope: :firm_id } | |
validates :firm_id, presence: true | |
delegate :description, | |
:name, | |
:medium, | |
:message_type, to: :topic | |
def deactivate! | |
self.update_attributes(active: false) | |
end | |
end | |
########################################################## | |
class CommunicationChannel < ActiveRecord::Base | |
include Bitfields | |
TYPES = ['MobileChannel', 'EmailChannel', 'SlackChannel'] | |
belongs_to :user | |
has_many :notification_filters | |
has_many :filtered_firms_topics, -> { where(firms_topics: { active: true }) }, | |
through: :notification_filters, | |
class_name: 'FirmsTopic', source: :firms_topic | |
scope :active, -> { where(active: true) } | |
validates :name, presence: true, uniqueness: {scope: :user_id} | |
validates :user_id, presence: true | |
bitfield :options, 1 => :allows_daily_digest | |
# meta data about the device (for things like OS/model/???? ) | |
serialize :meta, Hash | |
delegate :firm, to: :user, allow_nil: false | |
attr_accessor :email, :device_token, :endpoint_arn | |
def deactivate! | |
self.update_attribute(:active, false) | |
end | |
def activate! | |
self.update_attribute(:active, true) | |
end | |
# 1) if the device(communication_channel) is marked inactive then it will not get ANY notification firms_topics | |
# 2) if the firm shut off the notifications the device(communication_channel) will not get the notification firms_topic | |
# 3) otherwise grab all firms_topics that the user has not filtered that are available for the firm | |
def allowed_firms_topics | |
if active | |
firm.firms_topics.includes(:topic).where(topics: {medium: medium}).where.not({firms_topics: {id: notification_filters.pluck(:firms_topic_id) }}) | |
else | |
[] | |
end | |
end | |
def allows_firms_topic?(topic_id) | |
if active | |
allowed_firms_topics.where(topics: {id: topic_id}).exists? | |
else | |
false | |
end | |
end | |
def firms_topics_for(message_type) | |
if active | |
firm.firms_topics.includes(:topic).where(topics: { medium: medium, message_type: message_type }).where.not({firms_topics: {id: notification_filters.pluck(:firms_topic_id) }}) | |
else | |
FirmsTopic.none | |
end | |
end | |
def add_to_message_bus(message_type, object_id, message = nil, options = {}) | |
firms_topics_for(message_type).map { |firm_topic| firm_topic.topic.perform(id, object_id, message, options) } | |
end | |
def medium | |
# sub-class should implement this method | |
raise NotImplementedError.new("Add `medium` method to #{self.class}") | |
end | |
def identifier | |
# | |
end | |
def identifier_name=(val) | |
# do nothing | |
end | |
end | |
############################################### | |
class NotificationFilter < ActiveRecord::Base | |
belongs_to :communication_channel # null false in DB | |
belongs_to :firms_topic # null false in DB | |
delegate :topic_id, | |
:description, | |
:name, | |
:medium, | |
:message_type, to: :firms_topic | |
end | |
################################################ | |
class Firm < ActiveRecord::Base | |
has_many :firms_topics, -> { where({firms_topics: {active: true }}) } | |
has_many :all_firms_topics, class_name: 'FirmsTopic' | |
after_commit :assign_all_firms_topics, on: :create | |
private | |
def assign_all_firms_topics | |
Topic.all.each do |trigger| | |
self.firms_topics.where(topic_id: trigger.id).first_or_create(active: true) | |
end | |
end | |
end | |
################################################ | |
class User < ActiveRecord::Base | |
has_many :communication_channels | |
has_many :email_channels | |
has_many :mobile_channels | |
has_many :notification_filters, through: :communication_channels | |
has_many :slack_channels, source: :communication_channels, class_name: 'SlackChannel' | |
# @user.add_to_message_bus | |
# | |
# This method does all the heavy lifting for sending messages/information to | |
# users. | |
# | |
# The message bus is NOT responsible for knowing if the user was suppose to | |
# receive the message to begin with. If you put a message on the bus and the | |
# user has any type of communication turned on, they WILL receive the message. | |
# HENCE, the message bus does not check permissions, it just sends messages to all | |
# communication channels turned on for a given message_type. | |
# | |
# NOTE: you can look at the processing of the messages in models/triggers/* { :method_name => 'process' } | |
def add_to_message_bus(message_type, object_id, message = nil, options = {}) | |
raise "#{message_type} must be a recognized message type: #{Topic::MESSAGE_TYPES}" if !Topic::MESSAGE_TYPES.include?(message_type.to_s) | |
channels = communication_channels.where(active: true) | |
channels.each do |channel| | |
channel.add_to_message_bus(message_type, object_id, message, options) | |
end | |
end | |
end | |
################################################ | |
class EmailChannel < CommunicationChannel | |
validate :email_present | |
def email | |
self.meta['email'] | |
end | |
def email=(value) | |
self.meta['email'] = value | |
end | |
def medium | |
'email' | |
end | |
def identifier | |
end | |
def identifier=(val) | |
self.email=(val) | |
end | |
def identifier_name | |
'email' | |
end | |
def email_present | |
if meta['email'].blank? || !meta['email'].match(Devise.email_regexp) | |
self.errors.add(:base, 'This is not a valid email') | |
end | |
true | |
end | |
end | |
############################################ | |
# endpoint_arn | |
# - an endpoint token for a device or mobile app on one of the supported push notification services, such as GCM and APNS | |
# | |
# device_token | |
# - REFERENCE: http://www.jamesransom.net/?p=27 | |
# - To obtain your device token for testing on chrome refer to the README at | |
# * https://github.com/scalus/sample_push_notification | |
class MobileChannel < CommunicationChannel | |
include ActionView::Helpers::NumberHelper | |
validates :endpoint_arn, :device_token, presence: true, if: :active? | |
before_validation :obtain_endpoint_arn | |
def medium | |
'mobile' | |
end | |
def identifier=(val) | |
self.device_token=(val) | |
end | |
def identifier_name | |
'mobile' | |
end | |
def identifier_name=(val) | |
# | |
end | |
def endpoint_arn=(endpoint_arn) | |
self.meta[:endpoint_arn] = endpoint_arn | |
end | |
def endpoint_arn | |
self.meta[:endpoint_arn] | |
end | |
def device_token=(device_token) | |
@device_token_changed = (self.meta[:device_token] != device_token) | |
self.meta[:device_token] = device_token | |
end | |
def device_token | |
self.meta[:device_token] | |
end | |
def is_sandbox | |
self.meta[:is_sandbox] | |
end | |
def is_sandbox=(flag) | |
self.meta[:is_sandbox] = !!flag | |
end | |
def obtain_endpoint_arn! | |
obtain_endpoint_arn | |
self.save | |
end | |
def sns_client | |
@sns ||= Aws::SNS::Client.new | |
end | |
def publish(hash) | |
sns_message = { | |
default: hash.delete(:default), | |
APNS_SANDBOX: { aps: hash.to_json }, | |
APNS: { aps: hash.to_json } | |
} | |
sns_client.publish(target_arn: self.endpoint_arn, message: sns_message.to_json, message_structure: 'json') | |
end | |
private | |
def obtain_endpoint_arn | |
platform_arn = self.is_sandbox ? Settings.aws.sns.ios.application_arn : Settings.aws.sns.ios.sandbox_arn | |
if device_token.present? && (endpoint_arn.blank? || @device_token_changed) | |
endpoint = sns_client.create_platform_endpoint( | |
platform_application_arn: platform_arn, | |
token: device_token | |
) | |
self.endpoint_arn = endpoint[:endpoint_arn] | |
end | |
true | |
end | |
end | |
####################################### | |
class EmailTopic < Topic | |
end | |
####################################### | |
class Topics::EmailMessageAdded < EmailTopic | |
def perform(channel_id, comment_id, message, options) | |
# send device the message with a push notification | |
channel = CommunicationChannel.includes(:user).find(channel_id) | |
user_id_to_message = channel.user_id | |
comment_id = comment_id # Comment.includes(:task).find(comment_id) | |
# options optionally have recipient_names | |
options = ActiveSupport::HashWithIndifferentAccess.new(options) | |
options.reverse_merge({recipient_names: []}) # default to no options | |
send_message(channel.user_id, comment_id, options[:recipient_names]) | |
end | |
private | |
def send_message(user_id, comment_id, recipient_names) | |
TaskMailer.delay.new_comment_notification(user_id, comment_id, recipient_names: recipient_names) | |
end | |
end | |
####################################### | |
class Topics::EmailTaskCompleted < EmailTopic | |
def perform(channel_id, task_id, message, options) # updater_id is a required option | |
# send device the message with a push notification | |
channel = CommunicationChannel.includes(:user).find(channel_id) | |
task = Task.find(task_id) | |
options = ActiveSupport::HashWithIndifferentAccess.new(options) | |
options.reverse_merge({updater_id: nil}) | |
options.reverse_merge({recipient_names: []}) | |
options.reverse_merge({new_comment: false}) | |
send_message(channel.user_id, task.id, options[:updater_id], options[:new_comment], options[:recipient_names]) | |
end | |
private | |
def send_message(user_id, task_id, updater_id, new_comment, recipient_names) | |
TaskMailer.delay.resolved_notification(user_id, | |
task_id, | |
updater_id, | |
new_comment: new_comment, | |
recipient_names: recipient_names) | |
end | |
end | |
################################### | |
class MobileTopic < Topic | |
def sns | |
Aws::SNS::Client.new | |
end | |
end | |
################################### | |
class Topics::MobileMessageAdded < MobileTopic | |
def perform(channel_id, comment_id, message, options) | |
# send device the message with a push notification | |
@channel = MobileChannel.includes(:user).find(channel_id) | |
@comment = Comment.find(comment_id) | |
send_message | |
end | |
private | |
def send_message | |
sns = Aws::SNS::Client.new | |
title = truncate("New Message: #{@comment.commentable.title}", length: 40, omission: '...') | |
message = { default: title, sound: 'default', badge: 1, alert: title } | |
sns.publish(target_arn: @channel.endpoint_arn, message: message, message_structure: 'json', subject: 'test') | |
end | |
end | |
################################### | |
class Topics::MobileTaskCompleted < MobileTopic | |
def perform(channel_id, task_id, message, options) | |
# send device the message with a push notification | |
@channel = MobileChannel.find(channel_id) | |
@task = Task.find(task_id) | |
send_message | |
end | |
private | |
def send_message | |
title = truncate("Completed: #{@task.title}", length: 40, omission: '...') | |
message = { default: title, sound: 'default', badge: 1, alert: title } | |
@channel.publish(message) | |
end | |
end | |
################################### | |
require 'slack' | |
class SlackTopic < Topic | |
include Rails.application.routes.url_helpers | |
include ActionView::Helpers::TextHelper | |
include ApplicationHelper | |
private | |
def slack | |
Slack.configure do |config| | |
config.token = token.token | |
end | |
Slack | |
end | |
def token | |
@token ||= IntegrationToken.where(application: Integrations::SlackService::APPLICATION, firm_id: @firm.id).last | |
end | |
end | |
################################### | |
class Topics::SlackTask < SlackTopic | |
def task_list_text | |
if @task.project | |
"\n*Tasklist*\n #{@task.project.title.html_safe}" | |
end | |
end | |
def task_url | |
riveter_tasks_url(host: @firm.main_host) + '/' + @task.id.to_s | |
end | |
def task_actions | |
links = [] | |
links << "<#{ complete_task_with_token_url(@task, @user, { host: @firm.main_host }) }|Complete Task>" if @user.present? && @task.can_be_email_completed_by?(@user) | |
links << "<#{ unfollow_task_with_token_url(@follower, { host: @firm.main_host }) }|Unfollow Task>" if @follower.present? && [email protected]_follower? | |
links.join(' • ') | |
end | |
def for_company_text | |
@task && @task.company ? "for #{@task.company.name.html_safe}" : '' | |
end | |
end | |
################################### | |
module Topics | |
class SlackMessageAdded < SlackTask | |
def perform(channel_id, comment_id, message = nil, options = {}) | |
# send device the message with a push notification | |
@channel = CommunicationChannel.includes(:user).find(channel_id) | |
@comment = Comment.find(comment_id) | |
@task = @comment.task | |
@author = User.find_by_id(@comment.created_by) | |
@firm = @task.firm | |
@follower = @task.task_followers.where(follower_id: @channel.user_id).first | |
send_message | |
end | |
private | |
def send_message | |
response = slack.post('chat.postMessage', channel: @channel.slack_user_id, token: token.token, | |
username: 'Scalus Notifications', | |
icon_url: Settings.slack.bot_logo_url, | |
attachments: [{ | |
pretext: pretext, | |
fallback: pretext + ' ' + task_url, | |
color: '#EEE', | |
title: @task.title.html_safe, | |
text: text, | |
title_link: task_url, | |
mrkdwn_in: ['text'] | |
}].to_json | |
) | |
response['ok'] | |
end | |
def pretext | |
@author ? "#{@author.name_or_email} added a new message to this task" : 'A new message was added to this task' | |
end | |
def text | |
[ | |
truncate(@comment.try(:body).to_s.html_safe, length: 50), | |
task_url, | |
task_actions, | |
task_list_text | |
].join("\n") | |
end | |
end | |
end | |
################################### | |
################################### |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment