Storm的WordCount案例(spout bolt详细总结)

Storm的WordCount案例(spout bolt详细总结)spout介绍一个spout是由流组成的数据源在storm的拓扑里,通常情况下会读取外部的数据源然后emit(发射)到拓扑里面,比如是kafka,MySQL或者redis等等,Spout有两种实现一种是可靠的消息实现,如果发送失败则会重试,另外一种是不可靠的消息实现可能会出现消息丢失,spout可以一次声明多个数据流通过OutputFieldsDeclarer类的declareStream方法,当然前提是你的SpoutOutputCollector里的emit也是多个流Spout里面主要的方法..

spout介绍

一个spout是由流组成的数据源在storm的拓扑里,通常情况下会读取外部的数据源 
然后emit(发射)到拓扑里面,比如是kafka,MySQL或者redis等等,Spout有两种实现一种是可靠的消息实现,如果发送失败则会重试,另外一种是不可靠的消息实现可能会出现消息丢失,spout可以一次声明多个数据流通过OutputFieldsDeclarer类的declareStream方法,当然前提是你的SpoutOutputCollector里的emit也是多个流 

Spout里面主要的方法是nextTuple,它里面可以发射新的tuple到拓扑,或者当没有消息的时候就return,需要注意,这个方法里面不能阻塞,因为storm调用spout方法是单线程的,其他的主要方法是ack和fail,如果使用了可靠的spout,可以使用ack和fail来确定消息发送状态 

相关扩展: 
IRichSpout:spout类必须实现的接口 
BaseRichSpout :可靠的spout有ack确保 
BaseBasicSpout :不可靠的spout

1.Spout组件:创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

WordCountSpout:
package storm;

import lombok.SneakyThrows;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author: wtl
 * @License: (C) Copyright 2020, wtl Corporation Limited.
 * @Contact: 1050100468@qq.com
 * @Date: 2020/11/27 5:14
 * @Version: 1.0
 * @Description:
 */
public class WordCountSpout implements IRichSpout {

    private SpoutOutputCollector spoutOutputCollector;
    private String [] lines = {"I love you","hello world","hello kitty","nihao beijing"};
    private Random random = new Random();

    @Override
    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }

    @SneakyThrows
    @Override
    public void nextTuple() {
        int nextInt = random.nextInt(lines.length);
        spoutOutputCollector.emit(new Values(lines[nextInt]));
        TimeUnit.SECONDS.sleep(1);
    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

bolt介绍

Bolts 业务处理单元 
所有的拓扑处理都会在bolt中进行,bolt里面可以做任何etl,比如过滤,函数,聚合,连接,写入数据库系统或缓存等,一个bolt可以做简单的事件流转换,如果是复杂的流转化,往往需要多个bolt参与,这就是流计算,每个bolt都进行一个业务逻辑处理,bolt也可以emit多个流到下游,通过declareStream方法声明输出的schema。 

Bolt里面主要的方法是execute方法,每次处理一个输入的tuple,bolt里面也可以发射新的tuple使用OutputCollector类,bolt里面每处理一个tuple必须调用ack方法以便于storm知道某个tuple何时处理完成。Strom里面的IBasicBolt接口可以自动 
调用ack。 

相关拓展: 
IRichBolt:bolts的通用接口 
IBasicBolt:扩展的bolt接口,可以自动处理ack 
OutputCollector:bolt发射tuple到下游bolt里面 

2.Bolt组件1:创建Bolt(WordCountSplitBolt)组件进行分词操作

SplitBolt:
package storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.stream.Stream;

/**
 * @author: wtl
 * @License: (C) Copyright 2020, wtl Corporation Limited.
 * @Contact: 1050100468@qq.com
 * @Date: 2020/11/27 5:41
 * @Version: 1.0
 * @Description:
 */
public class SplitBolt implements IRichBolt {

    private TopologyContext topologyContext;
    private OutputCollector outputCollector;

    @Override
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.topologyContext = topologyContext;
        this.outputCollector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String line = tuple.getStringByField("line");
        String[] splits = line.split(" ");

        Stream.of(splits).forEach(word -> {
            outputCollector.emit(new Values(word,1));
        });
    }

    @Override
    public void cleanup() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word","count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

3.Bolt组件2:创建Bolt(WordCountTotalBolt)组件进行单词统计操作

TotalBolt:
package storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

/**
 * @author: wtl
 * @License: (C) Copyright 2020, wtl Corporation Limited.
 * @Contact: 1050100468@qq.com
 * @Date: 2020/11/27 5:57
 * @Version: 1.0
 * @Description:
 */
public class TotalBolt implements IRichBolt {
    private TopologyContext topologyContext;
    private OutputCollector outputCollector;
    private Map<String,Integer> map;

    @Override
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.topologyContext = topologyContext;
        this.outputCollector = outputCollector;
        this.map = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Integer count = tuple.getIntegerByField("count");
        if (!map.containsKey(word)){
            map.put(word,count);
        }
        else{
            map.put(word,map.get(word) + count);
        }

        map.forEach((key, value) -> {
            System.out.println(key + ": " + value);
        });
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word","total"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

4.Topology主程序:

WordCountTopology:
package storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author: wtl
 * @License: (C) Copyright 2020, wtl Corporation Limited.
 * @Contact: 1050100468@qq.com
 * @Date: 2020/11/27 6:06
 * @Version: 1.0
 * @Description:
 */
public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        topologyBuilder.setSpout("wcSpout",new WordCountSpout(),1);
        topologyBuilder.setBolt("wcSplitBolt",new SplitBolt(),1).shuffleGrouping("wcSpout");

        topologyBuilder.setBolt("wcTotalBolt",new TotalBolt(),1).fieldsGrouping("wcSplitBolt",new Fields("word"));

        //创建任务
        StormTopology job = topologyBuilder.createTopology();

        Config config = new Config();

        //运行任务有两种模式
        //1 本地模式   2 集群模式

        //1、本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("MyWordCount",config,job);
    }
}

今天的文章Storm的WordCount案例(spout bolt详细总结)分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:http://bianchenghao.cn/25141.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注