package com.zhch.v3;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout { private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMappending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}
实现语句分割bolt
package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.List;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector collector; private ListnumCounterTasks; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; //获取下游bolt的taskId列表 this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID); } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word)); collector.emitDirect(taskId, tuple, new Values(word)); } this.collector.ack(tuple); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if (word.isEmpty()) return 0; else { //单词首字母对下游 bolt taskId 列表长度取余 return word.charAt(0) % numCounterTasks.size(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }}
实现单词计数bolt
package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;public class WordCountBolt extends BaseRichBolt { private OutputCollector collector; private HashMapcounts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap (); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); Iterator keys = this.counts.keySet().iterator(); while (keys.hasNext()) { String w = keys.next(); Long c = this.counts.get(w); writer.write(w + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }}
实现单词计数topology
package com.zhch.v3;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology { public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v3"; public static void main(String[] args) throws Exception { SentenceSpout spout = new SentenceSpout(); SplitSentenceBolt spiltBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout, 2); builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4) .shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(COUNT_BOLT_ID, countBolt, 2) .directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分组策略 Config config = new Config(); config.put("wordsFile", args[0]); if (args != null && args.length > 1) { config.setNumWorkers(2); //集群模式启动 StormSubmitter.submitTopology(args[1], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { } cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }}
提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3
运行结果:
[grid@hadoop5 stormData]$ cat result.txt second : 1can : 1set : 1simple : 1use : 2unbounded : 1used : 1It : 1Storm : 4online : 1cases: : 1open : 1Apache : 1of : 2over : 1more : 1clocked : 1easy : 2scalable : 1any : 1guarantees : 1ETL : 1million : 1continuous : 1is : 6with : 1it : 2makes : 1your : 1a : 4at : 1machine : 1analytics : 1up : 1and : 5many : 1system : 1source : 1what : 1operate : 1will : 1computation : 2streams : 1[grid@hadoop6 stormData]$ cat result.txt to : 3for : 2data : 2distributed : 2has : 1free : 1programming : 1reliably : 1fast: : 1processing : 2be : 2Hadoop : 1did : 1fun : 1learning : 1torm : 1process : 1RPC : 1node : 1processed : 2per : 2realtime : 3benchmark : 1batch : 1doing : 1lot : 1language : 1tuples : 1fault-tolerant : 1