Created
January 1, 2016 22:09
-
-
Save phatboyg/53c4e173eba91f7f7825 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace SagaLockupRepro | |
{ | |
using System; | |
using System.Threading; | |
using Automatonymous; | |
using MassTransit; | |
using MassTransit.NLogIntegration; | |
using MassTransit.RabbitMqTransport; | |
using MassTransit.Saga; | |
internal class Program | |
{ | |
private static void Main() | |
{ | |
var rand = new Random(); | |
IBusControl sagaBus = Bus.Factory.CreateUsingRabbitMq(c => | |
{ | |
IRabbitMqHost host = c.Host(new Uri("rabbitmq://localhost"), hc => { }); | |
c.UseNLog(); | |
c.UseInMemoryScheduler(); | |
c.ReceiveEndpoint(host, "saga_endpoint", h => | |
{ | |
h.PurgeOnStartup = true; | |
h.StateMachineSaga(new Saga(), new InMemorySagaRepository<SagaState>()); | |
}); | |
}); | |
IBusControl bus = Bus.Factory.CreateUsingRabbitMq(c => | |
{ | |
IRabbitMqHost host = c.Host(new Uri("rabbitmq://localhost"), hc => { }); | |
c.UseNLog(); | |
c.ReceiveEndpoint(host, "server_endpoint", h => | |
{ | |
h.PurgeOnStartup = true; | |
h.Handler<SomeMessage2>(async m => | |
{ | |
Thread.Sleep(rand.Next(200)); | |
m.Respond(new SomeResponse2(m.Message.CorrelationId)); | |
}); | |
h.Handler<SomeMessage3>(async m => | |
{ | |
Thread.Sleep(rand.Next(200)); | |
m.Respond(new SomeResponse3(m.Message.CorrelationId)); | |
}); | |
}); | |
}); | |
sagaBus.Start(); | |
try | |
{ | |
bus.Start(); | |
try | |
{ | |
for (var i = 0; i < 1000; i++) | |
{ | |
sagaBus.Publish(new SomeMessage1(Guid.NewGuid(), i)); | |
Thread.Sleep(15); | |
} | |
Console.WriteLine("Published"); | |
Console.ReadLine(); | |
} | |
finally | |
{ | |
bus.Stop(); | |
} | |
} | |
finally | |
{ | |
sagaBus.Stop(); | |
} | |
} | |
} | |
public class SagaState : | |
SagaStateMachineInstance | |
{ | |
private Guid _correlationId; | |
public SagaState(Guid correlationId) | |
{ | |
_correlationId = correlationId; | |
} | |
public Guid? TimeoutToken { get; set; } | |
public State CurrentState { get; set; } | |
public int Index { get; set; } | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
set { _correlationId = value; } | |
} | |
} | |
public sealed class Saga : | |
MassTransitStateMachine<SagaState> | |
{ | |
public Saga() | |
{ | |
State(() => WaitingForSecondMessage); | |
State(() => WaitingForThirdMessage); | |
Event(() => OnFirstMessage); | |
Event(() => OnSecondMessage); | |
Event(() => OnThirdMessage); | |
Schedule(() => ScheduleTimeout, x => x.TimeoutToken, x => { x.Delay = TimeSpan.FromSeconds(60); }); | |
Initially( | |
When(OnFirstMessage) | |
.Then(context => context.Instance.Index = context.Data.Index) | |
.TransitionTo(WaitingForSecondMessage) | |
.Publish(s => new SomeMessage2(s.Instance.CorrelationId))); | |
During(WaitingForSecondMessage, | |
When(OnSecondMessage) | |
.TransitionTo(WaitingForThirdMessage) | |
.Schedule(ScheduleTimeout, c => new Timeout(c.Instance.CorrelationId)) | |
.Publish(s => new SomeMessage3(s.Instance.CorrelationId))); | |
During(WaitingForThirdMessage, | |
When(ScheduleTimeout.Received) | |
.Then(x => { Console.WriteLine($"Timed out: {x.Instance.Index}"); }).Finalize()); | |
During(WaitingForThirdMessage, | |
When(OnThirdMessage) | |
.Unschedule(ScheduleTimeout) | |
.Finalize()); | |
DuringAny( | |
When(Final.Enter) | |
.Then(x => { Console.WriteLine($"Done: {x.Instance.Index}"); })); | |
SetCompletedWhenFinalized(); | |
} | |
public State WaitingForSecondMessage { get; set; } | |
public State WaitingForThirdMessage { get; set; } | |
public Event<SomeMessage1> OnFirstMessage { get; set; } | |
public Event<SomeResponse2> OnSecondMessage { get; set; } | |
public Event<SomeResponse3> OnThirdMessage { get; set; } | |
public Schedule<SagaState, Timeout> ScheduleTimeout { get; set; } | |
} | |
public class SomeMessage1 : CorrelatedBy<Guid> | |
{ | |
private Guid _correlationId; | |
private int _index; | |
public SomeMessage1(Guid correlationId, int index) | |
{ | |
_index = index; | |
_correlationId = correlationId; | |
} | |
public int Index | |
{ | |
get { return _index; } | |
} | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
} | |
} | |
public class SomeMessage2 : CorrelatedBy<Guid> | |
{ | |
private Guid _correlationId; | |
public SomeMessage2(Guid correlationId) | |
{ | |
_correlationId = correlationId; | |
} | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
} | |
} | |
public class SomeResponse2 : CorrelatedBy<Guid> | |
{ | |
private Guid _correlationId; | |
public SomeResponse2(Guid correlationId) | |
{ | |
_correlationId = correlationId; | |
} | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
} | |
} | |
public class SomeMessage3 : CorrelatedBy<Guid> | |
{ | |
private Guid _correlationId; | |
public SomeMessage3(Guid correlationId) | |
{ | |
_correlationId = correlationId; | |
} | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
} | |
} | |
public class SomeResponse3 : CorrelatedBy<Guid> | |
{ | |
private Guid _correlationId; | |
public SomeResponse3(Guid correlationId) | |
{ | |
_correlationId = correlationId; | |
} | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
} | |
} | |
public class Timeout : CorrelatedBy<Guid> | |
{ | |
private Guid _correlationId; | |
public Timeout(Guid correlationId) | |
{ | |
_correlationId = correlationId; | |
} | |
public Guid CorrelationId | |
{ | |
get { return _correlationId; } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment