Skip to content

Instantly share code, notes, and snippets.

@henryw374
Last active April 11, 2023 10:34
Show Gist options
  • Save henryw374/cd37c7fd182fedf538340899c4e0913b to your computer and use it in GitHub Desktop.
Save henryw374/cd37c7fd182fedf538340899c4e0913b to your computer and use it in GitHub Desktop.
(ns async-play
(:require [manifold.stream :as ms]
[clojure.core.async :as a]))
(comment ; memory leak. puts are queued up indefinitely
(def s (ms/stream))
(dotimes [n 100]
(ms/put! s n))
(ms/consume println s)
(def s (ms/stream))
(dotimes [n 100]
(ms/try-put! s n 1.0))
(ms/consume println s) ;-> prints nothing, bc puts failed bc there was nothing there to take them
)
(comment ;multiple consumer cbs
(def up (ms/stream))
(ms/consume (fn [x] (println "d1 " x)) up)
(ms/consume (fn [x] (println "d2 " x)) up)
(ms/put! up "a")
)
(comment ; leak with multiple consumers
; put! not accepted until all consumers accept.
; more puts will queue up
(def up (ms/stream))
(def d1 (ms/stream))
(def d2 (ms/stream))
(ms/connect up d1)
(ms/connect up d2)
(ms/consume (fn [x] (println "d1 " x)) d1)
(ms/put! up "a")
; starts to get backed up after here
(ms/put! up "b")
(ms/put! up "c")
(ms/consume (fn [x] (println "d2 " x)) d2)
)
(comment
(def c (a/chan))
(def cc (ms/->sink c))
(-> cc
(ms/on-closed (fn [] (println "closa"))))
(a/close! c) ; on-closed not called
(ms/close! cc) ; on-closed is called
@(ms/put! cc 1) ; false. c is closed
@(ms/take! cc) ; err. cc is a sink
)
(comment ;downstream closing
(def s (ms/stream))
(def a (ms/map inc s))
(def b (ms/map inc s))
(println "s closed at start:" (ms/closed? s))
(ms/close! a)
(ms/close! b)
(println "s closed immediately after closing downstreams:" (ms/closed? s))
(let [put-attempt @(ms/put! s 42)]
(println "s closed after a put attempt:" (ms/closed? s)))
(def s (ms/stream))
(def s (ms/stream* {:permanent? true}))
(def c (a/chan))
;(def cc (ms/->sink c))
(ms/connect s c)
(println "s closed at start:" (ms/closed? s))
(a/close! c)
(println "s closed immediately after closing downstreams:" (ms/closed? s))
(let [put-attempt @(ms/put! s 42)]
(println "s closed after a put attempt:" (ms/closed? s)))
)
(comment ;batching
;
(do
(def keep-going? (atom true))
(def s (ms/stream))
(->> s
; deliver batch with at most 1000 items and at least every 2 seconds
(ms/batch 1000 2000)
(ms/consume (fn [batch]
(println "got " (count batch)))))
(def waits (cycle [0 0 2100]))
(first (rest (rest waits)))
(future
(loop [ws waits]
(when @keep-going?
(ms/put-all! s (mapv str (range 1500)))
(println "sleeping for " (first ws))
(Thread/sleep (first ws))
(println "looping")
(recur (rest ws)))))
)
(reset! keep-going? false)
(reset! keep-going? true)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment