-
-
Save tjaskula/70eb32871dbbb8b860138ced3fbed20e to your computer and use it in GitHub Desktop.
Dump of ProtoBuf serialization spike for http://github.com/bartelink/FunDomain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module FunDomain.Persistence.NEventStore.NesGateway | |
open FunDomain.Persistence.Serialization | |
open NEventStore | |
open NEventStore.Persistence | |
open NEventStore.Persistence.Sql.SqlDialects | |
open Microsoft.FSharp.Reflection | |
open System | |
open System.Collections.Generic | |
/// Opaque token yielded by Streamer.read and consumed by Streamer.append | |
type Token = { CommitSequence : int; StreamRevision : int} | |
/// Identifier of a stream in NEventStore | |
type StreamId = { Bucket: string option; StreamId: string} | |
type ProtobufSerializer() = | |
interface NEventStore.Serialization.ISerialize with | |
member x.Deserialize<'T> input = | |
ProtoBuf.Serializer.Deserialize<'T>(input) | |
member x.Serialize<'T>(output, graph) = | |
ProtoBuf.Serializer.Serialize<'T>(output, graph) | |
module ProtoBufAdapter = | |
open ProtoBuf | |
open ProtoBuf.Meta | |
let deserializeUnion<'a> (data:byte[]) = | |
Serializer.Deserialize<'a>(new IO.MemoryStream(data)) | |
let serializeUnion (o:'a) = | |
use stream = new IO.MemoryStream() | |
Serializer.Serialize<'a>(stream, o) | |
stream.ToArray() | |
let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) = | |
let baseType = model.[typeof<'TMessage>] | |
for case in typeof<'TMessage> |> FSharpType.GetUnionCases do | |
let caseType = case.Name |> case.DeclaringType.GetNestedType | |
baseType.AddSubType(1000 + case.Tag, caseType) |> ignore | |
let caseTypeModel = model.[caseType] | |
caseTypeModel.Add("item").UseConstructor <- false | |
baseType.CompileInPlace() | |
let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default | |
/// Wrapper yielded by create* functions with create/append functions matching FunDomain.CommandHandler requirements | |
type StreamerProtobuf private (inner') = | |
// Hoop jumping a la C++ pimpl pattern - if we don't do this, we're foisting an NEventStore package reference on all downstream users | |
let inner : IPersistStreams = unbox inner' | |
let defaultBucket bucketId = defaultArg bucketId "default" | |
let load {Bucket=bucketId; StreamId=streamId} minRevision maxRevision = | |
inner.GetFrom(bucketId |> defaultBucket, streamId, minRevision, maxRevision) | |
let commit = inner.Commit >> ignore | |
let readStream streamId startIndex count = | |
let minRevision,maxRevision = startIndex,startIndex+count-1 | |
async { | |
let commits = | |
load streamId minRevision maxRevision | |
|> Array.ofSeq | |
let events = | |
commits | |
|> Seq.collect (fun ev -> ev.Events) | |
|> Seq.map (fun em -> em.Body |> unbox |> ProtoBufAdapter.deserializeUnion) | |
|> List.ofSeq | |
let tokenOption = | |
if commits.Length = 0 then | |
None | |
else | |
let lastCommit = commits |> Seq.last | |
Some {CommitSequence=lastCommit.CommitSequence; StreamRevision=lastCommit.StreamRevision} | |
return events, tokenOption, None } | |
let appendToStream {Bucket=bucketId; StreamId=streamId} streamMeta token events = | |
let commitId,commitStamp,commitHeaders = streamMeta | |
async { | |
let eventMessages = | |
events |> Seq.map (fun event -> | |
let body = event |> ProtoBufAdapter.serializeUnion |> box | |
EventMessage(Body=body)) | |
let updatedStreamRevision=token |> Option.map (fun token -> token.StreamRevision+1) | |
let updatedCommitSequence=token |> Option.map (fun token -> token.CommitSequence+1) | |
let attempt = | |
CommitAttempt( | |
bucketId |> defaultBucket, streamId, | |
updatedStreamRevision |> defaultArg <| 1, | |
commitId, | |
updatedCommitSequence |> defaultArg <| 1, | |
commitStamp, | |
commitHeaders, | |
eventMessages) | |
commit attempt } | |
static member internal wrap persister = StreamerProtobuf( box persister) | |
member this.read stream = | |
let events,version,_ = | |
readStream stream 0 Int32.MaxValue | |
|> Async.RunSynchronously | |
version,events | |
member this.append stream token events = | |
let commitMetadata() = | |
let commitId = Guid.NewGuid() | |
let commitDateTime = DateTime.UtcNow | |
let commitHeaders = null | |
commitId,commitDateTime,commitHeaders | |
let metadata = commitMetadata() | |
appendToStream stream metadata token events | |
|> Async.RunSynchronously | |
let createFromStoreProtobuf (inner:IStoreEvents) = | |
inner.Advanced |> StreamerProtobuf.wrap | |
let createInMemoryProtobuf () = | |
let serializer = ProtobufSerializer() | |
Wireup.Init() | |
.LogToOutputWindow() | |
.UsingInMemoryPersistence() | |
.UsingCustomSerialization( serializer) | |
.Build() | |
|> createFromStoreProtobuf |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Scenarios | |
open FunUno.UnoGame // Commands, replay, handle | |
open FunUno.UnoGame.Events // Digit | |
open FunDomain // CommandHandler | |
open FunDomain.Persistence.NEventStore.NesGateway // createInMemory, StreamId | |
open Xunit | |
let fullGameActions gameId = [ | |
StartGame { GameId=gameId; PlayerCount=4; TopCard=Digit(3, Red) } | |
PlayCard { GameId=gameId; Player=0; Card=Digit(3, Blue) } | |
PlayCard { GameId=gameId; Player=1; Card=Digit(8, Blue) } | |
PlayCard { GameId=gameId; Player=2; Card=Digit(8, Yellow) } | |
PlayCard { GameId=gameId; Player=3; Card=Digit(4, Yellow) } | |
PlayCard { GameId=gameId; Player=0; Card=Digit(4, Green) } ] | |
let streamId gameId = {Bucket=None; StreamId=gameId |> string} | |
let [<Fact>] ``Can run a full round using NEventStore's InMemoryPersistence Protobuf`` () = | |
let domainHandler = CommandHandler.create replay handle | |
let store = createInMemoryProtobuf() | |
let persistingHandler = domainHandler store.read store.append | |
let gameId = 42 | |
let stream = streamId gameId | |
for action in fullGameActions gameId do | |
printfn "Processing %A against Stream %A" action stream | |
action |> persistingHandler stream | |
let [<Fact>] warmup () = | |
ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Event> () | |
ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Events.Card> () | |
ProtoBufAdapter.registerSerializableDu<FunUno.UnoGame.Events.Color> () | |
``Can run a full round using NEventStore's InMemoryPersistence Protobuf``() | |
``Can run a full round using NEventStore's InMemoryPersistence`` () |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module FunDomain.Tests.ProtobufNetSerialization | |
open ProtoBuf | |
open ProtoBuf.Meta | |
open Swensen.Unquote | |
open Xunit | |
open System.IO | |
open Microsoft.FSharp.Reflection | |
[<ProtoContract; CLIMutable>] | |
type MessageA = { | |
[<ProtoMember(1)>] X: string; | |
[<ProtoMember(2)>] Y: int option; | |
} | |
[<ProtoContract>] | |
[<CLIMutable>] | |
type MessageB = { | |
[<ProtoMember(1)>] A: string; | |
[<ProtoMember(2)>] B: string; | |
} | |
[<ProtoContract>] | |
type Message = | |
| MessageA of MessageA | |
| MessageB of MessageB | |
let serialize msg = | |
use ms = new MemoryStream() | |
Serializer.SerializeWithLengthPrefix(ms, msg, PrefixStyle.Fixed32) | |
ms.ToArray() | |
let deserialize<'TMessage> bytes = | |
use ms = new MemoryStream(buffer=bytes) | |
Serializer.DeserializeWithLengthPrefix<'TMessage>(ms, PrefixStyle.Fixed32) | |
let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) = | |
let baseType = model.[typeof<'TMessage>] | |
for case in typeof<'TMessage> |> FSharpType.GetUnionCases do | |
let caseType = case.Name |> case.DeclaringType.GetNestedType | |
baseType.AddSubType(1000 + case.Tag, caseType) |> ignore | |
let caseTypeModel = model.[caseType] | |
caseTypeModel.Add("item").UseConstructor <- false | |
baseType.CompileInPlace() | |
let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default | |
registerSerializableDu<Message> () | |
let [<Fact>] ``MessageA roundtrips with null`` () = | |
let msg = {X=null; Y=None} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageA roundtrips with Empty`` () = | |
let msg = {X=""; Y=None} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageA roundtrips with Some`` () = | |
let msg = {X="foo"; Y=Some 32} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageA roundtrips with None`` () = | |
let msg = {X="foo"; Y=None} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageB roundtrips`` () = | |
let msg = {A="bar"; B="baz"} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``roundtrip pair``() = | |
let msg1 = MessageA {X="foo"; Y=Some 32} | |
let msg1' = msg1 |> serialize |> deserialize | |
test <@ msg1' = msg1 @> | |
let msg2 = MessageB {A="bar"; B="baz"} | |
let msg2' = msg2 |> serialize |> deserialize | |
test <@ msg2' = msg2 @> | |
let [<Fact>] many() = | |
for _ in 1..1000 do | |
``roundtrip pair``() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module FunDomain.Tests.ProtobufNetSerializationFacts | |
open ProtoBuf | |
open ProtoBuf.Meta | |
open Swensen.Unquote | |
open Xunit | |
open System.IO | |
open Microsoft.FSharp.Reflection | |
[<ProtoContract; CLIMutable>] | |
type MessageA = { | |
[<ProtoMember(1)>] X: string; | |
[<ProtoMember(2)>] Y: int option; | |
} | |
[<ProtoContract>] | |
[<CLIMutable>] | |
type MessageB = { | |
[<ProtoMember(1)>] A: string; | |
[<ProtoMember(2)>] B: string; | |
} | |
[<ProtoContract>] | |
type Message = | |
| MessageA of MessageA | |
| MessageB of MessageB | |
let serialize msg = | |
use ms = new MemoryStream() | |
Serializer.SerializeWithLengthPrefix(ms, msg, PrefixStyle.Fixed32) | |
ms.ToArray() | |
let deserialize<'TMessage> bytes = | |
use ms = new MemoryStream(buffer=bytes) | |
Serializer.DeserializeWithLengthPrefix<'TMessage>(ms, PrefixStyle.Fixed32) | |
let registerSerializableDuInModel<'TMessage> (model:RuntimeTypeModel) = | |
let baseType = model.[typeof<'TMessage>] | |
for case in typeof<'TMessage> |> FSharpType.GetUnionCases do | |
let caseType = case.Name |> case.DeclaringType.GetNestedType | |
baseType.AddSubType(1000 + case.Tag, caseType) |> ignore | |
let caseTypeModel = model.[caseType] | |
caseTypeModel.Add("item").UseConstructor <- false | |
baseType.CompileInPlace() | |
let registerSerializableDu<'TMessage> () = registerSerializableDuInModel<'TMessage> RuntimeTypeModel.Default | |
registerSerializableDu<Message> () | |
let [<Fact>] ``MessageA roundtrips with null`` () = | |
let msg = {X=null; Y=None} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageA roundtrips with Empty`` () = | |
let msg = {X=""; Y=None} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageA roundtrips with Some`` () = | |
let msg = {X="foo"; Y=Some 32} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageA roundtrips with None`` () = | |
let msg = {X="foo"; Y=None} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``MessageB roundtrips`` () = | |
let msg = {A="bar"; B="baz"} | |
let result = serialize msg | |
test <@ msg = deserialize result @> | |
let [<Fact>] ``roundtrip pair``() = | |
let msg1 = MessageA {X="foo"; Y=Some 32} | |
let msg1' = msg1 |> serialize |> deserialize | |
test <@ msg1' = msg1 @> | |
let msg2 = MessageB {A="bar"; B="baz"} | |
let msg2' = msg2 |> serialize |> deserialize | |
test <@ msg2' = msg2 @> | |
let [<Fact>] many() = | |
for _ in 1..1000 do | |
``roundtrip pair``() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace FunUno | |
open ProtoBuf | |
module UnoGame = | |
module Events = | |
type [<ProtoContract; CLIMutable>] GameStartedEvent = { | |
[<ProtoMember(1)>] GameId: int; | |
[<ProtoMember(2)>] PlayerCount:int; | |
[<ProtoMember(3)>] TopCard: Card} | |
and [<ProtoContract; CLIMutable>] CardPlayedEvent = { | |
[<ProtoMember(1)>] GameId: int; | |
[<ProtoMember(2)>] Player:int; | |
[<ProtoMember(3)>] Card: Card} | |
and [<ProtoContract>] Color = | |
| Red | |
| Green | |
| Blue | |
| Yellow | |
and [<ProtoContract>] Card = | |
| Digit of DigitCard | |
| Kickback of KickbackCard | |
[<ProtoContract>] | |
type Event = | |
| GameStarted of Events.GameStartedEvent | |
| CardPlayed of Events.CardPlayedEvent |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment