Skip to content

Instantly share code, notes, and snippets.

@brunofrank
Created March 31, 2026 09:19
Show Gist options
  • Select an option

  • Save brunofrank/fe57a36631581eb6738f4cf3e70939e6 to your computer and use it in GitHub Desktop.

Select an option

Save brunofrank/fe57a36631581eb6738f4cf3e70939e6 to your computer and use it in GitHub Desktop.
PubSub classes to internal message exchange
class AutoCheckOutDeliveryManService
include Interactor
include Workable
include PubSub::Concerns::LifecycleEventSubscriber
subscribe_to 'store/change_status', kind: 'delivery', status: 'closed'
# event: Store::ChangeStatusEvent
def call
tenant.with do
if can_checkout?
checkout_delivery_man
else
reschedule_for 1.hour
end
end
end
private
def tenant
@tenant ||= Tenant.find(event.id)
end
def checkout_delivery_man
User.delivery_man.checked_in.find_each do |user|
user.check_out!
end
end
def can_checkout?
Order.where(created_at: 3.hours.ago...).where.not(status: [:delivered, :canceled, :auto_done]).empty? &&
OrderGroup.where(created_at: 3.hours.ago...).where.not(status: :delivered).empty?
end
end
class AutoOpenCloseStoresService
include Interactor
include Workable
def call
Tenant.all.find_each do |tenant|
open_stores_within_opening_hours(tenant)
close_stores_outside_opening_hours(tenant)
end
end
private
def open_stores_within_opening_hours(tenant)
if tenant.delivery_closed? && tenant.within_opening_hours? &&
tenant.opens_at.present? && tenant.delivery_status_changed_at < tenant.opens_at # This line is to keep to store closed when the user closes it manually
tenant.open!
PubSub.publish 'store/change_status', { id: tenant.id, kind: 'delivery', status: 'open' }
end
if tenant.in_store_closed? && tenant.within_opening_hours? &&
tenant.opens_at.present? && tenant.in_store_status_changed_at < tenant.opens_at # This line is to keep to store closed when the user closes it manually
tenant.open!
PubSub.publish 'store/change_status', { id: tenant.id, kind: 'in_store', status: 'open' }
end
end
def close_stores_outside_opening_hours(tenant)
if tenant.delivery_open? && !tenant.within_opening_hours? &&
tenant.closes_at.present? && tenant.delivery_status_changed_at < tenant.closes_at # This line is to keep to store open when the user opens it manually
tenant.close!
PubSub.publish 'store/change_status', { id: tenant.id, kind: 'delivery', status: 'closed' }
end
if tenant.in_store_open? && !tenant.within_opening_hours? &&
tenant.closes_at.present? && tenant.in_store_status_changed_at < tenant.closes_at # This line is to keep to store open when the user opens it manually
tenant.close!
PubSub.publish 'store/change_status', { id: tenant.id, kind: 'in_store', status: 'closed' }
end
end
end
module PubSub
module Concerns
module LifecycleEventBroadcaster
extend ActiveSupport::Concern
included do
after_commit :publish_created, on: :create
after_commit :publish_updated, on: :update, unless: Proc.new { |obj| obj.previous_changes.keys.blank? } # only emit if attribute(s) changed
after_commit :publish_deleted, on: :destroy
end
def publish_created
PubSub.publish(
'model/created',
id: id,
model_class: model_class,
tenant_id: infer_tenant_id
)
end
def publish_updated
PubSub.publish(
'model/updated',
id: id,
model_class: model_class,
tenant_id: infer_tenant_id,
changed_fields: previous_changes.keys # see ActiveModel::Dirty
)
end
def publish_deleted
PubSub.publish(
'model/deleted',
id: id,
model_class: model_class,
tenant_id: infer_tenant_id,
model_attributes: attributes
)
end
def model_class
self.class.name.demodulize
end
def infer_tenant_id
if respond_to?(:tenant_id)
tenant_id
elsif respond_to?(:tenant)
tenant.try(:id)
elsif self.class.name.demodulize == 'Tenant'
id
end
end
end
end
end
module PubSub
module Concerns
# Usage:
#
# class MyClass
# include Concerns::LifecycleEventSubscriber
#
# subscribe_to "model/created", :method_name
#
# def self.method_name(event)
# model = event.model_name.constantize.find(event.id)
# # do something here
# end
# end
#
module LifecycleEventSubscriber
extend ActiveSupport::Concern
def event
return unless context.respond_to?(:event)
@event ||= context.event.is_a?(Hash) ?
self.class.deserialize_event(context.event) :
context.event
end
def reschedule_for(time)
self.class::Worker.perform_in(time, { 'event' => self.class.serialize_event(event) })
end
class_methods do
# Filters only fire the subscription when the event attributes match
# ie: subscribe_to 'model/created', :method_name, model_class: 'Deal'
# The delay option allows inserting a delay before the callback is executed
# Note: a delay of zero forces the task to run inline
def subscribe_to(event_name, method_to_call: nil, **options)
filters = options.except(:delay) # special options, see below
delay = options[:delay]
log_event_subscribed(event_name)
PubSub.subscribe event_name do |event|
log_event_received(event_name, event)
send_delayed(method_to_call, event, delay:) if filters.blank? || filters.all? { |key, filter| matches?(event.send(key), filter) }
end
end
def send_delayed(method_to_call, event, delay:)
if method_to_call.present?
send(method_to_call, event)
elsif delay.present? && self.included_modules.include?(Workable)
self::Worker.perform_in(delay, { 'event' => serialize_event(event) })
elsif delay.present? && respond_to?(:perform_in)
perform_in(delay, { 'event' => serialize_event(event) })
else
call({ 'event' => event })
end
end
def serialize_event(event)
{
'event_class' => event.class.name,
'event_attributes' => event.attributes
}
end
def deserialize_event(payload)
return payload unless payload.is_a?(Hash) || payload.is_a?(String)
data = payload.is_a?(String) ? JSON.parse(payload) : payload
event_class = data['event_class']
event_attributes = data['event_attributes']
return data if event_class.blank? || event_attributes.blank?
event_class.constantize.new(event_attributes)
end
def log_event_subscribed(event_name)
return unless Rails.env.development?
ZenLogger.info 'Subscribing', key: 'PubSub', klass: self.name do
{
event_name: event_name
}
end
end
def log_event_received(event_name, event)
return unless Rails.env.development?
ZenLogger.info "Received event '#{event_name}'", key: 'PubSub', klass: self.name do
{
event: event.inspect
}
end
end
# Check whether value matches a given filter, using the BROADEST possible logic
# value: string OR array
# filter: string OR array
# examples:
# matches?('a', 'a') -> true # simple equality check
# matches?('a', ['a', '123']) -> true # if the one array contains the other value, they match
# matches?(['a', 'b'], 'a') -> true # inverse also works
# matches?(['a','c'], ['a', 'b]') -> true # if two arrays share a single value, they match!
def matches?(value, filter)
return filter.any? { |f| value.include?(f) } if value.is_a?(Array) && filter.is_a?(Array)
return filter.include?(value) if filter.is_a? Array
return value.include?(filter) if value.is_a? Array
value == filter
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment