Skip to content

Instantly share code, notes, and snippets.

@pfedotovsky
Created July 17, 2018 13:05
Show Gist options
  • Save pfedotovsky/3d9917d878e26fcf1079052148b6ddac to your computer and use it in GitHub Desktop.
Save pfedotovsky/3d9917d878e26fcf1079052148b6ddac to your computer and use it in GitHub Desktop.
MassTransit extension to support messages from other systems
namespace RabbitMqClient
{
using System;
using System.IO;
using System.Net.Mime;
using System.Runtime.Serialization;
using System.Text;
using GreenPipes;
using MassTransit;
using MassTransit.Serialization;
using MassTransit.Util;
using Newtonsoft.Json;
internal sealed class PlainJsonDeserializer : IMessageDeserializer
{
private readonly JsonSerializer deserializer;
private readonly IObjectTypeDeserializer objectTypeDeserializer;
private readonly Type messageType;
public PlainJsonDeserializer(Type messageType)
{
this.deserializer = new JsonSerializer();
this.objectTypeDeserializer = new ObjectTypeDeserializer(this.deserializer);
this.messageType = messageType;
}
public ContentType ContentType => new ContentType("application/json");
public void Probe(ProbeContext context)
{
}
public ConsumeContext Deserialize(ReceiveContext receiveContext)
{
try
{
var messageEncoding = this.GetMessageEncoding(receiveContext);
object message;
using (var bodyStream = receiveContext.GetBodyStream())
using (var reader = new StreamReader(bodyStream, messageEncoding, false, 1024, true))
using (var jsonReader = new JsonTextReader(reader))
{
message = this.deserializer.Deserialize(jsonReader);
}
var messageUrn = new MessageUrn(this.messageType).ToString();
return new JsonConsumeContext(
this.deserializer,
this.objectTypeDeserializer,
receiveContext,
new PlainMessageEnvelope(message) { MessageType = new[] { messageUrn } });
}
catch (JsonSerializationException ex)
{
throw new SerializationException(
"A JSON serialization exception occurred while deserializing the message envelope",
ex);
}
catch (SerializationException)
{
throw;
}
catch (Exception ex)
{
throw new SerializationException("An exception occurred while deserializing the message envelope", ex);
}
}
private Encoding GetMessageEncoding(ReceiveContext receiveContext)
{
var contentEncoding = receiveContext.TransportHeaders.Get("Content-Encoding", default(string));
return string.IsNullOrWhiteSpace(contentEncoding) ? Encoding.UTF8 : Encoding.GetEncoding(contentEncoding);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment