Skip to content

Instantly share code, notes, and snippets.

@9jaswag
Last active January 22, 2024 13:43
Show Gist options
  • Save 9jaswag/e2447404e27916cbfe5faca1ad272e53 to your computer and use it in GitHub Desktop.
Save 9jaswag/e2447404e27916cbfe5faca1ad272e53 to your computer and use it in GitHub Desktop.
Bunny AMQP listener
require "bunny"
conn = Bunny.new(rabbitmq_config)
conn.start
channel = conn.create_chanel
queue = channel.queue("queue.from.PC", exclusive: false, auto_delete: false)
exchange = channel.fanout("exchange.name")
queue.bind(exchange)
queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
puts "Received #{payload}, message properties are #{properties.inspect}"
# Worker.perform_async(id: payload.id)
# acknowledge message
# FYI: there is a shortcut, Bunny::Channel.ack
ch.acknowledge(delivery_info.delivery_tag, false)
end
sleep 1.0
conn.close
# === using a consumer instance
class RabbitMqListener < Bunny::Consumer
def cancelled?
@cancelled
end
def handle_cancellation(_)
@cancelled = true
end
end
connection = Bunny.new(rabbitmq_config)
connection.start
t = Thread.new do
ch = connection.create_channel
q = ch.queue("pc.queue")
consumer = RabbitMqListener.new(ch, q, "consumer_tag", false, false, {:test_arg => 'test'})
consumer.on_delivery do |delivery_info, metadata, payload|
puts payload
ch.ack(delivery_info.delivery_tag, false)
end
# Register the consumer
q.subscribe_with(consumer)
# wait for messages
sleep
end
t.abort_on_exception = true
sleep 0.5
connection.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment