Created
February 10, 2019 21:32
-
-
Save phatboyg/a062e5dda9fc39c7a91c7522dce4cec6 to your computer and use it in GitHub Desktop.
Sample showing Azure Service Bus with Lock Timeout
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<Project Sdk="Microsoft.NET.Sdk"> | |
<PropertyGroup> | |
<OutputType>Exe</OutputType> | |
<TargetFramework>netcoreapp2.2</TargetFramework> | |
</PropertyGroup> | |
<ItemGroup> | |
<PackageReference Include="MassTransit" Version="5.3.1" /> | |
<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="5.3.1" /> | |
<PackageReference Include="MassTransit.Log4Net" Version="5.3.1" /> | |
</ItemGroup> | |
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using System.Text; | |
using System.Threading.Tasks; | |
using GreenPipes; | |
using log4net; | |
using log4net.Config; | |
using MassTransit; | |
using MassTransit.Azure.ServiceBus.Core; | |
using MassTransit.Internals.Extensions; | |
using MassTransit.Log4NetIntegration; | |
using MassTransit.Log4NetIntegration.Logging; | |
namespace ConsoleApp2 | |
{ | |
class Program | |
{ | |
internal static Guid RunId = NewId.NextGuid(); | |
static void Main(string[] args) | |
{ | |
var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg => | |
{ | |
string connectionString = | |
"Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey="; | |
var host = cfg.Host(connectionString, h => { h.OperationTimeout = TimeSpan.FromSeconds(60); }); | |
cfg.UseLog4Net(); | |
cfg.ReceiveEndpoint(host, "SlowConsumerService", e => | |
{ | |
e.LockDuration = TimeSpan.FromSeconds(60); | |
e.MaxAutoRenewDuration = TimeSpan.FromMinutes(30); | |
e.PrefetchCount = 100; | |
e.Consumer<SlowConsumer>(); | |
}); | |
}); | |
bus.Start(); | |
for (int i = 0; i < 25; i++) | |
{ | |
bus.Publish<SlowMessage>(new {Delay = TimeSpan.FromMinutes(i), RunId}); | |
} | |
Console.ReadLine(); | |
bus.Stop(); | |
} | |
static void ConfigureLogger() | |
{ | |
const string logConfig = @"<?xml version=""1.0"" encoding=""utf-8"" ?> | |
<log4net> | |
<root> | |
<level value=""INFO"" /> | |
<appender-ref ref=""console"" /> | |
</root> | |
<appender name=""console"" type=""log4net.Appender.ColoredConsoleAppender""> | |
<layout type=""log4net.Layout.PatternLayout""> | |
<conversionPattern value=""%m%n"" /> | |
</layout> | |
</appender> | |
</log4net>"; | |
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(logConfig))) | |
{ | |
var logRepository = LogManager.GetRepository(System.Reflection.Assembly.GetEntryAssembly()); | |
XmlConfigurator.Configure(logRepository, stream); | |
} | |
Log4NetLogger.Use(); | |
} | |
} | |
public class SlowConsumer : | |
IConsumer<SlowMessage> | |
{ | |
public async Task Consume(ConsumeContext<SlowMessage> context) | |
{ | |
DateTime started = DateTime.UtcNow; | |
if (Program.RunId != context.Message.RunId) | |
return; | |
BrokeredMessageContext messageContext = context.GetPayload<BrokeredMessageContext>(); | |
int limit = (int) (context.Message.Delay.TotalSeconds / 60); | |
for (int i = 0; i < limit; i++) | |
{ | |
await Console.Out.WriteLineAsync( | |
$"Message: (delay: {context.Message.Delay.ToFriendlyString(),8}, locked-until: {messageContext.LockedUntil,24}, queued: {started,24})"); | |
await Task.Delay(TimeSpan.FromMinutes(1)); | |
} | |
await Console.Out.WriteLineAsync( | |
$"M--DONE: (delay: {context.Message.Delay.ToFriendlyString(),8}, locked-until: {messageContext.LockedUntil,24}, queued: {started,24}, duration:{DateTime.UtcNow - started})"); | |
} | |
} | |
public interface SlowMessage | |
{ | |
Guid RunId { get; } | |
TimeSpan Delay { get; } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment