Last active
January 26, 2025 14:18
-
-
Save Horusiath/0464167035b5a2d3b9c9d540fdac421b to your computer and use it in GitHub Desktop.
Toy implementation of SWIM protocol in Akkling (Akka.NET F#)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Threading | |
open Akkling | |
open DemoFs | |
[<EntryPoint>] | |
let main argv = | |
let config = """ | |
akka.loglevel = DEBUG | |
""" | |
use sys = System.create "swim-cluster" <| Configuration.parse config | |
let a = spawn sys "node-a" <| props (Swim.membership []) | |
Thread.Sleep(1000) | |
let b = spawn sys "node-b" <| props (Swim.membership [a]) | |
Thread.Sleep(1000) | |
let c = spawn sys "node-c" <| props (Swim.membership [a]) | |
Thread.Sleep(30_000) | |
sys.Stop (untyped b) // stop node B after 30 sec | |
Console.ReadLine() |> ignore | |
0 // return an integer exit code |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module DemoFs.Swim | |
open Akka.Actor | |
open Akka.Util | |
open System | |
open Akkling | |
type Endpoint = IActorRef<SwimProtocol> | |
and SwimProtocol = | |
/// Triggers next ping round on current member. | |
| NextRound | |
/// Ping message send from `replyTo` to another member with `status` piggybacked on. | |
| Ping of status:GossipStatus * replyTo:Endpoint | |
/// If `Ping` or `PingReq` where not replied on time, `PingTimeout` will happen. | |
| PingTimeout of suspect:Member | |
/// Reply to `Ping` request. | |
| PingAck of who:Member * status:GossipStatus | |
/// Indirect ping request my by member, who issued `Ping` but didn't received `PingAck` before timeout. | |
| PingReq of suspect:Member * status:GossipStatus * replyTo:Endpoint | |
/// Message broadcast among all nodes once suspected member didn't respond on `Ping` request on time. | |
| Suspected of Member | |
/// Message that overrides `Suspected` and marks suspected node as alive. | |
| Alive of Member | |
/// Message that confirms that node is dead. It overrides suspected, can be issued manually for | |
/// graceful cluster leave. | |
| Left of Member | |
/// Request issued by node trying to join the cluster, should be responded with `Joined` broadcast | |
/// among all cluster members (including new node). | |
| Join of Member | |
/// If `Join` request was not responded on time, a timeout is issued and next node from contact list | |
/// will receive join request. | |
| JoinTimeout of remaining:Endpoint list | |
/// Positive response for `Join` request broadcast among all of the nodes. | |
| Joined of GossipStatus | |
and Member = { Endpoint: Endpoint } | |
and GossipStatus = Set<Member> | |
type Message<'M> = Endpoint * 'M | |
[<Struct>] | |
type SuspectType = | |
/// On actor which issued original `Ping`. | |
| WaitPingAck of Set<Member> | |
/// On actor which issues `Ping` on behalf of another actor who sent it `PingReq` | |
| WaitPingReqAck | |
/// On actors who received `Suspected` notice. | |
| WaitConfirm | |
type SuspectState = ICancelable * SuspectType | |
type MembershipState = | |
{ Myself: Member | |
Active: Set<Member> | |
Suspects: Map<Member, SuspectState> } | |
static member Create (endpoint: Endpoint) = | |
let myself = { Endpoint = endpoint } | |
{ Myself = myself | |
Active = Set.empty | |
Suspects = Map.empty } | |
let pingTimeout = TimeSpan.FromSeconds 10. | |
let indirectPingTimeout = TimeSpan.FromSeconds 5. | |
let pingInterval = TimeSpan.FromSeconds 30. | |
let joinTimeout = TimeSpan.FromSeconds 30. | |
let suspicionTimeout = TimeSpan.FromSeconds 30. | |
let membership (seeds: Endpoint list) (ctx: Actor<_>) = | |
let state = MembershipState.Create ctx.Self | |
let pick peers = | |
match Set.count peers with | |
| 0 -> None | |
| count -> | |
let idx = ThreadLocalRandom.Current.Next count | |
Some (Seq.item idx peers) | |
let leave peer state = | |
let active = Set.remove peer state.Active | |
let left = Left peer | |
active | |
|> Set.remove state.Myself | |
|> Set.iter (fun peer -> peer.Endpoint <! left) | |
{ state with Suspects = Map.remove peer state.Suspects; Active = active } | |
let merge gossip state = | |
let added = gossip - state.Active |> Set.map (fun peer -> peer.Endpoint.Path.Name) | |
let removed = state.Active - gossip |> Set.map (fun peer -> peer.Endpoint.Path.Name) | |
if not (Set.isEmpty added && Set.isEmpty removed) then | |
logDebugf ctx "Received gossip - added: %O, removed: %O" added removed | |
{ state with Active = state.Active + gossip } | |
let rec ready state = actor { | |
match! ctx.Receive() with | |
| NextRound -> | |
ctx.Schedule pingInterval ctx.Self NextRound |> ignore | |
let suspects = state.Suspects |> Map.toSeq |> Seq.map fst |> Set.ofSeq | |
let others = (Set.remove state.Myself state.Active) - suspects | |
// pick one member at random, other than self and not marked as suspected | |
match pick others with | |
| None -> return! ready state | |
| Some peer -> | |
peer.Endpoint <! Ping(state.Active, ctx.Self) | |
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout peer) | |
let skipList = Set.ofArray [| state.Myself; peer |] | |
return! ready { state with Suspects = Map.add peer (cancel, WaitPingAck skipList) state.Suspects } | |
| Ping (gossip, sender) -> | |
logDebugf ctx "Received PING from '%s'. Sending ACK." sender.Path.Name | |
sender <! PingAck(state.Myself, state.Active) | |
return! ready (merge gossip state) | |
| PingAck (who, gossip) -> | |
logDebugf ctx "Received ACK from '%s'." who.Endpoint.Path.Name | |
let newState = | |
match Map.tryFind who state.Suspects with | |
| None -> state | |
| Some (cancel, status) -> | |
cancel.Cancel() | |
let newState = { merge gossip state with Suspects = Map.remove who state.Suspects } | |
match status with | |
| WaitPingReqAck -> | |
// notify everyone that actor is alive and no longer suspected | |
let msg = Alive who | |
Set.remove newState.Myself newState.Active | |
|> Set.iter (fun peer -> peer.Endpoint <! msg) | |
| _ -> () | |
newState | |
return! ready (merge gossip newState) | |
| PingReq (suspect, gossip, sender) -> | |
logDebugf ctx "Received PING REQ: '%s' -> '%s'." sender.Path.Name suspect.Endpoint.Path.Name | |
let cancel = ctx.Schedule indirectPingTimeout ctx.Self (PingTimeout suspect) | |
suspect.Endpoint <! Ping(state.Active, ctx.Self) | |
return! ready { merge gossip state with Suspects = Map.add suspect (cancel, WaitPingReqAck) state.Suspects } | |
| Suspected suspect when suspect = state.Myself -> | |
(Set.remove state.Myself state.Active) | |
|> Set.iter (fun peer -> peer.Endpoint <! Alive state.Myself) | |
return! ready state | |
| Suspected suspect -> | |
let cancel = ctx.Schedule suspicionTimeout ctx.Self (Alive suspect) | |
match Map.tryFind suspect state.Suspects with | |
| None -> return! ready { state with Suspects = Map.add suspect (cancel, WaitConfirm) state.Suspects } | |
| _ -> return! ready state | |
| Alive peer -> | |
let newState = | |
match Map.tryFind peer state.Suspects with | |
| None -> state | |
| Some (cancel, _) -> | |
cancel.Cancel() | |
{ state with Suspects = Map.remove peer state.Suspects } | |
return! ready newState | |
| PingTimeout suspect -> | |
match Map.tryFind suspect state.Suspects with | |
| None -> return! ready state // duplicate or invalidated | |
| Some (_, WaitPingReqAck) -> | |
logDebugf ctx "REQ ACK timeout. '%s' confirmed dead." suspect.Endpoint.Path.Name | |
return! ready (leave suspect state) | |
| Some (_, WaitPingAck skipList) -> | |
let peers = state.Active - skipList | |
logDebugf ctx "ACK timeout from '%s'. Picking from: %O" suspect.Endpoint.Path.Name (peers |> Set.map (fun p -> p.Endpoint.Path.Name)) | |
match pick peers with | |
| Some other -> // ask someone else to make a ping to confirm suspect is dead or alive | |
other.Endpoint <! PingReq(suspect, state.Active, ctx.Self) | |
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout suspect) | |
return! ready { state with Suspects = Map.add suspect (cancel, WaitPingAck(Set.add other skipList)) state.Suspects } | |
| None -> return! ready (leave suspect state) // no one left to ask, mark suspect as dead | |
| _ -> return Unhandled | |
| Join peer -> | |
logDebugf ctx "Peer '%s' requests to join the cluster." peer.Endpoint.Path.Name | |
let gossip = Set.add peer state.Active | |
let msg = Joined gossip | |
gossip | |
|> Set.remove state.Myself | |
|> Set.iter (fun peer -> peer.Endpoint <! msg) | |
return! ready { state with Active = gossip } | |
| Joined gossip -> | |
return! ready (merge gossip state) | |
| Left peer -> | |
logDebugf ctx "Peer '%s' left the cluster." peer.Endpoint.Path.Name | |
match Map.tryFind peer state.Suspects with | |
| None -> return! ready { state with Active = Set.remove peer state.Active } | |
| Some (cancel, _) -> | |
cancel.Cancel() | |
return! ready { state with Active = Set.remove peer state.Active; Suspects = Map.remove peer state.Suspects } | |
| _ -> return Unhandled | |
} | |
let becomeReady state gossip = | |
logInfof ctx "%A successfully joined SWIM cluster: %A" state.Myself.Endpoint.Path.Name (gossip |> Set.map (fun p -> p.Endpoint.Path.Name)) | |
ctx.Schedule pingInterval ctx.Self NextRound |> ignore | |
ready (merge gossip state) | |
let rec joining state cancel = actor { | |
match! ctx.Receive() with | |
| JoinTimeout [] -> | |
logErrorf ctx "Failed to join any of the members. Shutting down" | |
return Stop | |
| JoinTimeout (next::remaining) -> | |
logDebugf ctx "Failed to join to a member. Try to join %O" next.Path.Name | |
next <! Join state.Myself | |
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining) | |
return! joining state cancel | |
| Joined gossip -> | |
cancel.Cancel() // cancel join timeout | |
return! becomeReady state gossip | |
| Join peer when peer = state.Myself -> // establish new cluster | |
cancel.Cancel() // cancel join timeout | |
return! becomeReady state (Set.singleton state.Myself) | |
| _ -> return Unhandled | |
} | |
match seeds with | |
| [] -> becomeReady state (Set.singleton state.Myself) | |
| seed::remaining -> | |
seed <! Join state.Myself | |
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining) | |
joining state cancel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment