Forked from anonymous/DIYWordCountTopologyForLocalCluster.java
Created
August 10, 2013 10:30
-
-
Save HeartSaVioR/6199943 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package storm.starter; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Random; | |
public class DIYWordCountTopologyForLocalCluster { | |
public static class CountWordBolt extends BaseRichBolt { | |
OutputCollector _collector; | |
private Map<String, Integer> mapWordCount = new HashMap<String, Integer>(); | |
@Override | |
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { | |
_collector = collector; | |
} | |
@Override | |
public void execute(Tuple input) { | |
String word = input.getString(0); | |
int wordCount = increaseWordCount(word); | |
_collector.emit(input, new Values(word, wordCount)); | |
_collector.ack(input); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("word", "count")); | |
} | |
private int increaseWordCount(String word) { | |
int wordCount = 0; | |
if (mapWordCount.containsKey(word)) | |
wordCount = mapWordCount.get(word); | |
wordCount++; | |
mapWordCount.put(word, wordCount); | |
return wordCount; | |
} | |
} | |
public static class SplitBolt extends BaseRichBolt { | |
private OutputCollector _collector; | |
@Override | |
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { | |
_collector = collector; | |
} | |
@Override | |
public void execute(Tuple input) { | |
String sentence = input.getString(0); | |
for (String word : sentence.split(" ")) { | |
word = word.replaceAll(",|\\.|:|\"", ""); | |
_collector.emit(input, new Values(word)); | |
} | |
_collector.ack(input); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("word")); | |
} | |
} | |
public static class RandomSentenceSpout extends BaseRichSpout { | |
private SpoutOutputCollector _collector; | |
private Random rand; | |
private String[] sentences = new String[]{ | |
"Once when I was six years old I saw a magnificent picture in a book, called True Stories from Nature, about the primeval forest.", | |
"It was a picture of a boa constrictor in the act of swallowing an animal.", | |
"Here is a copy of the drawing.", | |
"In the book it said: \"Boa constrictors swallow their prey whole, without chewing it. After that they are not able to move, and they sleep through the six months that they need for digestion.\"", | |
"I pondered deeply, then, over the adventures of the jungle.", | |
"And after some work with a colored pencil I succeeded in making my first drawing.", | |
"My Drawing Number One." | |
}; | |
@Override | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
_collector = collector; | |
rand = new Random(); | |
} | |
@Override | |
public void nextTuple() { | |
String selectedSentence = sentences[rand.nextInt(sentences.length)]; | |
_collector.emit(new Values(selectedSentence)); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("sentence")); | |
} | |
@Override | |
public void ack(Object msgId) { | |
} | |
@Override | |
public void fail(Object msgId) { | |
} | |
} | |
public static void main(String[] args) { | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("sentence", new RandomSentenceSpout(), 1); | |
builder.setBolt("split", new SplitBolt(), 20) | |
.shuffleGrouping("sentence"); | |
builder.setBolt("count", new CountWordBolt(), 10) | |
.fieldsGrouping("split", new Fields("word")); | |
Config config = new Config(); | |
config.setDebug(true); | |
// Local Cluster test | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("diy", config, builder.createTopology()); | |
Utils.sleep(10000); | |
cluster.killTopology("diy"); | |
cluster.shutdown(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment