Created
August 10, 2013 10:30
Revisions
-
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,146 @@ package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; public class DIYWordCountTopologyForLocalCluster { public static class CountWordBolt extends BaseRichBolt { OutputCollector _collector; private Map<String, Integer> mapWordCount = new HashMap<String, Integer>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple input) { String word = input.getString(0); int wordCount = increaseWordCount(word); _collector.emit(input, new Values(word, wordCount)); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } private int increaseWordCount(String word) { int wordCount = 0; if (mapWordCount.containsKey(word)) wordCount = mapWordCount.get(word); wordCount++; mapWordCount.put(word, wordCount); return wordCount; } } public static class SplitBolt extends BaseRichBolt { private OutputCollector _collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple input) { String sentence = input.getString(0); for (String word : sentence.split(" ")) { word = word.replaceAll(",|\\.|:|\"", ""); _collector.emit(input, new Values(word)); } _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class RandomSentenceSpout extends BaseRichSpout { private SpoutOutputCollector _collector; private Random rand; private String[] sentences = new String[]{ "Once when I was six years old I saw a magnificent picture in a book, called True Stories from Nature, about the primeval forest.", "It was a picture of a boa constrictor in the act of swallowing an animal.", "Here is a copy of the drawing.", "In the book it said: \"Boa constrictors swallow their prey whole, without chewing it. After that they are not able to move, and they sleep through the six months that they need for digestion.\"", "I pondered deeply, then, over the adventures of the jungle.", "And after some work with a colored pencil I succeeded in making my first drawing.", "My Drawing Number One." }; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; rand = new Random(); } @Override public void nextTuple() { String selectedSentence = sentences[rand.nextInt(sentences.length)]; _collector.emit(new Values(selectedSentence)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentence", new RandomSentenceSpout(), 1); builder.setBolt("split", new SplitBolt(), 20) .shuffleGrouping("sentence"); builder.setBolt("count", new CountWordBolt(), 10) .fieldsGrouping("split", new Fields("word")); Config config = new Config(); config.setDebug(true); // Local Cluster test LocalCluster cluster = new LocalCluster(); cluster.submitTopology("diy", config, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("diy"); cluster.shutdown(); } }