Last active
August 29, 2015 14:23
-
-
Save edwinb/45f1b2cda00f20a47775 to your computer and use it in GitHub Desktop.
More concurrency in Idris
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Main | |
import System.Concurrency.Raw | |
%access public | |
-- Process IDs are parameterised by the type of request they are willing to | |
-- take (req, which is implicit), and a function which calculates the type of | |
-- response they'll send | |
abstract | |
data ProcID : (resp : req -> Type) -> Type where | |
MkPID : Ptr -> ProcID {req} resp | |
-- Processes we've sent a request to, but not got a response yet. | |
data ReqPending : Type where | |
None : ReqPending | |
Pending : (ProcID {req} resp, Type) -> ReqPending | |
-- Processes are parameterised by their request and response types, like | |
-- ProcID, and also their "responsiveness" - i.e. whether they run forever | |
-- and contain a 'Respond' so we can tell from the type if it's going to | |
-- respond to no requests (awake = False), | |
-- one request (awake = True, running = False) | |
-- or infinite requests (awake = True, running = True) | |
-- We also keep track of requests we've made that we haven't got the response | |
-- to yet - this allows us to send a message and do other stuff while the | |
-- server is computing a response. | |
data Process : (resp : req -> Type) -> | |
(running : Bool) -> -- running forever | |
(awake : Bool) -> -- contains a Respond | |
ReqPending -> ReqPending -> | |
Type -> Type where | |
-- Some plumbing | |
Lift' : IO a -> Process resp False False p p a | |
Pure : a -> Process resp False False p p a | |
(>>=) : Process resp r awake p p' a -> | |
(a -> Process resp r' awake' p' p'' b) -> | |
Process resp r' (awake || awake') p p'' b | |
-- Calculate the response to a request from some client, and send it. | |
-- The function returns a pair, the second element of which can be used | |
-- to update internal state, for example. | |
-- Blocks if there are no requests waiting. | |
Respond : ((x : req) -> Process {req} resp False False p p (resp x, t)) -> | |
Process resp False True p p t | |
-- As Respond, but with a default if no message waiting | |
TryRespond : t -> | |
((x : req) -> Process {req} resp False False p p (resp x, t)) -> | |
Process {req} resp False True p p t | |
-- As Respond, but with a default if timeout passes | |
TimeoutRespond : Int -> t -> | |
((x : req) -> Process {req} resp False False p p (resp x, t)) -> | |
Process {req} resp False True p p t | |
-- Send a request to a process. Only allowed if there are no other | |
-- requests (since we can't guarantee order of response). | |
Request : (proc : ProcID {req = req'} resp') -> (x : req') -> | |
Process resp False False None (Pending (proc, resp' x)) () | |
-- Get a response from a process, as long as we're waiting for one. | |
GetReply : (proc : ProcID {req = req'} resp') -> | |
Process resp False False (Pending (proc, ty)) None ty | |
-- Continue processing requests recursively. This is the only way to | |
-- have 'running' be True, so if we want to start a process which | |
-- processes requests forever, we have to use Loop. | |
Loop : Inf (Process resp True True p p t) -> | |
Process resp True False p p t | |
-- Start a new server process and return its ID. This ID can be shared | |
-- between multiple programs and threads. | |
StartServer : Process {req = req'} resp' True True p p () -> | |
Process resp False False p p (ProcID {req = req'} resp') | |
-- Start a new process, with no restrictions on it. We can't communicate | |
-- with this process since there are no guarantees about how long | |
-- it will run. | |
Fork : Process resp' r a None None () -> | |
Process resp False False p p () | |
-- Fork a child process which can send us messages on our interface, | |
-- which we are guaranteed to respond to because we're running | |
-- forever. | |
StartClient : (ProcID {req} resp -> Process resp' r a None None ()) -> | |
Process {req} resp True True p p () -> | |
Process {req} resp True True p p () | |
Server : (resp : req -> Type) -> Type -> Type | |
Server resp = Process resp True True None None | |
Program : Type -> Type | |
Program t = Process {req = Void} (const Void) False False None None t | |
implicit | |
Lift : IO a -> Process resp False False p p a | |
Lift = Lift' | |
-- There's no need to be in 'Process' to send a message to a server, once | |
-- that server process is running, because we know it's continuing to be | |
-- ready to respond! | |
send : ProcID {req} resp -> (x : req) -> IO (resp x) | |
send (MkPID pid) x = do sendToThread pid x | |
return !(getMsgFrom pid) | |
Send : ProcID {req = req'} resp' -> (x : req') -> | |
Process resp False False None None (resp' x) | |
Send proc x = do Request proc x | |
GetReply proc | |
-- Run the process DSL | |
mutual | |
respond : (req : Type) -> | |
((x : req) -> Process {req} resp False False p p (resp x, t)) -> | |
IO t | |
respond req f = do (pid, msg) <- recv | |
(resp, val) <- eval (f msg) | |
sendToThread pid resp | |
return val | |
where recv : IO (Ptr, req) | |
recv = getMsgWithSender | |
eval : Process {req} resp s s' p p' t -> IO t | |
eval (Lift' x) = x | |
eval (Pure x) = pure x | |
eval (x >>= f) = do x' <- eval x | |
eval (f x') | |
eval {req} (Respond f) = respond req f | |
eval {req} (TryRespond t f) = if !checkMsgs | |
then (respond req f) | |
else return t | |
eval {req} (TimeoutRespond delay t f) = if !(checkMsgsTimeout delay) | |
then (respond req f) | |
else return t | |
eval (Request (MkPID pid) x) = sendToThread pid x | |
eval (GetReply {ty} (MkPID pid)) = getMsgFrom pid | |
eval (Loop x) = eval x | |
eval (StartServer proc) = do ptr <- fork (eval proc) | |
return (MkPID ptr) | |
eval (Fork proc) = do ptr <- fork (eval proc) | |
return () | |
eval (StartClient f proc) = do ptr <- fork (eval (f (MkPID prim__vm))) | |
eval proc | |
run : Process resp running awake None None t -> IO t | |
run = eval | |
{------------ EXAMPLE ------------} | |
-- A simple server with two commands; add two numbers, or return the number | |
-- of operations already performed. | |
data Command = Add Int Int | |
| GetOpCount | |
-- Response type of each command | |
Resp : Command -> Type | |
Resp (Add x y) = Int | |
Resp GetOpCount = Int | |
-- The server itself keeps track of an internal state, which is the number | |
-- of commands performed, and responds to requests forever. | |
addServer : Int -> Server Resp () | |
addServer x = do putStrLn $ "Awaiting input" | |
x' <- TimeoutRespond 10 x | |
(\val => case val of | |
Add l r => Pure (l + r, x + 1) | |
GetOpCount => Pure (x, x)) | |
putStrLn $ "Response done, uptime " ++ show x' | |
Loop (addServer x') | |
-- FIXME: This ugly thing is to make the 'case' below know its type | |
simple : Program () -> Program () | |
simple x = x | |
-- A program which uses the server | |
countProg : ProcID Resp -> Program () | |
countProg proc = do putStr ": " | |
x <- getLine | |
simple $ case getNums (trim x) of | |
Nothing => do ops <- Send proc GetOpCount | |
putStrLn $ "Uptime: " ++ show ops | |
Just (x, y) => do sum <- Send proc (Add x y) | |
putStrLn $ "Sum is: " ++ show sum | |
countProg proc | |
where | |
getNums : String -> Maybe (Int, Int) | |
getNums xs with (words xs) | |
getNums xs | [l, r] = Just (cast l, cast r) | |
getNums xs | _ = Nothing | |
-- Finally, the main program starts the server | |
main : IO () | |
main = run (do server <- StartServer (addServer 0) | |
countProg server) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment