Created
November 22, 2023 12:57
-
-
Save fcallejon/bf72b3e8599d82e14f090d01852b0c8c to your computer and use it in GitHub Desktop.
F# | A very basic and awful event sourcing (with snapshot) implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System.Threading.Tasks | |
type TaskResult<'T, 'E> = Task<Result<'T, 'E>> | |
module Task = | |
let inline map ([<InlineIfLambda>] f) t = | |
task { | |
let! r = t | |
return f r | |
} | |
module TaskResult = | |
let inline map ([<InlineIfLambda>] f) t = (Task.map << Result.map) f t | |
let inline bind ([<InlineIfLambda>] f) t = | |
task { | |
match! t with | |
| Ok v -> return! f v | |
| Error e -> return Error e | |
} | |
type EventOffset = int64 | |
type GetState<'TId, 'TSstate> = 'TId -> TaskResult<'TSstate, exn> | |
type SaveState<'TState> = 'TState -> TaskResult<unit, exn> | |
type GetPendingEvents<'TId, 'TEvent> = 'TId -> EventOffset -> TaskResult<'TEvent list, exn> | |
type SaveEvents<'TEvent> = 'TEvent list -> TaskResult<unit, exn> | |
[<RequireQualifiedAccess>] | |
module SRTPAggregates = | |
let inline getState aggregate () = ( ^a: (member GetState: unit -> TaskResult<'TSstate, exn>) aggregate) | |
let inline saveState aggregate state = ( ^a: (member SaveState: 'TState -> TaskResult<unit, exn>) aggregate, state) | |
let inline getPendingEvents aggregate state = ( ^a: (member GetPendingEvents: 'TState -> TaskResult<'TEvent list, exn>) aggregate, state) | |
let inline saveEvents aggregate events = ( ^a: (member SaveEvents: 'TEvent list -> TaskResult<unit, exn>) aggregate, events) | |
let inline execute aggregate state cmd = ( ^a: (member Execute: 'TState -> 'TCommand -> 'TEvent list) aggregate, state, cmd) | |
let inline apply aggregate state event = ( ^a: (member Apply: 'TState -> 'TEvent -> 'TState) aggregate, state, event) | |
let inline processAggregate aggregate cmd = | |
let getPending state = | |
let joinEvents pending = | |
execute aggregate state cmd | |
|> List.append pending | |
getPendingEvents aggregate state | |
|> TaskResult.map joinEvents | |
|> TaskResult.map (fun eventsToApply -> state, eventsToApply) | |
let applyAndSaveEvents (state, events) = | |
let newState = List.fold (apply aggregate) state events | |
saveEvents aggregate events | |
|> TaskResult.map (fun _ -> newState) | |
getState aggregate () | |
|> TaskResult.bind getPending | |
|> TaskResult.bind applyAndSaveEvents | |
|> TaskResult.bind (fun e -> saveState aggregate e |> TaskResult.map (fun _ -> e)) | |
[<AbstractClass>] | |
type TypedAggregate<'TId, 'TState, 'TCommand, 'TEvent> (id: 'TId) = | |
abstract member GetState: unit -> TaskResult<'TState, exn> | |
abstract member SaveState: 'TState -> TaskResult<unit, exn> | |
abstract member GetPendingEvents: unit -> TaskResult<'TEvent list, exn> | |
abstract member SaveEvents: 'TEvent list -> TaskResult<unit, exn> | |
abstract member Execute: 'TState -> 'TCommand -> 'TEvent list | |
abstract member Apply: 'TState -> 'TEvent -> 'TState | |
member this.Process cmd = | |
let getPending state = | |
let joinEvents pending = | |
this.Execute state cmd | |
|> List.append pending | |
this.GetPendingEvents () | |
|> TaskResult.map joinEvents | |
|> TaskResult.map (fun eventsToApply -> state, eventsToApply) | |
let applyAndSaveEvents (state, events) = | |
let newState = List.fold this.Apply state events | |
this.SaveEvents events | |
|> TaskResult.map (fun _ -> newState) | |
this.GetState () | |
|> TaskResult.bind getPending | |
|> TaskResult.bind applyAndSaveEvents | |
|> TaskResult.bind (fun e -> this.SaveState e |> TaskResult.map (fun _ -> e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment