Skip to content

Instantly share code, notes, and snippets.

@Quantisan
Forked from richhickey/flow-scratch.clj
Last active February 10, 2025 16:13
Show Gist options
  • Save Quantisan/d79cb62a708dc4fba214f817c70f7c69 to your computer and use it in GitHub Desktop.
Save Quantisan/d79cb62a708dc4fba214f817c70f7c69 to your computer and use it in GitHub Desktop.
(require '[clojure.core.async :as async]
'[clojure.core.async.flow :as flow]
'[clojure.pprint :as pp])
;; Monitoring function to show the built-in reporting and error handling
;; This showcases the centralized monitoring capabilities of the flow system
(defn monitoring [{:keys [report-chan error-chan]}]
(prn "========= monitoring start")
(async/thread
(loop []
(let [[val port] (async/alts!! [report-chan error-chan])]
(if (nil? val)
(prn "========= monitoring shutdown")
(do
(prn (str "======== message from " (if (= port error-chan) :error-chan :report-chan)))
(pp/pprint val)
(recur))))))
nil)
;; Example of a stateful process function
;; Shows how to create a process that maintains state between invocations
(defn ddupe
;; Describe function: defines inputs and outputs
([] {:ins {:in "stuff"}
:outs {:out "stuff w/o consecutive dupes"}})
;; Init function: sets up initial state
([_] {:last nil})
;; Transform function: processes input and produces output
([{:keys [last]} _ v]
[{:last v} (when (not= last v) {:out [v]})]))
;; Define the flow graph
;; Shows the separation of process logic from system topology
(def gdef
{:procs
{
;; Source process: generates random dice rolls
;; Shows how to create a process that injects data into the flow
:dice-source
{:proc (flow/process
{:describe (fn [] {:outs {:out "roll the dice!"}})
:introduce (fn [_]
(Thread/sleep 200)
[nil {:out [[(inc (rand-int 6)) (inc (rand-int 6))]]}])})}
;; Transformation process: identifies "craps" rolls
;; Shows how to convert a simple function into a flow process
:craps-finder
{:proc (-> #(when (#{2 3 12} (apply + %)) %) flow/lift1->step flow/step-process)}
;; Stateful process: removes consecutive duplicates
;; Shows use of a multi-arity function to define a stateful process
:dedupe
{:proc (flow/step-process #'ddupe)}
;; Sink process: prints results
;; Shows how to create a output process
:prn-sink
{:proc (flow/process
{:describe (fn [] {:ins {:in "gimme stuff to print!"}})
:transform (fn [_ _ v] (prn v))})}}
;; Define connections between processes
;; This shows the topology of the system separate from process logic
:conns
[
[[:dice-source :out] [:dedupe :in]]
[[:dedupe :out] [:craps-finder :in]]
[[:craps-finder :out] [:prn-sink :in]]]})
;; Create the flow
;; This step compiles the flow definition into an executable form
(def g (flow/create-flow gdef))
;; Start the flow and set up monitoring
;; Shows how to initialize the flow and begin observing its behavior
(monitoring (flow/start g))
(flow/resume g) ;;wait a bit for craps to print
(flow/pause g)
;; Inject custom data into the flow for testing
;; Shows how to manually introduce data at any point in the flow
(flow/inject g [:craps-finder :in] [[1 2] [2 1] [2 1] [6 6] [6 6] [4 3] [1 1]])
(flow/ping g)
(flow/stop g)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment