Created
July 12, 2015 14:39
-
-
Save edwinb/8374789251a45dc3d781 to your computer and use it in GitHub Desktop.
Mucking about with processes again
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Process | |
import System.Concurrency.Raw | |
import Data.List | |
import System | |
-- Process IDs are parameterised by their interface. A request of type | |
-- 'iface t' has a response of type 't' | |
abstract | |
data ProcID : (iface : Type -> Type) -> Type where | |
MkPID : Ptr -> ProcID iface | |
data ServerID : Type where | |
MkServer : ProcID iface -> ServerID | |
data ReqPending : Type where | |
None : ReqPending | |
One : (ProcID req, Type) -> ReqPending | |
-- Current state of a process includes: | |
-- * whether a response to a request is pending | |
-- * the servers it currently has an open connection to | |
-- * the number of clients it currently has connected | |
-- Therefore, we can write process types which make clear that a process | |
-- cannot quit while it is talking to a server or while it still has clients | |
-- expecting to communicate with it | |
data ProcState : Type where | |
MkProcState : ReqPending -> | |
(servers : List ServerID) -> | |
(clients : Nat) -> | |
ProcState | |
{-- Some useful predicates on process state --} | |
data Pending : ProcState -> ProcID req -> Type -> Type where | |
IsPending : Pending (MkProcState (One (p, t)) s c) p t | |
data NoPending : ProcState -> Type where | |
IsNoPending : NoPending (MkProcState None s c) | |
data Connected : ServerID -> ProcState -> Type where | |
IsConnected : Elem p servers -> Connected p (MkProcState r servers c) | |
data NoClient : ProcState -> Type where | |
IsNoClient : NoClient (MkProcState r servers 0) | |
data OneClient : ProcState -> Type where | |
IsOneClient : OneClient (MkProcState r servers (S k)) | |
{-- Some useful operations on process state --} | |
newClient : ProcState -> ProcState | |
newClient (MkProcState x servers clients) | |
= MkProcState x servers (S clients) | |
setClients : ProcState -> Nat -> ProcState | |
setClients (MkProcState x servers clients) k | |
= MkProcState x servers k | |
newServer : ProcID iface -> ProcState -> ProcState | |
newServer p (MkProcState x servers clients) | |
= MkProcState x (MkServer p :: servers) clients | |
dropServer : (r : ProcID iface) -> (p : ProcState) -> | |
Connected (MkServer r) p -> ProcState | |
dropServer r (MkProcState x servers c) (IsConnected prf) | |
= MkProcState x (dropElem servers prf) c | |
pendingReq : ProcID iface -> iface t -> (p : ProcState) -> NoPending p -> ProcState | |
pendingReq {t} p x (MkProcState None s c) IsNoPending | |
= MkProcState (One (p, t)) s c | |
doneReq : (r : ProcID iface) -> (p : ProcState) -> Pending p r ty -> ProcState | |
doneReq r (MkProcState (One (r, t)) s c) IsPending | |
= MkProcState None s c | |
runningServer : Nat -> ProcState | |
runningServer c = MkProcState None [] (S c) | |
doneServer : ProcState | |
doneServer = MkProcState None [] 0 | |
init : List ServerID -> ProcState | |
init s = MkProcState None s 0 | |
{-- Processes themselves. | |
A process returns some time 'a', responds to requests on the interface | |
'iface', and has an input and output state. | |
--} | |
data Process : (a : Type) -> (iface : Type -> Type) -> | |
ProcState -> (a -> ProcState) -> | |
Type where | |
-- Some plumbing | |
Lift' : IO a -> Process a iface p (const p) | |
Pure : a -> Process a iface p (const p) | |
bind : Process a iface p p' -> | |
((x : a) -> Process b iface (p' x) p'') -> | |
Process b iface p p'' | |
Fork : Process () serveri (runningServer 1) (const doneServer) -> | |
Process (ProcID serveri) iface p (\res => (newServer res p)) | |
Request : (r : ProcID serveri) -> (x : serveri t) -> | |
{auto connected : Connected (MkServer r) p} -> | |
{auto nopend : NoPending p} -> | |
Process () iface p (const (pendingReq r x p nopend)) | |
GetReply : (r : ProcID serveri) -> | |
{auto pend : Pending p r ty} -> | |
Process ty iface p (const (doneReq r p pend)) | |
TimeoutRespond : (timeout : Int) -> | |
(def : res) -> | |
({t : Type} -> (x : iface t) -> | |
Process (t, res) iface p (const p)) -> | |
Process res iface p (const p) | |
Respond : ({t : Type} -> (x : iface t) -> | |
Process (t, res) iface p (const p)) -> | |
Process res iface p (const p) | |
Connect : (r : ProcID serveri) -> | |
Process Bool iface p (\ok => case ok of | |
True => newServer r p | |
False => p) | |
Disconnect : (r : ProcID serveri) -> | |
{auto connected : Connected (MkServer r) p} -> | |
Process () iface p (const (dropServer r p connected)) | |
CountClients : Process Nat iface p (\n => setClients p n) | |
Loop : Inf (Process () iface p p') -> Process () iface p p' | |
-- 'Running a iface' is the type of a process which is currently | |
-- responding to requests (i.e. knows it has at least one client connected) | |
-- and will not exit unless there are no clients connected | |
Running : Type -> (iface : Type -> Type) -> Type | |
Running a iface = {k : Nat} -> Process a iface (runningServer k) (const doneServer) | |
-- 'Program a' is the type of a process which does not respond to any requests | |
-- and beings and ends with no connections to any server open. | |
Program : Type -> Type | |
Program a = {s : List ServerID} -> Process a (const Void) (init s) (const (init s)) | |
implicit | |
Lift : IO a -> Process a iface p (const p) | |
Lift = Lift' | |
%no_implicit -- helps error messages, and speeds things up a bit | |
(>>=) : Process a iface p p' -> | |
((x : a) -> Process b iface (p' x) p'') -> | |
Process b iface p p'' | |
(>>=) = bind | |
{--- evaluator --} | |
-- The evaluator keeps track of the number of client connections open, | |
-- and manages Connect/Disconnect requests by managing them whenever a | |
-- 'Response' or 'TimeoutResponse' is encountered. | |
data Message : (Type -> Type) -> Type where | |
ConnectMsg : Message iface | |
CloseMsg : Message iface | |
RequestMsg : iface t -> Message iface | |
readMsg : IO (Maybe (Ptr, Message iface)) | |
readMsg {iface} = | |
do if !checkMsgs | |
then do (pid, msg) <- getMsgWithSender {a = Message iface} | |
return (Just (pid, msg)) | |
else return Nothing | |
readMsgTimeout : Int -> IO (Maybe (Ptr, Message iface)) | |
readMsgTimeout {iface} i = | |
do if !(checkMsgsTimeout i) | |
then do (pid, msg) <- getMsgWithSender {a = Message iface} | |
return (Just (pid, msg)) | |
else return Nothing | |
eval : (clients : Nat) -> Process t iface p p' -> IO (t, Nat) | |
eval clients (Lift' x) = do x' <- x | |
return (x', clients) | |
eval clients (Pure x) = return (x, clients) | |
eval clients (bind x f) = do (x', clients') <- eval clients x | |
eval clients' (f x') | |
eval clients (Fork proc) = do ptr <- fork (do _ <- eval 1 proc | |
return ()) | |
return (MkPID ptr, clients) | |
eval clients (Request (MkPID pid) x) = do sendToThread pid (RequestMsg x) | |
return ((), clients) | |
eval clients (GetReply (MkPID pid)) = do res <- getMsgFrom pid | |
return (res, clients) | |
eval {iface} clients (Respond f) = do | |
msg <- readMsg {iface} | |
case msg of | |
Nothing => eval clients (Respond f) -- readMsg blocks... | |
Just (sender, ConnectMsg) => | |
eval (clients + 1) (Respond f) | |
Just (sender, CloseMsg) => | |
eval (clients - 1) (Respond f) | |
Just (sender, RequestMsg {t} m) => | |
do ((resp, val), clients') <- eval clients (f m) | |
sendToThread sender resp | |
return (val, clients') | |
eval {iface} clients (TimeoutRespond i def f) = do | |
msg <- readMsgTimeout {iface} i | |
case msg of | |
Nothing => return (def, clients) | |
Just (sender, ConnectMsg) => | |
eval (clients + 1) (TimeoutRespond i def f) | |
Just (sender, CloseMsg) => | |
eval (clients - 1) (TimeoutRespond i def f) | |
Just (sender, RequestMsg {t} m) => | |
do ((resp, val), clients') <- eval clients (f m) | |
sendToThread sender resp | |
return (val, clients') | |
eval clients (Connect {serveri} (MkPID pid)) = do | |
x <- sendToThread pid (ConnectMsg {iface = serveri}) | |
return (x == 1, clients) | |
eval clients (Disconnect {serveri} (MkPID pid)) = do | |
x <- sendToThread pid (CloseMsg {iface = serveri}) | |
return ((), clients) | |
eval clients CountClients = return (clients, clients) | |
eval clients (Loop x) = eval clients x | |
run : Process () (const Void) (init []) (const (init [])) -> IO () | |
run p = do eval 0 p | |
return () | |
{--- test ---} | |
-- The usual test, a simple program which responds to requests to add | |
-- numbers, and keeps track of the number of operations performed. | |
data Cmd : Type -> Type where | |
Add : Double -> Double -> Cmd Double | |
GetOpCount : Cmd Int | |
addServer : Int -> Running () Cmd | |
addServer uptime | |
= do uptime' <- TimeoutRespond 2 uptime | |
(\val => case val of | |
Add x y => Pure (x + y, uptime + 1) | |
GetOpCount => Pure (uptime, uptime)) | |
c <- CountClients | |
putStrLn (show c ++ " clients connected") | |
-- Not allowed to quit if there's some clients connected, because | |
-- the type is 'Running () Cmd'! | |
case c of | |
Z => putStrLn "No more clients, quitting" | |
S k => Loop (addServer uptime') | |
-- Add client repeatedly connects to a server until either the server | |
-- has died, or it gets a negative number in response. | |
-- Note that it needs to open a connection and close it at the end, due | |
-- to the type 'Program ()'. | |
addClient : ProcID Cmd -> Program () | |
addClient pid = do True <- Connect pid | |
| False => do putStrLn "Server died" | |
Pure () | |
putStr ": " | |
x <- getLine | |
continue <- case getNums (trim x) of | |
Nothing => do Request pid GetOpCount | |
putStrLn $ "Uptime: " ++ show !(GetReply pid) | |
Pure True | |
Just (x, y) => do Request pid (Add x y) | |
putStrLn $ "Sum is: " ++ show !(GetReply pid) | |
Pure (x + y > 0) | |
Disconnect pid | |
if continue then addClient pid | |
else Pure () | |
where | |
getNums : String -> Maybe (Double, Double) | |
getNums xs with (words xs) | |
getNums xs | [l, r] = Just (cast l, cast r) | |
getNums xs | _ = Nothing | |
-- Main program initialises a process, runs the client, then when done | |
-- disconnects the server. | |
sumProg : Program () | |
sumProg = do pid <- Fork (addServer 0) | |
addClient pid | |
Disconnect pid | |
usleep 5000000 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment