Last active
May 10, 2022 12:07
-
-
Save brantburnett/9a39ae2077bc0203c17068c517564b8f to your computer and use it in GitHub Desktop.
RabbitMQ message bus OpenTelemetry tracing example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections; | |
using System.Collections.Generic; | |
namespace CenterEdge.Common.Events | |
{ | |
/// <summary> | |
/// Collection of properties associated with the event, such as tracing headers. | |
/// </summary> | |
public class EventProperties : IDictionary<string, string?> | |
{ | |
private readonly Dictionary<string, string?> _inner = new(StringComparer.OrdinalIgnoreCase); | |
private IDictionary<string, string?> InnerAsIDictionary => _inner; | |
/// <inheritdoc /> | |
public IEnumerator<KeyValuePair<string, string?>> GetEnumerator() | |
{ | |
return _inner.GetEnumerator(); | |
} | |
IEnumerator IEnumerable.GetEnumerator() | |
{ | |
return ((IEnumerable)_inner).GetEnumerator(); | |
} | |
void ICollection<KeyValuePair<string, string?>>.Add(KeyValuePair<string, string?> item) | |
{ | |
InnerAsIDictionary.Add(item); | |
} | |
/// <inheritdoc /> | |
public void Clear() | |
{ | |
_inner.Clear(); | |
} | |
bool ICollection<KeyValuePair<string, string?>>.Contains(KeyValuePair<string, string?> item) | |
{ | |
return InnerAsIDictionary.Contains(item); | |
} | |
void ICollection<KeyValuePair<string, string?>>.CopyTo(KeyValuePair<string, string?>[] array, int arrayIndex) | |
{ | |
InnerAsIDictionary.CopyTo(array, arrayIndex); | |
} | |
bool ICollection<KeyValuePair<string, string?>>.Remove(KeyValuePair<string, string?> item) | |
{ | |
return InnerAsIDictionary.Remove(item); | |
} | |
/// <inheritdoc /> | |
public int Count => _inner.Count; | |
/// <inheritdoc /> | |
public bool IsReadOnly => InnerAsIDictionary.IsReadOnly; | |
/// <inheritdoc /> | |
public void Add(string key, string? value) | |
{ | |
_inner.Add(key, value); | |
} | |
/// <inheritdoc /> | |
public bool ContainsKey(string key) | |
{ | |
return _inner.ContainsKey(key); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using CenterEdge.Common.Events; | |
using CenterEdge.Common.MessageBus.Events; | |
using Microsoft.Extensions.Logging; | |
using OpenTelemetry; | |
using OpenTelemetry.Context.Propagation; | |
namespace CenterEdge.Common.MessageBus | |
{ | |
/// <summary> | |
/// Abstract base implementation for an <see cref="IEventPublisher"/> which includes standard handling | |
/// like tracing propagation. | |
/// </summary> | |
public abstract class EventPublisherBase : IEventPublisher | |
{ | |
/// <summary> | |
/// Logger. | |
/// </summary> | |
protected ILogger Logger { get; } | |
/// <summary> | |
/// Create a new EventPublisherBase. | |
/// </summary> | |
/// <param name="logger">Logger.</param> | |
/// <exception cref="ArgumentNullException"><paramref name="eventContextProvider"/> or <paramref name="logger"/> is null.</exception> | |
protected EventPublisherBase(ILogger logger) | |
{ | |
Logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |
} | |
/// <inheritdoc /> | |
public async Task PublishEventAsync<T>(T e, CancellationToken cancellationToken = default) where T : class, IServiceEvent | |
{ | |
if (e == null) | |
{ | |
throw new ArgumentNullException(nameof(e)); | |
} | |
if (string.IsNullOrWhiteSpace(e.Topic)) | |
{ | |
throw new ArgumentException("Topic is required", nameof(e)); | |
} | |
try | |
{ | |
e.Properties ??= new EventProperties(); | |
using (var activity = MessageBusActivitySources.PublishActivitySource.StartActivity($"{typeof(T).FullName} send", ActivityKind.Producer)) | |
{ | |
// Depending on Sampling (and whether a listener is registered or not), the activity above may not be created. | |
// If it is created, then propagate its context. If it is not created, the propagate the Current context, if any. | |
ActivityContext contextToInject = activity?.Context ?? Activity.Current?.Context ?? default; | |
// Inject the activity context into the message properties to propagate trace context | |
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(contextToInject, Baggage.Current), e.Properties, InjectTraceContext); | |
await PublishEventCoreAsync(e, activity, cancellationToken).ConfigureAwait(false); | |
} | |
} | |
catch (Exception ex) | |
{ | |
var message = $"Error publishing event '{e.GetType()}' to message bus"; | |
Logger.LogError(ex, message); | |
throw new MessageBusPublishException(message, ex); | |
} | |
} | |
/// <summary> | |
/// Core implementation for publishing an event. This is called by <see cref="PublishEventAsync{T}"/> after | |
/// contract validation, tracing propagation, etc. | |
/// </summary> | |
/// <typeparam name="T">The type of event, must implement from <see cref="IServiceEvent"/>.</typeparam> | |
/// <param name="e">The event.</param> | |
/// <param name="activity">Tracing activity.</param> | |
/// <param name="cancellationToken">Cancellation token.</param> | |
protected abstract Task PublishEventCoreAsync<T>(T e, Activity? activity, CancellationToken cancellationToken = default) where T : class, IServiceEvent; | |
// PublishEventCoreAsync would have RabbitMQ specifics in its implementation, | |
// including a call to RabbitMQHelpers.AddMessageTags to fill in standardized tags on the activity | |
private static void InjectTraceContext(EventProperties properties, string key, string value) | |
{ | |
properties[key] = value; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using CenterEdge.Common.Events; | |
using CenterEdge.Common.MessageBus.Events; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Logging; | |
using OpenTelemetry; | |
using OpenTelemetry.Context.Propagation; | |
namespace CenterEdge.Common.MessageBus | |
{ | |
/// <summary> | |
/// Base implementation for <see cref="IEventSubscriber" /> which includes standard handling like tracing propagation. | |
/// </summary> | |
/// <typeparam name="T">Type of event, must implement <see cref="IServiceEvent"/>.</typeparam> | |
public abstract class EventSubscriberBase<T> : IEventSubscriber | |
where T : class, IServiceEvent | |
{ | |
private readonly EventDelegate _eventHandler; | |
private readonly IServiceProvider _serviceProvider; | |
private readonly string _activityName = $"{typeof(T).FullName} receive"; | |
/// <summary> | |
/// Creates a new EventSubscriberBase. | |
/// </summary> | |
/// <param name="eventHandler">Delegate that will process the event.</param> | |
/// <param name="serviceProvider">Service provider for supporting services.</param> | |
/// <exception cref="ArgumentNullException"><paramref name="eventHandler" /> or <paramref name="serviceProvider" /> is null.</exception> | |
protected EventSubscriberBase(EventDelegate eventHandler, IServiceProvider serviceProvider) | |
{ | |
_eventHandler = eventHandler ?? throw new ArgumentNullException(nameof(eventHandler)); | |
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); | |
} | |
/// <summary> | |
/// Handles an event received from the bus, called by the inherited class after it applies its processing. | |
/// </summary> | |
/// <param name="message">The event to handle.</param> | |
/// <param name="internalMessage">The internal message object received from the bus.</param> | |
protected virtual async Task HandleEvent(T message, object? internalMessage) | |
{ | |
var parentContext = | |
Propagators.DefaultTextMapPropagator.Extract(default, message.Properties, ExtractTraceContext); | |
Baggage.Current = parentContext.Baggage; | |
using (var activity = MessageBusActivitySources.ReceiveActivitySource.StartActivity(_activityName, ActivityKind.Consumer, | |
parentContext.ActivityContext)) | |
{ | |
AddActivityTags(activity, message); | |
using (var scope = _serviceProvider.CreateScope()) | |
{ | |
var context = new DefaultEventContext<T>(scope.ServiceProvider, message, internalMessage); | |
var eventContextProvider = scope.ServiceProvider.GetRequiredService<IEventContextProvider>(); | |
eventContextProvider.CurrentEvent = context; | |
await _eventHandler(context).ConfigureAwait(false); | |
} | |
} | |
} | |
/// <summary> | |
/// Add any provider-specific tags to the trace activity. | |
/// </summary> | |
/// <param name="activity">The activity to update. May be null.</param> | |
/// <param name="message">The message received from the bus.</param> | |
protected virtual void AddActivityTags(Activity? activity, T message) | |
{ | |
// For the RabbitMQ implementation of this abstract, it would call | |
// RabbitMQHelpers.AddMessageTags to fill in standardized tags on the activity | |
} | |
/// <inheritdoc /> | |
public abstract void Dispose(); | |
private static IEnumerable<string?> ExtractTraceContext(EventProperties? properties, string key) | |
{ | |
if (properties != null && properties.TryGetValue(key, out var value)) | |
{ | |
return new[] { value }; | |
} | |
return Enumerable.Empty<string?>(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
namespace CenterEdge.Common.Events | |
{ | |
/// <summary> | |
/// Base interface for service events. | |
/// </summary> | |
public interface IServiceEvent | |
{ | |
/// <summary> | |
/// Topic of this event. | |
/// </summary> | |
string Topic { get; set; } | |
/// <summary> | |
/// Properties related to the event, such as tracing headers. | |
/// </summary> | |
EventProperties? Properties { get; set; } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
namespace CenterEdge.Common.MessageBus | |
{ | |
/// <summary> | |
/// Activity sources related to tracing for the message bus. | |
/// </summary> | |
public static class MessageBusActivitySources | |
{ | |
/// <summary> | |
/// Name of the activity source for message publishing. | |
/// </summary> | |
public const string PublishActivitySourceName = "CenterEdge.Common.MessageBus.Publish"; | |
/// <summary> | |
/// Name of the activity source for message receiving. | |
/// </summary> | |
public const string ReceiveActivitySourceName = "CenterEdge.Common.MessageBus.Receive"; | |
internal static readonly ActivitySource PublishActivitySource = new(PublishActivitySourceName, "1.0.0"); | |
internal static readonly ActivitySource ReceiveActivitySource = new(ReceiveActivitySourceName, "1.0.0"); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using OpenTelemetry.Trace; | |
namespace CenterEdge.Common.MessageBus | |
{ | |
/// <summary> | |
/// Extensions for <see cref="TracerProviderBuilder" />. | |
/// </summary> | |
public static class MessageBusTracerProviderBuilderExtensions | |
{ | |
/// <summary> | |
/// Add instrumentation for the message bus. | |
/// </summary> | |
/// <param name="builder">The <see cref="TracerProviderBuilder" />.</param> | |
/// <returns>The <see cref="TracerProviderBuilder" />.</returns> | |
public static TracerProviderBuilder AddMessageBusInstrumentation(this TracerProviderBuilder builder) | |
{ | |
// ReSharper disable once SuspiciousTypeConversion.Global | |
if (builder is IDeferredTracerProviderBuilder deferredTracerProviderBuilder) | |
{ | |
return deferredTracerProviderBuilder.Configure(static (_, deferredBuilder) => | |
{ | |
deferredBuilder.AddSource(MessageBusActivitySources.PublishActivitySourceName, MessageBusActivitySources.ReceiveActivitySourceName); | |
}); | |
} | |
return builder.AddSource(MessageBusActivitySources.PublishActivitySourceName, MessageBusActivitySources.ReceiveActivitySourceName); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Diagnostics; | |
using CenterEdge.Common.Events; | |
using EasyNetQ; | |
namespace CenterEdge.Common.MessageBus.RabbitMQ.Events.Internal | |
{ | |
internal static class RabbitMQHelpers | |
{ | |
public static void AddMessageTags(Activity? activity, IBus bus, IServiceEvent message) | |
{ | |
if (activity != null) | |
{ | |
// These tags are added demonstrating the semantic conventions of the OpenTelemetry messaging specification | |
// See: | |
// * https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#messaging-attributes | |
// * https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#rabbitmq | |
activity.SetTag("messaging.system", "rabbitmq"); | |
activity.SetTag("messaging.destination_kind", "queue"); | |
activity.SetTag("messaging.destination", bus.Advanced.Conventions.ExchangeNamingConvention(message.GetType())); | |
activity.SetTag("messaging.rabbitmq.routing_key", message.Topic); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment