spout介绍
一个spout是由流组成的数据源在storm的拓扑里,通常情况下会读取外部的数据源
然后emit(发射)到拓扑里面,比如是kafka,MySQL或者redis等等,Spout有两种实现一种是可靠的消息实现,如果发送失败则会重试,另外一种是不可靠的消息实现可能会出现消息丢失,spout可以一次声明多个数据流通过OutputFieldsDeclarer类的declareStream方法,当然前提是你的SpoutOutputCollector里的emit也是多个流
Spout里面主要的方法是nextTuple,它里面可以发射新的tuple到拓扑,或者当没有消息的时候就return,需要注意,这个方法里面不能阻塞,因为storm调用spout方法是单线程的,其他的主要方法是ack和fail,如果使用了可靠的spout,可以使用ack和fail来确定消息发送状态
相关扩展:
IRichSpout:spout类必须实现的接口
BaseRichSpout :可靠的spout有ack确保
BaseBasicSpout :不可靠的spout
1.Spout组件:创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源
WordCountSpout:
package storm;
import lombok.SneakyThrows;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @author: wtl
* @License: (C) Copyright 2020, wtl Corporation Limited.
* @Contact: 1050100468@qq.com
* @Date: 2020/11/27 5:14
* @Version: 1.0
* @Description:
*/
public class WordCountSpout implements IRichSpout {
private SpoutOutputCollector spoutOutputCollector;
private String [] lines = {"I love you","hello world","hello kitty","nihao beijing"};
private Random random = new Random();
@Override
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@SneakyThrows
@Override
public void nextTuple() {
int nextInt = random.nextInt(lines.length);
spoutOutputCollector.emit(new Values(lines[nextInt]));
TimeUnit.SECONDS.sleep(1);
}
@Override
public void ack(Object o) {
}
@Override
public void fail(Object o) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
bolt介绍
Bolts 业务处理单元
所有的拓扑处理都会在bolt中进行,bolt里面可以做任何etl,比如过滤,函数,聚合,连接,写入数据库系统或缓存等,一个bolt可以做简单的事件流转换,如果是复杂的流转化,往往需要多个bolt参与,这就是流计算,每个bolt都进行一个业务逻辑处理,bolt也可以emit多个流到下游,通过declareStream方法声明输出的schema。
Bolt里面主要的方法是execute方法,每次处理一个输入的tuple,bolt里面也可以发射新的tuple使用OutputCollector类,bolt里面每处理一个tuple必须调用ack方法以便于storm知道某个tuple何时处理完成。Strom里面的IBasicBolt接口可以自动
调用ack。
相关拓展:
IRichBolt:bolts的通用接口
IBasicBolt:扩展的bolt接口,可以自动处理ack
OutputCollector:bolt发射tuple到下游bolt里面
2.Bolt组件1:创建Bolt(WordCountSplitBolt)组件进行分词操作
SplitBolt:
package storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.stream.Stream;
/**
* @author: wtl
* @License: (C) Copyright 2020, wtl Corporation Limited.
* @Contact: 1050100468@qq.com
* @Date: 2020/11/27 5:41
* @Version: 1.0
* @Description:
*/
public class SplitBolt implements IRichBolt {
private TopologyContext topologyContext;
private OutputCollector outputCollector;
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String line = tuple.getStringByField("line");
String[] splits = line.split(" ");
Stream.of(splits).forEach(word -> {
outputCollector.emit(new Values(word,1));
});
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
3.Bolt组件2:创建Bolt(WordCountTotalBolt)组件进行单词统计操作
TotalBolt:
package storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* @author: wtl
* @License: (C) Copyright 2020, wtl Corporation Limited.
* @Contact: 1050100468@qq.com
* @Date: 2020/11/27 5:57
* @Version: 1.0
* @Description:
*/
public class TotalBolt implements IRichBolt {
private TopologyContext topologyContext;
private OutputCollector outputCollector;
private Map<String,Integer> map;
@Override
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.topologyContext = topologyContext;
this.outputCollector = outputCollector;
this.map = new HashMap<>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = tuple.getIntegerByField("count");
if (!map.containsKey(word)){
map.put(word,count);
}
else{
map.put(word,map.get(word) + count);
}
map.forEach((key, value) -> {
System.out.println(key + ": " + value);
});
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","total"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
4.Topology主程序:
WordCountTopology:
package storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
/**
* @author: wtl
* @License: (C) Copyright 2020, wtl Corporation Limited.
* @Contact: 1050100468@qq.com
* @Date: 2020/11/27 6:06
* @Version: 1.0
* @Description:
*/
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("wcSpout",new WordCountSpout(),1);
topologyBuilder.setBolt("wcSplitBolt",new SplitBolt(),1).shuffleGrouping("wcSpout");
topologyBuilder.setBolt("wcTotalBolt",new TotalBolt(),1).fieldsGrouping("wcSplitBolt",new Fields("word"));
//创建任务
StormTopology job = topologyBuilder.createTopology();
Config config = new Config();
//运行任务有两种模式
//1 本地模式 2 集群模式
//1、本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("MyWordCount",config,job);
}
}
今天的文章Storm的WordCount案例(spout bolt详细总结)分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/25141.html