Created
September 20, 2022 20:46
-
-
Save d4hines/471505f82cffabd9e9bc3f85c2cf4323 to your computer and use it in GitHub Desktop.
Domainslib and Lwt Gotcha's
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open Domainslib | |
let domains = 8 | |
(* First, the map function we were using when we discovered the problem *) | |
let parallel_map pool f l = | |
let length = List.length l in | |
let chunk_size = max (length / domains) 1 in | |
let chunks = Base.List.chunks_of l ~length:chunk_size in | |
let promises = | |
List.map (fun chunk -> Task.async pool (fun () -> List.map f chunk)) chunks | |
in | |
List.concat_map (fun promise -> Task.await pool promise) promises | |
(* This succeeds *) | |
let parallel_vanilla_case () = | |
let pool = Task.setup_pool ~num_additional_domains:domains () in | |
let l = List.init domains Fun.id in | |
ignore | |
@@ Task.run pool (fun () -> | |
parallel_map pool | |
(fun _ -> | |
print_endline "sleeping 1 sec"; | |
Unix.sleep 1) | |
l) | |
(* This succeeds *) | |
let lwt_vanilla_case () = | |
let l = List.init 8 Fun.id in | |
List.iter | |
(fun _ -> | |
Lwt.async (fun () -> | |
print_endline "starting lwt sleep"; | |
Lwt.bind (Lwt_unix.sleep 1.) (fun () -> | |
print_endline "finished lwt sleep"; | |
Lwt.return_unit))) | |
l; | |
Lwt_main.run (Lwt_unix.sleep 2.) | |
(* This fails *) | |
let lwt_and_parallel () = | |
Lwt_main.run | |
@@ | |
let pool = Task.setup_pool ~num_additional_domains:domains () in | |
Task.run pool (fun () -> | |
print_endline "starting lwt sleep"; | |
Lwt.bind (Lwt_unix.sleep 1.) (fun () -> | |
print_endline "finished lwt sleep"; | |
print_endline "starting parallel map"; | |
let _ = | |
(* The problem is that Task.async (called by [parallel_map]) | |
must be in the same dynamic scope as Task.run. | |
In this case, the [Lwt.bind] actually puts us in a different | |
scope, because this callback is executed by Lwt. | |
As soon as [parallel_map] hits [Task.await], it blows up. *) | |
parallel_map pool | |
(fun _ -> | |
print_endline "sleeping 1 sec"; | |
Unix.sleep 1) | |
(List.init 8 Fun.id) | |
in | |
Lwt.return_unit)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment