博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm实验 -- 单词计数3
阅读量:7005 次
发布时间:2019-06-27

本文共 8486 字,大约阅读时间需要 28 分钟。

hot3.png

在上一次单词计数的基础上做如下改动: 使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数
数据源spout
package com.zhch.v3;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {    private FileReader fileReader = null;    private boolean completed = false;    private ConcurrentHashMap
pending; private SpoutOutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("sentence")); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap
(); try { this.fileReader = new FileReader(map.get("wordsFile").toString()); } catch (Exception e) { throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]"); } } @Override public void nextTuple() { if (completed) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } String line; BufferedReader reader = new BufferedReader(fileReader); try { while ((line = reader.readLine()) != null) { Values values = new Values(line); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values); this.collector.emit(values, msgId); } } catch (Exception e) { throw new RuntimeException("Error reading tuple", e); } finally { completed = true; } } @Override public void ack(Object msgId) { this.pending.remove(msgId); } @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}

实现语句分割bolt

package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.List;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {    private OutputCollector collector;    private List
numCounterTasks; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; //获取下游bolt的taskId列表 this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID); } @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word)); collector.emitDirect(taskId, tuple, new Values(word)); } this.collector.ack(tuple); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if (word.isEmpty()) return 0; else { //单词首字母对下游 bolt taskId 列表长度取余 return word.charAt(0) % numCounterTasks.size(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); }}

实现单词计数bolt 

package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;public class WordCountBolt extends BaseRichBolt {    private OutputCollector collector;    private HashMap
counts = null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap
(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if (count == null) { count = 0L; } count++; this.counts.put(word, count); BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt")); Iterator
keys = this.counts.keySet().iterator(); while (keys.hasNext()) { String w = keys.next(); Long c = this.counts.get(w); writer.write(w + " : " + c); writer.newLine(); writer.flush(); } } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (Exception e) { e.printStackTrace(); } writer = null; } } this.collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "count")); }}

实现单词计数topology 

package com.zhch.v3;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {    public static final String SENTENCE_SPOUT_ID = "sentence-spout";    public static final String SPLIT_BOLT_ID = "split-bolt";    public static final String COUNT_BOLT_ID = "count-bolt";    public static final String TOPOLOGY_NAME = "word-count-topology-v3";    public static void main(String[] args) throws Exception {        SentenceSpout spout = new SentenceSpout();        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();        WordCountBolt countBolt = new WordCountBolt();        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);        builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)                .shuffleGrouping(SENTENCE_SPOUT_ID);        builder.setBolt(COUNT_BOLT_ID, countBolt, 2)                .directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分组策略        Config config = new Config();        config.put("wordsFile", args[0]);        if (args != null && args.length > 1) {            config.setNumWorkers(2);            //集群模式启动            StormSubmitter.submitTopology(args[1], config, builder.createTopology());        } else {            LocalCluster cluster = new LocalCluster();            cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());            try {                Thread.sleep(5 * 1000);            } catch (InterruptedException e) {            }            cluster.killTopology(TOPOLOGY_NAME);            cluster.shutdown();        }    }}

提交到Storm集群 

storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3

运行结果:

[grid@hadoop5 stormData]$ cat result.txt second : 1can : 1set : 1simple : 1use : 2unbounded : 1used : 1It : 1Storm : 4online : 1cases: : 1open : 1Apache : 1of : 2over : 1more : 1clocked : 1easy : 2scalable : 1any : 1guarantees : 1ETL : 1million : 1continuous : 1is : 6with : 1it : 2makes : 1your : 1a : 4at : 1machine : 1analytics : 1up : 1and : 5many : 1system : 1source : 1what : 1operate : 1will : 1computation : 2streams : 1[grid@hadoop6 stormData]$ cat result.txt to : 3for : 2data : 2distributed : 2has : 1free : 1programming : 1reliably : 1fast: : 1processing : 2be : 2Hadoop : 1did : 1fun : 1learning : 1torm : 1process : 1RPC : 1node : 1processed : 2per : 2realtime : 3benchmark : 1batch : 1doing : 1lot : 1language : 1tuples : 1fault-tolerant : 1

转载于:https://my.oschina.net/zc741520/blog/410168

你可能感兴趣的文章
PhoneGap API帮助文档翻译Notification提醒
查看>>
LESS介绍及其与Sass的差异
查看>>
PForDelta索引压缩算法的实现
查看>>
在.NET开发中的单元测试工具之(2)——xUnit.Net
查看>>
微软BI 之SSRS 系列 - 使用带参数的 MDX 查询实现一个分组聚合功能的报表
查看>>
Direct基础学习系列3 绘制+实例
查看>>
myeclipse报异常:java.lang.OutOfMemoryError: Java heap space
查看>>
NSCopying简析
查看>>
解决This system is not registered with RHN
查看>>
python抓取网络内容
查看>>
nginx(四)初识nginx日志文件
查看>>
poj1026(置换找循环节)
查看>>
[leetcode]Search for a Range
查看>>
leetcode - Merge Intervals
查看>>
Freertos之系统配置
查看>>
四年前从这里开始,四年后从这里继续-回归博客园
查看>>
Linux命令--删除软连接
查看>>
Nginx 错误汇总
查看>>
Maven将jar包放入本地库
查看>>
SnowNLP:一个处理中文文本的 Python 类库
查看>>