Created
April 2, 2023 21:36
-
-
Save nenodias/36728ca2a1066d832cc1db298db0b1f4 to your computer and use it in GitHub Desktop.
Python Pika - AMQP Producer/Consumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
from pika.exchange_type import ExchangeType | |
queue_name = 'queue_estoque_novo_pedido' | |
dlq_name = 'dlq_estoque_novo_pedido' | |
username = '' | |
password = '' | |
host = '' | |
vhost = '' | |
def get_channel(): | |
auth = pika.PlainCredentials(username=username, password=password) | |
params = pika.ConnectionParameters(credentials=auth, host=host, virtual_host=vhost) | |
connection = pika.BlockingConnection(params) | |
channel = connection.channel() | |
channel.exchange_declare(exchange=dlq_name, exchange_type=ExchangeType.topic) | |
channel.queue_declare(queue=queue_name, durable=True, arguments={ | |
"x-dead-letter-exchange": dlq_name, | |
"x-dead-letter-routing-key": dlq_name | |
}) | |
channel.queue_declare(queue=dlq_name, durable=True) | |
channel.queue_bind(dlq_name, dlq_name, dlq_name) | |
return (channel, connection) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika, json | |
from typing import List | |
from pika.channel import Channel | |
from pika import BasicProperties | |
from pika.spec import Basic | |
import config | |
class Item(object): | |
def __init__(self, description: str, price: float, quantity: float): | |
self.description = description | |
self.price = price | |
self.quantity = quantity | |
ListItem = List[Item] | |
class Order(object): | |
def __init__(self, total : float, items: ListItem): | |
self.total = total | |
self.items = items | |
def consumer(ch: Channel, method: Basic.Deliver, properties: BasicProperties, body): | |
print("Consumer") | |
print(f'body={body}') | |
print(f'method={method}') | |
print(f'properties={properties}') | |
try: | |
dados = json.loads(body) | |
order = Order(total=dados["total"], items=[Item(description=i["description"], price=i["price"], quantity=i["quantity"]) for i in dados["items"]]) | |
print(f"{order}") | |
print(f" [x] Message successfully received!") | |
except Exception as ex: | |
props = pika.BasicProperties( | |
delivery_mode = 2, | |
headers={"exception": f"{ex}"} | |
) | |
ch.basic_publish(config.dlq_name, config.dlq_name, body=body, properties=props) | |
finally: | |
ch.basic_ack(delivery_tag = method.delivery_tag) | |
connection = None | |
try: | |
channel, connection = config.get_channel() | |
ret = channel.basic_consume(queue=config.queue_name, auto_ack=False, on_message_callback=consumer) | |
print(' [*] Waiting for messages. To exit press CTRL+C') | |
channel.start_consuming() | |
except Exception as ex: | |
print(f" [x] Excepton: {type(ex)}:{ex}") | |
finally: | |
connection.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pika | |
import config | |
dados = """{ | |
"total": 462, | |
"items":[ | |
{"description":"Pelo", "price":0.50, "quantity":60}, | |
{"description":"Chumbinho", "price":4.80, "quantity":90} | |
] | |
}""" | |
channel, connection = config.get_channel() | |
channel.basic_publish(exchange='', routing_key=config.queue_name, body=dados) | |
print(f" [x] Sent {dados}") | |
connection.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pika==1.3.1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment