Last active
September 1, 2016 13:13
-
-
Save mavnn/57e39ce97c7de1aa3f24da77478412fc to your computer and use it in GitHub Desktop.
Hopac supervisor using proc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env fsharpi --exec | |
#I "packages/Hopac/lib/net45" | |
#r "Hopac.Core.dll" | |
#r "Hopac.dll" | |
#r "Hopac.Platform.dll" | |
open System | |
open Hopac | |
open Hopac.Infixes | |
type Policy = | |
| Restart | |
| Terminate | |
| Delayed of TimeSpan | |
type MinionInfo = | |
{ | |
name : string | |
policy : Policy | |
job : Ch<IVar<unit>> -> (obj -> Job<unit>) -> obj option -> Job<unit> | |
} | |
type MinionState = | |
{ | |
info : MinionInfo | |
state : obj option | |
shutdown : Ch<IVar<unit>> | |
} | |
type JobId = private JobId of int | |
type private SupervisorState = | |
{ | |
ident : int | |
minions : Map<JobId, MinionState> | |
processes : Map<JobId, Alt<JobId>> | |
delayed : Map<string, Alt<string * (SupervisorState -> Job<SupervisorState>)>> | |
} | |
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] | |
module private SupervisorState = | |
let removeMinion jobId state = | |
{ state with | |
processes = Map.remove jobId state.processes | |
minions = Map.remove jobId state.minions } | |
let addMinion jobId minionState (p : Proc) will state = | |
{ state with | |
ident = state.ident + 1 | |
processes = | |
Map.add jobId (p ^-> (fun () -> jobId)) state.processes | |
minions = | |
Map.add jobId minionState state.minions } | |
let addDelayed name promise state = | |
{ state with | |
delayed = Map.add name promise state.delayed } | |
let removeDelayed name state = | |
{ state with | |
delayed = Map.remove name state.delayed } | |
let updateWill jobId will state = | |
{ state with | |
minions = Map.map (fun k v -> if k = jobId then { v with state = Some will } else v) state.minions } | |
let jobNames state = | |
let running = | |
state.minions | |
|> Map.toList | |
|> List.map (fun (_, { info = { name = n } }) -> n) | |
let delayed = | |
state.delayed | |
|> Map.toList | |
|> List.map fst | |
List.concat [running;delayed] | |
let jobState state name = | |
state.minions | |
|> Map.toSeq | |
|> Seq.tryFind (fun (jobId, minionState) -> minionState.info.name = name) | |
type Supervisor = | |
{ | |
shutdown : Ch<IVar<unit>> | |
register : Ch<MinionInfo> | |
unregister : Ch<string> | |
} | |
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>] | |
module Supervisor = | |
let create logger = | |
let shutdownCh = Ch() | |
let registerCh = Ch() | |
let unregisterCh = Ch() | |
let lastWillCh = Ch() | |
let startMinion minionInfo will state = | |
logger <| sprintf "Starting minion %A" minionInfo.name | |
if SupervisorState.jobNames state |> List.contains minionInfo.name then | |
logger <| sprintf "New minion not started; %A already supervised" minionInfo.name | |
Job.result state | |
else | |
let jobId = JobId state.ident | |
let minionShutdown = Ch() | |
let minionState = { info = minionInfo; state = will; shutdown = minionShutdown } | |
Proc.start (minionInfo.job minionShutdown (fun o -> Ch.send lastWillCh (jobId, o)) will) | |
>>- fun p -> | |
logger <| sprintf "Minion %A (%A) started" minionInfo.name jobId | |
SupervisorState.addMinion jobId minionState p will state | |
let unregisterMinion name state = | |
logger <| sprintf "Unregistering %A started" name | |
match SupervisorState.jobState state name with | |
| Some (jobId, minionState) -> | |
logger <| sprintf "Shutting down %A (%A)" name jobId | |
minionState.shutdown *<-=>- id | |
>>-. SupervisorState.removeMinion jobId state | |
| None -> | |
logger <| sprintf "Received request to unregister unknown job %A" name | |
Job.result state | |
let handleTermination state jobId = | |
let minionState = Map.find jobId state.minions | |
match minionState.info.policy with | |
| Terminate -> | |
logger <| sprintf "%A (%A) terminated; removing from supervision" minionState.info.name jobId | |
SupervisorState.removeMinion jobId state | |
|> Job.result | |
| Restart -> | |
logger <| sprintf "%A (%A) terminated; restarting" minionState.info.name jobId | |
SupervisorState.removeMinion jobId state | |
|> startMinion minionState.info minionState.state | |
| Delayed delay -> | |
logger <| sprintf "%A (%A) terminated; restarting in %A" minionState.info.name jobId delay | |
let promise = | |
timeOut delay | |
>>-. (minionState.info.name, startMinion minionState.info minionState.state) | |
|> memo | |
state | |
|> SupervisorState.removeMinion jobId | |
|> SupervisorState.addDelayed minionState.info.name (Promise.read promise) | |
|> Job.result | |
let replaceLastWill state (jobId, will) = | |
logger <| sprintf "(%A) sent new will" jobId | |
SupervisorState.updateWill jobId will state | |
|> Job.result | |
let rec loop state = | |
// Sanity check; should fail hard if these fail... | |
// These should be removed after testing. | |
let processIds = state.processes |> Map.toList |> List.map fst | |
if processIds <> (state.minions |> Map.toList |> List.map fst) then | |
failwithf "Unmatched process and minion maps\nprocesses: %A\n minions: %A" | |
processIds | |
(state.minions |> Map.toList |> List.map fst) | |
if processIds <> List.distinct processIds then | |
failwithf "duplicate process ids?" | |
let minionNames = state.minions |> Map.toList |> List.map snd |> List.map (fun ms -> ms.info.name) | |
if minionNames <> List.distinct minionNames then | |
failwithf "duplicate minion names detected\n%A" minionNames | |
Alt.choose [ | |
// shutdown | |
shutdownCh ^=> | |
fun ack -> | |
let shutdownMinion (minionState : MinionState) = | |
minionState.shutdown *<-=>- id | |
let shutdownAll = | |
Job.seqIgnore (state.minions |> Map.toSeq |> Seq.map (snd >> shutdownMinion)) | |
|> Job.map (fun () -> logger "All minions shutdown") | |
|> memo | |
logger "Shutting down minions!" | |
Alt.choose [ | |
shutdownAll |> Promise.read | |
timeOutMillis 1000 |> Alt.afterFun (fun () -> logger "Minion shutdown timed out without all minions shutting down cleanly") | |
] >>=. ack *<= () | |
// anything else will create a new state and then recurse into the loop | |
Alt.choose [ | |
// process delayed restarts | |
state.delayed | |
|> Map.toSeq | |
|> Seq.map snd | |
|> Alt.choose | |
|> Alt.afterJob | |
(fun (delayName, restart) -> | |
state | |
|> SupervisorState.removeDelayed delayName | |
|> restart) | |
// register new minion | |
registerCh ^=> | |
fun minionInfo -> | |
startMinion minionInfo None state | |
// unregister minion | |
unregisterCh ^=> | |
fun name -> unregisterMinion name state | |
// new last will | |
lastWillCh ^=> replaceLastWill state | |
// handle termination | |
state.processes | |
|> Map.toSeq | |
|> Seq.map snd | |
|> Alt.choose | |
|> Alt.afterJob (fun jid -> handleTermination state jid) | |
] |> Alt.afterJob loop | |
] | |
loop { ident = 0; minions = Map.empty; processes = Map.empty; delayed = Map.empty } | |
|> start | |
{ | |
shutdown = shutdownCh | |
register = registerCh | |
unregister = unregisterCh | |
} | |
// interactive testing... | |
let testMinion name failIf shutdown (sendWill : obj -> Job<unit>) (lastWill : obj option) : Job<unit> = | |
let rec loop state = | |
Alt.choose [ | |
shutdown ^=> fun ack -> ack *<= () | |
timeOutMillis 500 | |
|> Alt.afterFun (fun () -> if failIf state then failwith "boom" else printfn "teeeeeeeest minion! [%s - %d]" name state) | |
|> Alt.afterJob (fun () -> sendWill <| state + 1) | |
|> Alt.afterJob (fun () -> loop <| state + 1) | |
] :> Job<unit> | |
match lastWill with | |
| None -> loop 0 | |
| Some will -> | |
match will with | |
| :? int -> loop (will :?> int) | |
| _ -> loop 0 | |
let sup = Supervisor.create (printfn "%s") | |
let rand = System.Random() | |
let test1 = { name = "test1"; policy = Restart; job = testMinion "test1" (fun _ -> false) } | |
let test2 = { name = "delayedTest"; policy = Delayed <| TimeSpan.FromSeconds 1.; job = testMinion "delayedTest" (fun _ -> rand.Next(0, 2) = 1) } | |
sup.register *<+ test1 |> start | |
sup.register *<+ test2 |> start | |
sup.unregister *<- "test1" |> run | |
sup.unregister *<- "delayedTest" |> run | |
sup.shutdown *<-=>- id |> run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
Reason
type would be nice - in particular whether the job just finished or whether it Raised an exception. I agree with you that it should not give access to the value returned though.