Skip to content

Instantly share code, notes, and snippets.

@kkroesch
Last active October 23, 2024 11:08
Show Gist options
  • Save kkroesch/f5ac9bfd0938a0699358a8049a1c5bf5 to your computer and use it in GitHub Desktop.
Save kkroesch/f5ac9bfd0938a0699358a8049a1c5bf5 to your computer and use it in GitHub Desktop.
RabbitMQ Producer and Consumer with Tokio Runtime.
use futures_util::stream::StreamExt;
use lapin::{
options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
types::FieldTable,
Connection, ConnectionProperties,
};
use std::error::Error;
use tracing::{info, instrument};
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let connection = Connection::connect(&addr, ConnectionProperties::default()).await?;
info!("Verbunden mit RabbitMQ an {}", addr);
let channel = connection.create_channel().await?;
info!("Channel erstellt: {}", channel.id());
let queue_name = "Application";
channel
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
info!("Queue deklariert: {}", queue_name);
let mut consumer = channel
.basic_consume(
queue_name,
"app_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
info!("Warte auf Nachrichten...");
while let Some(result) = consumer.next().await {
match result {
Ok(delivery) => {
let data = delivery.data.clone();
let message = String::from_utf8_lossy(&data);
info!("Nachricht erhalten: {}", message);
verarbeite_nachricht(&message).await;
delivery
.ack(BasicAckOptions::default())
.await
.expect("Fehler beim Bestätigen der Nachricht");
}
Err(error) => {
eprintln!("Fehler beim Empfangen der Nachricht: {}", error);
}
}
}
Ok(())
}
#[instrument]
async fn verarbeite_nachricht(message: &str) {
println!("Verarbeite Nachricht: {}", message);
}
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
use tracing_subscriber;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Logging initialisieren
tracing_subscriber::fmt::init();
// AMQP-Adresse
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
// Verbindung herstellen
let connection = Connection::connect(&addr, ConnectionProperties::default()).await?;
println!("Verbunden mit RabbitMQ an {}", addr);
// Channel erstellen
let channel = connection.create_channel().await?;
println!("Channel erstellt: {:?}", channel.id());
// Queue deklarieren
let queue_name = "Application";
let queue = channel
.queue_declare(
queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
println!("Queue deklariert: {:?}", queue);
// Nachrichtendaten
let payload = b"Hallo, Welt!";
// Nachricht senden
let confirm = channel
.basic_publish(
"",
queue_name,
BasicPublishOptions::default(),
&payload.to_vec(),
BasicProperties::default(),
)
.await?
.await?; // Auf Bestätigung warten
println!("Nachricht gesendet");
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment