Last active
May 7, 2020 14:45
-
-
Save Horusiath/5972445107c14c9a9ce66cec42f06211 to your computer and use it in GitHub Desktop.
CASPaxos implementation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Toy implementation of CAS Paxos (see: https://github.com/rystsov/caspaxos/blob/master/latex/caspaxos.pdf). | |
module DemoFs.CASPaxos | |
open Akka.Actor | |
open Akkling | |
open System | |
type NodeId = string | |
[<Struct>] | |
type Ballot = | |
{ SeqNr: int64; Id: NodeId; } | |
override this.ToString() = sprintf "(%i:%s)" this.SeqNr this.Id | |
type Protocol<'state> = | |
/// Return current state. Applicable on both proposer and acceptor. | |
| Query | |
/// Request for change send to proposer. Change is a lambda, so it requires a serializer, which is able to serialize | |
/// these. It's expected to be replied with Result<'state,string> which can contain the failure when timeout was reached. | |
/// We use UTC time as deadline (similar to gRPC deadlines), so that we can auto-retry proposal after conflict was detected. | |
/// Ofc. using non-monotonic clock can lead to "funny" results (somewhat solvable with hybrid logical clocks), so | |
/// keep that in mind. | |
| Propose of deadline:DateTime * change:('state -> 'state) | |
/// 1st phase is to send Prepare request to all acceptors. After majority confirms with `PrepareReply`, we send `Accept`. | |
| Prepare of proposal:Ballot * replyTo:Endpoint<'state> | |
/// When state is None, it means accepted value. Otherwise ballot of the acceptor is higher than the one we sent. | |
| PrepareReply of Ballot * 'state option | |
/// Timeout for entire change `Propose` (calculated from deadline). If prepare phase failed (because acceptor had | |
/// higher ballot), we'll retry it if it's possible under deadline. | |
| ProposeTimeout | |
/// 2nd phase - proposer sends `Accept` request to all acceptors. After majority responds with `AcceptReply`, an | |
/// updated state is send to original Propose sender. | |
| Accept of Ballot * 'state * replyTo:Endpoint<'state> | |
| AcceptReply | |
and Endpoint<'s> = IActorRef<Protocol<'s>> | |
type ProposerState<'state> = | |
{ Ballot: Ballot | |
State: 'state | |
Acceptors: Set<Endpoint<'state>> } | |
module ProposerState = | |
let create nodeId state acceptors = { Ballot = { SeqNr = 0L; Id = nodeId }; State = state; Acceptors = acceptors } | |
type AcceptorState<'state> = | |
{ Promise: Ballot option | |
Accepted: Ballot | |
State: 'state } | |
module AcceptorState = | |
let create nodeId state = { Promise = None; Accepted = { SeqNr = 0L; Id = nodeId }; State = state } | |
type Proposal<'state> = | |
{ Change: 'state -> 'state | |
HighestBallot: Ballot | |
HighestState: 'state | |
Client: IActorRef<Result<'state, string>> | |
Pending: Set<Endpoint<'state>> | |
Deadline: DateTime | |
Timeout: ICancelable } | |
let inline isMajority acceptors remaining = | |
if Set.isEmpty acceptors then true | |
else | |
let a = Set.count acceptors | |
let n = Set.count remaining | |
n < (a / 2) + 1 | |
let rec proposer (state: ProposerState<'s>) (ctx: Actor<Protocol<'s>>) = actor { | |
match! ctx.Receive() with | |
| Query -> ctx.Sender() <! state.State; return! proposer state ctx | |
| Propose(deadline, change) when deadline > DateTime.UtcNow -> | |
let ballot = { state.Ballot with SeqNr = state.Ballot.SeqNr + 1L } | |
let msg = Prepare(ballot, ctx.Self) | |
for acceptor in state.Acceptors do acceptor <! msg | |
let cancel = ctx.Schedule (deadline - DateTime.UtcNow) ctx.Self ProposeTimeout | |
let proposal = | |
{ Client = ctx.Sender() | |
HighestBallot = ballot | |
HighestState = state.State | |
Change = change | |
Pending = state.Acceptors | |
Deadline = deadline | |
Timeout = cancel } | |
logDebugf ctx "created proposal %O" ballot | |
return! proposing { state with Ballot = ballot } proposal ctx | |
| Propose _ -> // deadline reached | |
ctx.Sender() <! Error "didn't finished operation before deadline" | |
return! proposer state ctx | |
| _ -> return Unhandled } | |
and private proposing (state: ProposerState<'s>) (proposal: Proposal<'s>) (ctx: Actor<Protocol<'s>>) = actor { | |
match! ctx.Receive() with | |
| Query -> ctx.Sender() <! state.State; return! proposing state proposal ctx | |
| Propose _ -> | |
ctx.Stash() | |
return! proposing state proposal ctx | |
| PrepareReply(ballot, Some data) when ballot > state.Ballot -> | |
logDebugf ctx "prepare - received higher ballot number %O" ballot | |
proposal.Timeout.Cancel() | |
ctx.Self.Tell(Propose(proposal.Deadline, proposal.Change), untyped proposal.Client) | |
ctx.UnstashAll() | |
return! proposer { state with State = data; Ballot = { ballot with Id = state.Ballot.Id } } ctx | |
| PrepareReply(ballot, _) -> | |
let remaining = Set.remove (ctx.Sender()) proposal.Pending | |
if isMajority state.Acceptors remaining then | |
logDebugf ctx "prepare - received confirm from majority of nodes" | |
let modified = proposal.Change state.State | |
let msg = Accept(ballot, modified, ctx.Self) | |
for acceptor in state.Acceptors do acceptor <! msg | |
return! accepting { state with State = modified } { proposal with Pending = state.Acceptors } ctx | |
else | |
logDebugf ctx "prepare - received confirm" | |
return! proposing state { proposal with Pending = remaining } ctx | |
| ProposeTimeout -> | |
proposal.Client <! Error "failed to gather prepare confirmations before the deadline" | |
ctx.UnstashAll() | |
return! proposer state ctx | |
| _ -> return Unhandled } | |
and private accepting (state: ProposerState<'s>) (proposal: Proposal<'s>) (ctx: Actor<Protocol<'s>>) = actor { | |
match! ctx.Receive() with | |
| Query -> ctx.Sender() <! state.State; return! accepting state proposal ctx | |
| Propose _ -> | |
ctx.Stash() | |
return! accepting state proposal ctx | |
| AcceptReply -> | |
let remaining = Set.remove (ctx.Sender()) proposal.Pending | |
if isMajority state.Acceptors remaining then | |
logDebugf ctx "accept - received confirm from majority of nodes" | |
proposal.Client <! Ok state.State | |
ctx.UnstashAll() | |
return! proposer state ctx | |
else | |
logDebugf ctx "accept - received confirm" | |
return! accepting state { proposal with Pending = remaining } ctx | |
| ProposeTimeout -> | |
proposal.Client <! Error "prepare phase completed, but didn't received accept from majority of nodes on time" | |
ctx.UnstashAll() | |
return! proposer state ctx | |
| _ -> return Unhandled } | |
let rec acceptor (state: AcceptorState<'state>) (ctx: Actor<_>) = actor { | |
match! ctx.Receive() with | |
| Query -> ctx.Sender() <! state.State; return! acceptor state ctx | |
| Prepare(ballot, replyTo) -> | |
match state.Promise with | |
| Some promised when promised > ballot -> | |
replyTo <! PrepareReply(state.Accepted, Some state.State) | |
return! acceptor state ctx | |
| _ when state.Accepted > ballot -> | |
replyTo <! PrepareReply(state.Accepted, Some state.State) | |
return! acceptor state ctx | |
| _ -> | |
logDebugf ctx "sending prepare confirmation" | |
replyTo <! PrepareReply(ballot, None) | |
return! acceptor { state with Promise = Some ballot } ctx | |
| Accept(ballot, modified, sender) when ballot > state.Accepted -> | |
match state.Promise with | |
| Some promised when promised > ballot -> | |
//NOTE: this is always the "funniest" part of every 2-phase commit protocol: | |
// what happens when the 2nd phase fails? | |
return Unhandled | |
| _ -> | |
logDebugf ctx "sending accept confirmation" | |
sender <! AcceptReply | |
return! acceptor { state with Accepted = ballot; State = modified; Promise = None } ctx | |
| _ -> return Unhandled } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Program = | |
let main () = | |
let sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG" | |
let acceptors = | |
[|'A'..'E'|] | |
|> Array.map (fun name -> | |
let name = string name | |
let state = AcceptorState.create name Set.empty | |
spawn sys name <| props (acceptor state)) | |
|> Set.ofArray | |
let proposer = | |
let state = ProposerState.create "P" Set.empty acceptors | |
spawn sys "P" <| props (proposer state) | |
async { | |
proposer <! Propose(DateTime.UtcNow.AddSeconds(10.), Set.add "hello") | |
let! state = proposer <? Propose(DateTime.UtcNow.AddSeconds(10.), Set.add "world") | |
printfn "Reply: %A" state | |
Console.ReadLine() |> ignore | |
} |> Async.RunSynchronously | |
0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment