Last active
November 27, 2019 16:27
-
-
Save ralfw/628d2b3ba93825eecde46b08575915e1 to your computer and use it in GitHub Desktop.
DDC 2019 - Event-Orientation Workshop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Threading; | |
using Newtonsoft.Json; // NuGet package dependency | |
namespace eventorientation | |
{ | |
public interface IEventstore : IDisposable { | |
event Action<Version, long, Event[]> OnRecorded; | |
(Version version, long finalEventNumber) Record(params Event[] events); | |
(Version version, long finalEventNumber) Record(Version expectedVersion, params Event[] events); | |
Version Version(string id); | |
long Length { get; } | |
Event[] Replay(params Type[] eventTypes); | |
Event[] Replay(long firstEventNumber, params Type[] eventTypes); | |
(Version[] versions, Event[] events) ReplayWithVersion(Func<Event,string> mapToVersionId, params Type[] eventTypes); | |
(Version[] versions, Event[] events) ReplayWithVersion(long firstEventNumber, Func<Event,string> mapToVersionId, params Type[] eventTypes); | |
} | |
public abstract class Event { | |
public string EventId { get; set; } | |
protected Event() { EventId = Guid.NewGuid().ToString(); } | |
} | |
public class Version { | |
public string Id { get; } | |
public string Number { get; } | |
internal Version(string id, string number) { | |
if (string.IsNullOrWhiteSpace(id)) throw new InvalidOperationException("Id of version must not be empty/null!"); | |
Id = id; | |
Number = number; | |
} | |
public Version(string id) : this(id, "*") {} | |
} | |
public class FilesInFolderEventstore : IEventstore | |
{ | |
private const string DEFAUL_PATH = "eventstore.db"; | |
public event Action<Version, long, Event[]> OnRecorded = (v, f, e) => { }; | |
private readonly Lock _lock; | |
private readonly FilesInFolderEventRepository _repo; | |
private readonly Versions _vers; | |
public FilesInFolderEventstore() : this(DEFAUL_PATH) {} | |
public FilesInFolderEventstore(string path) { | |
_repo = new FilesInFolderEventRepository(Path.Combine(path, "events")); | |
_vers = new Versions(Path.Combine(path, "versions")); | |
_lock = new Lock(); | |
} | |
public (Version version, long finalEventNumber) Record(params Event[] events) => Record(null, events); | |
public (Version version, long finalEventNumber) Record(Version expectedVersion, params Event[] events) { | |
Version newVersion = null; | |
long finalEventNumber = -1; | |
_lock.TryWrite(() => { | |
newVersion = _vers.Update(expectedVersion); | |
Store_events(); | |
}); | |
OnRecorded(newVersion, finalEventNumber, events); | |
return (newVersion, finalEventNumber); | |
void Store_events() { | |
var n = _repo.Count; | |
events.ToList().ForEach(e => _repo.Store(n++, e)); | |
finalEventNumber = _repo.Count; | |
} | |
} | |
public Version Version(string id) => _vers[id]; | |
public long Length => _repo.Count; | |
public Event[] Replay(params Type[] eventTypes) => Replay(-1, eventTypes); | |
public Event[] Replay(long firstEventNumber, params Type[] eventTypes) | |
=> _lock.TryRead(() => { | |
var allEvents = AllEvents(firstEventNumber); | |
return Filter(allEvents, eventTypes).ToArray(); | |
}); | |
public (Version[] versions, Event[] events) ReplayWithVersion(Func<Event, string> mapToVersionId, params Type[] eventTypes) | |
=> ReplayWithVersion(-1, mapToVersionId, eventTypes); | |
public (Version[] versions, Event[] events) ReplayWithVersion(long firstEventNumber, Func<Event, string> mapToVersionId, params Type[] eventTypes) { | |
Version[] versions = new Version[0]; | |
var events = _lock.TryRead(() => { | |
var allEvents = AllEvents(firstEventNumber); | |
var filteredEvents = Filter(allEvents, eventTypes).ToArray(); | |
versions = Retrieve_versions(filteredEvents).ToArray(); | |
return filteredEvents; | |
}); | |
return (versions, events); | |
IEnumerable<Version> Retrieve_versions(IEnumerable<Event> events) { | |
var idsRetrieved = new HashSet<string>(); | |
foreach (var e in events) { | |
var versionId = mapToVersionId(e); | |
if (string.IsNullOrEmpty(versionId)) continue; | |
if (idsRetrieved.Contains(versionId)) continue; | |
yield return _vers[versionId]; | |
idsRetrieved.Add(versionId); | |
} | |
} | |
} | |
private IEnumerable<Event> AllEvents(long firstEventNumber) { | |
var n = _repo.Count; | |
for (var i = firstEventNumber < 0 ? 0 : firstEventNumber; i < n; i++) | |
yield return _repo.Load(i); | |
} | |
private IEnumerable<Event> Filter(IEnumerable<Event> events, Type[] eventTypes) { | |
if (eventTypes.Length <= 0) return events; | |
var eventTypes_ = new HashSet<Type>(eventTypes); | |
return events.Where(e => eventTypes_.Contains(e.GetType())); | |
} | |
public void Dispose() { | |
_repo.Dispose(); | |
} | |
} | |
internal static class EventSerialization { | |
public static string Serialize(this Event e) { | |
var eventName = e.GetType().AssemblyQualifiedName; | |
var data = JsonConvert.SerializeObject(e); | |
var parts = new[]{eventName, data}; | |
return string.Join("\n", parts); | |
} | |
public static Event Deserialize(this string e) { | |
var lines = e.Split('\n'); | |
var eventName = lines.First(); | |
var data = string.Join("\n", lines.Skip(1)); | |
return (Event)JsonConvert.DeserializeObject(data, Type.GetType(eventName)); | |
} | |
} | |
internal class Versions : IDisposable | |
{ | |
private readonly string _path; | |
public Versions(string path) { | |
_path = path; | |
if (Directory.Exists(_path) is false) | |
Directory.CreateDirectory(_path); | |
} | |
public Version this[string id] { | |
get { | |
var filename = VersionFilename(id); | |
if (File.Exists(filename) is false) | |
return new Version(id); | |
return new Version( | |
id, | |
File.ReadAllText(filename)); | |
} | |
} | |
public Version Update(Version expectedVersion) { | |
if (expectedVersion == null) return null; | |
var filename = VersionFilename(expectedVersion.Id); | |
if (File.Exists(filename) is false) { | |
var newVersion = new Version(expectedVersion.Id, "1"); | |
File.WriteAllText(filename, newVersion.Number); | |
return newVersion; | |
} | |
var currentNumber = File.ReadAllText(filename); | |
if (expectedVersion.Number != "*" && expectedVersion.Number != currentNumber) throw new InvalidOperationException($"Expected version '{expectedVersion.Number}' for {expectedVersion.Id} does not match current '{currentNumber}'!"); | |
currentNumber = (int.Parse(currentNumber) + 1).ToString(); | |
var updatedVersion = new Version(expectedVersion.Id, currentNumber); | |
File.WriteAllText(filename, updatedVersion.Number); | |
return updatedVersion; | |
} | |
private string VersionFilename(string id) => Path.Combine(_path, id + ".txt"); | |
public void Dispose() {} | |
} | |
internal class FilesInFolderEventRepository : IDisposable | |
{ | |
private readonly string _path; | |
public FilesInFolderEventRepository(string path) { | |
_path = path; | |
if (Directory.Exists(_path) is false) | |
Directory.CreateDirectory(_path); | |
} | |
public void Store(long index, Event e) { | |
var text = EventSerialization.Serialize(e); | |
Store(index, text); | |
} | |
private void Store(long index, string text) { | |
if (index < 0) throw new InvalidOperationException("Event index must be >= 0!"); | |
var filepath = FilepathFor(index); | |
if (File.Exists(filepath)) throw new InvalidOperationException($"Event with index {index} has already been stored and cannot be overwritten!"); | |
File.WriteAllText(filepath, text); | |
} | |
public Event Load(long index) { | |
var text = File.ReadAllText(FilepathFor(index)); | |
return EventSerialization.Deserialize(text); | |
} | |
public long Count => Directory.GetFiles(_path).Length; | |
private string FilepathFor(long index) => System.IO.Path.Combine(_path, $"{index:x16}.txt"); | |
public void Dispose() { } | |
} | |
internal class Lock | |
{ | |
private const int LOCK_ACQUISITION_TIMEOUT_MSEC = 5000; | |
private readonly ReaderWriterLock _lock; | |
public Lock() { | |
_lock = new ReaderWriterLock(); | |
} | |
public void TryWrite(Action f) { | |
_lock.AcquireWriterLock(LOCK_ACQUISITION_TIMEOUT_MSEC); | |
try { | |
f(); | |
} | |
finally { | |
_lock.ReleaseWriterLock(); | |
} | |
} | |
public T TryRead<T>(Func<T> f) { | |
_lock.AcquireReaderLock(LOCK_ACQUISITION_TIMEOUT_MSEC); | |
try { | |
return f(); | |
} | |
finally { | |
_lock.ReleaseReaderLock(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace eventorientation | |
{ | |
public interface IMessage {} | |
public interface IIncoming : IMessage {} | |
public interface IOutgoing : IMessage {} | |
public abstract class Request : IIncoming {} | |
public abstract class Response : IOutgoing {} | |
public abstract class Notification : IIncoming, IOutgoing {} | |
public abstract class Command : Request {} | |
public abstract class CommandStatus : Response {} | |
public class Success : CommandStatus {} | |
public class Success<T> : Success { | |
public T Value { get; } | |
public Success(T value) { Value = value; } | |
} | |
public class Failure : CommandStatus { | |
public string Errormessage { get; } | |
public Failure(string errormessage) { Errormessage = errormessage; } | |
} | |
public abstract class Query : Request {} | |
public abstract class QueryResult : Response {} | |
public class NoResponse : Response {} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
namespace eventorientation | |
{ | |
public abstract class MessageModel {} | |
public sealed class EmptyMessageModel : MessageModel {} | |
public delegate (TMessageModel model, Version version) LoadDelegate<in TIncoming, TMessageModel>(IEventstore es, TIncoming message) | |
where TIncoming : IIncoming | |
where TMessageModel : MessageModel; | |
public delegate void UpdateDelegate(IEventstore es, Event[] events, Version version); | |
public delegate TQueryResult | |
HandleQueryDelegate<in TQuery, in TMessageModel, out TQueryResult>(TQuery query, TMessageModel model) | |
where TQuery : Query | |
where TMessageModel : MessageModel | |
where TQueryResult : QueryResult; | |
public delegate (CommandStatus commandStatus, Event[] events, Notification[] notifications) | |
HandleCommandDelegate<in TCommand, in TMessageModel>(TCommand command, TMessageModel model) | |
where TCommand : Command | |
where TMessageModel : MessageModel; | |
public delegate Command[] HandleNotificationDelegate<in TNotification, in TMessageModel>(TNotification notification, TMessageModel model) | |
where TNotification : Notification | |
where TMessageModel : MessageModel; | |
public delegate (Response response, Notification[] notifications) PipelineDelegate(IIncoming message); | |
public interface IQueryPipeline<in TQuery, TMessageModel, out TQueryResult> | |
where TQuery : Query | |
where TMessageModel : MessageModel | |
where TQueryResult : QueryResult | |
{ | |
(TMessageModel model, Version version) Load(IEventstore es, TQuery query); | |
TQueryResult Project(TQuery query, TMessageModel model); | |
void Update(IEventstore es, Event[] events, Version version); | |
} | |
public interface ICommandPipeline<in TCommand, TMessageModel> | |
where TCommand : Command | |
where TMessageModel : MessageModel | |
{ | |
(TMessageModel model, Version version) Load(IEventstore es, TCommand command); | |
(CommandStatus commandStatus, Event[] events, Notification[] notifications) Execute(TCommand command, TMessageModel model); | |
void Update(IEventstore es, Event[] events, Version version); | |
} | |
public class MessagePump | |
{ | |
private readonly IEventstore _es; | |
private readonly Dictionary<Type, PipelineDelegate> _pipelines; | |
private event UpdateDelegate _update; | |
public MessagePump(IEventstore es) { | |
_es = es; | |
_pipelines = new Dictionary<Type, PipelineDelegate>(); | |
} | |
public void RegisterQueryPipeline<TQuery,TMessageModel,TQueryResult>( | |
LoadDelegate<TQuery,TMessageModel> load, | |
HandleQueryDelegate<TQuery,TMessageModel,TQueryResult> process, | |
UpdateDelegate update | |
) | |
where TQuery : Query | |
where TMessageModel : MessageModel | |
where TQueryResult : QueryResult | |
{ | |
_pipelines.Add(typeof(TQuery), msg => { | |
var (model, _) = load(_es, msg as TQuery); | |
var result = process(msg as TQuery, model); | |
return (result, new Notification[0]); | |
}); | |
_update += update; | |
} | |
public void RegisterQueryPipeline<TQuery, TMessageModel, TQueryResult>(IQueryPipeline<TQuery, TMessageModel, TQueryResult> pipeline) | |
where TQuery : Query | |
where TMessageModel : MessageModel | |
where TQueryResult : QueryResult | |
=> RegisterQueryPipeline<TQuery,TMessageModel,TQueryResult>(pipeline.Load, pipeline.Project, pipeline.Update); | |
public void RegisterCommandPipeline<TCommand, TMessageModel>( | |
LoadDelegate<TCommand, TMessageModel> load, | |
HandleCommandDelegate<TCommand, TMessageModel> process, | |
UpdateDelegate update | |
) | |
where TCommand : Command | |
where TMessageModel : MessageModel | |
{ | |
_pipelines.Add(typeof(TCommand), msg => { | |
var (model, version) = load(_es, msg as TCommand); | |
var (commandStatus, events, notifications) = process(msg as TCommand, model); | |
var (newVersion, _) = _es.Record(version, events); | |
if (events.Any()) _update(_es, events, newVersion); | |
return (commandStatus, notifications); | |
}); | |
_update += update; | |
} | |
public void RegisterCommandPipeline<TCommand, TMessageModel>(ICommandPipeline<TCommand, TMessageModel> pipeline) | |
where TCommand : Command | |
where TMessageModel : MessageModel | |
=> RegisterCommandPipeline<TCommand, TMessageModel>(pipeline.Load, pipeline.Execute, pipeline.Update); | |
public void RegisterNotificationPipeline<TNotification, TMessageModel>( | |
LoadDelegate<TNotification, TMessageModel> load, | |
HandleNotificationDelegate<TNotification, TMessageModel> process | |
) | |
where TNotification : Notification | |
where TMessageModel : MessageModel | |
{ | |
_pipelines.Add(typeof(TNotification), msg => { | |
var (model, version) = load(_es, msg as TNotification); | |
var commands = process(msg as TNotification, model); | |
var allNotifications = commands.SelectMany(cmd => Handle(cmd).notifications); | |
return (new NoResponse(), allNotifications.ToArray()); | |
}); | |
} | |
private bool _hasBeenStarted; | |
public void Start() { | |
if (_hasBeenStarted) return; | |
_update(_es, new Event[0], null); | |
_hasBeenStarted = true; | |
} | |
public (Response response, Notification[] notifications) Handle(IIncoming input) { | |
Start(); | |
return _pipelines[input.GetType()](input); | |
} | |
public (TResponse response, Notification[] notifications) Handle<TResponse>(IIncoming input) | |
where TResponse : Response | |
{ | |
var result = Handle(input); | |
return (result.response as TResponse, result.notifications); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment