Skip to content

Instantly share code, notes, and snippets.

@nathanmarz
Created September 20, 2011 04:01

Revisions

  1. nathanmarz created this gist Sep 20, 2011.
    32 changes: 32 additions & 0 deletions gistfile1.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,32 @@
    (use 'backtype.storm.clojure)
    (use 'backtype.storm.config)
    (require '[backtype.storm [thrift :as thrift]])
    (import 'storm.starter.spout.RandomSentenceSpout)
    (import 'backtype.storm.LocalCluster)

    (defboltfull suffix-bolt ["word"]
    :params [suffix]
    :let [conf-state (atom nil)]
    :prepare ([conf context collector]
    (reset! conf-state conf))
    :execute ([tuple collector]
    (.emit collector tuple [(str (.getValue tuple 0) suffix)])
    (.ack collector tuple)
    ))

    (defbolt exclamation-bolt ["word"] [tuple collector]
    (.emit collector tuple [(str (.getValue tuple 0) "!!!")])
    (.ack collector tuple))

    (defn mk-topology []
    (thrift/mk-topology
    {1 (thrift/mk-spout-spec (RandomSentenceSpout.) :parallelism-hint 4)}
    {2 (thrift/mk-bolt-spec {1 :shuffle} (suffix-bolt "!!!") :parallelism-hint 3)
    3 (thrift/mk-bolt-spec {2 :shuffle} exclamation-bolt :parallelism-hint 3)}
    ))


    (def cluster (LocalCluster.))
    (.submitTopology cluster "test" {TOPOLOGY-DEBUG true} (mk-topology))
    (Thread/sleep 10000)
    (.shutdown cluster)