Created
August 22, 2013 16:24
-
-
Save halgari/6309500 to your computer and use it in GitHub Desktop.
Load balancer using core.async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns async-examples.load-balancer | |
(:require [clojure.core.async :refer :all])) | |
;; Let's assume we have a DB, and it holds the following pairs of name/ages | |
(def db-store | |
{:john 42 | |
:amy 33 | |
:jill 3 | |
:james 4}) | |
;; Now let's assume that we have two operations with a operand on one | |
;; operation | |
(defn query [op & args] | |
(case op | |
:get-people (keys db-store) | |
:get-age (db-store (first args)))) | |
;; Most DBs have a connection limit, so let's simulate that | |
(def MAX-CONNECTIONS 4) | |
(def access-count (atom 0)) | |
(defn db-operation [& args] | |
(swap! access-count inc) | |
(assert (<= @access-count MAX-CONNECTIONS)) | |
(let [result (apply query args)] | |
(Thread/sleep 100) ;; simulates a slower db | |
(swap! access-count dec) | |
result)) | |
;; So what we want to do, is some connection pooling / load balancing. This is normally | |
;; pretty hard, but with core.async it's rather trivial: | |
(def db-chan (let [c (chan MAX-CONNECTIONS)] | |
(dotimes [x MAX-CONNECTIONS] | |
(thread | |
(while true | |
(let [[command ret-chan] (<!! c)] | |
(>!! ret-chan (apply db-operation command)))))) | |
c)) | |
;; Now lets' write a helper function that will make querying the DB a | |
;; bit simpler from inside a async blog | |
(defn async-query [& operations] | |
(let [c (chan 1)] | |
(go (>! db-chan [operations c])) | |
c)) | |
;; Now we have everything we need to create 100 gos and do some | |
;; queries. Notice how the 100 threads are balanced over the | |
;; MAX-CONNECTIONS db threads. In this way we don't overload the DB | |
;; server, while still taking advantage of the large number of go threads. | |
(def println-chan (let [c (chan MAX-CONNECTIONS)] | |
(thread | |
(while true | |
(println (<!! c)))) | |
c)) | |
(defn join!! | |
"takes a sequence of gos waits until all gos complete" | |
[args] | |
(doseq [s (doall args)] | |
(<!! s))) | |
(defn -main [] | |
(join!! (for [x (range 100)] | |
(go | |
(let [people (<! (async-query :get-people))] | |
(>! println-chan people) | |
(doseq [person people] | |
(>! println-chan [person (<! (async-query :get-age person))]))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing, here an alternative implementation of the same behavior using core.async/pipeline-blocking
Your approach is more straightforward when you need a reply for your asynchronous command. (my result chan proxies to the sender to achieve similar behavior)