Skip to content

Instantly share code, notes, and snippets.

@MirzaLeka
Last active May 18, 2025 20:40
Show Gist options
  • Save MirzaLeka/45e10ea8c738ec168898cd21090880d1 to your computer and use it in GitHub Desktop.
Save MirzaLeka/45e10ea8c738ec168898cd21090880d1 to your computer and use it in GitHub Desktop.
ASP .NET Worker Service for Apache ActiveMQ
namespace AMQSubscriberWorker.Services
{
public interface IOrderService
{
PizzaOrder? DeserializeOrder(string pizzaOrder);
}
}
using System.Text.Json;
namespace AMQSubscriberWorker.Services
{
public class OrderService : IOrderService
{
public PizzaOrder? DeserializeOrder(string pizzaOrder)
{
try
{
var order = JsonSerializer.Deserialize<PizzaOrder>(pizzaOrder);
if (order is null)
{
return null;
}
return order;
}
catch(Exception)
{
return null;
}
}
}
}
namespace AMQSubscriberWorker
{
public class PizzaOrder
{
public Guid ID { get; set; }
public string Name { get; set; }
public DateTime DateOrdered { get; set; }
}
}
using AMQSubscriberWorker.Services;
namespace AMQSubscriberWorker
{
public class Program
{
public static void Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
// Registering the OrderService
builder.Services.AddScoped<IOrderService, OrderService>();
// Registering the background service
builder.Services.AddHostedService<Worker>();
var host = builder.Build();
host.Run();
}
}
}
using AMQSubscriberWorker.Services;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Microsoft.Extensions.Logging;
using System.Data.Common;
namespace AMQSubscriberWorker
{
public class Worker(ILogger<Worker> logger, IServiceProvider serviceProvider) : BackgroundService
{
private readonly IServiceProvider _serviceProvider = serviceProvider;
private readonly ILogger<Worker> _logger = logger;
private readonly string _brokerUri = "ssl://b-<YOUR-OPENWIRE-AMQ-URI>.amazonaws.com:61617";
private readonly string _username = "<YOUR-AMQ-USERNAME>";
private readonly string _password = "<YOUR-AMQ-USER-PASSWORD>";
private readonly string _queueName = "<YOUR-QUEUE>";
private IConnection _connection;
private Apache.NMS.ISession _session;
private IMessageConsumer _consumer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
var factory = new ConnectionFactory(_brokerUri);
_connection = factory.CreateConnection(_username, _password);
_connection.Start();
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var destination = _session.GetQueue(_queueName);
_consumer = _session.CreateConsumer(destination);
_consumer.Listener += OnMessage;
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Operation was gracefully canceled.");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Critical Exception!");
CloseSession();
}
}
private void CloseSession()
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
private void OnMessage(IMessage message)
{
if (message is ITextMessage textMessage)
{
var text = textMessage.Text;
using var scope = _serviceProvider.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderService>();
var order = orderService.DeserializeOrder(text);
if (order is not null)
{
_logger.LogInformation("Order received: {Name}, at: {Date}", order.Name, order.DateOrdered);
}
}
}
}
}
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.ActiveMQ.Commands;
using Microsoft.Extensions.Logging;
using System.Data.Common;
namespace AMQSubscriberWorker
{
public class Worker(ILogger<Worker> logger) : BackgroundService
{
private readonly ILogger<Worker> _logger = logger;
private readonly string _brokerUri = "ssl://b-<YOUR-OPENWIRE-AMQ-URI>.amazonaws.com:61617";
private readonly string _username = "<YOUR-AMQ-USERNAME>";
private readonly string _password = "<YOUR-AMQ-USER-PASSWORD>";
private readonly string _queueName = "<YOUR-QUEUE>";
private IConnection _connection;
private Apache.NMS.ISession _session;
private IMessageConsumer _consumer;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
var factory = new ConnectionFactory(_brokerUri);
_connection = factory.CreateConnection(_username, _password);
_connection.Start();
_session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
var destination = _session.GetQueue(_queueName);
_consumer = _session.CreateConsumer(destination);
_consumer.Listener += OnMessage;
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Operation was gracefully canceled.");
}
catch (Exception ex)
{
_logger.LogCritical(ex, "Critical Exception!");
CloseSession();
}
}
private void CloseSession()
{
_consumer?.Close();
_session?.Close();
_connection?.Close();
}
private void OnMessage(IMessage message)
{
if (message is ITextMessage textMessage)
{
var text = textMessage.Text;
_logger.LogInformation("Message received: {Text}", text);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment